From 294a728b0f3cf4c335923b7fbe6bd7e137a3fee3 Mon Sep 17 00:00:00 2001 From: nico Date: Mon, 1 Oct 2018 23:17:09 +0200 Subject: refactorization (#1) * changed all lineendings to lf * cleaned up main class * refactor bot * refactor bot functions * moved functions.py to classes dir * code comment changes * changed code comment style - removed unnecessary pass statement + added missing newline * simplified function and return statements --- main.py | 416 ++++++++++++++++++++++++++++------------------------------------ 1 file changed, 183 insertions(+), 233 deletions(-) (limited to 'main.py') diff --git a/main.py b/main.py index 84d7a72..7445d9a 100644 --- a/main.py +++ b/main.py @@ -1,233 +1,183 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" - Slixmpp: The Slick XMPP Library - Copyright (C) 2010 Nathanael C. Fritz - This file is part of Slixmpp. - - See the file LICENSE for copying permission. -""" -import asyncio -import configparser -import logging -import slixmpp -import ssl -import validators -from argparse import ArgumentParser -from datetime import datetime, timedelta -from random import randint -from slixmpp.exceptions import XMPPError - - -class QueryBot(slixmpp.ClientXMPP): - """ A simple Slixmpp bot with some features """ - def __init__(self, jid, password, room, nick): - slixmpp.ClientXMPP.__init__(self, jid, password) - - self.room = room - self.nick = nick - - # session start event, starting point for the presence and roster requests - self.add_event_handler('session_start', self.start) - - # register handler to recieve both groupchat and normal message events - self.add_event_handler('message', self.message) - - def start(self, event): - """ - Arguments: - event -- An empty dictionary. The session_start event does not provide any additional data. - """ - self.send_presence() - self.get_roster() - - # If a room password is needed, use: password=the_room_password - for rooms in self.room.split(sep=","): - self.plugin['xep_0045'].join_muc(rooms, self.nick, wait=True) - - - @staticmethod - def precheck(line): - """ - pre check function - - check that keywords are used properly - - check that following a keyword a proper jid is following - :param line: line from message body - :return: true if correct - """ - keywords = ["!help", "!uptime", "!version", "!contact"] - proper_domain, proper_key = False, False - - try: - # check for valid keyword in position 0 - if line[0] in keywords: - proper_key = True - else: - return - - # help command is used - if line[0] == "!help": - proper_domain = True - # check if domain is valid - elif validators.domain(line[1]): - proper_domain = True - else: - return - except IndexError: - pass - - return proper_key and proper_domain - - @asyncio.coroutine - def message(self, msg): - """ - Arguments: - msg -- The received message stanza. See the documentation for stanza objects and the Message stanza to see - how it may be used. - """ - - # catch self messages to prevent self flooding - if msg['mucnick'] == self.nick: - return - - if self.nick in msg['body']: - # answer with predefined text when mucnick is used - self.send_message(mto=msg['from'].bare, mbody=notice_answer(msg['mucnick']), mtype=msg['type']) - - for line in msg['body'].splitlines(): - """ split multiline messages into lines to check every line for keywords """ - line = line.split(sep= " ") - - if self.precheck(line): - """ true if keyword and domain are valid """ - # Display help - if line[0] == '!help': - """ display help when keyword !help is recieved """ - self.send_message(mto=msg['from'].bare, mbody=help_doc(), mtype=msg['type']) - - # XEP-0072: Server Version - if line[0] == '!version': - """ query the server software version of the specified domain, defined by XEP-0092 """ - try: - version = yield from self['xep_0092'].get_version(line[1]) - - if msg['type'] == "groupchat": - text = "%s: %s is running %s version %s on %s" % (msg['mucnick'], line[1], version[ - 'software_version']['name'], version['software_version']['version'], version[ - 'software_version']['os']) - else: - text = "%s is running %s version %s on %s" % (line[1], version['software_version'][ - 'name'], version['software_version']['version'], version['software_version']['os']) - - self.send_message(mto=msg['from'].bare, mbody=text, mtype=msg['type']) - except NameError: - pass - except XMPPError: - pass - - # XEP-0012: Last Activity - if line[0] == '!uptime': - """ query the server uptime of the specified domain, defined by XEP-0012 """ - try: - # try if domain[0] is set if not just pass - last_activity = yield from self['xep_0012'].get_last_activity(line[1]) - uptime = datetime(1, 1, 1) + timedelta(seconds=last_activity['last_activity']['seconds']) - - if msg['type'] == "groupchat": - text = "%s: %s is running since %d days %d hours %d minutes" % (msg['mucnick'], line[1], - uptime.day - 1, uptime.hour, - uptime.minute) - else: - text = "%s is running since %d days %d hours %d minutes" % (line[1], uptime.day - 1, - uptime.hour, uptime.minute) - self.send_message(mto=msg['from'].bare, mbody=text, mtype=msg['type']) - except NameError: - pass - except XMPPError: - pass - - # XEP-0157: Contact Addresses for XMPP Services - if line[0] == "!contact": - """ query the XEP-0030: Service Discovery and extract contact information """ - try: - result = yield from self['xep_0030'].get_info(jid=line[1], cached=False) - server_info = [] - for field in result['disco_info']['form']: - var = field['var'] - if field['type'] == 'hidden' and var == 'FORM_TYPE': - title = field['value'][0] - continue - sep = ', ' - field_value = field.get_value(convert=False) - value = sep.join(field_value) if isinstance(field_value, list) else field_value - server_info.append('%s: %s' % (var, value)) - - text = "contact addresses for %s are" % (line[1]) - for count in range(len(server_info)): - text += "\n" + server_info[count] - - self.send_message(mto=msg['from'].bare, mbody=text, mtype=msg['type']) - except NameError: - pass - except XMPPError: - pass - - # TODO - # append all results to single message send just once - else: - pass - - -def help_doc(): - helpfile = {'help': '!help -- display this text', - 'version': '!version domain.tld -- receive XMPP server version', - 'uptime':'!uptime domain.tld -- receive XMPP server uptime', - 'contact': '!contact domain.tld -- receive XMPP server contact address info'} - return "".join(['%s\n' % (value) for (_, value) in helpfile.items()]) - - -def notice_answer(nickname): - possible_answers = {'1': 'I heard that, %s.', - '2': 'I am sorry for that %s.', - '3': '%s did you try turning it off and on again?'} - return possible_answers[str(randint(1, len(possible_answers)))] % nickname - -if __name__ == '__main__': - # command line arguments. - parser = ArgumentParser() - parser.add_argument('-q', '--quiet', help='set logging to ERROR', action='store_const', dest='loglevel', - const=logging.ERROR, default=logging.INFO) - parser.add_argument('-d', '--debug', help='set logging to DEBUG', action='store_const', dest='loglevel', - const=logging.DEBUG, default=logging.INFO) - parser.add_argument('-D', '--dev', help='set logging to console', action='store_const', dest='logfile', - const="", default='bot.log') - args = parser.parse_args() - - # logging - logging.basicConfig(filename=args.logfile, level=args.loglevel, format='%(levelname)-8s %(message)s') - logger = logging.getLogger(__name__) - - # configfile - config = configparser.RawConfigParser() - config.read('./bot.cfg') - args.jid = config.get('Account', 'jid') - args.password = config.get('Account', 'password') - args.room = config.get('MUC', 'rooms') - args.nick = config.get('MUC', 'nick') - args.admins = config.get('ADMIN', 'admins') - - # init the bot and register used slixmpp plugins - xmpp = QueryBot(args.jid, args.password, args.room, args.nick) - xmpp.ssl_version = ssl.PROTOCOL_TLSv1_2 - xmpp.register_plugin('xep_0012') # Last Activity - xmpp.register_plugin('xep_0030') # Service Discovery - xmpp.register_plugin('xep_0045') # Multi-User Chat - xmpp.register_plugin('xep_0060') # PubSub - xmpp.register_plugin('xep_0085') # Chat State Notifications - xmpp.register_plugin('xep_0092') # Software Version - xmpp.register_plugin('xep_0128') # Service Discovery Extensions - xmpp.register_plugin('xep_0199') # XMPP Ping - - # connect and start receiving stanzas - xmpp.connect() - xmpp.process() +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" + Slixmpp: The Slick XMPP Library + Copyright (C) 2010 Nathanael C. Fritz + This file is part of Slixmpp. + + See the file LICENSE for copying permission. +""" +import asyncio +import slixmpp +import ssl +import validators +import configparser +import logging + +from argparse import ArgumentParser +from slixmpp.exceptions import XMPPError + +from classes.strings import StaticAnswers +from classes.functions import Version, LastActivity, ContactInfo, HandleError + + +class QueryBot(slixmpp.ClientXMPP): + def __init__(self, jid, password, room, nick): + slixmpp.ClientXMPP.__init__(self, jid, password) + self.ssl_version = ssl.PROTOCOL_TLSv1_2 + self.room = room + self.nick = nick + + # session start event, starting point for the presence and roster requests + self.add_event_handler('session_start', self.start) + + # register handler to recieve both groupchat and normal message events + self.add_event_handler('message', self.message) + + def start(self, event): + """ + :param str event -- An empty dictionary. The session_start event does not provide any additional data. + """ + self.send_presence() + self.get_roster() + + # If a room password is needed, use: password=the_room_password + for rooms in self.room.split(sep=","): + self.plugin['xep_0045'].join_muc(rooms, self.nick, wait=True) + + def validate_domain(self, wordlist, index): + """ + validation method to reduce connection attemps to unvalid domains + :param wordlist: words seperated by " " from the message + :param index: keyword index inside the message + :return: true if valid + """ + # keyword inside the message + argument = wordlist[index] + + # if the argument is not inside the no_arg_keywords target is index + 1 + if argument not in StaticAnswers().keys(arg='list', keyword="no_arg_keywords"): + try: + target = wordlist[index + 1] + if validators.domain(target): + return True + except IndexError: + # except an IndexError if a keywords is the last word in the message + return False + elif argument in StaticAnswers().keys(arg='list', keyword="no_arg_keywords"): + return True + else: + return + + def deduplicate(self, reply): + """ + deduplication method for the result list + :param list reply: list containing strings + :return: list containing unique strings + """ + reply_dedup = list() + for item in reply: + if item not in reply_dedup: + reply_dedup.append(item) + + return reply_dedup + + @asyncio.coroutine + def message(self, msg): + """ + :param msg: received message stanza + """ + # init empty reply list + reply = list() + + # catch self messages to prevent self flooding + if msg['mucnick'] == self.nick: + return + elif self.nick in msg['body']: + # add pre predefined text to reply list + reply.append(StaticAnswers(msg['mucnick']).gen_answer()) + + # building the queue + # double splitting to exclude whitespaces + words = " ".join(msg['body'].split()).split(sep=" ") + queue = list() + + # check all words in side the message for possible hits + for x in enumerate(words): + # check word for match in keywords list + for y in StaticAnswers().keys(arg='list'): + # if so queue the keyword and the postion in the string + if x[1] == y: + # only add job to queue if domain is valid + if self.validate_domain(words, x[0]): + queue.append({str(y): x[0]}) + + # queue + for job in queue: + for key in job: + keyword = key + index = job[key] + + if keyword == '!help': + reply.append(StaticAnswers().gen_help()) + + try: + target = words[index + 1] + if keyword == '!uptime': + last_activity = yield from self['xep_0012'].get_last_activity(target) + reply.append(LastActivity(last_activity, msg, target).format_values()) + + elif keyword == "!version": + version = yield from self['xep_0092'].get_version(target) + reply.append(Version(version, msg, target).format_version()) + + elif keyword == "!contact": + contact = yield from self['xep_0030'].get_info(jid=target, cached=False) + reply.append(ContactInfo(contact, msg, target).format_contact()) + + except XMPPError as error: + reply.append(HandleError(error, msg, key, target).build_report()) + + # remove None type from list and send all elements + if list(filter(None.__ne__, reply)) and reply: + reply = self.deduplicate(reply) + self.send_message(mto=msg['from'].bare, mbody="\n".join(reply), mtype=msg['type']) + + +if __name__ == '__main__': + # command line arguments. + parser = ArgumentParser() + parser.add_argument('-q', '--quiet', help='set logging to ERROR', action='store_const', dest='loglevel', + const=logging.ERROR, default=logging.INFO) + parser.add_argument('-d', '--debug', help='set logging to DEBUG', action='store_const', dest='loglevel', + const=logging.DEBUG, default=logging.INFO) + parser.add_argument('-D', '--dev', help='set logging to console', action='store_const', dest='logfile', const="", + default='bot.log') + args = parser.parse_args() + + # logging + logging.basicConfig(filename=args.logfile, level=args.loglevel, format='%(levelname)-8s %(message)s') + logger = logging.getLogger(__name__) + + # configfile + config = configparser.RawConfigParser() + config.read('./bot.cfg') + args.jid = config.get('Account', 'jid') + args.password = config.get('Account', 'password') + args.room = config.get('MUC', 'rooms') + args.nick = config.get('MUC', 'nick') + + # init the bot and register used slixmpp plugins + xmpp = QueryBot(args.jid, args.password, args.room, args.nick) + xmpp.register_plugin('xep_0012') # Last Activity + xmpp.register_plugin('xep_0030') # Service Discovery + xmpp.register_plugin('xep_0045') # Multi-User Chat + xmpp.register_plugin('xep_0060') # PubSub + xmpp.register_plugin('xep_0085') # Chat State Notifications + xmpp.register_plugin('xep_0092') # Software Version + xmpp.register_plugin('xep_0128') # Service Discovery Extensions + xmpp.register_plugin('xep_0199') # XMPP Ping + + # connect and start receiving stanzas + xmpp.connect() + xmpp.process() -- cgit v1.2.3-18-g5258