diff options
-rw-r--r-- | .gitignore | 126 | ||||
-rw-r--r-- | README.MD | 84 | ||||
-rw-r--r-- | classes/functions.py | 108 | ||||
-rw-r--r-- | classes/strings.py | 51 | ||||
-rw-r--r-- | magicbot.service.dummy | 26 | ||||
-rw-r--r-- | main.py | 416 |
6 files changed, 460 insertions, 351 deletions
@@ -1,63 +1,63 @@ -# Byte-compiled / optimized / DLL files
-__pycache__/
-*.py[cod]
-*$py.class
-
-# C extensions
-*.so
-
-# Distribution / packaging
-.Python
-env/
-venv/
-build/
-develop-eggs/
-dist/
-downloads/
-eggs/
-.eggs/
-lib/
-lib64/
-parts/
-sdist/
-var/
-*.egg-info/
-.installed.cfg
-*.egg
-
-# PyInstaller
-# Usually these files are written by a python script from a template
-# before PyInstaller builds the exe, so as to inject date/other infos into it.
-*.manifest
-*.spec
-
-# Installer logs
-pip-log.txt
-pip-delete-this-directory.txt
-
-# Unit test / coverage reports
-htmlcov/
-.tox/
-.coverage
-.coverage.*
-.cache
-nosetests.xml
-coverage.xml
-*,cover
-
-# Translations
-*.mo
-*.pot
-
-# Django stuff:
-*.log
-
-# Sphinx documentation
-docs/_build/
-
-# PyBuilder
-target/
-
-# .idea
-.idea
-bot\.cfg
+# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +venv/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover + +# Translations +*.mo +*.pot + +# Django stuff: +*.log + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# .idea +.idea +bot\.cfg @@ -1,43 +1,43 @@ -# Python XMPP Chatbot
-
-## beforehand
-Do not opperate this bot on foreign servers.
-
-### functions
-- query xmpp server software and version [XEP-0092](https://xmpp.org/extensions/xep-0092.html)
-- query xmpp server uptime [XEP-0012](https://xmpp.org/extensions/xep-0012.html)
-- query xmpp server contact addresses [XEP-0157](https://xmpp.org/extensions/xep-0157.html)
-- display help output
-- respond to username being mentioned
-
-### todo
-- [ ] Github Webhook
-
-### install
-#### requirements
-- slixmpp
-- configparser
-- datetime
-- random
-- validators
-
-#### configuration
-`bot.cfg` replace dummy file with correct credentials/ parameters
-````cfg
-[Account]
-jid=nick@domain.tld/querybot-0.1
-password=super_secret_password
-[MUC]
-rooms=room_to_connect_to@conference.domain.tld
-nick=mucnickname
-[ADMIN]
-admins=admins ( ! muc nick and not the jid nickname)
-````
-
-##### systemd
-Copy the systemd dummy file into systemd service folder.
-`systemdctl daemon-reload` and `systemctl start magicbot.service` to start the bot.
-If it is neccecary to start the bot automatically when the system boots do `systemctl enable magicbot.service`.
-
-#### starting the bot without systemd
+# Python XMPP Chatbot + +## beforehand +Do not opperate this bot on foreign servers. + +### functions +- query xmpp server software and version [XEP-0092](https://xmpp.org/extensions/xep-0092.html) +- query xmpp server uptime [XEP-0012](https://xmpp.org/extensions/xep-0012.html) +- query xmpp server contact addresses [XEP-0157](https://xmpp.org/extensions/xep-0157.html) +- display help output +- respond to username being mentioned + +### todo +- [ ] Github Webhook + +### install +#### requirements +- slixmpp +- configparser +- datetime +- random +- validators + +#### configuration +`bot.cfg` replace dummy file with correct credentials/ parameters +````cfg +[Account] +jid=nick@domain.tld/querybot-0.1 +password=super_secret_password +[MUC] +rooms=room_to_connect_to@conference.domain.tld,another_room@conference.domain.tld +nick=mucnickname +[ADMIN] +admins=admins ( ! muc nick and not the jid nickname) +```` + +##### systemd +Copy the systemd dummy file into systemd service folder. +`systemdctl daemon-reload` and `systemctl start magicbot.service` to start the bot. +If it is neccecary to start the bot automatically when the system boots do `systemctl enable magicbot.service`. + +#### starting the bot without systemd Got to the bots directory and run `./main.py &`.
\ No newline at end of file diff --git a/classes/functions.py b/classes/functions.py new file mode 100644 index 0000000..08d6146 --- /dev/null +++ b/classes/functions.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# XEP-0072: Server Version +class Version: + def __init__(self, version, msg, target): + self.version = version['software_version']['version'] + self.os = version['software_version']['os'] + self.name = version['software_version']['name'] + self.nick = msg['mucnick'] + self.message_type = msg['type'] + self.target = target + + def format_version(self): + if self.message_type == "groupchat": + text = "%s: %s is running %s version %s on %s" % (self.nick, self.target, self.name, self.version, self.os) + else: + text = "%s is running %s version %s on %s" % (self.target, self.name, self.version, self.os) + + return text + + +# XEP-0012: Last Activity +class LastActivity: + """ query the server uptime of the specified domain, defined by XEP-0012 """ + def __init__(self, last_activity, msg, target): + self.last_activity = last_activity + self.nick = msg['mucnick'] + self.message_type = msg['type'] + self.target = target + + def format_values(self, granularity=4): + seconds = self.last_activity['last_activity']['seconds'] + uptime = [] + intervals = ( + ('years', 31536000), # 60 * 60 * 24 * 365 + ('weeks', 604800), # 60 * 60 * 24 * 7 + ('days', 86400), # 60 * 60 * 24 + ('hours', 3600), # 60 * 60 + ('minutes', 60), + ('seconds', 1) + ) + for name, count in intervals: + value = seconds // count + if value: + seconds -= value * count + if value == 1: + name = name.rstrip('s') + uptime.append("{} {}".format(value, name)) + result = ' '.join(uptime[:granularity]) + + if self.message_type == "groupchat": + text = "%s: %s is running since %s" % (self.nick, self.target, result) + else: + text = "%s is running since %s" % (self.target, result) + + return text + + +# XEP-0157: Contact Addresses for XMPP Services +class ContactInfo: + def __init__(self, contact, msg, target): + self.contact = contact + self.message = msg + self.target = target + + def format_contact(self): + server_info = [] + sep = ' , ' + possible_vars = ['abuse-addresses', + 'admin-addresses', + 'feedback-addresses', + 'sales-addresses', + 'security-addresses', + 'support-addresses'] + + for field in self.contact['disco_info']['form']: + var = field['var'] + if var in possible_vars: + field_value = field.get_value(convert=False) + value = sep.join(field_value) if isinstance(field_value, list) else field_value + server_info.append(' - %s: %s' % (var, value)) + + if server_info: + text = "contact addresses for %s are" % self.target + for count in range(server_info.__len__()): + text += "\n" + server_info[count] + else: + text = "%s has no contact addresses configured." % self.target + + return text + + +# class handeling XMPPError exeptions +class HandleError: + def __init__(self, error, msg, key, target): + self.error = error + self.message = msg + self.key = key + self.target = target + + def build_report(self): + condition = self.error.condition + keyword = self.key[1:] + + text = "There was an error requesting " + self.target + '\'s ' + keyword + " : " + condition + + return text diff --git a/classes/strings.py b/classes/strings.py new file mode 100644 index 0000000..ade0520 --- /dev/null +++ b/classes/strings.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +from random import randint + + +class StaticAnswers: + """ + collection of callable static/ semi-static strings + """ + def __init__(self, nick=""): + self.nickname = nick + self.helpfile = { + 'help': '!help -- display this text', + 'version': '!version domain.tld -- receive XMPP server version', + 'uptime': '!uptime domain.tld -- receive XMPP server uptime', + 'contact': '!contact domain.tld -- receive XMPP server contact address info'} + self.possible_answers = { + '1': 'I heard that, %s.', + '2': 'I am sorry for that %s.', + '3': '%s did you try turning it off and on again?'} + self.error_messages = { + '1': 'not reachable', + '2': 'not a valid target' + } + self.keywords = { + "keywords": ["!help", "!uptime", "!version", "!contact"], + "no_arg_keywords": ["!help"] + } + + def keys(self, arg="", keyword='keywords'): + if arg == 'list': + try: + return self.keywords[keyword] + except KeyError: + return self.keywords['keywords'] + else: + return self.keywords + + def gen_help(self): + helpdoc = "\n".join(['%s' % value for (_, value) in self.helpfile.items()]) + return helpdoc + + def gen_answer(self): + possible_answers = self.possible_answers + return possible_answers[str(randint(1, possible_answers.__len__()))] % self.nickname + + def error(self,code): + try: + text = self.error_messages[str(code)] + except KeyError: + return 'unknown error' + return text diff --git a/magicbot.service.dummy b/magicbot.service.dummy index 44ae41f..5d3dd53 100644 --- a/magicbot.service.dummy +++ b/magicbot.service.dummy @@ -1,13 +1,13 @@ - [Unit]
- 2 Description=SlixXMPP service bot
- 3 After=network.target ejabberd.service
- 4
- 5 [Service]
- 6 Type=simple
- 7 ExecStart=/usr/bin/python3 /path/to/main.py
- 8 Restart=on-failure
- 9 RestartSec=60s
-10 User=nico
-11
-12 [Install]
-13 WantedBy=multi-user.target
+ [Unit] + 2 Description=SlixXMPP service bot + 3 After=network.target ejabberd.service + 4 + 5 [Service] + 6 Type=simple + 7 ExecStart=/usr/bin/python3 /path/to/main.py + 8 Restart=on-failure + 9 RestartSec=60s +10 User=nico +11 +12 [Install] +13 WantedBy=multi-user.target @@ -1,233 +1,183 @@ -#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-"""
- Slixmpp: The Slick XMPP Library
- Copyright (C) 2010 Nathanael C. Fritz
- This file is part of Slixmpp.
-
- See the file LICENSE for copying permission.
-"""
-import asyncio
-import configparser
-import logging
-import slixmpp
-import ssl
-import validators
-from argparse import ArgumentParser
-from datetime import datetime, timedelta
-from random import randint
-from slixmpp.exceptions import XMPPError
-
-
-class QueryBot(slixmpp.ClientXMPP):
- """ A simple Slixmpp bot with some features """
- def __init__(self, jid, password, room, nick):
- slixmpp.ClientXMPP.__init__(self, jid, password)
-
- self.room = room
- self.nick = nick
-
- # session start event, starting point for the presence and roster requests
- self.add_event_handler('session_start', self.start)
-
- # register handler to recieve both groupchat and normal message events
- self.add_event_handler('message', self.message)
-
- def start(self, event):
- """
- Arguments:
- event -- An empty dictionary. The session_start event does not provide any additional data.
- """
- self.send_presence()
- self.get_roster()
-
- # If a room password is needed, use: password=the_room_password
- for rooms in self.room.split(sep=","):
- self.plugin['xep_0045'].join_muc(rooms, self.nick, wait=True)
-
-
- @staticmethod
- def precheck(line):
- """
- pre check function
- - check that keywords are used properly
- - check that following a keyword a proper jid is following
- :param line: line from message body
- :return: true if correct
- """
- keywords = ["!help", "!uptime", "!version", "!contact"]
- proper_domain, proper_key = False, False
-
- try:
- # check for valid keyword in position 0
- if line[0] in keywords:
- proper_key = True
- else:
- return
-
- # help command is used
- if line[0] == "!help":
- proper_domain = True
- # check if domain is valid
- elif validators.domain(line[1]):
- proper_domain = True
- else:
- return
- except IndexError:
- pass
-
- return proper_key and proper_domain
-
- @asyncio.coroutine
- def message(self, msg):
- """
- Arguments:
- msg -- The received message stanza. See the documentation for stanza objects and the Message stanza to see
- how it may be used.
- """
-
- # catch self messages to prevent self flooding
- if msg['mucnick'] == self.nick:
- return
-
- if self.nick in msg['body']:
- # answer with predefined text when mucnick is used
- self.send_message(mto=msg['from'].bare, mbody=notice_answer(msg['mucnick']), mtype=msg['type'])
-
- for line in msg['body'].splitlines():
- """ split multiline messages into lines to check every line for keywords """
- line = line.split(sep= " ")
-
- if self.precheck(line):
- """ true if keyword and domain are valid """
- # Display help
- if line[0] == '!help':
- """ display help when keyword !help is recieved """
- self.send_message(mto=msg['from'].bare, mbody=help_doc(), mtype=msg['type'])
-
- # XEP-0072: Server Version
- if line[0] == '!version':
- """ query the server software version of the specified domain, defined by XEP-0092 """
- try:
- version = yield from self['xep_0092'].get_version(line[1])
-
- if msg['type'] == "groupchat":
- text = "%s: %s is running %s version %s on %s" % (msg['mucnick'], line[1], version[
- 'software_version']['name'], version['software_version']['version'], version[
- 'software_version']['os'])
- else:
- text = "%s is running %s version %s on %s" % (line[1], version['software_version'][
- 'name'], version['software_version']['version'], version['software_version']['os'])
-
- self.send_message(mto=msg['from'].bare, mbody=text, mtype=msg['type'])
- except NameError:
- pass
- except XMPPError:
- pass
-
- # XEP-0012: Last Activity
- if line[0] == '!uptime':
- """ query the server uptime of the specified domain, defined by XEP-0012 """
- try:
- # try if domain[0] is set if not just pass
- last_activity = yield from self['xep_0012'].get_last_activity(line[1])
- uptime = datetime(1, 1, 1) + timedelta(seconds=last_activity['last_activity']['seconds'])
-
- if msg['type'] == "groupchat":
- text = "%s: %s is running since %d days %d hours %d minutes" % (msg['mucnick'], line[1],
- uptime.day - 1, uptime.hour,
- uptime.minute)
- else:
- text = "%s is running since %d days %d hours %d minutes" % (line[1], uptime.day - 1,
- uptime.hour, uptime.minute)
- self.send_message(mto=msg['from'].bare, mbody=text, mtype=msg['type'])
- except NameError:
- pass
- except XMPPError:
- pass
-
- # XEP-0157: Contact Addresses for XMPP Services
- if line[0] == "!contact":
- """ query the XEP-0030: Service Discovery and extract contact information """
- try:
- result = yield from self['xep_0030'].get_info(jid=line[1], cached=False)
- server_info = []
- for field in result['disco_info']['form']:
- var = field['var']
- if field['type'] == 'hidden' and var == 'FORM_TYPE':
- title = field['value'][0]
- continue
- sep = ', '
- field_value = field.get_value(convert=False)
- value = sep.join(field_value) if isinstance(field_value, list) else field_value
- server_info.append('%s: %s' % (var, value))
-
- text = "contact addresses for %s are" % (line[1])
- for count in range(len(server_info)):
- text += "\n" + server_info[count]
-
- self.send_message(mto=msg['from'].bare, mbody=text, mtype=msg['type'])
- except NameError:
- pass
- except XMPPError:
- pass
-
- # TODO
- # append all results to single message send just once
- else:
- pass
-
-
-def help_doc():
- helpfile = {'help': '!help -- display this text',
- 'version': '!version domain.tld -- receive XMPP server version',
- 'uptime':'!uptime domain.tld -- receive XMPP server uptime',
- 'contact': '!contact domain.tld -- receive XMPP server contact address info'}
- return "".join(['%s\n' % (value) for (_, value) in helpfile.items()])
-
-
-def notice_answer(nickname):
- possible_answers = {'1': 'I heard that, %s.',
- '2': 'I am sorry for that %s.',
- '3': '%s did you try turning it off and on again?'}
- return possible_answers[str(randint(1, len(possible_answers)))] % nickname
-
-if __name__ == '__main__':
- # command line arguments.
- parser = ArgumentParser()
- parser.add_argument('-q', '--quiet', help='set logging to ERROR', action='store_const', dest='loglevel',
- const=logging.ERROR, default=logging.INFO)
- parser.add_argument('-d', '--debug', help='set logging to DEBUG', action='store_const', dest='loglevel',
- const=logging.DEBUG, default=logging.INFO)
- parser.add_argument('-D', '--dev', help='set logging to console', action='store_const', dest='logfile',
- const="", default='bot.log')
- args = parser.parse_args()
-
- # logging
- logging.basicConfig(filename=args.logfile, level=args.loglevel, format='%(levelname)-8s %(message)s')
- logger = logging.getLogger(__name__)
-
- # configfile
- config = configparser.RawConfigParser()
- config.read('./bot.cfg')
- args.jid = config.get('Account', 'jid')
- args.password = config.get('Account', 'password')
- args.room = config.get('MUC', 'rooms')
- args.nick = config.get('MUC', 'nick')
- args.admins = config.get('ADMIN', 'admins')
-
- # init the bot and register used slixmpp plugins
- xmpp = QueryBot(args.jid, args.password, args.room, args.nick)
- xmpp.ssl_version = ssl.PROTOCOL_TLSv1_2
- xmpp.register_plugin('xep_0012') # Last Activity
- xmpp.register_plugin('xep_0030') # Service Discovery
- xmpp.register_plugin('xep_0045') # Multi-User Chat
- xmpp.register_plugin('xep_0060') # PubSub
- xmpp.register_plugin('xep_0085') # Chat State Notifications
- xmpp.register_plugin('xep_0092') # Software Version
- xmpp.register_plugin('xep_0128') # Service Discovery Extensions
- xmpp.register_plugin('xep_0199') # XMPP Ping
-
- # connect and start receiving stanzas
- xmpp.connect()
- xmpp.process()
+#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" + Slixmpp: The Slick XMPP Library + Copyright (C) 2010 Nathanael C. Fritz + This file is part of Slixmpp. + + See the file LICENSE for copying permission. +""" +import asyncio +import slixmpp +import ssl +import validators +import configparser +import logging + +from argparse import ArgumentParser +from slixmpp.exceptions import XMPPError + +from classes.strings import StaticAnswers +from classes.functions import Version, LastActivity, ContactInfo, HandleError + + +class QueryBot(slixmpp.ClientXMPP): + def __init__(self, jid, password, room, nick): + slixmpp.ClientXMPP.__init__(self, jid, password) + self.ssl_version = ssl.PROTOCOL_TLSv1_2 + self.room = room + self.nick = nick + + # session start event, starting point for the presence and roster requests + self.add_event_handler('session_start', self.start) + + # register handler to recieve both groupchat and normal message events + self.add_event_handler('message', self.message) + + def start(self, event): + """ + :param str event -- An empty dictionary. The session_start event does not provide any additional data. + """ + self.send_presence() + self.get_roster() + + # If a room password is needed, use: password=the_room_password + for rooms in self.room.split(sep=","): + self.plugin['xep_0045'].join_muc(rooms, self.nick, wait=True) + + def validate_domain(self, wordlist, index): + """ + validation method to reduce connection attemps to unvalid domains + :param wordlist: words seperated by " " from the message + :param index: keyword index inside the message + :return: true if valid + """ + # keyword inside the message + argument = wordlist[index] + + # if the argument is not inside the no_arg_keywords target is index + 1 + if argument not in StaticAnswers().keys(arg='list', keyword="no_arg_keywords"): + try: + target = wordlist[index + 1] + if validators.domain(target): + return True + except IndexError: + # except an IndexError if a keywords is the last word in the message + return False + elif argument in StaticAnswers().keys(arg='list', keyword="no_arg_keywords"): + return True + else: + return + + def deduplicate(self, reply): + """ + deduplication method for the result list + :param list reply: list containing strings + :return: list containing unique strings + """ + reply_dedup = list() + for item in reply: + if item not in reply_dedup: + reply_dedup.append(item) + + return reply_dedup + + @asyncio.coroutine + def message(self, msg): + """ + :param msg: received message stanza + """ + # init empty reply list + reply = list() + + # catch self messages to prevent self flooding + if msg['mucnick'] == self.nick: + return + elif self.nick in msg['body']: + # add pre predefined text to reply list + reply.append(StaticAnswers(msg['mucnick']).gen_answer()) + + # building the queue + # double splitting to exclude whitespaces + words = " ".join(msg['body'].split()).split(sep=" ") + queue = list() + + # check all words in side the message for possible hits + for x in enumerate(words): + # check word for match in keywords list + for y in StaticAnswers().keys(arg='list'): + # if so queue the keyword and the postion in the string + if x[1] == y: + # only add job to queue if domain is valid + if self.validate_domain(words, x[0]): + queue.append({str(y): x[0]}) + + # queue + for job in queue: + for key in job: + keyword = key + index = job[key] + + if keyword == '!help': + reply.append(StaticAnswers().gen_help()) + + try: + target = words[index + 1] + if keyword == '!uptime': + last_activity = yield from self['xep_0012'].get_last_activity(target) + reply.append(LastActivity(last_activity, msg, target).format_values()) + + elif keyword == "!version": + version = yield from self['xep_0092'].get_version(target) + reply.append(Version(version, msg, target).format_version()) + + elif keyword == "!contact": + contact = yield from self['xep_0030'].get_info(jid=target, cached=False) + reply.append(ContactInfo(contact, msg, target).format_contact()) + + except XMPPError as error: + reply.append(HandleError(error, msg, key, target).build_report()) + + # remove None type from list and send all elements + if list(filter(None.__ne__, reply)) and reply: + reply = self.deduplicate(reply) + self.send_message(mto=msg['from'].bare, mbody="\n".join(reply), mtype=msg['type']) + + +if __name__ == '__main__': + # command line arguments. + parser = ArgumentParser() + parser.add_argument('-q', '--quiet', help='set logging to ERROR', action='store_const', dest='loglevel', + const=logging.ERROR, default=logging.INFO) + parser.add_argument('-d', '--debug', help='set logging to DEBUG', action='store_const', dest='loglevel', + const=logging.DEBUG, default=logging.INFO) + parser.add_argument('-D', '--dev', help='set logging to console', action='store_const', dest='logfile', const="", + default='bot.log') + args = parser.parse_args() + + # logging + logging.basicConfig(filename=args.logfile, level=args.loglevel, format='%(levelname)-8s %(message)s') + logger = logging.getLogger(__name__) + + # configfile + config = configparser.RawConfigParser() + config.read('./bot.cfg') + args.jid = config.get('Account', 'jid') + args.password = config.get('Account', 'password') + args.room = config.get('MUC', 'rooms') + args.nick = config.get('MUC', 'nick') + + # init the bot and register used slixmpp plugins + xmpp = QueryBot(args.jid, args.password, args.room, args.nick) + xmpp.register_plugin('xep_0012') # Last Activity + xmpp.register_plugin('xep_0030') # Service Discovery + xmpp.register_plugin('xep_0045') # Multi-User Chat + xmpp.register_plugin('xep_0060') # PubSub + xmpp.register_plugin('xep_0085') # Chat State Notifications + xmpp.register_plugin('xep_0092') # Software Version + xmpp.register_plugin('xep_0128') # Service Discovery Extensions + xmpp.register_plugin('xep_0199') # XMPP Ping + + # connect and start receiving stanzas + xmpp.connect() + xmpp.process() |