console/SWSCloudCore/controllers/common/__init__.py
2016-06-11 14:20:56 +03:00

298 lines
8.4 KiB
Python

# coding: utf-8
import datetime
import json
import random
import re
import smtplib
import string
import uuid
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from hashlib import md5
from flask import g, render_template
from SWSCloudCore import models
class ControllerCommon:
def __init__(self):
pass
@staticmethod
def generate_password(size=14, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
class ControllerManagement:
class Servers:
def create(self, server_id, datacenter_id, ip, hostname, secret, status=0):
models.Servers.create(
id=server_id,
datacenter=datacenter_id,
ip=ip,
hostname=hostname,
status=status,
secret=secret,
created=datetime.datetime.now()
)
return True
def get_all(self):
return models.Rules.select()
class ControllerClient:
def __init__(self):
pass
def get(self):
return None
# https://docs.python.org/2/library/email-examples.html
class ControllerMessages:
def __init__(self):
s = [
"mail.template_name",
'mail.logotype',
"mail.company_name",
"contacts.email",
"contacts.phone",
"social.googleplus",
"social.facebook",
"social.twitter",
'SMTP_SERVER',
'SMTP_PORT',
'SMTP_USERNAME',
'SMTP_PASSWORD',
'SMTP_TIMEOUT',
'SMTP_FROM',
'SMTP_SENDER',
'SMTP_SSL',
]
settings = models.Settings.select(
models.Settings.key,
models.Settings.val
).where(models.Settings.key << s).execute()
self.settings = dict()
for i in settings:
self.settings[i.key] = i.val
class ControllerMessagesEmail(ControllerMessages):
def send(self, **content):
# return True
# Create message container - the correct MIME type is multipart/alternative.
msg = MIMEMultipart('alternative')
msg['Subject'] = content['title'].encode('utf-8')
msg['From'] = self.settings['contacts.email'].encode('utf-8')
msg['To'] = content['to'].encode('utf-8')
text = content['message'].encode('utf-8')
html = render_template(
'email/%s/layout.html' % self.settings['mail.template_name'],
content=content,
settings=self.settings
).encode('utf-8')
# Record the MIME types of both parts - text/plain and text/html.
part1 = MIMEText(text, 'plain', 'UTF-8')
part2 = MIMEText(html, 'html', 'UTF-8')
# Attach parts into message container.
# According to RFC 2046, the last part of a multipart message, in this case
# the HTML message, is best and preferred.
msg.attach(part1)
msg.attach(part2)
if self.settings['SMTP_SSL']:
server = smtplib.SMTP_SSL(
self.settings['SMTP_SERVER'],
int(self.settings['SMTP_PORT']),
timeout=int(self.settings['SMTP_TIMEOUT'])
)
else:
server = smtplib.SMTP(
self.settings['SMTP_SERVER'],
int(self.settings['SMTP_PORT']),
timeout=int(self.settings['SMTP_TIMEOUT'])
)
server.ehlo()
# server.starttls()
server.login(self.settings['SMTP_USERNAME'], self.settings['SMTP_PASSWORD'])
server.sendmail(self.settings['SMTP_FROM'], content['to'], msg.as_string())
server.quit()
class ControllerMessagesSMS(ControllerMessages):
def send(self):
return None
class ControllerMessagesInternal(ControllerMessages):
"""
Класс для
"""
def send(self):
return None
def list(self):
return None
class Settings:
SETTINGS_KEYS = [
'SMTP_SERVER',
'SMTP_PORT',
'SMTP_USERNAME',
'SMTP_PASSWORD',
'SMTP_FROM',
'SMTP_TIMEOUT',
'PAY_ROBOKASA_LOGIN',
'PAY_ROBOKASA_PASSWORD1',
'PAY_ROBOKASA_PASSWORD2',
]
def get_items(self, keys):
return models.Settings.select().where(models.Settings.key << keys).execute()
def get(self, key):
return None
def set(self, key, value):
if models.Settings.select().where(models.Settings.key == key).count() == 0:
models.Settings.create(key=key, val=value)
else:
models.Settings.update()
def delete(self, key):
return None
class Mail:
def send(self, to, subject, message):
msg = "\r\n".join([
"From: %s <%s>" % (g.settings['SMTP_FROM'], g.settings['SMTP_SENDER']),
"To: %s" % to,
"Subject: %s" % subject,
"",
message
]).encode('utf-8')
if int(g.settings['SMTP_SSL']) == 0:
server = smtplib.SMTP(g.settings['SMTP_SERVER'], int(g.settings['SMTP_PORT']), timeout=int(g.settings['SMTP_TIMEOUT']))
else:
server = smtplib.SMTP_SSL(g.settings['SMTP_SERVER'], int(g.settings['SMTP_PORT']), timeout=int(g.settings['SMTP_TIMEOUT']))
server.ehlo()
# server.starttls()
server.login(g.settings['SMTP_USERNAME'], g.settings['SMTP_PASSWORD'])
server.sendmail(g.settings['SMTP_SENDER'], to, msg)
server.quit()
def special_match(string, search=r'[^a-z0-9.@]'):
"""
email - r"^[A-Za-z0-9\.\+_-]+@[A-Za-z0-9\._-]+\.[a-zA-Z]*$"
:param string:
:param search:
:return:
"""
return re.compile(search).match(string)
class Datacenters:
def get_items(self, status=1):
# todo: use `status`
dcs = {
"total": models.DataCenters.select().where(models.DataCenters.status == 1).count(),
"items": []
}
items = models.DataCenters.select().where(models.DataCenters.status == 1)
# print items[0].id
for i in items:
dc = {
"id": i.id,
"name": i.name,
"status": i.status
}
dcs['items'].append(dc)
# print i
return dcs
class Control:
def user_secret_update(self, user_id):
secret = uuid.uuid4()
# check exists record
if models.UsersSecrets.select().where(models.UsersSecrets.user == user_id).count() == 0:
# insert record
models.UsersSecrets.create(user=user_id, secret=secret, acl='', status=1)
return True
else:
# update
user_secret = models.UsersSecrets.select().where(models.UsersSecrets.user == user_id).limit(1)[0]
user_secret.secret = secret
user_secret.save()
return True
def user_secret_get(self, user_id):
user_secret = models.UsersSecrets.select().where(models.UsersSecrets.user == user_id).limit(1)[0]
return {
"id": user_secret.id,
"secret": user_secret.secret,
"acl": user_secret.acl,
"status": user_secret.status
}
def task_create(self, datacenter_id, server_id, user_id, plain):
task_id = uuid.uuid4()
task_json = json.dumps(plain)
models.Tasks.create(id=task_id, datacenter=datacenter_id, server=server_id, user=user_id, plain=task_json)
class Admins:
def check_session(self):
"""
Check session for contain a required keys
:return: bool
"""
from flask import session
required = ['admin_email', 'admin_password', 'admin_id']
success = True
for r in required:
if r in session and success:
continue
else:
success = False
return success
def auth(self, email, password, status=1):
"""
:param email:
:param password:
:param status:
:return:
"""
password_hash = md5(password).hexdigest()
result = models.Admins.select().\
where(
models.Admins.email == email,
models.Admins.password == password_hash,
models.Admins.status == status
).count()
if result == 0:
return False
return True
def get_id_by_email(self, email):
admin = models.Admins.select(models.Admins.id).where(models.Admins.email == email).get()
return admin.id