console/SWSCloudAdministrator/Administrator/__init__.py
2024-12-10 21:05:37 +03:00

410 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding: utf-8
from uuid import uuid4
import validators
from flask import Blueprint, flash, g, jsonify, redirect, render_template, request, session, url_for
from SWSCloudAdministrator.Administrator.common import requires_login
from SWSCloudCore.controllers.administrators import ControllerAdministrators
# from SWSCloudCore.controllers.billing import ControllerBilling
from SWSCloudCore.controllers.common import ControllerMessagesEmail
from SWSCloudCore.controllers.datacenters.manage import ControllerManageDatacenters
from SWSCloudCore.controllers.ips.manage import ControllerManageIPs
from SWSCloudCore.controllers.servers.manage import ControllerManageServer
# from SWSCloudCore.controllers.users.manage import ControllerManageUsers
# from SWSCloudCore.controllers.users.manage import ControllerManageUsersBalance
# from SWSCloudCore.controllers.users.manage import ControllerManageUsersDetails
from SWSCloudCore.controllers.plans import ControllerPlans
from SWSCloudCore import models
viewAdministrator = Blueprint('administrator', __name__, url_prefix='/administrator')
@viewAdministrator.route('/login.html', methods=['GET'])
def login():
return render_template('administrator/login.html')
@viewAdministrator.route('/login.html', methods=['POST'])
def login_post():
admin_email = request.form.get('email').encode('utf-8')
admin_password = request.form.get('password').encode('utf-8')
# validation entered data
if not validators.email(admin_email):
flash('Invalid registration data')
return redirect(url_for('administrator.login'))
# try auth only active users (with status code 1)
if not models.Admins.auth(admin_email, admin_password, 1):
flash('Invalid login. Please try again.')
return redirect(url_for('administrator.login'))
# get user_id
user_id = ControllerAdministrators().get_id_by_email(admin_email)
# save user data to session
session['admin_id'] = str(user_id)
session['admin_email'] = admin_email
session['admin_password'] = admin_password
# redirect to rules list
return redirect(url_for('administrator.dashboard'))
@viewAdministrator.route('/logout.html')
def logout():
session.pop('admin_id', None)
session.pop('admin_email', None)
session.pop('admin_password', None)
return redirect(url_for('administrator.login'))
@viewAdministrator.route('/dashboard.html')
@requires_login
def dashboard():
stats = {
'users': models.Users.select().count(),
'datacenters': models.DataCenters.select().count(),
'servers': models.Servers.select().count(),
'containers': models.Containers.select().count(),
'ips': models.Ips.select().count(),
'vms': models.Vms.select().count(),
'payments': models.UsersBalanceTransactions.select().count(),
}
#
return render_template(
'administrator/dashboard.html', stats=stats)
@viewAdministrator.route('/payments')
@requires_login
def payments():
payments_items = {
'total': models.UsersBalanceTransactions.select().count(),
'items': models.UsersBalanceTransactions.select()
}
return render_template(
'administrator/payments.html',
payments=payments_items
)
@viewAdministrator.route('/datacenters')
@requires_login
def datacenters():
return render_template(
'administrator/datacenters/index.html',
datacenters=ControllerManageDatacenters().items_get())
@viewAdministrator.route('/datacenters/create', methods=['GET', 'POST'])
@requires_login
def datacenters_create():
if request.method == "POST":
# TODO: validate
dc_name = request.form.get('name')
dc_code = request.form.get('code')
dc_country = request.form.get('country')
dc_city = request.form.get('city')
dc_status = request.form.get('status')
# check exists datacenter
if models.DataCenters.select().where(models.DataCenters.name == dc_name).count() == 1:
return redirect(url_for('administrator.datacenters_create'))
models.DataCenters.create(id=uuid4(), code=dc_code, name=dc_name, country=dc_country, city=dc_city, status=dc_status)
return redirect(url_for('administrator.datacenters'))
# todo: ... update record for datacenter
# return url_for('administrator.datacenters_edit', dc_id=dc_id)
# datacenter = models.DataCenters.select().where(models.DataCenters.id == dc_id).limit(1)[0]
return render_template('administrator/datacenters/create.html')
@viewAdministrator.route('/datacenters/edit/<uuid:dc_id>', methods=['GET', 'POST'])
@requires_login
def datacenters_edit(dc_id):
# check exists datacenter
if not ControllerManageDatacenters().check_exists(dc_id):
return redirect(url_for('administrator.datacenters'))
# POST
if request.method == "POST":
# todo: ... update record for datacenter
return redirect(url_for('administrator.datacenters_edit', dc_id=dc_id))
cmd = ControllerManageDatacenters()
return render_template(
'administrator/datacenters/edit.html',
datacenter=cmd.item_get(dc_id)
)
@viewAdministrator.route('/servers/')
@requires_login
def servers_index():
# ca = ControllerAdministrators()
cms = ControllerManageServer()
#
return render_template(
'administrator/servers/index.html',
servers=cms.items_get())
@viewAdministrator.route('/ips/')
@requires_login
def ips_index():
#
return render_template(
'administrator/ips/index.html',
ips=ControllerManageIPs().items_get())
@viewAdministrator.route('/ips/create', methods=['POST', 'GET'])
@requires_login
def ips_create():
#
if request.method == "POST":
print(request.form)
if ControllerManageIPs().is_valid_ipv4_address(request.form['ipv4'])\
and ControllerManageIPs().is_valid_ipv4_address(request.form['ipv4_gateway']):
ControllerManageIPs().item_create(
request.form.get('datacenter'),
request.form.get('server'),
request.form.get('ipv4'),
request.form.get('ipv4_gateway'),
'',
'',
0
)
if ControllerManageIPs().is_valid_ipv6_address(request.form['ipv6'])\
and ControllerManageIPs().is_valid_ipv6_address(request.form['ipv6_gateway']):
ControllerManageIPs().item_create(
request.form['datacenter'],
request.form['server'],
'',
'',
request.form['ipv6'],
request.form['ipv6_gateway'],
0
)
#
return render_template(
'administrator/ips/create.html',
datacenters=ControllerManageDatacenters().items_get(),
servers=ControllerManageServer().items_get()
)
@viewAdministrator.route('/ips/edit/<int:ip_id>', methods=['GET', 'POST'])
@requires_login
def ips_edit(ip_id):
#
if request.method == 'POST':
print(request.form)
# if ControllerManageIPs().is_valid_ipv4_address(request.form['ipv4'])\
# and ControllerManageIPs().is_valid_ipv4_address(request.form['ipv4_gateway']):
ControllerManageIPs().item_update(
request.form['ip_id'],
request.form['server'],
request.form['ipv4'],
request.form['ipv4_gateway'],
request.form['ipv6'],
request.form['ipv6_gateway'],
request.form['status']
)
#
return render_template(
'administrator/ips/edit.html',
servers=ControllerManageServer().items_get(),
ip=ControllerManageIPs().item_get(ip_id)
)
@viewAdministrator.route('/ips/delete', methods=['GET', 'POST'])
def ips_delete():
return render_template(
'administrator/ips/delete.html',
# ip=ControllerManageIPs().i
)
@viewAdministrator.route('/servers/create', methods=['GET', 'POST'])
@requires_login
def servers_create():
if request.method == "POST":
print(request.form)
params = {
'datacenter_id': request.form['datacenter_id'],
'server_id': uuid4(),
'secret': uuid4(),
'hostname': request.form['hostname'],
'ipv4': request.form['ip'],
'status': request.form['status']
}
ControllerManageServer().item_create(
params['datacenter_id'],
params['server_id'],
params['secret'],
params['hostname'],
params['ipv4'],
None,
params['status']
)
dcs = ControllerManageDatacenters().items_get()
return render_template(
'administrator/servers/create.html',
datacenters=dcs
)
@viewAdministrator.route('/servers/edit/<uuid:server_id>.html', methods=['GET', 'POST'])
@requires_login
def server_edit(server_id):
# check exists server
if models.Servers.select().where(models.Servers.id == server_id).count() == 0:
return redirect(url_for('administrator.servers'))
if request.method == "POST":
x = models.Servers.update(status=request.form.get('status', 0)).where(models.Servers.id == server_id)
x.execute()
return redirect(url_for('administrator.server_edit', server_id=server_id))
server_details = models.Servers.select().where(models.Servers.id == server_id).get()
return render_template('administrator/servers/edit.html', server=server_details)
@viewAdministrator.route('/settings/')
@requires_login
def settings_index():
settings_list = {
'total': models.Settings.select().count(),
'items': models.Settings.select()
}
return render_template('administrator/settings/index.html', settings=settings_list)
@viewAdministrator.route('/settings/create', methods=['GET', 'POST'])
@requires_login
def settings_create():
# Обрабатываем POST-запрос
if request.method == 'POST':
# check exists `key`
if models.Settings.select().where(models.Settings.key == request.form['key']).count() == 0:
# Если указанного ключа нет в таблице, то добавляем его
models.Settings.create(key=request.form['key'], val=request.form['val'])
# После добавления записи делаем редирект на страницу со списком параметров
return redirect(url_for('administrator.settings_index'))
# Если предыдущее условие (проверка отсутствия ключа) не сработало,
# то выдаём сообщение об уже существующем ключе
g.errors['total'] += 1
g.errors['items'].append(u"Параметр уже существует")
return render_template('administrator/settings/create.html')
@viewAdministrator.route('/settings/delete', methods=['GET', 'POST'])
@requires_login
def settings_delete():
#
if request.method == 'POST':
delete_set = models.Settings.delete().where(models.Settings.id == request.form['id'])
delete_set.execute()
return redirect(url_for('administrator.settings_index'))
#
setting = models.Settings.select().where(models.Settings.id == request.args['id']).limit(1)[0]
#
return render_template('administrator/settings/delete.html', setting=setting)
@viewAdministrator.route('/settings/update', methods=['GET', 'POST'])
@requires_login
def settings_update():
#
if request.method == 'POST':
uq = models.Settings.update(val=request.form['val']).where(models.Settings.id == request.form['id'])
uq.execute()
return redirect(url_for('administrator.settings_index'))
# get setting parameter by ID
setting = models.Settings.select().where(models.Settings.id == request.args['id']).limit(1)[0]
return render_template('administrator/settings/update.html', setting=setting)
@viewAdministrator.route('/settings/messages/email_test.html', methods=['GET', 'POST'])
@requires_login
def settings_messages_email_test():
email = ControllerMessagesEmail()
email.send(title='test', to='vanzhiganov@ya.ru', lead='qwdqwd', message='qwdqwd', callout='qwdqwd')
# if request.method == 'POST':
# uq = models.Settings.update(val=request.form['val']).where(models.Settings.id == request.form['id'])
# uq.execute()
# return redirect(url_for('administrator.settings_index'))
# get setting parameter by ID
# setting = models.Settings.select().where(models.Settings.id == request.args['id']).limit(1)[0]
return render_template('administrator/settings/messages/email.html')
@viewAdministrator.route('/json/datacenter/list', methods=['GET'])
@requires_login
def json_datacenter_list():
return jsonify(ControllerManageDatacenters().items_get())
@viewAdministrator.route('/plans/', methods=['GET'])
@requires_login
def plans_index():
return render_template(
'administrator/plans/index.html',
plans=ControllerPlans().get())
@viewAdministrator.route('/plans/create.html', methods=['GET', 'POST'])
@requires_login
def plans_create():
if request.method == "POST":
plan_id = str(uuid4())
models.PlansVMs.create(
id=plan_id,
name=request.form.get('name'),
price=request.form.get('price'),
status=request.form.get('status'),
storage=request.form.get('storage'),
swap=request.form.get('swap'),
memory=request.form.get('memory'),
cores=request.form.get('cores'),
)
flash('Plan has been created')
return redirect(url_for('administrator.plans_index'))
return render_template('administrator/plans/create.html')
@viewAdministrator.route('/plans/<uuid:plan_id>/edit.html', methods=['GET', 'POST'])
@requires_login
def plan_edit(plan_id):
# check exists plan
if models.PlansVMs.select().where(models.PlansVMs.id == plan_id).count() == 0:
return redirect(url_for('administrator.plans_index'))
if request.method == 'POST':
x = models.PlansVMs.get(models.PlansVMs.id == plan_id)
x.name = request.form.get('name')
x.status = request.form.get('status')
x.price = request.form.get('price')
x.storage = request.form.get('storage')
x.swap = request.form.get('swap')
x.memory = request.form.get('memory')
x.cores = request.form.get('cores')
x.save()
return redirect(url_for('administrator.plans_index'))
return render_template(
'administrator/plans/edit.html',
plan_details=models.PlansVMs.select().where(models.PlansVMs.id == plan_id).get())