This commit is contained in:
Vyacheslav Anzhiganov 2015-12-01 02:33:55 +03:00
parent 774c5f2485
commit c34df881ae
5 changed files with 740 additions and 0 deletions

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
*.pyc

87
cloud_node_agent.py Normal file
View file

@ -0,0 +1,87 @@
# coding: utf-8
import requests
import config
import ConfigParser
import lxc
import node
nodeclient = node.NodeClient()
tasks = nodeclient.tasks_get()
for task in tasks['results']:
# container_create
print task
print '---------------------'
if task['task'] == 'container_create':
# TODO: update task status to 1
nodeclient.task_status_update(task['id'], 1)
print task['id']
container_id = task['plain']['container_id']
ipv4 = dict()
ipv6 = dict()
if task['plain']['ipv4']:
ipv4['ipv4'] = task['plain']['ipv4']
ipv4['ipv4_gateway'] = task['plain']['ipv4_gateway']
if 'ipv6' in task['plain'] and task['plain']['ipv6'] != '' and 'ipv6_gateway' in task['plain']:
ipv6['ipv6'] = task['plain']['ipv6']
ipv6['ipv6_gateway'] = task['plain']['ipv6_gateway']
node.__container_config_create(container_id, config.interface, ipv4, ipv6)
container_config_file = '/var/lib/gocloud/node/configs/%s.config' % container_id
# create ssh_key.pub
param_auth_key = ''
if 'ssh_key' in task['plain'] and task['plain']['ssh_key'] != '':
node.__container_authkey_create(container_id, task['plain']['ssh_key'])
param_auth_key = '--auth-key /var/lib/gocloud/node/auth-keys/%s.pub' % container_id
# create container
lxc.lxc().create(
container_id,
container_config_file,
'ubuntu',
None,
"%s --user %s --password %s" % (
param_auth_key,
task['plain']['username'],
task['plain']['password']
)
)
lxc.lxc().start(container_id)
# TODO: update task status to 2
nodeclient.task_status_update(task['id'], 2)
# container_start
if task['task'] == 'container_start':
nodeclient.task_status_update(task['id'], 1)
container_id = task['plain']['container_id']
lxc.lxc().start(container_id)
nodeclient.task_status_update(task['id'], 2)
# container_restart
if task['task'] == 'container_restart':
nodeclient.task_status_update(task['id'], 1)
container_id = task['plain']['container_id']
lxc.lxc().stop(container_id)
lxc.lxc().start(container_id)
nodeclient.task_status_update(task['id'], 2)
# container_stop
if task['task'] == 'container_stop':
nodeclient.task_status_update(task['id'], 1)
container_id = task['plain']['container_id']
lxc.lxc().stop(container_id)
nodeclient.task_status_update(task['id'], 2)
# container_delete
if task['task'] == 'container_delete':
nodeclient.task_status_update(task['id'], 1)
container_id = task['plain']['container_id']
lxc.lxc().destroy(container_id)
nodeclient.task_status_update(task['id'], 2)

4
config.py Normal file
View file

@ -0,0 +1,4 @@
server = 'localhost:5000'
node_id = "f411b7d6-bf93-4fcd-91ee-03e5343d0187"
node_secret = "b3c9a8b0-95ca-11e5-bec1-28d244e159e9"
interface = 'br0:0'

390
lxc/__init__.py Normal file
View file

