40 lines
1.1 KiB
Python
40 lines
1.1 KiB
Python
import json
|
|
|
|
from SWSCloudCore import models
|
|
|
|
|
|
class ControllerManageTasks:
|
|
"""
|
|
|
|
"""
|
|
def __init__(self):
|
|
pass
|
|
|
|
def get(self, limit=100):
|
|
return models.Tasks.select().limit(limit)
|
|
|
|
def get_task(self, task_id):
|
|
"""
|
|
Get specified task
|
|
:param task_id:
|
|
:return:
|
|
"""
|
|
return models.Tasks.select().where(models.Tasks.id == task_id).get()
|
|
|
|
def get_by_server(self, server_id):
|
|
task = models.Tasks.select(models.Tasks.plain).\
|
|
where(models.Tasks.server_id == server_id).order_by(models.Tasks.created.desc()).limit(1)
|
|
return json.loads(task[0].plain)
|
|
|
|
def delete(self, task_id):
|
|
return models.Tasks.delete().where(models.Tasks.id == task_id).execute()
|
|
|
|
def check_exists_by_id(self, task_id):
|
|
if models.Tasks.select().where(models.Tasks.id == task_id).count() == 1:
|
|
return True
|
|
return False
|
|
|
|
def check_exists_by_server(self, server_id):
|
|
result = models.Tasks.select().where(models.Tasks.server_id == server_id).count()
|
|
# logging.debug("Function: check_exists_by_server(): %s" % result)
|
|
return result
|