mirror of
https://git.centos.org/centos-git-common.git
synced 2025-02-23 16:22:56 +00:00
Provide sample MQTT scripts for the new infrastructure
This commit is contained in:
parent
9023a882e1
commit
0b0750a5f7
7 changed files with 804 additions and 0 deletions
46
mqtt/README.md
Normal file
46
mqtt/README.md
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
# MQTT scripts
|
||||||
|
|
||||||
|
The mqtt.git.centos.org server requires authentication. As a result we've provided some client server scripts that will let you protect your keys.
|
||||||
|
|
||||||
|
These can also be used as a basis for building your own MQTT automation scripts.
|
||||||
|
|
||||||
|
## Scripts:
|
||||||
|
|
||||||
|
* send-mqtt-to-dbus.py - Connects the MQTT messages to a dbus interface.
|
||||||
|
To fully protect your keys you can setup the system bus (a config is provided by --dbus-config)
|
||||||
|
Then you can have this run as a dedicated user that has access to your keys.
|
||||||
|
See the `on_mqtt_connect` and `on_mqtt_message` functions for customizing the behavior.
|
||||||
|
|
||||||
|
* listen-on-dbus-for-mqtt-signals.py - Listens to messages sent to dbus and performs an action.
|
||||||
|
You can set this to run a generic command or customize it to fit your needs.
|
||||||
|
See the `signal_recieved` function for customizing the behavior.
|
||||||
|
|
||||||
|
* example-safe-command.py - It is an example of how to run a command from listen-on-dbus-for-mqtt-signals.py
|
||||||
|
|
||||||
|
* send-mqtt-to-irc.py - An untested IRC bot that will (in theory) chat out the messages.
|
||||||
|
|
||||||
|
## Systemd Unit:
|
||||||
|
|
||||||
|
Some sample systemd unit files are provided to work with the example scripts.
|
||||||
|
|
||||||
|
NOTE: They require customization before use.
|
||||||
|
You must at minimum set the User= to a trusted user.
|
||||||
|
|
||||||
|
* listen-on-dbus-for-mqtt-signals.service
|
||||||
|
You should adjust the path of commands and select a safe command to execute.
|
||||||
|
|
||||||
|
* send-mqtt-to-dbus.service
|
||||||
|
You should setup the system dbus profile with --dbus-config
|
||||||
|
|
||||||
|
## Container notes:
|
||||||
|
|
||||||
|
It is _not_ considered safe to share the host dbus (system or session) with a container. This can permit the container to escape into the host and violate the security of your system.
|
||||||
|
|
||||||
|
For example, here is how you can reboot a host from dbus if you've got rights.
|
||||||
|
```
|
||||||
|
DBUS_SYSTEM_BUS_ADDRESS=unix:path=/run/dbus/system_bus_socket \
|
||||||
|
dbus-send --system --print-reply \
|
||||||
|
--dest=org.freedesktop.systemd1 \
|
||||||
|
/org/freedesktop/systemd1 \
|
||||||
|
org.freedesktop.systemd1.Manager.Reboot
|
||||||
|
```
|
55
mqtt/example-safe-command.py
Executable file
55
mqtt/example-safe-command.py
Executable file
|
@ -0,0 +1,55 @@
|
||||||
|
#!/usr/bin/env python3.6
|
||||||
|
#pylint: disable=line-too-long
|
||||||
|
#
|
||||||
|
# Copyright (2019). Fermi Research Alliance, LLC.
|
||||||
|
# Initial Author: Pat Riehecky <riehecky@fnal.gov>
|
||||||
|
#
|
||||||
|
'''
|
||||||
|
Example command to run from listen-on-dbus-for-mqtt-signals.py
|
||||||
|
'''
|
||||||
|
|
||||||
|
## Uncomment these for python2 support
|
||||||
|
#from __future__ import unicode_literals
|
||||||
|
#from __future__ import absolute_import
|
||||||
|
#from __future__ import print_function
|
||||||
|
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import textwrap
|
||||||
|
|
||||||
|
from pprint import pprint
|
||||||
|
|
||||||
|
try:
|
||||||
|
from argparse import ArgumentParser
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
print("Please install argparse - rpm: python-argparse", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
##########################################
|
||||||
|
def setup_args():
|
||||||
|
'''
|
||||||
|
Setup the argparse object.
|
||||||
|
|
||||||
|
Make sure all fields have defaults so we could use this as an object
|
||||||
|
'''
|
||||||
|
parser = ArgumentParser(description=textwrap.dedent(__doc__))
|
||||||
|
|
||||||
|
parser.add_argument('signal', help='The dbus signal is set here')
|
||||||
|
|
||||||
|
return parser
|
||||||
|
|
||||||
|
##########################################
|
||||||
|
|
||||||
|
##########################################
|
||||||
|
##########################################
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
PARSER = setup_args()
|
||||||
|
ARGS = PARSER.parse_args()
|
||||||
|
|
||||||
|
MESSAGE = json.loads(sys.stdin.read())
|
||||||
|
print("Your dbus-signal was %s" % ARGS.signal)
|
||||||
|
print("Your message was decoded as %s (between the lines)" % type(MESSAGE))
|
||||||
|
print("------------------------------------------------")
|
||||||
|
pprint(MESSAGE)
|
||||||
|
print("------------------------------------------------")
|
128
mqtt/listen-on-dbus-for-mqtt-signals.py
Executable file
128
mqtt/listen-on-dbus-for-mqtt-signals.py
Executable file
|
@ -0,0 +1,128 @@
|
||||||
|
#!/usr/bin/env python3.6
|
||||||
|
#pylint: disable=line-too-long
|
||||||
|
#
|
||||||
|
# Copyright (2019). Fermi Research Alliance, LLC.
|
||||||
|
# Initial Author: Pat Riehecky <riehecky@fnal.gov>
|
||||||
|
#
|
||||||
|
'''
|
||||||
|
Listen to dbus events
|
||||||
|
'''
|
||||||
|
|
||||||
|
## Uncomment these for python2 support
|
||||||
|
#from __future__ import unicode_literals
|
||||||
|
#from __future__ import absolute_import
|
||||||
|
#from __future__ import print_function
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os.path
|
||||||
|
import sys
|
||||||
|
import textwrap
|
||||||
|
|
||||||
|
from subprocess import Popen, PIPE
|
||||||
|
|
||||||
|
DBUS_INTERFACE = 'org.centos.git.mqtt'
|
||||||
|
|
||||||
|
try:
|
||||||
|
from pydbus import SystemBus, SessionBus
|
||||||
|
from pydbus.generic import signal
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
print("Please install pydbus - rpm: python-pydbus", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
try:
|
||||||
|
from gi.repository.GLib import MainLoop
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
print("Please install pygobject - rpm: python-gobject", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
try:
|
||||||
|
from argparse import ArgumentParser
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
print("Please install argparse - rpm: python-argparse", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
##########################################
|
||||||
|
def setup_args():
|
||||||
|
'''
|
||||||
|
Setup the argparse object.
|
||||||
|
|
||||||
|
Make sure all fields have defaults so we could use this as an object
|
||||||
|
'''
|
||||||
|
parser = ArgumentParser(description=textwrap.dedent(__doc__))
|
||||||
|
|
||||||
|
parser.add_argument('--debug',action='store_true',
|
||||||
|
help='Print out all debugging actions',
|
||||||
|
default=False)
|
||||||
|
parser.add_argument('--dbus-use-system-bus',action='store_true',
|
||||||
|
help='Should we use the global SystemBus or the user SessionBus. The SystemBus requires settings in /etc/dbus-1/system.d/myservice.conf',
|
||||||
|
default=False)
|
||||||
|
parser.add_argument('--run-command', metavar='<ABSOLUTE_PATH>',
|
||||||
|
help='Command to run with message payload. sys.argv[1] will be the DBUS signal name, STDIN will be the payload as json. If no run command, simply print the results to STDOUT.',
|
||||||
|
default='', type=str)
|
||||||
|
|
||||||
|
return parser
|
||||||
|
|
||||||
|
##########################################
|
||||||
|
|
||||||
|
##########################################
|
||||||
|
##########################################
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
PARSER = setup_args()
|
||||||
|
ARGS = PARSER.parse_args()
|
||||||
|
|
||||||
|
if ARGS.run_command != '':
|
||||||
|
if not os.path.exists(ARGS.run_command):
|
||||||
|
raise ValueError('No such file %s', ARGS.run_command)
|
||||||
|
|
||||||
|
MYLOGGER = logging.getLogger()
|
||||||
|
|
||||||
|
if ARGS.debug:
|
||||||
|
MYLOGGER.setLevel(logging.DEBUG)
|
||||||
|
else:
|
||||||
|
MYLOGGER.setLevel(logging.WARNING)
|
||||||
|
|
||||||
|
handler = logging.StreamHandler(sys.stderr)
|
||||||
|
handler.setLevel(logging.DEBUG)
|
||||||
|
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
MYLOGGER.addHandler(handler)
|
||||||
|
|
||||||
|
PROGRAM_NAME = os.path.basename(sys.argv[0])
|
||||||
|
MYLOGGER.debug('Running:%s args:%s', PROGRAM_NAME, sys.argv[1:])
|
||||||
|
|
||||||
|
if ARGS.dbus_use_system_bus:
|
||||||
|
BUS = SystemBus()
|
||||||
|
else:
|
||||||
|
BUS = SessionBus()
|
||||||
|
|
||||||
|
def signal_recieved(sender, obj, iface, signal, params):
|
||||||
|
''' Define in scope so I can read ARGS '''
|
||||||
|
# sanitize all my single quotes
|
||||||
|
signal_msg = json.dumps(json.loads(params[0]))
|
||||||
|
|
||||||
|
logging.debug("sender:%s object:%s iface:%s signal:%s all_params:%s signal_msg=%s", sender, obj, iface, signal, params, signal_msg)
|
||||||
|
|
||||||
|
logging.debug("Running %s %s < %s", ARGS.run_command, signal, signal_msg)
|
||||||
|
if ARGS.run_command == '':
|
||||||
|
print("signal:%s signal_msg:%s" % (signal, signal_msg), file=sys.stderr)
|
||||||
|
else:
|
||||||
|
# Or you can customize this to fit your needs
|
||||||
|
proc = Popen([ARGS.run_command, signal], stdin=PIPE, cwd='/tmp', start_new_session=True, universal_newlines=True)
|
||||||
|
proc.communicate(input=signal_msg)
|
||||||
|
proc.wait(timeout=300)
|
||||||
|
|
||||||
|
if ARGS.dbus_use_system_bus:
|
||||||
|
MYLOGGER.debug('Subscribing to system bus %s', DBUS_INTERFACE)
|
||||||
|
else:
|
||||||
|
MYLOGGER.debug('Subscribing to session bus %s', DBUS_INTERFACE)
|
||||||
|
|
||||||
|
BUS.subscribe(iface=DBUS_INTERFACE, signal_fired=signal_recieved)
|
||||||
|
|
||||||
|
# loop forever, until CTRL+C, or something goes wrong
|
||||||
|
try:
|
||||||
|
MainLoop().run()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logging.debug('Got CTRL+C, exiting cleanly')
|
||||||
|
raise SystemExit
|
16
mqtt/listen-on-dbus-for-mqtt-signals.service
Normal file
16
mqtt/listen-on-dbus-for-mqtt-signals.service
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
[Unit]
|
||||||
|
Description=Listen to org.centos.git.mqtt for actions
|
||||||
|
After=network.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
PrivateTmp=yes
|
||||||
|
Restart=on-failure
|
||||||
|
RestartSec=2s
|
||||||
|
|
||||||
|
#User=<TRUSTEDUSERNAME>
|
||||||
|
Group=nobody
|
||||||
|
ExecStart=/usr/local/bin/listen-on-dbus-for-mqtt-signals.py --dbus-use-system-bus --run-command=/usr/local/bin/my_safe_command.py
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
288
mqtt/send-mqtt-to-dbus.py
Executable file
288
mqtt/send-mqtt-to-dbus.py
Executable file
|
@ -0,0 +1,288 @@
|
||||||
|
#!/usr/bin/env python3.6
|
||||||
|
#pylint: disable=line-too-long
|
||||||
|
#
|
||||||
|
# Copyright (2019). Fermi Research Alliance, LLC.
|
||||||
|
# Initial Author: Pat Riehecky <riehecky@fnal.gov>
|
||||||
|
#
|
||||||
|
'''
|
||||||
|
Connect to the MQTT server and convert messages into dbus signals.
|
||||||
|
'''
|
||||||
|
|
||||||
|
## Uncomment these for python2 support
|
||||||
|
#from __future__ import unicode_literals
|
||||||
|
#from __future__ import absolute_import
|
||||||
|
#from __future__ import print_function
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
import os.path
|
||||||
|
import sys
|
||||||
|
import random
|
||||||
|
import textwrap
|
||||||
|
|
||||||
|
DBUS_INTERFACE = 'org.centos.git.mqtt'
|
||||||
|
|
||||||
|
try:
|
||||||
|
import paho.mqtt.client
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
print("Please install paho.mqtt.client - rpm: python-paho-mqtt", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
try:
|
||||||
|
from pydbus import SystemBus, SessionBus
|
||||||
|
from pydbus.generic import signal
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
print("Please install pydbus - rpm: python-pydbus", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
try:
|
||||||
|
from gi.repository.GLib import MainLoop
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
print("Please install pygobject - rpm: python-gobject", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
try:
|
||||||
|
from argparse import ArgumentParser
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
print("Please install argparse - rpm: python-argparse", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
##########################################
|
||||||
|
def setup_args():
|
||||||
|
'''
|
||||||
|
Setup the argparse object.
|
||||||
|
|
||||||
|
Make sure all fields have defaults so we could use this as an object
|
||||||
|
'''
|
||||||
|
ca_cert = str(os.path.expanduser('~/')) + '.centos-server-ca.cert'
|
||||||
|
user_pubkey = str(os.path.expanduser('~/')) + '.centos.cert'
|
||||||
|
user_privkey = str(os.path.expanduser('~/')) + '.centos.cert'
|
||||||
|
|
||||||
|
# use a psudo random number for keepalive to help spread out the load
|
||||||
|
# some time between 1m 30s and 2m 10s
|
||||||
|
keep_alive = random.randint(90, 130)
|
||||||
|
|
||||||
|
parser = ArgumentParser(description=textwrap.dedent(__doc__))
|
||||||
|
|
||||||
|
parser.add_argument('--debug',action='store_true',
|
||||||
|
help='Print out all debugging actions',
|
||||||
|
default=False)
|
||||||
|
parser.add_argument('--client-connection-name', metavar='<UNIQUESTRING>',
|
||||||
|
help='Use this specific name when connecting. Default is a psudo-random string.',
|
||||||
|
default='', type=str)
|
||||||
|
parser.add_argument('--mqtt-server', metavar='<HOSTNAME>',
|
||||||
|
help='Connect to this MQTT server',
|
||||||
|
default='mqtt.git.centos.org', type=str)
|
||||||
|
parser.add_argument('--mqtt-port', metavar='<PORTNUMBER>',
|
||||||
|
help='Connect to MQTT server on this port',
|
||||||
|
default='8883', type=int)
|
||||||
|
parser.add_argument('--mqtt-source-ip', metavar='<SOURCE_IP>',
|
||||||
|
help='Connect to MQTT server from this address. Default is any.',
|
||||||
|
default='', type=str)
|
||||||
|
parser.add_argument('--mqtt-topic', metavar='<TOPIC_ID>',
|
||||||
|
action='append', nargs='+', type=str,
|
||||||
|
help='Which MQTT topic should we watch. You may set multiple times.')
|
||||||
|
parser.add_argument('--mqtt-keepalive', metavar='<SECONDS>',
|
||||||
|
help='Seconds between MQTT keepalive packets.',
|
||||||
|
default=keep_alive, type=int)
|
||||||
|
parser.add_argument('--mqtt-no-ssl', action='store_false', dest='mqtt_ssl',
|
||||||
|
help='Should MQTT use SSL? Default is to use SSL (and the SSL port).')
|
||||||
|
parser.add_argument('--mqtt-server-ca', metavar='<ABSOLUTE_PATH>',
|
||||||
|
help='Use this CA cert to validate the MQTT Server.',
|
||||||
|
default=ca_cert, type=str)
|
||||||
|
parser.add_argument('--mqtt-client-cert', metavar='<ABSOLUTE_PATH>',
|
||||||
|
help='Use this public key to identify yourself.',
|
||||||
|
default=user_pubkey, type=str)
|
||||||
|
parser.add_argument('--mqtt-client-key', metavar='<ABSOLUTE_PATH>',
|
||||||
|
help='The private key that matches with --mqtt-client-cert .',
|
||||||
|
default=user_privkey, type=str)
|
||||||
|
parser.add_argument('--dbus-use-system-bus',action='store_true',
|
||||||
|
help='Should we use the global SystemBus or the user SessionBus. The SystemBus requires settings in /etc/dbus-1/system.d/myservice.conf',
|
||||||
|
default=False)
|
||||||
|
parser.add_argument('--dbus-config',action='store_true',
|
||||||
|
help='Just output the SystemBus permissions file and exit',
|
||||||
|
default=False)
|
||||||
|
|
||||||
|
return parser
|
||||||
|
|
||||||
|
##########################################
|
||||||
|
class BusMessage(object):
|
||||||
|
"""
|
||||||
|
Server_XML definition.
|
||||||
|
"""
|
||||||
|
dbus = """<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
|
<node>
|
||||||
|
<interface name="{}">
|
||||||
|
<signal name="message">
|
||||||
|
<arg type='s'/>
|
||||||
|
</signal>
|
||||||
|
</interface>
|
||||||
|
</node>
|
||||||
|
""".format(DBUS_INTERFACE)
|
||||||
|
|
||||||
|
# Function does all the work already
|
||||||
|
message = signal()
|
||||||
|
|
||||||
|
def DbusPermissionsConf(interface_name):
|
||||||
|
'''
|
||||||
|
For the SystemBus you need permission to create endpoints
|
||||||
|
'''
|
||||||
|
import getpass
|
||||||
|
whoami = getpass.getuser()
|
||||||
|
|
||||||
|
xml = '''<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
|
<!-- Copy me into /etc/dbus-1/system.d/{interface_name}.conf and reload dbus -->
|
||||||
|
<!-- You can send the signal with: dbus-send ‐‐system ‐‐type=signal / org.centos.git.mqtt.message 'string:{{"test": ["1", "2"]}}' -->
|
||||||
|
<!-- You can watch bus with dbus-monitor ‐‐system 'interface=org.centos.git.mqtt' -->
|
||||||
|
<!DOCTYPE busconfig PUBLIC
|
||||||
|
"-//freedesktop//DTD D-BUS Bus Configuration 1.0//EN"
|
||||||
|
"http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd">
|
||||||
|
<busconfig>
|
||||||
|
<!-- Always allow root to do anything -->
|
||||||
|
<policy user="root">
|
||||||
|
<allow own="{interface_name}" />
|
||||||
|
<allow send_interface="{interface_name}/" />
|
||||||
|
</policy>
|
||||||
|
<!-- Allow a single non-root user to setup interface and send to endpoint -->
|
||||||
|
<!-- You can change this to group='somegroup' if you desire -->
|
||||||
|
<policy user="{whoami}">
|
||||||
|
<allow own="{interface_name}" />
|
||||||
|
<allow send_interface="{interface_name}" send_destination="{interface_name}.message" />
|
||||||
|
</policy>
|
||||||
|
<!-- Always allow anyone to listen -->
|
||||||
|
<policy context="default">
|
||||||
|
<allow receive_interface="{interface_name}" />
|
||||||
|
<allow receive_sender="{interface_name}.message" />
|
||||||
|
</policy>
|
||||||
|
</busconfig>
|
||||||
|
'''
|
||||||
|
return xml.format(interface_name=interface_name, whoami=whoami)
|
||||||
|
|
||||||
|
##########################################
|
||||||
|
def on_mqtt_message(client, userdata, message):
|
||||||
|
''' What should I do if I get a message? '''
|
||||||
|
logging.debug('Message received topic:%s payload:%s', message.topic, message.payload.decode("utf-8"))
|
||||||
|
|
||||||
|
# Or you can customize this to fit your needs
|
||||||
|
signal = {message.topic: message.payload.decode("utf-8")}
|
||||||
|
userdata['emit'].message(json.dumps((signal)))
|
||||||
|
|
||||||
|
logging.debug('Sending signal: %s', json.dumps(signal))
|
||||||
|
|
||||||
|
def on_mqtt_disconnect(client, userdata, rc):
|
||||||
|
''' If you get a connection error, print it out '''
|
||||||
|
if rc:
|
||||||
|
logging.error('Disconnected with error ErrCode:%s', rc)
|
||||||
|
logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.error_string(rc))
|
||||||
|
logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.connack_string(rc))
|
||||||
|
raise SystemExit
|
||||||
|
|
||||||
|
logging.error('Disconnected from MQTT Server')
|
||||||
|
|
||||||
|
def on_mqtt_connect(client, userdata, flags, rc):
|
||||||
|
''' Automatically subscribe to all topics '''
|
||||||
|
logging.debug('Connected with status code : %s', rc)
|
||||||
|
|
||||||
|
for topic in userdata['topics']:
|
||||||
|
client.subscribe(topic)
|
||||||
|
logging.info('Subscribing to topic %s', topic)
|
||||||
|
signal = {'mqtt.setup': 'Subscribing to topic {} at {}'.format(topic, datetime.datetime.now())}
|
||||||
|
userdata['emit'].message(json.dumps(signal))
|
||||||
|
|
||||||
|
##########################################
|
||||||
|
##########################################
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
PARSER = setup_args()
|
||||||
|
ARGS = PARSER.parse_args()
|
||||||
|
|
||||||
|
if ARGS.dbus_config:
|
||||||
|
print(DbusPermissionsConf(DBUS_INTERFACE))
|
||||||
|
raise SystemExit
|
||||||
|
|
||||||
|
MYLOGGER = logging.getLogger()
|
||||||
|
|
||||||
|
if ARGS.debug:
|
||||||
|
MYLOGGER.setLevel(logging.DEBUG)
|
||||||
|
else:
|
||||||
|
MYLOGGER.setLevel(logging.WARNING)
|
||||||
|
|
||||||
|
handler = logging.StreamHandler(sys.stderr)
|
||||||
|
handler.setLevel(logging.DEBUG)
|
||||||
|
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
MYLOGGER.addHandler(handler)
|
||||||
|
|
||||||
|
PROGRAM_NAME = os.path.basename(sys.argv[0])
|
||||||
|
MYLOGGER.debug('Running:%s args:%s', PROGRAM_NAME, sys.argv[1:])
|
||||||
|
|
||||||
|
if ARGS.client_connection_name:
|
||||||
|
MYLOGGER.info('Attempting to connect as %s to %s:%s', ARGS.client_connection_name, ARGS.mqtt_server, ARGS.mqtt_port)
|
||||||
|
else:
|
||||||
|
MYLOGGER.info('Attempting to connect with random name to %s:%s', ARGS.mqtt_server, ARGS.mqtt_port)
|
||||||
|
|
||||||
|
CLIENT = paho.mqtt.client.Client(client_id=ARGS.client_connection_name, clean_session=True)
|
||||||
|
|
||||||
|
if ARGS.mqtt_ssl:
|
||||||
|
ARGS.mqtt_server_ca = os.path.expanduser(ARGS.mqtt_server_ca)
|
||||||
|
if not os.path.exists(ARGS.mqtt_server_ca):
|
||||||
|
raise ValueError('No such file %s', ARGS.mqtt_server_ca)
|
||||||
|
|
||||||
|
ARGS.mqtt_client_cert = os.path.expanduser(ARGS.mqtt_client_cert)
|
||||||
|
if not os.path.exists(ARGS.mqtt_client_cert):
|
||||||
|
raise ValueError('No such file %s', ARGS.mqtt_client_cert)
|
||||||
|
|
||||||
|
ARGS.mqtt_client_key = os.path.expanduser(ARGS.mqtt_client_key)
|
||||||
|
if not os.path.exists(ARGS.mqtt_client_key):
|
||||||
|
raise ValueError('No such file %s', ARGS.mqtt_client_key)
|
||||||
|
|
||||||
|
MYLOGGER.info('SSL enabled CA=%s PUBKEY=%s PRIVKEY=%s', ARGS.mqtt_server_ca, ARGS.mqtt_client_cert, ARGS.mqtt_client_key)
|
||||||
|
CLIENT.tls_set(ca_certs=ARGS.mqtt_server_ca, certfile=ARGS.mqtt_client_cert, keyfile=ARGS.mqtt_client_key)
|
||||||
|
|
||||||
|
try:
|
||||||
|
CLIENT.enable_logger(logger=MYLOGGER)
|
||||||
|
except AttributeError:
|
||||||
|
# Added in 1.2.x of mqtt library
|
||||||
|
pass
|
||||||
|
|
||||||
|
CLIENT.on_connect = on_mqtt_connect
|
||||||
|
CLIENT.on_message = on_mqtt_message
|
||||||
|
CLIENT.on_disconnect = on_mqtt_disconnect
|
||||||
|
|
||||||
|
CLIENT.connect_async(host=ARGS.mqtt_server, port=ARGS.mqtt_port, keepalive=ARGS.mqtt_keepalive, bind_address=ARGS.mqtt_source_ip)
|
||||||
|
|
||||||
|
DBUS_MESSAGE = BusMessage()
|
||||||
|
|
||||||
|
if not ARGS.mqtt_topic:
|
||||||
|
ARGS.mqtt_topic = ['git.centos.org/#',]
|
||||||
|
|
||||||
|
CLIENT.user_data_set({'topics': ARGS.mqtt_topic, 'emit': DBUS_MESSAGE})
|
||||||
|
|
||||||
|
# loop_start will run in background async
|
||||||
|
CLIENT.loop_start()
|
||||||
|
|
||||||
|
if ARGS.dbus_use_system_bus:
|
||||||
|
BUS = SystemBus()
|
||||||
|
else:
|
||||||
|
BUS = SessionBus()
|
||||||
|
|
||||||
|
if ARGS.dbus_use_system_bus:
|
||||||
|
MYLOGGER.debug('Publishing to system bus %s', DBUS_INTERFACE)
|
||||||
|
else:
|
||||||
|
MYLOGGER.debug('Publishing to session bus %s', DBUS_INTERFACE)
|
||||||
|
|
||||||
|
BUS.publish(DBUS_INTERFACE, DBUS_MESSAGE)
|
||||||
|
|
||||||
|
# loop forever, until CTRL+C, or something goes wrong
|
||||||
|
try:
|
||||||
|
MainLoop().run()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
CLIENT.disconnect()
|
||||||
|
logging.debug('Got CTRL+C, exiting cleanly')
|
||||||
|
raise SystemExit
|
||||||
|
except:
|
||||||
|
CLIENT.disconnect()
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
CLIENT.disconnect()
|
25
mqtt/send-mqtt-to-dbus.service
Normal file
25
mqtt/send-mqtt-to-dbus.service
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
[Unit]
|
||||||
|
Description=Bridge mqtt.git.centos.org to dbus
|
||||||
|
After=network.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=dbus
|
||||||
|
BusName=org.centos.git.mqtt
|
||||||
|
NoNewPrivileges=yes
|
||||||
|
PrivateTmp=yes
|
||||||
|
PrivateDevices=yes
|
||||||
|
DevicePolicy=closed
|
||||||
|
ProtectSystem=full
|
||||||
|
ProtectHome=read-only
|
||||||
|
Restart=on-failure
|
||||||
|
RestartSec=5s
|
||||||
|
|
||||||
|
# NOTE: You should run this as the user (or edit the file)
|
||||||
|
# /usr/local/bin/send-mqtt-to-dbus.py --dbus-config > /etc/dbus-1/system.d/org.centos.git.mqtt.conf
|
||||||
|
|
||||||
|
#User=<TRUSTEDUSERNAME>
|
||||||
|
Group=nobody
|
||||||
|
ExecStart=/usr/local/bin/send-mqtt-to-dbus.py --dbus-use-system-bus --mqtt-server-ca=~/.centos/ca.pem --mqtt-client-cert=~/.centos/client.pem --mqtt-client-key=~/.centos/client.key
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
246
mqtt/send-mqtt-to-irc.py
Executable file
246
mqtt/send-mqtt-to-irc.py
Executable file
|
@ -0,0 +1,246 @@
|
||||||
|
#!/usr/bin/env python3.6
|
||||||
|
#pylint: disable=line-too-long
|
||||||
|
#
|
||||||
|
# Copyright (2019). Fermi Research Alliance, LLC.
|
||||||
|
# Initial Author: Pat Riehecky <riehecky@fnal.gov>
|
||||||
|
#
|
||||||
|
'''
|
||||||
|
Connect to the MQTT server and convert messages into irc messages.
|
||||||
|
'''
|
||||||
|
|
||||||
|
## Uncomment these for python2 support
|
||||||
|
#from __future__ import unicode_literals
|
||||||
|
#from __future__ import absolute_import
|
||||||
|
#from __future__ import print_function
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
import logging
|
||||||
|
import os.path
|
||||||
|
import sys
|
||||||
|
import random
|
||||||
|
import textwrap
|
||||||
|
|
||||||
|
try:
|
||||||
|
import paho.mqtt.client
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
print("Please install paho.mqtt.client - rpm: python-paho-mqtt", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
try:
|
||||||
|
import irc.client
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
print("Please install irc - pip install --user irc", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
try:
|
||||||
|
from gi.repository.GLib import MainLoop
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
print("Please install pygobject - rpm: python-gobject", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
try:
|
||||||
|
from argparse import ArgumentParser
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
print("Please install argparse - rpm: python-argparse", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
##########################################
|
||||||
|
def setup_args():
|
||||||
|
'''
|
||||||
|
Setup the argparse object.
|
||||||
|
|
||||||
|
Make sure all fields have defaults so we could use this as an object
|
||||||
|
'''
|
||||||
|
ca_cert = str(os.path.expanduser('~/')) + '.centos-server-ca.cert'
|
||||||
|
user_pubkey = str(os.path.expanduser('~/')) + '.centos.cert'
|
||||||
|
user_privkey = str(os.path.expanduser('~/')) + '.centos.cert'
|
||||||
|
|
||||||
|
# use a psudo random number for keepalive to help spread out the load
|
||||||
|
# some time between 1m 30s and 2m 10s
|
||||||
|
keep_alive = random.randint(90, 130)
|
||||||
|
|
||||||
|
parser = ArgumentParser(description=textwrap.dedent(__doc__))
|
||||||
|
|
||||||
|
parser.add_argument('--debug',action='store_true',
|
||||||
|
help='Print out all debugging actions',
|
||||||
|
default=False)
|
||||||
|
parser.add_argument('--client-connection-name', metavar='<UNIQUESTRING>',
|
||||||
|
help='Use this specific name when connecting. Default is a psudo-random string.',
|
||||||
|
default='', type=str)
|
||||||
|
parser.add_argument('--mqtt-server', metavar='<HOSTNAME>',
|
||||||
|
help='Connect to this MQTT server',
|
||||||
|
default='mqtt.git.centos.org', type=str)
|
||||||
|
parser.add_argument('--mqtt-port', metavar='<PORTNUMBER>',
|
||||||
|
help='Connect to MQTT server on this port',
|
||||||
|
default='8883', type=int)
|
||||||
|
parser.add_argument('--mqtt-source-ip', metavar='<SOURCE_IP>',
|
||||||
|
help='Connect to MQTT server from this address. Default is any.',
|
||||||
|
default='', type=str)
|
||||||
|
parser.add_argument('--mqtt-topic', metavar='<TOPIC_ID>',
|
||||||
|
action='append', nargs='+', type=str,
|
||||||
|
help='Which MQTT topic should we watch. You may set multiple times.')
|
||||||
|
parser.add_argument('--mqtt-keepalive', metavar='<SECONDS>',
|
||||||
|
help='Seconds between MQTT keepalive packets.',
|
||||||
|
default=keep_alive, type=int)
|
||||||
|
parser.add_argument('--mqtt-no-ssl', action='store_false', dest='mqtt_ssl',
|
||||||
|
help='Should MQTT use SSL? Default is to use SSL (and the SSL port).')
|
||||||
|
parser.add_argument('--mqtt-server-ca', metavar='<ABSOLUTE_PATH>',
|
||||||
|
help='Use this CA cert to validate the MQTT Server.',
|
||||||
|
default=ca_cert, type=str)
|
||||||
|
parser.add_argument('--mqtt-client-cert', metavar='<ABSOLUTE_PATH>',
|
||||||
|
help='Use this public key to identify yourself.',
|
||||||
|
default=user_pubkey, type=str)
|
||||||
|
parser.add_argument('--mqtt-client-key', metavar='<ABSOLUTE_PATH>',
|
||||||
|
help='The private key that matches with --mqtt-client-cert .',
|
||||||
|
default=user_privkey, type=str)
|
||||||
|
parser.add_argument('--irc-server', metavar='<HOSTNAME>',
|
||||||
|
help='The hostname of your irc server.',
|
||||||
|
default='chat.freenode.net', type=str)
|
||||||
|
parser.add_argument('--irc-port', default=6667, type=int,
|
||||||
|
help='IRC port number.',
|
||||||
|
parser.add_argument('--irc-bot-name', metavar='<BOT_NIC>',
|
||||||
|
help='The name of your IRC bot.',
|
||||||
|
default='testbot__', type=str)
|
||||||
|
parser.add_argument('--irc-bot-password', metavar='<PASSWORD>',
|
||||||
|
help='The password for your IRC bot.',
|
||||||
|
type=str)
|
||||||
|
parser.add_argument('--irc-bot-admin', metavar='<YOUR_NIC>',
|
||||||
|
help="The name of your IRC bot's owner.",
|
||||||
|
default='', type=str)
|
||||||
|
parser.add_argument('--irc-channel', metavar='<WHERE_TO_POST>',
|
||||||
|
help="The name of an IRC channel where your bot will psot.",
|
||||||
|
default='#bot-testing', type=str)
|
||||||
|
|
||||||
|
return parser
|
||||||
|
|
||||||
|
##########################################
|
||||||
|
class IRCBot(irc.client.SimpleIRCClient):
|
||||||
|
''' An IRC bot as an object '''
|
||||||
|
def __init__(self, channel, admin_nic):
|
||||||
|
irc.bot.SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname)
|
||||||
|
self.target_channel = channel
|
||||||
|
self.admin_nic = admin_nic
|
||||||
|
|
||||||
|
if self.admin_nic == '':
|
||||||
|
raise ValueError("You must set an owner for your bot")
|
||||||
|
|
||||||
|
def on_welcome(self, connection, event):
|
||||||
|
if irc.client.is_channel(self.target_channel):
|
||||||
|
connection.join(self.target_channel)
|
||||||
|
self.connection.notice(self.target_channel, "Hello, I am a bot owned by " + self.admin_nic)
|
||||||
|
|
||||||
|
def on_disconnect(self, connection, event):
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
def send_message(self, message):
|
||||||
|
self.connection.notice(self.target, message)
|
||||||
|
|
||||||
|
##########################################
|
||||||
|
def on_mqtt_message(client, userdata, message):
|
||||||
|
''' What should I do if I get a message? '''
|
||||||
|
logging.debug('Message received topic:%s payload:%s', message.topic, message.payload.decode("utf-8"))
|
||||||
|
|
||||||
|
# Or you can customize this to fit your needs
|
||||||
|
|
||||||
|
logging.debug('Sending signal: %s', signal)
|
||||||
|
|
||||||
|
def on_mqtt_disconnect(client, userdata, rc):
|
||||||
|
''' If you get a connection error, print it out '''
|
||||||
|
if rc:
|
||||||
|
logging.error('Disconnected with error ErrCode:%s', rc)
|
||||||
|
logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.error_string(rc))
|
||||||
|
logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.connack_string(rc))
|
||||||
|
raise SystemExit
|
||||||
|
|
||||||
|
logging.error('Disconnected from MQTT Server')
|
||||||
|
|
||||||
|
def on_mqtt_connect(client, userdata, flags, rc):
|
||||||
|
''' Automatically subscribe to all topics '''
|
||||||
|
logging.debug('Connected with status code : %s', rc)
|
||||||
|
|
||||||
|
for topic in userdata['topics']:
|
||||||
|
client.subscribe(topic)
|
||||||
|
logging.info('Subscribing to topic %s', topic)
|
||||||
|
signal = {'mqtt.setup': 'Subscribing to topic {} at {}'.format(topic, datetime.datetime.now())}
|
||||||
|
userdata['emit'].message(str(signal))
|
||||||
|
|
||||||
|
##########################################
|
||||||
|
##########################################
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
PARSER = setup_args()
|
||||||
|
ARGS = PARSER.parse_args()
|
||||||
|
|
||||||
|
MYLOGGER = logging.getLogger()
|
||||||
|
|
||||||
|
if ARGS.debug:
|
||||||
|
MYLOGGER.setLevel(logging.DEBUG)
|
||||||
|
else:
|
||||||
|
MYLOGGER.setLevel(logging.WARNING)
|
||||||
|
|
||||||
|
handler = logging.StreamHandler(sys.stderr)
|
||||||
|
handler.setLevel(logging.DEBUG)
|
||||||
|
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
MYLOGGER.addHandler(handler)
|
||||||
|
|
||||||
|
PROGRAM_NAME = os.path.basename(sys.argv[0])
|
||||||
|
MYLOGGER.debug('Running:%s args:%s', PROGRAM_NAME, sys.argv[1:])
|
||||||
|
|
||||||
|
MYBOT = IRCBot(ARGS.irc_channel, ARGS.irc_bot_name, ARGS.irc_server, ARGS.irc_port, ARGS.irc_channel, ARGS.irc_bot_admin)
|
||||||
|
|
||||||
|
if ARGS.client_connection_name:
|
||||||
|
MYLOGGER.info('Attempting to connect as %s to %s:%s', ARGS.client_connection_name, ARGS.mqtt_server, ARGS.mqtt_port)
|
||||||
|
else:
|
||||||
|
MYLOGGER.info('Attempting to connect with random name to %s:%s', ARGS.mqtt_server, ARGS.mqtt_port)
|
||||||
|
|
||||||
|
CLIENT = paho.mqtt.client.Client(client_id=ARGS.client_connection_name, clean_session=True)
|
||||||
|
|
||||||
|
if ARGS.mqtt_ssl:
|
||||||
|
ARGS.mqtt_server_ca = os.path.expanduser(ARGS.mqtt_server_ca)
|
||||||
|
if not os.path.exists(ARGS.mqtt_server_ca):
|
||||||
|
raise ValueError('No such file %s', ARGS.mqtt_server_ca)
|
||||||
|
|
||||||
|
ARGS.mqtt_client_cert = os.path.expanduser(ARGS.mqtt_client_cert)
|
||||||
|
if not os.path.exists(ARGS.mqtt_client_cert):
|
||||||
|
raise ValueError('No such file %s', ARGS.mqtt_client_cert)
|
||||||
|
|
||||||
|
ARGS.mqtt_client_key = os.path.expanduser(ARGS.mqtt_client_key)
|
||||||
|
if not os.path.exists(ARGS.mqtt_client_key):
|
||||||
|
raise ValueError('No such file %s', ARGS.mqtt_client_key)
|
||||||
|
|
||||||
|
MYLOGGER.info('SSL enabled CA=%s PUBKEY=%s PRIVKEY=%s', ARGS.mqtt_server_ca, ARGS.mqtt_client_cert, ARGS.mqtt_client_key)
|
||||||
|
CLIENT.tls_set(ca_certs=ARGS.mqtt_server_ca, certfile=ARGS.mqtt_client_cert, keyfile=ARGS.mqtt_client_key)
|
||||||
|
|
||||||
|
try:
|
||||||
|
CLIENT.enable_logger(logger=MYLOGGER)
|
||||||
|
except AttributeError:
|
||||||
|
# Added in 1.2.x of mqtt library
|
||||||
|
pass
|
||||||
|
|
||||||
|
CLIENT.on_connect = on_mqtt_connect
|
||||||
|
CLIENT.on_message = on_mqtt_message
|
||||||
|
CLIENT.on_disconnect = on_mqtt_disconnect
|
||||||
|
|
||||||
|
CLIENT.connect_async(host=ARGS.mqtt_server, port=ARGS.mqtt_port, keepalive=ARGS.mqtt_keepalive, bind_address=ARGS.mqtt_source_ip)
|
||||||
|
|
||||||
|
if not ARGS.mqtt_topic:
|
||||||
|
ARGS.mqtt_topic = ['git.centos.org/#',]
|
||||||
|
|
||||||
|
CLIENT.user_data_set({'topics': ARGS.mqtt_topic, 'emit': DBUS_MESSAGE})
|
||||||
|
|
||||||
|
# loop_start will run in background async
|
||||||
|
CLIENT.loop_start()
|
||||||
|
|
||||||
|
# loop forever, until CTRL+C, or something goes wrong
|
||||||
|
try:
|
||||||
|
MainLoop().run()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
CLIENT.disconnect()
|
||||||
|
logging.debug('Got CTRL+C, exiting cleanly')
|
||||||
|
raise SystemExit
|
||||||
|
except:
|
||||||
|
CLIENT.disconnect()
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
CLIENT.disconnect()
|
Loading…
Add table
Reference in a new issue