@ -0,0 +1,390 @@
import subprocess
import logging
import threading
import select
import pty
import os
import signal
class ContainerAlreadyExists(Exception):
pass
class ContainerAlreadyRunning(Exception):
pass
class ContainerNotExists(Exception):
pass
_logger = logging.getLogger("pylxc")
_monitor = None
class _LXCMonitor(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self._process = None
self._monitors = {}
def run(self):
master, slave = pty.openpty()
cmd = ['lxc-monitor', '-n', '.*']
self._process = subprocess.Popen(cmd, stdout=slave, bufsize=1)
stdout = os.fdopen(master)
while self._process.poll() is None:
ready, _, _ = select.select([stdout], [], [], 0.1)
if ready:
logging.debug("Waiting for state change")
state = stdout.readline()
inf = state.strip().split()
container = inf[0].strip("'")
state = inf[-1].strip('[]')
if container in self._monitors:
logging.debug("State of container '%s' changed to '%s'", container, state)
self._monitors[container](state)
_logger.info("LXC Monitor stopped!")
def add_monitor(self, name, callback):
self._monitors[name] = callback
def rm_monitor(self, name):
self._monitors.pop(name)
def is_monitored(self, name):
return name in self._monitors
def kill(self):
try:
self._process.terminate()
self._process.wait()
except:
pass
self.join()
class lxc():
def __init__(self):
logging.debug("")
def list(self, status=None):
"""
:return: ['container_first', 'container_second']
"""
if status in ['active', 'frozen', 'running', 'stopped', 'nesting']:
path = "--%s" % status
else:
path = ""
cmd = ['lxc-ls', path]
out = subprocess.check_output(cmd).splitlines()
# print out
return out
def exists(self, name):
"""
checks if a given container is defined or not
"""
if name in self.list():
return True
return False
def start(self, name, config_file=None):
"""
starts a container in daemon mode
"""
if not self.exists(name):
raise ContainerNotExists("The container (%s) does not exist!" % name)
if name in self.list("running"):
raise ContainerAlreadyRunning('The container %s is already started!' % name)
cmd = ['lxc-start', '-n', name, '-d']
if config_file:
cmd += ['-f', config_file]
return subprocess.check_call(cmd)
def stop(self, name):
"""
stops a container
"""
if not self.exists(name):
raise ContainerNotExists("The container (%s) does not exist!" % name)
cmd = ['lxc-stop', '-n', name]
return subprocess.check_call(cmd)
def destroy(self, name):
"""
removes a container [stops a container if it's running and]
raises ContainerNotExists exception if the specified name is not created
"""
if not self.exists(name):
raise ContainerNotExists("The container (%s) does not exist!" % name)
# todo: check status. If status not STOPPED - run method self.stop(name)
# todo: add condition
self.stop(name)
cmd = ['lxc-destroy', '-f', '-n', name]
return subprocess.check_call(cmd)
def info(self, name):
"""
returns info dict about the specified container
"""
if not self.exists(name):
raise ContainerNotExists("The container (%s) does not exist!" % name)
cmd = ['lxc-info', '-n', name, "-H"]
out = subprocess.check_output(cmd).splitlines()
clean = []
info = {}
for line in out:
# print line
if line not in clean:
clean.append(line)
for line in clean:
key, value = line.split(":")
# strip
key = key.lstrip()
value = value.lstrip()
key = key.replace(" ", "_")
info[key.lower()] = value
return info
def freeze(self, name):
"""
freezes the container
"""
if not self.exists(name):
raise ContainerNotExists("The container (%s) does not exist!" % name)
cmd = ['lxc-freeze', '-n', name]
subprocess.check_call(cmd)
def unfreeze(self, name):
"""
unfreezes the container
"""
if not self.exists(name):
raise ContainerNotExists("The container (%s) does not exist!" % name)
cmd = ['lxc-unfreeze', '-n', name]
subprocess.check_call(cmd)
def notify(self, name, states, callback):
"""
executes the callback function with no parameters when the container reaches the specified state or states
states can be or-ed or and-ed
notify('test', 'STOPPED', letmeknow)
notify('test', 'STOPPED|RUNNING', letmeknow)
"""
if not self.exists(name):
raise ContainerNotExists("The container (%s) does not exist!" % name)
cmd = ['lxc-wait', '-n', name, '-s', states]
def th():
subprocess.check_call(cmd)
callback()
_logger.info("Waiting on states %s for container %s", states, name)
threading.Thread(target=th).start()
def checkconfig(self):
"""
returns the output of lxc-checkconfig
"""
cmd = ['lxc-checkconfig']
return subprocess.check_output(cmd).replace('[1;32m', '').replace('[1;33m', '').replace('[0;39m', '').replace('[1;32m', '').replace(' ', '').split('\n')
def create(self, name, config_file=None, template=None, backing_store=None, template_options=None):
"""
Create a new container
raises ContainerAlreadyExists exception if the container name is reserved already.
:param template_options: Options passed to the specified template
:type template_options: list or None
"""
if self.exists(name):
raise ContainerAlreadyExists("The Container %s is already created!" % name)
command = []
command.append("lxc-create -n %s" % name)
if config_file:
command.append(' -f %s' % config_file)
if template:
command.append(' -t %s' % template)
if backing_store:
command.append(' -B %s' % backing_store)
if template_options:
command.append(' -- %s' % template_options)
print " ".join(command)
print
# create = subprocess.check_call(command, shell=True)
create = subprocess.check_call(" ".join(command), shell=True)
print
print create
print
# if create == 0:
# if not self.exists(name):
# _logger.critical("The Container %s doesn't seem to be created! (options: %s)", name, command[3:])
# raise ContainerNotExists("The container (%s) does not exist!" % name)
#
# _logger.info("Container %s has been created with options %s", name, command[3:])
# return False
return True
def reset_password(self, container_name, username, password):
call = [
'echo',
'"%s:${PASSWORD:-%s}"' % (username, password),
"|",
"chroot",
"/var/lib/lxc/%s/rootfs/ chpasswd" % container_name
]
subprocess.check_call(call, shell=True)
# subprocess.call("echo \"ubuntu:${PASSWORD:-%(password)s}\" | chroot /var/lib/lxc/%(hostname)s/rootfs/ chpasswd" % task['parameters'], shell=True)
return True
# def running():
# '''
# returns a list of the currently running containers
# '''
# return all_as_dict()['Running']
# def stopped():
# '''
# returns a list of the stopped containers
# '''
# return all_as_dict()['Stopped']
# def all_as_dict():
# '''
# returns a dict {'Running': ['cont1', 'cont2'],
# 'Stopped': ['cont3', 'cont4']
# }
#
# '''
# cmd = ['lxc-ls']
# out = subprocess.check_output(cmd).splitlines()
# print out
# stopped = []
# running = []
# frozen = []
# current = None
# for c in out:
# c = c.strip()
# if c == 'RUNNING':
# current = running
# continue
# if c == 'STOPPED':
# current = stopped
# continue
# if c == 'FROZEN':
# current = frozen
# continue
# if not len(c):
# continue
# current.append(c)
# return {'Running': running,
# 'Stopped': stopped,
# 'Frozen': frozen}
# def all_as_list():
# '''
# returns a list of all defined containers
# '''
# as_dict = all_as_dict()
# containers = as_dict['Running'] + as_dict['Frozen'] + as_dict['Stopped']
# containers_list = []
# for i in containers:
# i = i.replace(' (auto)', '')
# containers_list.append(i)
# return containers_list
# def kill(name, signal):
# '''
# sends a kill signal to process 1 of ths container <name>
# :param signal: numeric signal
# '''
# if not exists(name):
# raise ContainerNotExists("The container (%s) does not exist!" % name)
# cmd = ['lxc-kill', '--name=%s' % name, signal]
# subprocess.check_call(cmd)
# def shutdown(name, wait=False, reboot=False):
# '''
# graceful shutdown sent to the container
# :param wait: should we wait for the shutdown to complete?
# :param reboot: reboot a container, ignores wait
# '''
# if not exists(name):
# raise ContainerNotExists("The container (%s) does not exist!" % name)
# cmd = ['lxc-shutdown', '-n', name]
# if wait:
# cmd += ['-w']
# if reboot:
# cmd += ['-r']
#
# subprocess.check_call(cmd)
# def monitor(name, callback):
# '''
# monitors actions on the specified container,
# callback is a function to be called on
# '''
# global _monitor
# if not exists(name):
# raise ContainerNotExists("The container (%s) does not exist!" % name)
# if _monitor:
# if _monitor.is_monitored(name):
# raise Exception("You are already monitoring this container (%s)" % name)
# else:
# _monitor = _LXCMonitor()
# logging.info("Starting LXC Monitor")
# _monitor.start()
# def kill_handler(sg, fr):
# stop_monitor()
# signal.signal(signal.SIGTERM, kill_handler)
# signal.signal(signal.SIGINT, kill_handler)
# _monitor.add_monitor(name, callback)
# def unmonitor(name):
# if not exists(name):
# raise ContainerNotExists("The container (%s) does not exist!" % name)
# if not _monitor:
# raise Exception("LXC Monitor is not started!")
# if not _monitor.is_monitored(name):
# raise Exception("This container (%s) is not monitored!" % name)
# _monitor.rm_monitor(name)
# def stop_monitor():
# global _monitor
# if _monitor:
# logging.info("Killing LXC Monitor")
# _monitor.kill()
# _monitor = None
# signal.signal(signal.SIGTERM, signal.SIG_DFL)
# signal.signal(signal.SIGINT, signal.SIG_DFL)

258
node/__init__.py Normal file
View file

@ -0,0 +1,258 @@
import config
import os
import logging
import subprocess
import json
import shutil
import socket
import dnsmasq
import lxc
# import nginx
import commands
import requests
class NodeClient():
def tasks_get(self):
response = requests.get('http://%s/server_api/tasks?node_id=%s&node_secret=%s' % (config.server, config.node_id, config.node_secret))
return response.json()
def task_status_update(self, task_id, status):
response = requests.get('http://%s/server_api/task_status_update?node_id=%s&node_secret=%s&task_id=%s&status=%s' % (config.server, config.node_id, config.node_secret, task_id, status))
return response.json()
def __container_config_create(container_id, link, ipv4, ipv6):
cfg = []
cfg.append("lxc.network.link = %s" % link)
if ipv4['ipv4']:
cfg.append('lxc.network.ipv4 = %s/32' % ipv4['ipv4'])
cfg.append('lxc.network.ipv4.gateway = %s' % ipv4['ipv4_gateway'])
if 'ipv6' in ipv6 and 'ipv6_gateway' in ipv6:
cfg.append('lxc.network.ipv6 = %s/64' % ipv6['ipv6'])
cfg.append('lxc.network.ipv6.gateway = %s', ipv6['ipv6_gateway'])
config_file = '/var/lib/gocloud/node/configs/%s.config' % container_id
cfgfile = open(config_file, 'w')
cfgfile.write('\n'.join(cfg))
cfgfile.write('\n')
cfgfile.close()
return True
def __container_authkey_create(container_id, auth_key):
# create ssh_key.pub
authkey_file = '/var/lib/gocloud/node/auth-keys/%s.pub' % container_id
ak = open(authkey_file, 'w')
ak.write(auth_key)
ak.write('\n')
ak.close()
return True
class Task(NodeClient):
def interface2ip(self):
# intf = open(self.settings['proxy_interface'], 'r').read().split('\n')[0]
interface = "eth0"
intf_ip = commands.getoutput("ip address show dev " + interface).split()
intf_ip = intf_ip[intf_ip.index('inet') + 1].split('/')[0]
return intf_ip
def container_create(self, task):
return True
def container_destroy(self, task):
# check exists container name
if lxc.lxc().exists(task['parameters']['hostname']):
# todo: if hostname already exists then node callback to server to rename container
return False
lxc.lxc().destroy(task['parameters']['hostname'])
# TODO: check status
# delete record from dnsmasq
dnsmasq.Dnsmasq().delete(task['parameters']['hostname'])
return True
def container_start(self, task):
logging.debug("container_start")
print "================ "
print task
lxc.lxc().start(task['parameters']['hostname'])
# TODO: check status
return True
def container_stop(self, task):
logging.debug("container_stop")
lxc.lxc().stop(task['parameters']['hostname'])
# TODO: check status
return True
def container_restart(self, task):
logging.debug("container_restart")
lxc.lxc().stop(task['parameters']['hostname'])
lxc.lxc().start(task['parameters']['hostname'])
# TODO: check status
return True
def init(self):
task = TCPClient().request(Request().build("task_get", self.request_auth, None))
# check exists element 'version'
if not "version" in task:
logging.error("Response not contain 'version' element")
return False
if task['version'] == "1.0":
if task['status'] == 0:
if task['method'] == "container_create":
# create container
result = Task().container_create(task)
if not result:
return False
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
# todo: hold job if status not 0
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_destroy":
# delete hostname from dnsmasq /etc/lxc/dnsmasq.conf
result = self.container_destroy(task)
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
# todo: held job if status not 0
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_start":
self.container_start(task)
# todo: held job if status not 0
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_stop":
self.container_stop(task)
# TODO: held job if status not 0
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_restart":
self.container_restart(task)
# todo: held job if status not 0
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_clone":
logging.debug("container_clone")
# TODO: set clone_IP to DNSMASQ
subprocess.call("/usr/bin/lxc-clone -o %(hostname)s -n %(clone_hostname)s" % task['parameters'], shell=True)
# TODO: check container status
# todo: held job if status not 0
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_mx_add":
logging.debug("create container")
# todo: held job if status not 0
if self.Request_tast_update(task['task_id'])['status'] == 0:
return True
return False
if task['method'] == "service_mx_delete":
logging.debug("create container")
# todo: held job if status not 0
if self.Request_tast_update(task['task_id'])['status'] == 0:
return True
return False
if task['method'] == "service_web_add":
logging.debug("create container")
# todo: held job if status not 0
nginx.Nginx().vhost_add(task['parameters']['vhost_id'], task['parameters']['vhost'], task['parameters']['container_ip'])
nginx.Service().reload()
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_web_delete":
logging.debug("service_web_delete")
# todo: held job if status not 0
print
print task
nginx.Nginx().vhost_delete(task['parameters']['container_ip'], task['parameters']['vhost_id'])
nginx.Service().reload()
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_web_update":
logging.debug("create container")
# todo: held job if status not 0
if self.Request_tast_update(task['task_id'])['status'] == 0:
return True
return False
if task['method'] == "service_ssh_allow":
logging.debug("service_ssh_allow")
# todo: held job if status not 0
values = (self.interface2ip(), task['parameters']['port'], task['parameters']['container_ip'], 22)
os.popen("ufw allow %s" % task['parameters']['port'])
os.popen("iptables -t nat -I PREROUTING -p tcp -d %s --dport %s -j DNAT --to %s:%s" % values)
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_ssh_deny":
logging.debug("service_ssh_deny")
# todo: held job if status not 0
logging.debug("------")
logging.debug(task)
os.popen("ufw deny %s" % task['parameters']['port'])
logging.debug("------")
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
else:
if task['status'] == 4:
print "auth fail"
else:
print "structure version not supported"
return None
class Report():
# def __init__(self, auth):
def container_info(self, data):
"""
Send container info to server
:param data:
:return:
"""
response = TCPClient().request(Request().build("report_container_info", config.auth, data))
if response['status'] == 0:
return True
return False