#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ James the MagicXMPP Bot build with Slick XMPP Library Copyright (C) 2018 Nico Wellpott See the file LICENSE for copying permission. """ import slixmpp import ssl import configparser import logging from argparse import ArgumentParser from slixmpp.exceptions import XMPPError import common.misc as misc from common.strings import StaticAnswers from classes import * class QueryBot(slixmpp.ClientXMPP): def __init__(self, jid, password, room, nick): slixmpp.ClientXMPP.__init__(self, jid, password) self.ssl_version = ssl.PROTOCOL_TLSv1_2 self.room = room self.nick = nick self.use_message_ids = True self.functions = { "!uptime": LastActivity(), "!contact": ServerContact(), "!version": Version(), "!xep": XEPRequest(), "!help": Helper() } # session start event, starting point for the presence and roster requests self.add_event_handler('session_start', self.start) # register receive handler for both groupchat and normal message events self.add_event_handler('message', self.message) def start(self, event): """ :param event -- An empty dictionary. The session_start event does not provide any additional data. """ self.send_presence() self.get_roster() # If a room password is needed, use: password=the_room_password if self.room: for rooms in self.room.split(sep=","): logging.debug("joining: %s" % rooms) self.plugin['xep_0045'].join_muc(rooms, self.nick, wait=True) async def message(self, msg): """ :param msg: received message stanza """ data = { 'words': list(), 'reply': list(), 'queue': list() } # catch self messages to prevent self flooding if msg['mucnick'] == self.nick: return elif self.nick in msg['body']: if msg["type"] == "groupchat": # discover role first affiliation = self['xep_0045'].get_jid_property(msg["mucroom"], self.nick, "role") if affiliation in ["moderator", "owner"]: # discover real jid realjid = self['xep_0045'].get_jid_property(msg["mucroom"], msg["mucnick"], "jid") # add pre predefined text to reply list data['reply'].append(StaticAnswers().answers(msg['mucnick'])) data = self.build_queue(data, msg) # queue for job in data['queue']: keys = list(job.keys()) keyword = keys[0] target = job[keyword][0] opt_arg = job[keyword][1] query = None try: if keyword == "!uptime": query = await self['xep_0012'].get_last_activity(jid=target) elif keyword == "!version": query = await self['xep_0092'].get_version(jid=target) elif keyword == "!contact": query = await self['xep_0030'].get_info(jid=target, cached=False) except XMPPError as error: logging.error(misc.HandleError(error, keyword, target).report()) data['reply'].append(misc.HandleError(error, keyword, target).report()) continue data["reply"].append(self.functions[keyword].format(query=query, target=target, opt_arg=opt_arg)) # remove None type from list and send all elements if list(filter(None.__ne__, data['reply'])) and data['reply']: # if msg type is groupchat if msg["type"] == "groupchat": # check if mucnick is present to tag the user if msg["mucnick"] not in data["reply"][0]: data["reply"][0] = "%s: " % msg["mucnick"] + data["reply"][0] # send the answer self.send_message(mto=msg['from'].bare, mbody="\n".join(data["reply"]), mtype=msg['type']) def build_queue(self, data, msg): # building the queue # double splitting to exclude whitespaces data['words'] = " ".join(msg['body'].split()).split(sep=" ") wordcount = len(data["words"]) # check all words in side the message for possible hits for index, keyword in enumerate(data['words']): if keyword.startswith("!") and keyword in StaticAnswers().keys("all"): # init variables to circumvent IndexErrors target, opt_arg = None, None # compare to wordcount if assignment is possible if index + 1 < wordcount: if not data["words"][index + 1].startswith("!"): target = data["words"][index + 1] if index + 2 < wordcount: if not data["words"][index + 2].startswith("!"): opt_arg = data["words"][index + 2] # only add job to queue if target is valid if misc.validate(keyword, target): # if keyword is a key in expand category expand it if keyword in StaticAnswers().keys("expand").keys(): # expand the keyword with the keyword list attached to it for key in StaticAnswers().keys("expand")[keyword]: data['queue'].append({key: [target, None]}) logging.debug("expanding keyword %s" % keyword) else: data['queue'].append({keyword: [target, opt_arg]}) logging.warning("Item added to queue %s" % {keyword: [target, opt_arg]}) # deduplicate queue elements logging.warning("before dedup %s" %data["queue"]) data["queue"] = misc.deduplicate(data["queue"]) logging.warning("after dedup %s" %data["queue"]) return data if __name__ == '__main__': # command line arguments. parser = ArgumentParser() parser.add_argument('-q', '--quiet', help='set logging to ERROR', action='store_const', dest='loglevel', const=logging.ERROR, default=logging.INFO) parser.add_argument('-d', '--debug', help='set logging to DEBUG', action='store_const', dest='loglevel', const=logging.WARNING, default=logging.INFO) parser.add_argument('-D', '--dev', help='set log output to console', action='store_const', dest='logfile', const="", default='bot.log') args = parser.parse_args() # logging logging.basicConfig(filename=args.logfile, level=args.loglevel, format='%(levelname)s: %(asctime)s: %(message)s') logger = logging.getLogger(__name__) # configfile config = configparser.RawConfigParser() config.read('./bot.cfg') args.jid = config.get('Account', 'jid') args.password = config.get('Account', 'password') args.room = config.get('MUC', 'rooms') args.nick = config.get('MUC', 'nick') # init the bot and register used slixmpp plugins xmpp = QueryBot(args.jid, args.password, args.room, args.nick) xmpp.register_plugin('xep_0012') # Last Activity xmpp.register_plugin('xep_0030') # Service Discovery xmpp.register_plugin('xep_0045') # Multi-User Chat xmpp.register_plugin('xep_0060') # PubSub xmpp.register_plugin('xep_0085') # Chat State Notifications xmpp.register_plugin('xep_0092') # Software Version xmpp.register_plugin('xep_0128') # Service Discovery Extensions xmpp.register_plugin('xep_0199') # XMPP Ping # connect and start receiving stanzas xmpp.connect() xmpp.process()