221 lines
6.8 KiB
Python
221 lines
6.8 KiB
Python
# coding: utf-8
|
|
|
|
import random
|
|
import string
|
|
import uuid
|
|
from hashlib import md5
|
|
|
|
from SWSCloudCore import models
|
|
|
|
|
|
class ControllerUsers:
|
|
def __init__(self, user_id=None):
|
|
if user_id:
|
|
self.user_id = user_id
|
|
|
|
# def user_exists(self):
|
|
# if models.UsersDetails.select().where(models.UsersDetails.user == self.user_id).count() == 0:
|
|
# return False
|
|
# return True
|
|
|
|
def user_id_by_email(self, email):
|
|
return models.Users.get(models.Users.email == email).id
|
|
|
|
def get_id_by_email(self, email):
|
|
return models.Users.get(models.Users.email == email).id
|
|
|
|
def user_exists_by_email(self, email):
|
|
cnt = models.Users.select().where(models.Users.email == email).count()
|
|
if cnt == 0:
|
|
return False
|
|
return True
|
|
|
|
def get(self):
|
|
return models.Users.select().where(models.Users.id == self.user_id).get()
|
|
|
|
def update(self, user_id, **kwargs):
|
|
if 'password' in kwargs:
|
|
x = models.Users.update(
|
|
password=md5(kwargs['password']).hexdigest()
|
|
).where(
|
|
models.Users.id == user_id
|
|
)
|
|
x.execute()
|
|
return True
|
|
|
|
def generate_password(self, size=14, chars=string.ascii_uppercase + string.digits):
|
|
return ''.join(random.choice(chars) for _ in range(size))
|
|
|
|
def check_session(self):
|
|
"""
|
|
Check session for contain a required keys
|
|
:return: bool
|
|
"""
|
|
from flask import session
|
|
required = ['email', 'password', 'user_id']
|
|
success = True
|
|
|
|
for r in required:
|
|
if r in session and success:
|
|
continue
|
|
else:
|
|
success = False
|
|
return success
|
|
|
|
def auth(self, email, password, status=1):
|
|
"""
|
|
|
|
:param email:
|
|
:param password:
|
|
:return:
|
|
"""
|
|
password_hash = md5(password).hexdigest()
|
|
result = models.Users.select().where(
|
|
models.Users.email == email,
|
|
models.Users.password == password_hash,
|
|
models.Users.status == status
|
|
).count()
|
|
if result == 0:
|
|
return False
|
|
return True
|
|
|
|
def registration(self, email, password):
|
|
"""
|
|
|
|
:param email:
|
|
:param password:
|
|
:return:
|
|
"""
|
|
password_hash = md5(password).hexdigest()
|
|
user_id = uuid.uuid4()
|
|
# TODO: add date registration and update of date - =datetime.datetime.now()
|
|
user = models.Users.create(id=user_id, email=email, password=password_hash, status=1)
|
|
if user.id:
|
|
return True
|
|
return False
|
|
|
|
def password_update(self, user_id, password):
|
|
password_hash = md5(password).hexdigest()
|
|
u = models.Users.get(models.Users.id == user_id)
|
|
u.password = password_hash
|
|
u.save()
|
|
return True
|
|
|
|
|
|
class ControllerUsersRecoveryCodes():
|
|
def create(self, user_id, code):
|
|
models.UsersRecoveryCodes.create(user=user_id, recovery_code=code)
|
|
return True
|
|
|
|
def delete(self, user_id):
|
|
"""
|
|
Удаление всех созданных кодов для указанного оользователя
|
|
:param user_id:
|
|
:return: Bool
|
|
"""
|
|
code = models.UsersRecoveryCodes.delete().where(models.UsersRecoveryCodes.user == user_id)
|
|
code.execute()
|
|
return True
|
|
|
|
def check(self, user_id, code):
|
|
"""
|
|
Проверка существования кода
|
|
:param user_id:
|
|
:param code:
|
|
:return: bool
|
|
"""
|
|
if models.UsersRecoveryCodes.select().where(
|
|
models.UsersRecoveryCodes.user == user_id, models.UsersRecoveryCodes.recovery_code == code
|
|
).count() == 0:
|
|
return False
|
|
return True
|
|
|
|
# http://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python
|
|
def code_generate(self, size=6, chars=string.ascii_uppercase + string.digits):
|
|
return ''.join(random.choice(chars) for _ in range(size))
|
|
|
|
|
|
class ControllerUsersDetails(ControllerUsers):
|
|
def details_exists(self):
|
|
if models.UsersDetails.select().where(models.UsersDetails.user == self.user_id).count() == 0:
|
|
return False
|
|
return True
|
|
|
|
def details_create(self):
|
|
models.UsersDetails.create(user=self.user_id)
|
|
return True
|
|
|
|
def details_get(self):
|
|
if not self.details_exists():
|
|
self.details_create()
|
|
return models.UsersDetails.select().where(models.UsersDetails.user == self.user_id).limit(1)[0]
|
|
|
|
def details_update(self, **kwargs):
|
|
x = models.UsersDetails.get(models.UsersDetails.user == self.user_id)
|
|
if 'fname' in kwargs:
|
|
x.fname = kwargs['fname']
|
|
if 'lname' in kwargs:
|
|
x.lname = kwargs['lname']
|
|
if 'address' in kwargs:
|
|
x.address = kwargs['address']
|
|
if 'city' in kwargs:
|
|
x.city = kwargs['city']
|
|
if 'country' in kwargs:
|
|
x.country = kwargs['country']
|
|
if 'state' in kwargs:
|
|
x.state = kwargs['state']
|
|
if 'zipcode' in kwargs:
|
|
x.zipcode = kwargs['zipcode']
|
|
x.save()
|
|
|
|
|
|
class ControllerSSHKey:
|
|
def __init__(self, user_id):
|
|
self.user_id = user_id
|
|
|
|
def check(self):
|
|
if models.SSHKeys.select().where(models.SSHKeys.user == self.user_id).count() == 0:
|
|
return False
|
|
return True
|
|
|
|
def get(self):
|
|
if self.check():
|
|
return models.SSHKeys.select().where(models.SSHKeys.user == self.user_id).limit(1)[0].sshkey
|
|
return ""
|
|
|
|
def set(self, key):
|
|
models.SSHKeys.create(sshkey=key, user=self.user_id)
|
|
return None
|
|
|
|
def delete(self):
|
|
q = models.SSHKeys.delete().where(models.SSHKeys.user == self.user_id)
|
|
q.execute()
|
|
return True
|
|
|
|
|
|
class ControllerAPI:
|
|
def auth(self, email, secret):
|
|
if models.UsersSecrets.select().join(models.Users).where(
|
|
models.Users.email == email,
|
|
models.UsersSecrets.secret == secret
|
|
).count() == 0:
|
|
return False
|
|
return True
|
|
|
|
def get(self, user_id):
|
|
return models.UsersSecrets.select().where(models.UsersSecrets.user == user_id).first()
|
|
|
|
def check(self, user_id):
|
|
if models.UsersSecrets.select().where(models.UsersSecrets.user == user_id).count() == 0:
|
|
return False
|
|
return True
|
|
|
|
def set(self, user_id, secret, acl, status):
|
|
if self.check(user_id):
|
|
x = models.UsersSecrets.update(secret=secret, acl=acl, status=status).where(
|
|
models.UsersSecrets.user == user_id
|
|
)
|
|
x.execute()
|
|
else:
|
|
models.UsersSecrets.create(user=user_id, secret=secret, acl=acl, status=status)
|
|
return True
|