console/SWSCloudCore/controllers/common/__init__.py

299 lines
8.4 KiB
Python
Raw Normal View History

2015-12-01 02:43:10 +03:00
# coding: utf-8
import datetime
import json
import random
2016-04-02 00:42:22 +03:00
import re
2015-12-01 02:43:10 +03:00
import smtplib
2016-04-02 00:42:22 +03:00
import string
import uuid
2015-12-01 02:43:10 +03:00
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
2016-04-02 00:42:22 +03:00
from hashlib import md5
2015-12-01 02:43:10 +03:00
2016-04-02 00:42:22 +03:00
from flask import g, render_template
2015-12-01 02:43:10 +03:00
2016-04-02 00:42:22 +03:00
from SWSCloudCore import models
2015-12-01 02:43:10 +03:00
class ControllerCommon:
def __init__(self):
pass
2016-06-11 14:20:56 +03:00
@staticmethod
def generate_password(size=14, chars=string.ascii_uppercase + string.digits):
2015-12-01 02:43:10 +03:00
return ''.join(random.choice(chars) for _ in range(size))
class ControllerManagement:
class Servers:
def create(self, server_id, datacenter_id, ip, hostname, secret, status=0):
models.Servers.create(
id=server_id,
datacenter=datacenter_id,
ip=ip,
hostname=hostname,
status=status,
secret=secret,
created=datetime.datetime.now()
)
return True
def get_all(self):
return models.Rules.select()
class ControllerClient:
def __init__(self):
pass
def get(self):
return None
# https://docs.python.org/2/library/email-examples.html
class ControllerMessages:
def __init__(self):
s = [
"mail.template_name",
'mail.logotype',
"mail.company_name",
"contacts.email",
"contacts.phone",
"social.googleplus",
"social.facebook",
"social.twitter",
'SMTP_SERVER',
'SMTP_PORT',
'SMTP_USERNAME',
'SMTP_PASSWORD',
'SMTP_TIMEOUT',
'SMTP_FROM',
'SMTP_SENDER',
'SMTP_SSL',
]
settings = models.Settings.select(
models.Settings.key,
models.Settings.val
).where(models.Settings.key << s).execute()
self.settings = dict()
for i in settings:
self.settings[i.key] = i.val
class ControllerMessagesEmail(ControllerMessages):
def send(self, **content):
# return True
# Create message container - the correct MIME type is multipart/alternative.
msg = MIMEMultipart('alternative')
msg['Subject'] = content['title'].encode('utf-8')
msg['From'] = self.settings['contacts.email'].encode('utf-8')
msg['To'] = content['to'].encode('utf-8')
text = content['message'].encode('utf-8')
html = render_template(
'email/%s/layout.html' % self.settings['mail.template_name'],
content=content,
settings=self.settings
).encode('utf-8')
# Record the MIME types of both parts - text/plain and text/html.
part1 = MIMEText(text, 'plain', 'UTF-8')
part2 = MIMEText(html, 'html', 'UTF-8')
# Attach parts into message container.
# According to RFC 2046, the last part of a multipart message, in this case
# the HTML message, is best and preferred.
msg.attach(part1)
msg.attach(part2)
if self.settings['SMTP_SSL']:
server = smtplib.SMTP_SSL(
self.settings['SMTP_SERVER'],
int(self.settings['SMTP_PORT']),
timeout=int(self.settings['SMTP_TIMEOUT'])
)
else:
server = smtplib.SMTP(
self.settings['SMTP_SERVER'],
int(self.settings['SMTP_PORT']),
timeout=int(self.settings['SMTP_TIMEOUT'])
)
server.ehlo()
# server.starttls()
server.login(self.settings['SMTP_USERNAME'], self.settings['SMTP_PASSWORD'])
server.sendmail(self.settings['SMTP_FROM'], content['to'], msg.as_string())
server.quit()
class ControllerMessagesSMS(ControllerMessages):
def send(self):
return None
class ControllerMessagesInternal(ControllerMessages):
"""
Класс для
"""
def send(self):
return None
def list(self):
return None
class Settings:
SETTINGS_KEYS = [
'SMTP_SERVER',
'SMTP_PORT',
'SMTP_USERNAME',
'SMTP_PASSWORD',
'SMTP_FROM',
'SMTP_TIMEOUT',
'PAY_ROBOKASA_LOGIN',
'PAY_ROBOKASA_PASSWORD1',
'PAY_ROBOKASA_PASSWORD2',
]
def get_items(self, keys):
2015-12-12 17:29:45 +03:00
return models.Settings.select().where(models.Settings.key << keys).execute()
2015-12-01 02:43:10 +03:00
def get(self, key):
return None
def set(self, key, value):
if models.Settings.select().where(models.Settings.key == key).count() == 0:
models.Settings.create(key=key, val=value)
else:
models.Settings.update()
def delete(self, key):
return None
class Mail:
def send(self, to, subject, message):
msg = "\r\n".join([
"From: %s <%s>" % (g.settings['SMTP_FROM'], g.settings['SMTP_SENDER']),
"To: %s" % to,
"Subject: %s" % subject,
"",
message
]).encode('utf-8')
if int(g.settings['SMTP_SSL']) == 0:
server = smtplib.SMTP(g.settings['SMTP_SERVER'], int(g.settings['SMTP_PORT']), timeout=int(g.settings['SMTP_TIMEOUT']))
else:
server = smtplib.SMTP_SSL(g.settings['SMTP_SERVER'], int(g.settings['SMTP_PORT']), timeout=int(g.settings['SMTP_TIMEOUT']))
server.ehlo()
# server.starttls()
server.login(g.settings['SMTP_USERNAME'], g.settings['SMTP_PASSWORD'])
server.sendmail(g.settings['SMTP_SENDER'], to, msg)
server.quit()
def special_match(string, search=r'[^a-z0-9.@]'):
"""
email - r"^[A-Za-z0-9\.\+_-]+@[A-Za-z0-9\._-]+\.[a-zA-Z]*$"
:param string:
:param search:
:return:
"""
return re.compile(search).match(string)
class Datacenters:
def get_items(self, status=1):
# todo: use `status`
dcs = {
"total": models.DataCenters.select().where(models.DataCenters.status == 1).count(),
"items": []
}
items = models.DataCenters.select().where(models.DataCenters.status == 1)
# print items[0].id
for i in items:
dc = {
"id": i.id,
"name": i.name,
"status": i.status
}
dcs['items'].append(dc)
# print i
return dcs
class Control:
def user_secret_update(self, user_id):
secret = uuid.uuid4()
# check exists record
if models.UsersSecrets.select().where(models.UsersSecrets.user == user_id).count() == 0:
# insert record
models.UsersSecrets.create(user=user_id, secret=secret, acl='', status=1)
return True
else:
# update
user_secret = models.UsersSecrets.select().where(models.UsersSecrets.user == user_id).limit(1)[0]
user_secret.secret = secret
user_secret.save()
return True
def user_secret_get(self, user_id):
user_secret = models.UsersSecrets.select().where(models.UsersSecrets.user == user_id).limit(1)[0]
return {
"id": user_secret.id,
"secret": user_secret.secret,
"acl": user_secret.acl,
"status": user_secret.status
}
def task_create(self, datacenter_id, server_id, user_id, plain):
task_id = uuid.uuid4()
task_json = json.dumps(plain)
models.Tasks.create(id=task_id, datacenter=datacenter_id, server=server_id, user=user_id, plain=task_json)
class Admins:
def check_session(self):
"""
Check session for contain a required keys
:return: bool
"""
from flask import session
required = ['admin_email', 'admin_password', 'admin_id']
success = True
for r in required:
if r in session and success:
continue
else:
success = False
return success
def auth(self, email, password, status=1):
"""
:param email:
:param password:
2016-06-11 14:20:56 +03:00
:param status:
2015-12-01 02:43:10 +03:00
:return:
"""
password_hash = md5(password).hexdigest()
result = models.Admins.select().\
where(
2016-04-02 00:42:22 +03:00
models.Admins.email == email,
models.Admins.password == password_hash,
models.Admins.status == status
2015-12-01 02:43:10 +03:00
).count()
if result == 0:
return False
return True
def get_id_by_email(self, email):
2016-06-11 14:20:56 +03:00
admin = models.Admins.select(models.Admins.id).where(models.Admins.email == email).get()
return admin.id