statistics report function added

This commit is contained in:
Vyacheslav Anzhiganov 2016-10-09 22:59:21 +03:00
parent a45384be4a
commit b5d77d553e
13 changed files with 902 additions and 721 deletions

View file

@ -1,303 +1,9 @@
# coding: utf-8 # coding: utf-8
import sys
import json
import os
import subprocess
import commands
import requests
from SWSCloudNode.settings import settings from SWSCloudNode.settings import settings
from SWSCloudNode.logger import logging from SWSCloudNode.compute.lxc import LXC
from SWSCloudNode import lxc from SWSCloudNode.compute.qemu import Qemu
from SWSCloudNode.compute.qemu import QemuStats
from .node import Node, StatisticsReporter
class Tasks: from .tasks import Tasks
def __init__(self):
self.endpoint = settings.get('server', 'endpoint')
self.id = settings.get('server', 'id')
self.secret = settings.get('server', 'secret')
def get_item(self):
try:
response = requests.get('%s/server_api/task' % self.endpoint, auth=(self.id, self.secret))
except Exception as e:
logging.error('no connection with %s' % self.endpoint)
return None
else:
if response.status_code == 200:
return response.json()
logging.error("Unexpected status code: %d" % response.status_code)
return None
class Node:
def tasks_get(self):
try:
response = requests.get(
'%s/server_api/tasks' % settings.get('server', 'endpoint'),
auth=(settings.get('server', 'id'), settings.get('server', 'secret'))
)
except Exception as e:
sys.exit('no connection with %s' % settings.get('server', 'endpoint'))
# print e
else:
return {
'status': response.status_code,
'results': response.json()
}
def task_status_update(self, task_id, status):
response = requests.put(
'%s/server_api/tasks/%s' % (
settings.get('server', 'endpoint'),
task_id
),
auth=(
settings.get('server', 'id'),
settings.get('server', 'secret'),
),
data={
"status": status
}
)
return response.json()
def report_container_stats(self, container_id, statistics):
response = requests.post(
'%s/server_api/containers/stats/%s' % (
settings.get('server', 'endpoint'),
container_id
),
auth=(settings.get('server', 'id'), settings.get('server', 'secret')),
data={
'status': json.dumps(statistics)
}
)
if response.status_code == 200:
return response.json()
return False
# TODO: подумать куда переместить
def container_config_create(self, container_id, link, ipv4, ipv6):
cfg = list()
cfg.append("lxc.network.type = veth")
cfg.append("lxc.network.flags = up")
cfg.append("lxc.network.name = eth0")
cfg.append("lxc.network.link = %s" % link)
if ipv4['ipv4']:
# cfg.append('lxc.network.ipv4 = %s/32' % ipv4['ipv4'])
cfg.append('lxc.network.ipv4 = %s' % ipv4['ipv4'])
cfg.append('lxc.network.ipv4.gateway = %s' % ipv4['ipv4_gateway'])
if 'ipv6' in ipv6 and 'ipv6_gateway' in ipv6:
# cfg.append('lxc.network.ipv6 = %s/64' % ipv6['ipv6'])
cfg.append('lxc.network.ipv6 = %s' % ipv6['ipv6'])
cfg.append('lxc.network.ipv6.gateway = %s', ipv6['ipv6_gateway'])
config_file = '/var/lib/gocloud/node/configs/%s.config' % container_id
cfg_file = open(config_file, 'w')
cfg_file.write('\n'.join(cfg))
cfg_file.write('\n')
cfg_file.close()
return True
def container_authkey_create(self, container_id, auth_key):
# create ssh_key.pub
authkey_file = '/var/lib/gocloud/node/auth-keys/%s.pub' % container_id
ak = open(authkey_file, 'w')
ak.write(auth_key)
ak.write('\n')
ak.close()
return True
class Task(Node):
def interface2ip(self):
# intf = open(self.settings['proxy_interface'], 'r').read().split('\n')[0]
interface = "eth0"
intf_ip = commands.getoutput("ip address show dev " + interface).split()
intf_ip = intf_ip[intf_ip.index('inet') + 1].split('/')[0]
return intf_ip
def container_create(self, task):
return True
def container_destroy(self, task):
# check exists container name
if lxc.lxc().exists(task['parameters']['hostname']):
# todo: if hostname already exists then node callback to server to rename container
return False
lxc.lxc().destroy(task['parameters']['hostname'])
# TODO: check status
# delete record from dnsmasq
# dnsmasq.Dnsmasq().delete(task['parameters']['hostname'])
return True
def container_start(self, task):
logging.debug("container_start")
print "================ "
print task
lxc.lxc().start(task['parameters']['hostname'])
# TODO: check status
return True
def container_stop(self, task):
logging.debug("container_stop")
lxc.lxc().stop(task['parameters']['hostname'])
# TODO: check status
return True
def container_restart(self, task):
logging.debug("container_restart")
lxc.lxc().stop(task['parameters']['hostname'])
lxc.lxc().start(task['parameters']['hostname'])
# TODO: check status
return True
def init(self):
task = TCPClient().request(Request().build("task_get", self.request_auth, None))
# check exists element 'version'
if not "version" in task:
logging.error("Response not contain 'version' element")
return False
if task['version'] == "1.0":
if task['status'] == 0:
if task['method'] == "container_create":
# create container
result = Task().container_create(task)
if not result:
return False
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
# todo: hold job if status not 0
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_destroy":
# delete hostname from dnsmasq /etc/lxc/dnsmasq.conf
result = self.container_destroy(task)
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
# todo: held job if status not 0
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_start":
self.container_start(task)
# todo: held job if status not 0
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_stop":
self.container_stop(task)
# TODO: held job if status not 0
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_restart":
self.container_restart(task)
# todo: held job if status not 0
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_clone":
logging.debug("container_clone")
# TODO: set clone_IP to DNSMASQ
subprocess.call("/usr/bin/lxc-clone -o %(hostname)s -n %(clone_hostname)s" % task['parameters'], shell=True)
# TODO: check container status
# todo: held job if status not 0
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_mx_add":
logging.debug("create container")
# todo: held job if status not 0
if self.Request_tast_update(task['task_id'])['status'] == 0:
return True
return False
if task['method'] == "service_mx_delete":
logging.debug("create container")
# todo: held job if status not 0
if self.Request_tast_update(task['task_id'])['status'] == 0:
return True
return False
if task['method'] == "service_web_add":
logging.debug("create container")
# todo: held job if status not 0
nginx.Nginx().vhost_add(task['parameters']['vhost_id'], task['parameters']['vhost'], task['parameters']['container_ip'])
nginx.Service().reload()
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_web_delete":
logging.debug("service_web_delete")
# todo: held job if status not 0
print
print task
nginx.Nginx().vhost_delete(task['parameters']['container_ip'], task['parameters']['vhost_id'])
nginx.Service().reload()
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_web_update":
logging.debug("create container")
# todo: held job if status not 0
if self.Request_tast_update(task['task_id'])['status'] == 0:
return True
return False
if task['method'] == "service_ssh_allow":
logging.debug("service_ssh_allow")
# todo: held job if status not 0
values = (self.interface2ip(), task['parameters']['port'], task['parameters']['container_ip'], 22)
os.popen("ufw allow %s" % task['parameters']['port'])
os.popen("iptables -t nat -I PREROUTING -p tcp -d %s --dport %s -j DNAT --to %s:%s" % values)
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_ssh_deny":
logging.debug("service_ssh_deny")
# todo: held job if status not 0
logging.debug("------")
logging.debug(task)
os.popen("ufw deny %s" % task['parameters']['port'])
logging.debug("------")
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
else:
if task['status'] == 4:
print "auth fail"
else:
print "structure version not supported"
return None

