console/SWSCloudCore/controllers/users/__init__.py

221 lines
6.8 KiB
Python

# coding: utf-8
import random
import string
import uuid
from hashlib import md5
from SWSCloudCore import models
class ControllerUsers:
def __init__(self, user_id=None):
if user_id:
self.user_id = user_id
# def user_exists(self):
# if models.UsersDetails.select().where(models.UsersDetails.user == self.user_id).count() == 0:
# return False
# return True
def user_id_by_email(self, email):
return models.Users.get(models.Users.email == email).id
def get_id_by_email(self, email):
return models.Users.get(models.Users.email == email).id
def user_exists_by_email(self, email):
cnt = models.Users.select().where(models.Users.email == email).count()
if cnt == 0:
return False
return True
def get(self):
return models.Users.select().where(models.Users.id == self.user_id).get()
def update(self, user_id, **kwargs):
if 'password' in kwargs:
x = models.Users.update(
password=md5(kwargs['password']).hexdigest()
).where(
models.Users.id == user_id
)
x.execute()
return True
def generate_password(self, size=14, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
def check_session(self):
"""
Check session for contain a required keys
:return: bool
"""
from flask import session
required = ['email', 'password', 'user_id']
success = True
for r in required:
if r in session and success:
continue
else:
success = False
return success
def auth(self, email, password, status=1):
"""
:param email:
:param password:
:return:
"""
password_hash = md5(password).hexdigest()
result = models.Users.select().where(
models.Users.email == email,
models.Users.password == password_hash,
models.Users.status == status
).count()
if result == 0:
return False
return True
def registration(self, email, password):
"""
:param email:
:param password:
:return:
"""
password_hash = md5(password).hexdigest()
user_id = uuid.uuid4()
# TODO: add date registration and update of date - =datetime.datetime.now()
user = models.Users.create(id=user_id, email=email, password=password_hash, status=1)
if user.id:
return True
return False
def password_update(self, user_id, password):
password_hash = md5(password).hexdigest()
u = models.Users.get(models.Users.id == user_id)
u.password = password_hash
u.save()
return True
class ControllerUsersRecoveryCodes():
def create(self, user_id, code):
models.UsersRecoveryCodes.create(user=user_id, recovery_code=code)
return True
def delete(self, user_id):
"""
Удаление всех созданных кодов для указанного оользователя
:param user_id:
:return: Bool
"""
code = models.UsersRecoveryCodes.delete().where(models.UsersRecoveryCodes.user == user_id)
code.execute()
return True
def check(self, user_id, code):
"""
Проверка существования кода
:param user_id:
:param code:
:return: bool
"""
if models.UsersRecoveryCodes.select().where(
models.UsersRecoveryCodes.user == user_id, models.UsersRecoveryCodes.recovery_code == code
).count() == 0:
return False
return True
# http://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python
def code_generate(self, size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
class ControllerUsersDetails(ControllerUsers):
def details_exists(self):
if models.UsersDetails.select().where(models.UsersDetails.user == self.user_id).count() == 0:
return False
return True
def details_create(self):
models.UsersDetails.create(user=self.user_id)
return True
def details_get(self):
if not self.details_exists():
self.details_create()
return models.UsersDetails.select().where(models.UsersDetails.user == self.user_id).limit(1)[0]
def details_update(self, **kwargs):
x = models.UsersDetails.get(models.UsersDetails.user == self.user_id)
if 'fname' in kwargs:
x.fname = kwargs['fname']
if 'lname' in kwargs:
x.lname = kwargs['lname']
if 'address' in kwargs:
x.address = kwargs['address']
if 'city' in kwargs:
x.city = kwargs['city']
if 'country' in kwargs:
x.country = kwargs['country']
if 'state' in kwargs:
x.state = kwargs['state']
if 'zipcode' in kwargs:
x.zipcode = kwargs['zipcode']
x.save()
class ControllerSSHKey:
def __init__(self, user_id):
self.user_id = user_id
def check(self):
if models.SSHKeys.select().where(models.SSHKeys.user == self.user_id).count() == 0:
return False
return True
def get(self):
if self.check():
return models.SSHKeys.select().where(models.SSHKeys.user == self.user_id).limit(1)[0].sshkey
return ""
def set(self, key):
models.SSHKeys.create(sshkey=key, user=self.user_id)
return None
def delete(self):
q = models.SSHKeys.delete().where(models.SSHKeys.user == self.user_id)
q.execute()
return True
class ControllerAPI:
def auth(self, email, secret):
if models.UsersSecrets.select().join(models.Users).where(
models.Users.email == email,
models.UsersSecrets.secret == secret
).count() == 0:
return False
return True
def get(self, user_id):
return models.UsersSecrets.select().where(models.UsersSecrets.user == user_id).first()
def check(self, user_id):
if models.UsersSecrets.select().where(models.UsersSecrets.user == user_id).count() == 0:
return False
return True
def set(self, user_id, secret, acl, status):
if self.check(user_id):
x = models.UsersSecrets.update(secret=secret, acl=acl, status=status).where(
models.UsersSecrets.user == user_id
)
x.execute()
else:
models.UsersSecrets.create(user=user_id, secret=secret, acl=acl, status=status)
return True