agent/node/__init__.py

284 lines
11 KiB
Python

# coding: utf-8
import config
import os
import logging
import subprocess
import json
import shutil
import socket
# import dnsmasq
import lxc
# import nginx
import commands
import requests
class NodeClient():
def tasks_get(self):
response = requests.get('http://%s/server_api/tasks?node_id=%s&node_secret=%s' % (config.server, config.node_id, config.node_secret))
return response.json()
def task_status_update(self, task_id, status):
response = requests.get('http://%s/server_api/task_status_update?node_id=%s&node_secret=%s&task_id=%s&status=%s' % (config.server, config.node_id, config.node_secret, task_id, status))
return response.json()
def report_container_status(self, container_id, statistics):
data = {
'node_id': config.node_id,
'node_secret': config.node_secret,
'status': json.dumps(statistics)
}
print statistics
# for i in statistics:
# data[i] = statistics[i]
response = requests.post(
'http://%s/server_api/report/container_status' % config.server,
data=data
)
if response.status_code == 200:
return response.json()
# print response.text
return False
def __container_config_create(container_id, link, ipv4, ipv6):
cfg = []
cfg.append("lxc.network.type = veth")
cfg.append("lxc.network.flags = up")
cfg.append("lxc.network.name = eth0")
cfg.append("lxc.network.link = %s" % link)
if ipv4['ipv4']:
# cfg.append('lxc.network.ipv4 = %s/32' % ipv4['ipv4'])
cfg.append('lxc.network.ipv4 = %s' % ipv4['ipv4'])
cfg.append('lxc.network.ipv4.gateway = %s' % ipv4['ipv4_gateway'])
if 'ipv6' in ipv6 and 'ipv6_gateway' in ipv6:
# cfg.append('lxc.network.ipv6 = %s/64' % ipv6['ipv6'])
cfg.append('lxc.network.ipv6 = %s' % ipv6['ipv6'])
cfg.append('lxc.network.ipv6.gateway = %s', ipv6['ipv6_gateway'])
config_file = '/var/lib/gocloud/node/configs/%s.config' % container_id
cfgfile = open(config_file, 'w')
cfgfile.write('\n'.join(cfg))
cfgfile.write('\n')
cfgfile.close()
return True
def __container_authkey_create(container_id, auth_key):
# create ssh_key.pub
authkey_file = '/var/lib/gocloud/node/auth-keys/%s.pub' % container_id
ak = open(authkey_file, 'w')
ak.write(auth_key)
ak.write('\n')
ak.close()
return True
class Task(NodeClient):
def interface2ip(self):
# intf = open(self.settings['proxy_interface'], 'r').read().split('\n')[0]
interface = "eth0"
intf_ip = commands.getoutput("ip address show dev " + interface).split()
intf_ip = intf_ip[intf_ip.index('inet') + 1].split('/')[0]
return intf_ip
def container_create(self, task):
return True
def container_destroy(self, task):
# check exists container name
if lxc.lxc().exists(task['parameters']['hostname']):
# todo: if hostname already exists then node callback to server to rename container
return False
lxc.lxc().destroy(task['parameters']['hostname'])
# TODO: check status
# delete record from dnsmasq
# dnsmasq.Dnsmasq().delete(task['parameters']['hostname'])
return True
def container_start(self, task):
logging.debug("container_start")
print "================ "
print task
lxc.lxc().start(task['parameters']['hostname'])
# TODO: check status
return True
def container_stop(self, task):
logging.debug("container_stop")
lxc.lxc().stop(task['parameters']['hostname'])
# TODO: check status
return True
def container_restart(self, task):
logging.debug("container_restart")
lxc.lxc().stop(task['parameters']['hostname'])
lxc.lxc().start(task['parameters']['hostname'])
# TODO: check status
return True
def init(self):
task = TCPClient().request(Request().build("task_get", self.request_auth, None))
# check exists element 'version'
if not "version" in task:
logging.error("Response not contain 'version' element")
return False
if task['version'] == "1.0":
if task['status'] == 0:
if task['method'] == "container_create":
# create container
result = Task().container_create(task)
if not result:
return False
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
# todo: hold job if status not 0
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_destroy":
# delete hostname from dnsmasq /etc/lxc/dnsmasq.conf
result = self.container_destroy(task)
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
# todo: held job if status not 0
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_start":
self.container_start(task)
# todo: held job if status not 0
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_stop":
self.container_stop(task)
# TODO: held job if status not 0
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_restart":
self.container_restart(task)
# todo: held job if status not 0
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "container_clone":
logging.debug("container_clone")
# TODO: set clone_IP to DNSMASQ
subprocess.call("/usr/bin/lxc-clone -o %(hostname)s -n %(clone_hostname)s" % task['parameters'], shell=True)
# TODO: check container status
# todo: held job if status not 0
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_mx_add":
logging.debug("create container")
# todo: held job if status not 0
if self.Request_tast_update(task['task_id'])['status'] == 0:
return True
return False
if task['method'] == "service_mx_delete":
logging.debug("create container")
# todo: held job if status not 0
if self.Request_tast_update(task['task_id'])['status'] == 0:
return True
return False
if task['method'] == "service_web_add":
logging.debug("create container")
# todo: held job if status not 0
nginx.Nginx().vhost_add(task['parameters']['vhost_id'], task['parameters']['vhost'], task['parameters']['container_ip'])
nginx.Service().reload()
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_web_delete":
logging.debug("service_web_delete")
# todo: held job if status not 0
print
print task
nginx.Nginx().vhost_delete(task['parameters']['container_ip'], task['parameters']['vhost_id'])
nginx.Service().reload()
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_web_update":
logging.debug("create container")
# todo: held job if status not 0
if self.Request_tast_update(task['task_id'])['status'] == 0:
return True
return False
if task['method'] == "service_ssh_allow":
logging.debug("service_ssh_allow")
# todo: held job if status not 0
values = (self.interface2ip(), task['parameters']['port'], task['parameters']['container_ip'], 22)
os.popen("ufw allow %s" % task['parameters']['port'])
os.popen("iptables -t nat -I PREROUTING -p tcp -d %s --dport %s -j DNAT --to %s:%s" % values)
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
if task['method'] == "service_ssh_deny":
logging.debug("service_ssh_deny")
# todo: held job if status not 0
logging.debug("------")
logging.debug(task)
os.popen("ufw deny %s" % task['parameters']['port'])
logging.debug("------")
task_update_result = TCPClient().request(Request().build("task_update", self.request_auth, {"task_id": task['task_id'], "status": 0}))
if task_update_result['status'] == 0:
return True
return False
else:
if task['status'] == 4:
print "auth fail"
else:
print "structure version not supported"
return None
class Report():
# def __init__(self, auth):
def container_info(self, data):
"""
Send container info to server
:param data:
:return:
"""
response = TCPClient().request(Request().build("report_container_info", config.auth, data))
if response['status'] == 0:
return True
return False