View file

@ -99,3 +99,9 @@ class Common:
"request": json.dumps(request) "request": json.dumps(request)
} }
return urllib.urlopen(server_url, urllib.urlencode(params)).read() return urllib.urlopen(server_url, urllib.urlencode(params)).read()
@staticmethod
def is_root_user():
if os.getuid() == 0:
return True
return False

View file

View file

@ -0,0 +1 @@
from .lxc import LXC

View file

@ -1,407 +1,407 @@
# coding: utf-8 # coding: utf-8
import subprocess import subprocess
import logging import logging
import threading import threading
import select import select
import pty import pty
import os import os
import signal import signal
class ContainerAlreadyExists(Exception): class ContainerAlreadyExists(Exception):
pass pass
class ContainerAlreadyRunning(Exception): class ContainerAlreadyRunning(Exception):
pass pass
class ContainerNotExists(Exception): class ContainerNotExists(Exception):
pass pass
_logger = logging.getLogger("pylxc") _logger = logging.getLogger("pylxc")
_monitor = None _monitor = None
class _LXCMonitor(threading.Thread): class _LXCMonitor(threading.Thread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self._process = None self._process = None
self._monitors = {} self._monitors = {}
def run(self): def run(self):
master, slave = pty.openpty() master, slave = pty.openpty()
cmd = ['/usr/bin/lxc-monitor', '-n', '.*'] cmd = ['/usr/bin/lxc-monitor', '-n', '.*']
self._process = subprocess.Popen(cmd, stdout=slave, bufsize=1) self._process = subprocess.Popen(cmd, stdout=slave, bufsize=1)
stdout = os.fdopen(master) stdout = os.fdopen(master)
while self._process.poll() is None: while self._process.poll() is None:
ready, _, _ = select.select([stdout], [], [], 0.1) ready, _, _ = select.select([stdout], [], [], 0.1)
if ready: if ready:
logging.debug("Waiting for state change") logging.debug("Waiting for state change")
state = stdout.readline() state = stdout.readline()
inf = state.strip().split() inf = state.strip().split()
container = inf[0].strip("'") container = inf[0].strip("'")
state = inf[-1].strip('[]') state = inf[-1].strip('[]')
if container in self._monitors: if container in self._monitors:
logging.debug("State of container '%s' changed to '%s'", container, state) logging.debug("State of container '%s' changed to '%s'", container, state)
self._monitors[container](state) self._monitors[container](state)
_logger.info("LXC Monitor stopped!") _logger.info("LXC Monitor stopped!")
def add_monitor(self, name, callback): def add_monitor(self, name, callback):
self._monitors[name] = callback self._monitors[name] = callback
def rm_monitor(self, name): def rm_monitor(self, name):
self._monitors.pop(name) self._monitors.pop(name)
def is_monitored(self, name): def is_monitored(self, name):
return name in self._monitors return name in self._monitors
def kill(self): def kill(self):
try: try:
self._process.terminate() self._process.terminate()
self._process.wait() self._process.wait()
except: except:
pass pass
self.join() self.join()
class lxc(): class LXC(object):
def __init__(self): def __init__(self):
logging.debug("") logging.debug("")
def list(self, status=None): def list(self, status=None):
""" """
:return: ['container_first', 'container_second'] :return: ['container_first', 'container_second']
""" """
if status in ['active', 'frozen', 'running', 'stopped', 'nesting']: if status in ['active', 'frozen', 'running', 'stopped', 'nesting']:
path = "--%s" % status path = "--%s" % status
else: else:
path = "" path = ""
cmd = ['/usr/bin/lxc-ls', path] cmd = ['/usr/bin/lxc-ls', path]
out = subprocess.check_output(cmd).splitlines() out = subprocess.check_output(cmd).splitlines()
# print out # print out
return out return out
def exists(self, name): def exists(self, name):
""" """
checks if a given container is defined or not checks if a given container is defined or not
""" """
if name in self.list(): if name in self.list():
return True return True
return False return False
def start(self, name, config_file=None): def start(self, name, config_file=None):
""" """
starts a container in daemon mode starts a container in daemon mode
""" """
if not self.exists(name): if not self.exists(name):
raise ContainerNotExists("The container (%s) does not exist!" % name) raise ContainerNotExists("The container (%s) does not exist!" % name)
if name in self.list("running"): if name in self.list("running"):
raise ContainerAlreadyRunning('The container %s is already started!' % name) raise ContainerAlreadyRunning('The container %s is already started!' % name)
cmd = ['lxc-start', '-n', name, '-d'] cmd = ['lxc-start', '-n', name, '-d']
if config_file: if config_file:
cmd += ['-f', config_file] cmd += ['-f', config_file]
return subprocess.check_call(cmd) return subprocess.check_call(cmd)
def stop(self, name): def stop(self, name):
""" """
stops a container stops a container
""" """
if not self.exists(name): if not self.exists(name):
raise ContainerNotExists("The container (%s) does not exist!" % name) raise ContainerNotExists("The container (%s) does not exist!" % name)
cmd = ['/usr/bin/lxc-stop', '-n', name] cmd = ['/usr/bin/lxc-stop', '-n', name]
try: try:
result = subprocess.check_call(cmd) result = subprocess.check_call(cmd)
return True return True
except Exception as e: except Exception as e:
return False return False
def destroy(self, name): def destroy(self, name):
""" """
removes a container [stops a container if it's running and] removes a container [stops a container if it's running and]
raises ContainerNotExists exception if the specified name is not created raises ContainerNotExists exception if the specified name is not created
""" """
if not self.exists(name): if not self.exists(name):
raise ContainerNotExists("The container (%s) does not exist!" % name) raise ContainerNotExists("The container (%s) does not exist!" % name)
# todo: check status. If status not STOPPED - run method self.stop(name) # todo: check status. If status not STOPPED - run method self.stop(name)
# todo: add condition # todo: add condition
self.stop(name) self.stop(name)
cmd = ['/usr/bin/lxc-destroy', '-f', '-n', name] cmd = ['/usr/bin/lxc-destroy', '-f', '-n', name]
return subprocess.check_call(cmd) return subprocess.check_call(cmd)
def info(self, name): def info(self, name):
""" """
returns info dict about the specified container returns info dict about the specified container
""" """
# #
if not self.exists(name): if not self.exists(name):
raise ContainerNotExists("The container (%s) does not exist!" % name) raise ContainerNotExists("The container (%s) does not exist!" % name)
# #
cmd = ['/usr/bin/lxc-info', '-n', name, "-H"] cmd = ['/usr/bin/lxc-info', '-n', name, "-H"]
out = subprocess.check_output(cmd).splitlines() out = subprocess.check_output(cmd).splitlines()
clean = [] clean = []
info = {} info = {}
# #
for line in out: for line in out:
if line not in clean: if line not in clean:
clean.append(line) clean.append(line)
# #
for line in clean: for line in clean:
key, value = line.split(":") key, value = line.split(":")
# strip # strip
key = key.lstrip() key = key.lstrip()
value = value.lstrip() value = value.lstrip()
# #
key = key.replace(" ", "_") key = key.replace(" ", "_")
info[key.lower()] = value info[key.lower()] = value
# get container size # get container size
info['size'] = self.__get_container_size(name) info['size'] = self.__get_container_size(name)
return info return info
def __get_container_size(self, name): def __get_container_size(self, name):
cmd = ['/usr/bin/du', '--total', '-s', '/var/lib/lxc/%s' % name] cmd = ['/usr/bin/du', '--total', '-s', '/var/lib/lxc/%s' % name]
out = subprocess.check_output(cmd).splitlines() out = subprocess.check_output(cmd).splitlines()
size = 0 size = 0
for l in out: for l in out:
key, value = l.split('\t') key, value = l.split('\t')
if value == 'total': if value == 'total':
size = key size = key
return int(key) return int(key)
def freeze(self, name): def freeze(self, name):
""" """
freezes the container freezes the container
""" """
if not self.exists(name): if not self.exists(name):
raise ContainerNotExists("The container (%s) does not exist!" % name) raise ContainerNotExists("The container (%s) does not exist!" % name)
cmd = ['/usr/bin/lxc-freeze', '-n', name] cmd = ['/usr/bin/lxc-freeze', '-n', name]
subprocess.check_call(cmd) subprocess.check_call(cmd)
def unfreeze(self, name): def unfreeze(self, name):
""" """
unfreezes the container unfreezes the container
""" """
if not self.exists(name): if not self.exists(name):
raise ContainerNotExists("The container (%s) does not exist!" % name) raise ContainerNotExists("The container (%s) does not exist!" % name)
cmd = ['lxc-unfreeze', '-n', name] cmd = ['lxc-unfreeze', '-n', name]
subprocess.check_call(cmd) subprocess.check_call(cmd)
def notify(self, name, states, callback): def notify(self, name, states, callback):
""" """
executes the callback function with no parameters when the container reaches the specified state or states executes the callback function with no parameters when the container reaches the specified state or states
states can be or-ed or and-ed states can be or-ed or and-ed
notify('test', 'STOPPED', letmeknow) notify('test', 'STOPPED', letmeknow)
notify('test', 'STOPPED|RUNNING', letmeknow) notify('test', 'STOPPED|RUNNING', letmeknow)
""" """
if not self.exists(name): if not self.exists(name):
raise ContainerNotExists("The container (%s) does not exist!" % name) raise ContainerNotExists("The container (%s) does not exist!" % name)
cmd = ['lxc-wait', '-n', name, '-s', states] cmd = ['lxc-wait', '-n', name, '-s', states]
def th(): def th():
subprocess.check_call(cmd) subprocess.check_call(cmd)
callback() callback()
_logger.info("Waiting on states %s for container %s", states, name) _logger.info("Waiting on states %s for container %s", states, name)
threading.Thread(target=th).start() threading.Thread(target=th).start()
def checkconfig(self): def checkconfig(self):
""" """
returns the output of lxc-checkconfig returns the output of lxc-checkconfig
""" """
cmd = ['lxc-checkconfig'] cmd = ['lxc-checkconfig']
return subprocess.check_output(cmd).replace('[1;32m', '').replace('[1;33m', '').replace('[0;39m', '').replace('[1;32m', '').replace(' ', '').split('\n') return subprocess.check_output(cmd).replace('[1;32m', '').replace('[1;33m', '').replace('[0;39m', '').replace('[1;32m', '').replace(' ', '').split('\n')
def create(self, name, config_file=None, template=None, backing_store=None, template_options=None): def create(self, name, config_file=None, template=None, backing_store=None, template_options=None):
""" """
Create a new container Create a new container
raises ContainerAlreadyExists exception if the container name is reserved already. raises ContainerAlreadyExists exception if the container name is reserved already.
:param template_options: Options passed to the specified template :param template_options: Options passed to the specified template
:type template_options: list or None :type template_options: list or None
""" """
if self.exists(name): if self.exists(name):
raise ContainerAlreadyExists("The Container %s is already created!" % name) raise ContainerAlreadyExists("The Container %s is already created!" % name)
command = list() command = list()
command.append("lxc-create -n %s" % name) command.append("lxc-create -n %s" % name)
if config_file: if config_file:
command.append(' -f %s' % config_file) command.append(' -f %s' % config_file)
if template: if template:
command.append(' -t %s' % template) command.append(' -t %s' % template)
if backing_store: if backing_store:
command.append(' -B %s' % backing_store) command.append(' -B %s' % backing_store)
if template_options: if template_options:
command.append(' -- %s' % template_options) command.append(' -- %s' % template_options)
print " ".join(command) print " ".join(command)
print print
# create = subprocess.check_call(command, shell=True) # create = subprocess.check_call(command, shell=True)
create = subprocess.check_call(" ".join(command), shell=True) create = subprocess.check_call(" ".join(command), shell=True)
print print
print create print create
print print
# if create == 0: # if create == 0:
# if not self.exists(name): # if not self.exists(name):
# _logger.critical("The Container %s doesn't seem to be created! (options: %s)", name, command[3:]) # _logger.critical("The Container %s doesn't seem to be created! (options: %s)", name, command[3:])
# raise ContainerNotExists("The container (%s) does not exist!" % name) # raise ContainerNotExists("The container (%s) does not exist!" % name)
# #
# _logger.info("Container %s has been created with options %s", name, command[3:]) # _logger.info("Container %s has been created with options %s", name, command[3:])
# return False # return False
return True return True
def reset_password(self, container_name, username, password): def reset_password(self, container_name, username, password):
call = [ call = [
'echo', 'echo',
'"%s:${PASSWORD:-%s}"' % (username, password), '"%s:${PASSWORD:-%s}"' % (username, password),
"|", "|",
"chroot", "chroot",
"/var/lib/lxc/%s/rootfs/ chpasswd" % container_name "/var/lib/lxc/%s/rootfs/ chpasswd" % container_name
] ]
subprocess.check_call(call, shell=True) subprocess.check_call(call, shell=True)
# subprocess.call("echo \"ubuntu:${PASSWORD:-%(password)s}\" | chroot /var/lib/lxc/%(hostname)s/rootfs/ chpasswd" % task['parameters'], shell=True) # subprocess.call("echo \"ubuntu:${PASSWORD:-%(password)s}\" | chroot /var/lib/lxc/%(hostname)s/rootfs/ chpasswd" % task['parameters'], shell=True)
return True return True
# def running(): # def running():
# ''' # '''
# returns a list of the currently running containers # returns a list of the currently running containers
# ''' # '''
# return all_as_dict()['Running'] # return all_as_dict()['Running']
# def stopped(): # def stopped():
# ''' # '''
# returns a list of the stopped containers # returns a list of the stopped containers
# ''' # '''
# return all_as_dict()['Stopped'] # return all_as_dict()['Stopped']
# def all_as_dict(): # def all_as_dict():
# ''' # '''
# returns a dict {'Running': ['cont1', 'cont2'], # returns a dict {'Running': ['cont1', 'cont2'],
# 'Stopped': ['cont3', 'cont4'] # 'Stopped': ['cont3', 'cont4']
# } # }
# #
# ''' # '''
# cmd = ['lxc-ls'] # cmd = ['lxc-ls']
# out = subprocess.check_output(cmd).splitlines() # out = subprocess.check_output(cmd).splitlines()
# print out # print out
# stopped = [] # stopped = []
# running = [] # running = []
# frozen = [] # frozen = []
# current = None # current = None
# for c in out: # for c in out:
# c = c.strip() # c = c.strip()
# if c == 'RUNNING': # if c == 'RUNNING':
# current = running # current = running
# continue # continue
# if c == 'STOPPED': # if c == 'STOPPED':
# current = stopped # current = stopped
# continue # continue
# if c == 'FROZEN': # if c == 'FROZEN':
# current = frozen # current = frozen
# continue # continue
# if not len(c): # if not len(c):
# continue # continue
# current.append(c) # current.append(c)
# return {'Running': running, # return {'Running': running,
# 'Stopped': stopped, # 'Stopped': stopped,
# 'Frozen': frozen} # 'Frozen': frozen}
# def all_as_list(): # def all_as_list():
# ''' # '''
# returns a list of all defined containers # returns a list of all defined containers
# ''' # '''
# as_dict = all_as_dict() # as_dict = all_as_dict()
# containers = as_dict['Running'] + as_dict['Frozen'] + as_dict['Stopped'] # containers = as_dict['Running'] + as_dict['Frozen'] + as_dict['Stopped']
# containers_list = [] # containers_list = []
# for i in containers: # for i in containers:
# i = i.replace(' (auto)', '') # i = i.replace(' (auto)', '')
# containers_list.append(i) # containers_list.append(i)
# return containers_list # return containers_list
# def kill(name, signal): # def kill(name, signal):
# ''' # '''
# sends a kill signal to process 1 of ths container <name> # sends a kill signal to process 1 of ths container <name>
# :param signal: numeric signal # :param signal: numeric signal
# ''' # '''
# if not exists(name): # if not exists(name):
# raise ContainerNotExists("The container (%s) does not exist!" % name) # raise ContainerNotExists("The container (%s) does not exist!" % name)
# cmd = ['lxc-kill', '--name=%s' % name, signal] # cmd = ['lxc-kill', '--name=%s' % name, signal]
# subprocess.check_call(cmd) # subprocess.check_call(cmd)
# def shutdown(name, wait=False, reboot=False): # def shutdown(name, wait=False, reboot=False):
# ''' # '''
# graceful shutdown sent to the container # graceful shutdown sent to the container
# :param wait: should we wait for the shutdown to complete? # :param wait: should we wait for the shutdown to complete?
# :param reboot: reboot a container, ignores wait # :param reboot: reboot a container, ignores wait
# ''' # '''
# if not exists(name): # if not exists(name):
# raise ContainerNotExists("The container (%s) does not exist!" % name) # raise ContainerNotExists("The container (%s) does not exist!" % name)
# cmd = ['lxc-shutdown', '-n', name] # cmd = ['lxc-shutdown', '-n', name]
# if wait: # if wait:
# cmd += ['-w'] # cmd += ['-w']
# if reboot: # if reboot:
# cmd += ['-r'] # cmd += ['-r']
# #
# subprocess.check_call(cmd) # subprocess.check_call(cmd)
# def monitor(name, callback): # def monitor(name, callback):
# ''' # '''
# monitors actions on the specified container, # monitors actions on the specified container,
# callback is a function to be called on # callback is a function to be called on
# ''' # '''
# global _monitor # global _monitor
# if not exists(name): # if not exists(name):
# raise ContainerNotExists("The container (%s) does not exist!" % name) # raise ContainerNotExists("The container (%s) does not exist!" % name)
# if _monitor: # if _monitor:
# if _monitor.is_monitored(name): # if _monitor.is_monitored(name):
# raise Exception("You are already monitoring this container (%s)" % name) # raise Exception("You are already monitoring this container (%s)" % name)
# else: # else:
# _monitor = _LXCMonitor() # _monitor = _LXCMonitor()
# logging.info("Starting LXC Monitor") # logging.info("Starting LXC Monitor")
# _monitor.start() # _monitor.start()
# def kill_handler(sg, fr): # def kill_handler(sg, fr):
# stop_monitor() # stop_monitor()
# signal.signal(signal.SIGTERM, kill_handler) # signal.signal(signal.SIGTERM, kill_handler)
# signal.signal(signal.SIGINT, kill_handler) # signal.signal(signal.SIGINT, kill_handler)
# _monitor.add_monitor(name, callback) # _monitor.add_monitor(name, callback)
# def unmonitor(name): # def unmonitor(name):
# if not exists(name): # if not exists(name):
# raise ContainerNotExists("The container (%s) does not exist!" % name) # raise ContainerNotExists("The container (%s) does not exist!" % name)
# if not _monitor: # if not _monitor:
# raise Exception("LXC Monitor is not started!") # raise Exception("LXC Monitor is not started!")
# if not _monitor.is_monitored(name): # if not _monitor.is_monitored(name):
# raise Exception("This container (%s) is not monitored!" % name) # raise Exception("This container (%s) is not monitored!" % name)
# _monitor.rm_monitor(name) # _monitor.rm_monitor(name)
# def stop_monitor(): # def stop_monitor():
# global _monitor # global _monitor
# if _monitor: # if _monitor:
# logging.info("Killing LXC Monitor") # logging.info("Killing LXC Monitor")
# _monitor.kill() # _monitor.kill()
# _monitor = None # _monitor = None
# signal.signal(signal.SIGTERM, signal.SIG_DFL) # signal.signal(signal.SIGTERM, signal.SIG_DFL)
# signal.signal(signal.SIGINT, signal.SIG_DFL) # signal.signal(signal.SIGINT, signal.SIG_DFL)

View file

@ -0,0 +1,2 @@
from .qemu import Qemu
from .stats import QemuStats

View file

@ -6,7 +6,7 @@ import subprocess
from SWSCloudNode.common import Common from SWSCloudNode.common import Common
class QEMU: class Qemu:
def __init__(self): def __init__(self):
# qemu+ssh://root@laforge.usersys.redhat.com/system # qemu+ssh://root@laforge.usersys.redhat.com/system
self.conn = libvirt.open("qemu:///system") self.conn = libvirt.open("qemu:///system")

View file

@ -0,0 +1,81 @@
# Get the network I/O statistics
# http://libvirt.org/docs/libvirt-appdev-guide-python/en-US/html/libvirt_application_development_guide_using_python-Guest_Domains-Monitoring-IO_stats.html
from __future__ import print_function
import sys
import libvirt
from xml.etree import ElementTree
class QemuStats(object):
def __init__(self):
self.conn = libvirt.open('qemu:///system')
def __del__(self):
self.conn.close()
def network(self, dom):
results = dict(
read_bytes=0, read_packets=0, read_errors=0, read_drops=0,
write_bytes=0, write_packets=0, write_errors=0, write_drops=0)
dom = self.conn.lookupByName(dom)
# dom = conn.lookupByID(5)
if dom == None:
return results
tree = ElementTree.fromstring(dom.XMLDesc())
iface = tree.find('devices/interface/target').get('dev')
stats = dom.interfaceStats(iface)
results['read_bytes'] = str(stats[0])
results['read_packets'] = str(stats[1])
results['read_errors'] = str(stats[2])
results['read_drops'] = str(stats[3])
results['write_drops'] = str(stats[4])
results['write_packets'] = str(stats[5])
results['write_errors'] = str(stats[6])
results['write_drops'] = str(stats[7])
return results
def cpu(self, dom):
# http://libvirt.org/docs/libvirt-appdev-guide-python/en-US/html/libvirt_application_development_guide_using_python-Guest_Domains-Monitoring-vCPU.html
results = dict(cpu_time=0, system_time=0, user_time=0)
if self.conn == None:
print('Failed to open connection to qemu:///system', file=sys.stderr)
return results
# dom = conn.lookupByID(5)
dom = self.conn.lookupByName(dom)
if dom == None:
print('Failed to find the domain '+domName, file=sys.stderr)
return results
stats = dom.getCPUStats(True)
results['cpu_time'] = str(stats[0]['cpu_time'] / 100000)
results['system_time'] = str(stats[0]['system_time'] / 100000)
results['user_time'] = str(stats[0]['user_time'] / 10000)
return results
def memory(self, dom):
# http://libvirt.org/docs/libvirt-appdev-guide-python/en-US/html/libvirt_application_development_guide_using_python-Guest_Domains-Monitoring-Memory.html
results = dict()
if self.conn == None:
print('Failed to open connection to qemu:///system', file=sys.stderr)
return results
# dom = conn.lookupByID(5)
dom = self.conn.lookupByName(dom)
if dom == None:
print('Failed to find the domain '+domName, file=sys.stderr)
return results
stats = dom.memoryStats()
# print('memory used:')
for name in stats:
results[name] = stats[name]
# print(' '+str(stats[name])+' ('+name+')')
return results

118
SWSCloudNode/node.py Normal file
View file

@ -0,0 +1,118 @@
# coding: utf-8
import sys
import json
import requests
from SWSCloudNode.settings import settings
class Node(object):
def tasks_get(self):
try:
response = requests.get(
'%s/server_api/tasks' % settings.get('server', 'endpoint'),
auth=(settings.get('server', 'id'), settings.get('server', 'secret')))
except Exception as e:
sys.exit('no connection with %s' % settings.get('server', 'endpoint'))
else:
return dict(status=response.status_code, results=response.json())
def task_status_update(self, task_id, status):
response = requests.put(
'%s/server_api/tasks/%s' % (
settings.get('server', 'endpoint'),
task_id
),
auth=(
settings.get('server', 'id'),
settings.get('server', 'secret'),
),
data={
"status": status
}
)
return response.json()
def report_container_stats(self, container_id, statistics):
response = requests.post(
'%s/server_api/containers/stats/%s' % (
settings.get('server', 'endpoint'),
container_id
),
auth=(settings.get('server', 'id'), settings.get('server', 'secret')),
data={
'status': json.dumps(statistics)
}
)
if response.status_code == 200:
return response.json()
return False
# TODO: подумать куда переместить
def container_config_create(self, container_id, link, ipv4, ipv6):
"""
:param container_id:
:param link:
:param ipv4:
:param ipv6:
:return:
"""
cfg = list()
cfg.append("lxc.network.type = veth")
cfg.append("lxc.network.flags = up")
cfg.append("lxc.network.name = eth0")
cfg.append("lxc.network.link = %s" % link)
if ipv4['ipv4']:
# cfg.append('lxc.network.ipv4 = %s/32' % ipv4['ipv4'])
cfg.append('lxc.network.ipv4 = %s' % ipv4['ipv4'])
cfg.append('lxc.network.ipv4.gateway = %s' % ipv4['ipv4_gateway'])
if 'ipv6' in ipv6 and 'ipv6_gateway' in ipv6:
# cfg.append('lxc.network.ipv6 = %s/64' % ipv6['ipv6'])
cfg.append('lxc.network.ipv6 = %s' % ipv6['ipv6'])
cfg.append('lxc.network.ipv6.gateway = %s', ipv6['ipv6_gateway'])
config_file = '/var/lib/gocloud/node/configs/%s.config' % container_id
cfg_file = open(config_file, 'w')
cfg_file.write('\n'.join(cfg))
cfg_file.write('\n')
cfg_file.close()
return True
def container_authkey_create(self, container_id, auth_key):
# create ssh_key.pub
authkey_file = '/var/lib/gocloud/node/auth-keys/%s.pub' % container_id
ak = open(authkey_file, 'w')
ak.write(auth_key)
ak.write('\n')
ak.close()
return True
class StatisticsReporter(object):
@staticmethod
def send_vm_statistics(vm_id, data):
response = requests.post(
'%s/stats/v1/compute/vms/%s' % (
settings.get('statistics', 'endpoint'), vm_id),
# TODO: node auth
# auth=(settings.get('server', 'id'), settings.get('server', 'secret')),
json=data)
if response.status_code == 200:
return response.json()
return False
@staticmethod
def send_containers_statistics(vm_id, data):
response = requests.post(
'%s/stats/v1/compute/containers/%s' % (
settings.get('statistics', 'endpoint'), vm_id),
# TODO: node auth
# auth=(settings.get('server', 'id'), settings.get('server', 'secret')),
json=data)
if response.status_code == 200:
return response.json()
return False

243
SWSCloudNode/tasks.py Normal file
View file

@ -0,0 +1,243 @@
# coding: utf-8
import os
import subprocess
import commands
import requests
from SWSCloudNode.logger import logging
from SWSCloudNode.settings import settings
from SWSCloudNode.logger import logging
ALLOWED_TASKS = [
'container_delete', 'container_create', 'container_start', 'container_stop', 'container_restart',
'vm_create', 'vm_delete', 'vm_start', 'vm_stop', 'vm_restart',
]
class Tasks:
def __init__(self):
self.endpoint = settings.get('server', 'endpoint')
self.id = settings.get('server', 'id')
self.secret = settings.get('server', 'secret')
@staticmethod
def is_allowed_task(task):
if task in ALLOWED_TASKS:
return True
return False
def get_item(self):
try:
response = requests.get('%s/server_api/task' % self.endpoint, auth=(self.id, self.secret))
except Exception as e:
logging.error('no connection with %s' % self.endpoint)
return None
else:
if response.status_code == 200:
return response.json()
logging.error("Unexpected status code: %d" % response.status_code)
return None
def interface2ip(self):
# intf = open(self.settings['proxy_interface'], 'r').read().split('\n')[0]
interface = "eth0"
intf_ip = commands.getoutput("ip address show dev " + interface).split()
intf_ip = intf_ip[intf_ip.index('inet') + 1].split('/')[0]
return intf_ip
def container_create(self, task):
return True
def container_destroy(self, task):
# check exists container name
if lxc.lxc().exists(task['parameters']['hostname']):
# todo: if hostname already exists then node callback to server to rename container
return False
lxc.lxc().destroy(task['parameters']['hostname'])
# TODO: check status
# delete record from dnsmasq
# dnsmasq.Dnsmasq().delete(task['parameters']['hostname'])
return True
def container_start(self, task):
logging.debug("container_start")
print "================ "
print task
lxc.lxc().start(task['parameters']['hostname'])
# TODO: check status
return True
def container_stop(self, task):
logging.debug("container_stop")
lxc.lxc().stop(task['parameters']['hostname'])
# TODO: check status
return True
def container_restart(self, task):
logging.debug("container_restart")
lxc.lxc().stop(task['parameters']['hostname'])
lxc.lxc().start(task['parameters']['hostname'])
# TODO: check status
return True
def init(self):
task = TCPClient().request(Request().build("task_get", self.request_auth, None))
# check exists element 'version'
if not "version" in task:
logging.error("Response not contain 'version' element")
return False
if task['version'] == "1.0":
if task['status'] == 0:
if task['method'] == "container_create":
# create container
result = Task().container_create(task)
if not result:
return False
task_update_result = TCPClient().request(
Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
# todo: hold job if status not 0
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_destroy":
# delete hostname from dnsmasq /etc/lxc/dnsmasq.conf
result = self.container_destroy(task)
task_update_result = TCPClient().request(
Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
# todo: held job if status not 0
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_start":
self.container_start(task)
# todo: held job if status not 0
task_update_result = TCPClient().request(
Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_stop":
self.container_stop(task)
# TODO: held job if status not 0
task_update_result = TCPClient().request(
Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_restart":
self.container_restart(task)
# todo: held job if status not 0
task_update_result = TCPClient().request(
Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_clone":
logging.debug("container_clone")
# TODO: set clone_IP to DNSMASQ
subprocess.call("/usr/bin/lxc-clone -o %(hostname)s -n %(clone_hostname)s" % task['parameters'],
shell=True)
# TODO: check container status
# todo: held job if status not 0
task_update_result = TCPClient().request(
Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_mx_add":
logging.debug("create container")
# todo: held job if status not 0
if self.Request_tast_update(task['task_id'])['status'] == 0:
return True
return False
if task['method'] == "service_mx_delete":
logging.debug("create container")
# todo: held job if status not 0
if self.Request_tast_update(task['task_id'])['status'] == 0:
return True
return False
if task['method'] == "service_web_add":
logging.debug("create container")
# todo: held job if status not 0
nginx.Nginx().vhost_add(task['parameters']['vhost_id'], task['parameters']['vhost'],
task['parameters']['container_ip'])
nginx.Service().reload()
task_update_result = TCPClient().request(
Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_web_delete":
logging.debug("service_web_delete")
# todo: held job if status not 0
print
print task
nginx.Nginx().vhost_delete(task['parameters']['container_ip'], task['parameters']['vhost_id'])
nginx.Service().reload()
task_update_result = TCPClient().request(
Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_web_update":
logging.debug("create container")
# todo: held job if status not 0
if self.Request_tast_update(task['task_id'])['status'] == 0:
return True
return False
if task['method'] == "service_ssh_allow":
logging.debug("service_ssh_allow")
# todo: held job if status not 0
values = (self.interface2ip(), task['parameters']['port'], task['parameters']['container_ip'], 22)
os.popen("ufw allow %s" % task['parameters']['port'])
os.popen("iptables -t nat -I PREROUTING -p tcp -d %s --dport %s -j DNAT --to %s:%s" % values)
task_update_result = TCPClient().request(
Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_ssh_deny":
logging.debug("service_ssh_deny")
# todo: held job if status not 0
logging.debug("------")
logging.debug(task)
os.popen("ufw deny %s" % task['parameters']['port'])
logging.debug("------")
task_update_result = TCPClient().request(
Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
else:
if task['status'] == 4:
print "auth fail"
else:
print "structure version not supported"
return None

View file

@ -137,14 +137,19 @@ while True:
# Create new virtual machine # Create new virtual machine
if task.get('task') == 'vm_create': if task.get('task') == 'vm_create':
nodeclient.task_status_update(task['id'], 1) nodeclient.task_status_update(task['id'], 1)
vm_id = task['plain']['vm_id'] vm_id = task['plain']['vm_id']
# TODO: if container doesn't exists then complete task and report about this fact # TODO: if container doesn't exists then complete task and report about this fact
p = task['plain'] p = task['plain']
try:
# автоматически определяем подходящий сетевой интерфейс исходя из имеющегося ipv4
interface = Detect().get_suitable_interface(p['ipv4'])
# interface = settings.get('node', 'interface')
# автоматически определяем подходящий сетевой интерфейс исходя из имеющегося ipv4
interface = Detect().get_suitable_interface(p['ipv4'])
if not interface:
interface = settings.get('node', 'interface')
try:
qemu.QEMU().create( qemu.QEMU().create(
p['cores'], p['memory'], p['storage'], p['swap'], p['hostname'], p['cores'], p['memory'], p['storage'], p['swap'], p['hostname'],
p['ipv4'], p['ipv4_gateway'], p['dns1'], p['dns2'], p['password'], p['ipv4'], p['ipv4_gateway'], p['dns1'], p['dns2'], p['password'],

View file

@ -1,17 +1,35 @@
#!/usr/bin/env python #!/usr/bin/env python
# coding: utf-8 # coding: utf-8
from SWSCloudNode import Node import requests
from SWSCloudNode import lxc from SWSCloudNode import Node, StatisticsReporter
from SWSCloudNode import LXC
from SWSCloudNode import Qemu
from SWSCloudNode import QemuStats
cls_lxc = lxc.lxc() cls_lxc = LXC()
cls_qemu = Qemu()
cls_node = Node() cls_node = Node()
containers = cls_lxc.list() for container in cls_lxc.list():
for container in containers:
# print container # print container
info = cls_lxc.info(container) info = cls_lxc.info(container)
info['container_id'] = info['name'] info['container_id'] = info['name']
print cls_node.report_container_stats(info['container_id'], info) print cls_node.report_container_stats(info['container_id'], info)
# print info # print info
"""
curl localhost:8089/node_stats/v1/compute/vms/04ea5600-89c6-11e6-b1e1-fb8145d56ed7 -X POST --header 'Content-Type: application/json'
"""
vms = cls_qemu.list().get('online')
for vm in vms:
dom = vms.get(vm).get('hostname')
data = dict(
nework=QemuStats().network(dom),
cpu=QemuStats().cpu(dom),
memory=QemuStats().memory(dom))
print StatisticsReporter().send_vm_statistics(dom, data)

View file

@ -4,13 +4,13 @@ from setuptools import setup
setup( setup(
name='SWSCloudNode', name='SWSCloudNode',
version='3.1.7', version='3.3.0',
author='Vyacheslav Anzhiganov', author='Vyacheslav Anzhiganov',
author_email='vanzhiganov@ya.ru', author_email='vanzhiganov@ya.ru',
packages=[ packages=[
'SWSCloudNode', 'SWSCloudNode',
'SWSCloudNode.lxc', 'SWSCloudNode.compute.lxc',
'SWSCloudNode.qemu', 'SWSCloudNode.compute.qemu',
], ],
scripts=[ scripts=[
'cloud_node_agent.py', 'cloud_node_agent.py',
@ -18,6 +18,7 @@ setup(
], ],
install_requires=[ install_requires=[
'requests', 'requests',
'netaddr==0.7.18' 'netaddr==0.7.18',
'libvirt-python==2.2.0',
], ],
) )