console/SWSCloudCore/controllers/tasks/manage.py

40 lines
1.1 KiB
Python

import json
from SWSCloudCore import models
class ControllerManageTasks:
"""
"""
def __init__(self):
pass
def get(self, limit=100):
return models.Tasks.select().limit(limit)
def get_task(self, task_id):
"""
Get specified task
:param task_id:
:return:
"""
return models.Tasks.select().where(models.Tasks.id == task_id).get()
def get_by_server(self, server_id):
task = models.Tasks.select(models.Tasks.plain).\
where(models.Tasks.server_id == server_id).order_by(models.Tasks.created.desc()).limit(1)
return json.loads(task[0].plain)
def delete(self, task_id):
return models.Tasks.delete().where(models.Tasks.id == task_id).execute()
def check_exists_by_id(self, task_id):
if models.Tasks.select().where(models.Tasks.id == task_id).count() == 1:
return True
return False
def check_exists_by_server(self, server_id):
result = models.Tasks.select().where(models.Tasks.server_id == server_id).count()
# logging.debug("Function: check_exists_by_server(): %s" % result)
return result