From 134a674228b3d2b42cb7fe2a12f215a225e7544b Mon Sep 17 00:00:00 2001 From: nico Date: Wed, 9 Jan 2019 19:38:04 +0100 Subject: initial rework of the implemented functions + added __init__ import file for all functions + added new helper function + added strings.json file * some logic fixups * validator fixup * arg abbreviation moved to misc --- classes/__init__.py | 5 +++ classes/help.py | 29 +++++++++++++ classes/servercontact.py | 26 +++--------- classes/version.py | 11 ++--- classes/xep.py | 2 +- common/misc.py | 52 +++++++++++++++++------ common/strings.json | 108 +++++++++++++++++++++++++++++++++++++++++++++++ common/strings.py | 69 +++++++++++------------------- main.py | 73 +++++++++++++++++--------------- 9 files changed, 259 insertions(+), 116 deletions(-) create mode 100644 classes/__init__.py create mode 100644 classes/help.py create mode 100644 common/strings.json diff --git a/classes/__init__.py b/classes/__init__.py new file mode 100644 index 0000000..838c8c2 --- /dev/null +++ b/classes/__init__.py @@ -0,0 +1,5 @@ +from classes.version import Version +from classes.servercontact import ServerContact +from classes.uptime import LastActivity +from classes.xep import XEPRequest +from classes.help import Helper diff --git a/classes/help.py b/classes/help.py new file mode 100644 index 0000000..c268520 --- /dev/null +++ b/classes/help.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +from common.misc import arg_abbr +from common.strings import StaticAnswers +import json + + +class Helper: + def __init__(self): + self.possible_vars = StaticAnswers().helpfile["help_advanced"].keys() + + def receive(self, target): + # optional argument abbreviation + target = arg_abbr(target, self.possible_vars) + + if target in self.possible_vars: + return StaticAnswers().help_doc(target) + else: + return StaticAnswers().help_doc() + + def format(self, query, target, opt_arg): + doc = self.receive(target) + + if target in self.possible_vars: + answer = json.dumps(StaticAnswers().help_doc(target), indent=4) + + else: + answer = "\n".join(['%s' % value for (_, value) in doc.items()]) + + return answer diff --git a/classes/servercontact.py b/classes/servercontact.py index c2f4ad5..e0e871e 100644 --- a/classes/servercontact.py +++ b/classes/servercontact.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +from common.misc import arg_abbr import defusedxml.ElementTree as Et @@ -19,23 +20,6 @@ class ServerContact: self.contact = None self.target, self.opt_arg = None, None - def opt_arg_abbreviation(self): - """ - optional argument abbreviation function - if the provided string > 2 characters the most likely key will be chosen - :return: completes the opt_arg to the most likely one - """ - # if opt_argument is smaller then 2 pass to prohibit multiple answers - if len(self.opt_arg) < 2: - pass - - abbr = str(self.opt_arg) - possible_abbr = ["abuse-addresses", "admin-addresses", "feedback-addresses", "sales-addresses", - "security-addresses", "support-addresses"] - - # searches the best match in the list of possible_abbr and completes the opt_arg to that - self.opt_arg = [s for s in possible_abbr if s.startswith(abbr)][0] - def process(self): # get etree from base xml iq = Et.fromstring(str(self.contact)) @@ -57,11 +41,11 @@ class ServerContact: # iterate over all child elements in node for child in node: - # if one opt_arg is defined return just that one - if self.opt_arg in self.possible_vars: - # check for possible abbreviations to the optional argument - self.opt_arg_abbreviation() + # check for possible abbreviations to the optional argument + self.opt_arg = arg_abbr(self.opt_arg, self.possible_vars) + # if opt_arg is defined and valid return just that one + if self.opt_arg in self.possible_vars: if child.attrib['var'] == self.opt_arg: # add section to result dict and append info result[child.attrib['var']] = list() diff --git a/classes/version.py b/classes/version.py index 1e9ef7e..42b3e61 100644 --- a/classes/version.py +++ b/classes/version.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- - +from common.misc import arg_abbr # XEP-0072: Server Version class Version: @@ -10,17 +10,18 @@ class Version: # init all necessary variables self.software_version = None self.target, self.opt_arg = None, None + self.possible_vars = ["version", "os", "name"] def format_result(self): - # list of all possible opt_arg - possible_opt_args = ["version", "os", "name"] - name = self.software_version['name'] version = self.software_version['version'] os = self.software_version['os'] + # check for possible abbreviations to the optional argument + self.opt_arg = arg_abbr(self.opt_arg, self.possible_vars) + # if opt_arg is given member of possible_opt_args list return that element - if self.opt_arg in possible_opt_args: + if self.opt_arg in self.possible_vars: text = "%s: %s" % (self.opt_arg, self.software_version[self.opt_arg]) # otherwise return full version string diff --git a/classes/xep.py b/classes/xep.py index f5fae61..df401a7 100644 --- a/classes/xep.py +++ b/classes/xep.py @@ -47,10 +47,10 @@ class XEPRequest: r = s.get("https://xmpp.org/extensions/xeplist.xml") r.encoding = 'utf-8' local_etag = head.headers['etag'] + self.xeplist = Et.fromstring(r.content.decode()) with open("./common/xeplist.xml", "w") as file: file.write(r.content.decode()) - self.xeplist = Et.fromstring(r.content.decode()) with open('./common/.etag', 'w') as string: string.write(local_etag) diff --git a/common/misc.py b/common/misc.py index b91c618..edfe3a0 100755 --- a/common/misc.py +++ b/common/misc.py @@ -24,28 +24,56 @@ def validate(keyword, target): :param target: provided target :return: true if valid """ - # if keyword in domain_keywords list - if keyword in StaticAnswers().keys('domain_keywords'): + # if keyword is in noarg list return True + if keyword in StaticAnswers().keys("noarg"): + return True + + # prevent AttributeError if target is NoneType + if target is None: + return False + + # if keyword in domain list + if keyword in StaticAnswers().keys('domain'): # if target is a domain / email return True if validators.domain(target) or validators.email(target): return True - # check if keyword is in number_keyword list - elif keyword in StaticAnswers().keys('number_keywords'): - # prevent AttributeError if target is NoneType - if target is not None: - # if target only consists of digits return True - return target.isdigit() + # check if keyword is in number list + elif keyword in StaticAnswers().keys('number'): + return target.isdigit() - # if keyword is in no_arg_keywords list return True - elif keyword in StaticAnswers().keys("no_arg_keywords"): + # if keyword in expand list return True + elif keyword in StaticAnswers().keys("expand"): return True # if the target could not be validated until this return False return False -# +def arg_abbr(value, possible_values): + """ + optional argument abbreviation + if the provided string value > 2 characters the most likely value will be chosen + :return: completes the value to the most likely one + """ + # prevent traceback if value is None + if value and possible_values is not None: + # if opt_argument is smaller then 2 pass to prohibit multiple answers + if len(value) < 2: + return value + + abbr = str(value) + + # searches the best match in the list of possible_abbr and completes the opt_arg to that + new_value = [s for s in list(possible_values) if s.startswith(abbr)] + + # prevent index error if abbreviation has not result + if new_value: + value = new_value[0] + + return value + + class HandleError: """ simple XMPP error / exception class formating the error condition @@ -59,6 +87,6 @@ class HandleError: def report(self): # return the formatted result string to the user - text = "%s. %s %s resulted in: %s" % (self.text, self.key, self.target, self.condition) + text = "%s %s resulted in: %s" % (self.key, self.target, self.condition) return text diff --git a/common/strings.json b/common/strings.json new file mode 100644 index 0000000..eefd73e --- /dev/null +++ b/common/strings.json @@ -0,0 +1,108 @@ +{ + "help": { + "help_basic": { + "contact": "!contact domain.tld -- receive XMPP server contact address info", + "help": "!help -- display all basic help information", + "uptime": "!uptime domain.tld -- receive XMPP server uptime", + "version": "!version domain.tld -- receive XMPP server version", + "xep": "!xep $xepnumber -- receive xep info abstract" + }, + "help_advanced": { + "contact": { + "info": "XEP-0157 contact addresses for XMPP Services request", + "optional_args": { + "info": "it is not required to type the whole optional argument, it is only necessary to type at least the two starting characters.", + "possible_vars": [ + "abuse-addresses", + "admin-addresses", + "feedback-addresses", + "sales-addresses", + "security-addresses", + "support-addresses" + ] + }, + "targets": "valid target: domain.tld", + "command": "!contact $target $optional_argument" + }, + "uptime": { + "info": "XEP-0012 last activity request", + "targets": "valid targets: domain.tld", + "command": "!uptime $target $optional_argument" + }, + "version": { + "info": "XEP-0072 version query request", + "optional_args": { + "name": "receive the name of the software used", + "os": "receive only the operating system version", + "version": "receive only the software version used" + }, + "targets": "valid targets are: domain.tld and jid/resource", + "command": "!version $target $optional_argument" + }, + "xep": { + "info": "receive abstract of referenced XEP", + "optional_args": { + "last_revision_tags": [ + "date", + "version", + "initials", + "remark" + ], + "tags": [ + "number", + "title", + "abstract", + "type", + "status", + "approver", + "shortname", + "sig", + "lastcall", + "date", + "version", + "initials", + "remark" + ] + }, + "targets": "any valid integer", + "command": "!xep $xepnumber $optional_argument" + }, + "info": { + "info": "pooled command to accumulate dataset composed of uptime version and contact", + "targets": "valid target is: domain.tld", + "command": "!info $target" + } + } + }, + "functions": { + "keywords": { + "all": [ + "!help", + "!uptime", + "!version", + "!contact", + "!xep", + "!info" + ], + "domain": [ + "!uptime", + "!version", + "!contact" + ], + "expand": { + "!info": ["!uptime", "!version", "!contact"] + }, + "noarg": [ + "!help" + ], + "number": [ + "!xep" + ] + }, + "answers": [ + "I heard that %s", + "%s I am sorry for that.", + "%s did you try turning it off and on again?" + ] + } +} \ No newline at end of file diff --git a/common/strings.py b/common/strings.py index 6f1b629..6941a6b 100644 --- a/common/strings.py +++ b/common/strings.py @@ -1,55 +1,36 @@ # -*- coding: utf-8 -*- -from random import randint +from random import choice +import json class StaticAnswers: """ collection of callable static/ semi-static strings """ - def __init__(self, nick=""): - self.nickname = nick - self.helpfile = { - 'help': '!help -- display this text', - 'version': '!version domain.tld -- receive XMPP server version', - 'uptime': '!uptime domain.tld -- receive XMPP server uptime', - 'contact': '!contact domain.tld -- receive XMPP server contact address info', - 'xep': '!xep XEP Number -- recieve information about the specified XEP' - } - self.possible_answers = { - '1': 'I heard that, %s.', - '2': 'I am sorry for that %s.', - '3': '%s did you try turning it off and on again?' - } - self.error_messages = { - '1': 'not reachable', - '2': 'not a valid target' - } - self.keywords = { - "keywords": ["!help", "!uptime", "!version", "!contact", "!xep"], - "domain_keywords": ["!uptime", "!version", "!contact"], - "no_arg_keywords": ["!help"], - "number_keywords": ["!xep"] - } - - def keys(self, key=""): + def __init__(self): + with open("./common/strings.json") as basefile: + self.strings = json.load(basefile) + + self.helpfile = self.strings["help"] + self.keywords = self.strings["functions"]["keywords"] + self.replys = self.strings["functions"]["answers"] + + def keys(self, key=None): # if specific keyword in referenced return that - if key in self.keywords.keys(): + if key in self.keywords: return self.keywords[key] # in any other case return the whole dict - return self.keywords["keywords"] - - def gen_help(self): - helpdoc = "\n".join(['%s' % value for (_, value) in self.helpfile.items()]) - return helpdoc - - def gen_answer(self): - possible_answers = self.possible_answers - return possible_answers[str(randint(1, possible_answers.__len__()))] % self.nickname - - def error(self,code): - try: - text = self.error_messages[str(code)] - except KeyError: - return 'unknown error' - return text + return self.keywords["all"] + + def help_doc(self, key=None): + # if specific key is referenced return only that key + if key is not None: + return self.helpfile["help_advanced"][key] + + # in any other case return basic dict + return self.helpfile["help_basic"] + + def answers(self, nick=None): + # return pseudo random answer + return choice(self.replys) % nick diff --git a/main.py b/main.py index 50f9b03..d04f8d3 100755 --- a/main.py +++ b/main.py @@ -17,10 +17,7 @@ from slixmpp.exceptions import XMPPError import common.misc as misc from common.strings import StaticAnswers -from classes.servercontact import ServerContact -from classes.version import Version -from classes.uptime import LastActivity -from classes.xep import XEPRequest +from classes import * class QueryBot(slixmpp.ClientXMPP): @@ -35,7 +32,8 @@ class QueryBot(slixmpp.ClientXMPP): "!uptime": LastActivity(), "!contact": ServerContact(), "!version": Version(), - "!xep": XEPRequest() + "!xep": XEPRequest(), + "!help": Helper() } # session start event, starting point for the presence and roster requests @@ -72,8 +70,15 @@ class QueryBot(slixmpp.ClientXMPP): return elif self.nick in msg['body']: + if msg["type"] == "groupchat": + # discover role first + affiliation = self['xep_0045'].get_jid_property(msg["mucroom"], self.nick, "role") + if affiliation in ["moderator", "owner"]: + # discover real jid + realjid = self['xep_0045'].get_jid_property(msg["mucroom"], msg["mucnick"], "jid") + # add pre predefined text to reply list - data['reply'].append(StaticAnswers(msg['mucnick']).gen_answer()) + data['reply'].append(StaticAnswers().answers(msg['mucnick'])) data = self.build_queue(data, msg) @@ -86,10 +91,6 @@ class QueryBot(slixmpp.ClientXMPP): opt_arg = job[keyword][1] query = None - if keyword == '!help': - data['reply'].append(StaticAnswers().gen_help()) - continue - try: if keyword == "!uptime": query = await self['xep_0012'].get_last_activity(jid=target) @@ -101,7 +102,7 @@ class QueryBot(slixmpp.ClientXMPP): query = await self['xep_0030'].get_info(jid=target, cached=False) except XMPPError as error: - logging.info(misc.HandleError(error, keyword, target).report()) + logging.error(misc.HandleError(error, keyword, target).report()) data['reply'].append(misc.HandleError(error, keyword, target).report()) continue @@ -110,13 +111,15 @@ class QueryBot(slixmpp.ClientXMPP): # remove None type from list and send all elements if list(filter(None.__ne__, data['reply'])) and data['reply']: - # if msg type is groupchat prepend mucnick + # if msg type is groupchat if msg["type"] == "groupchat": - data["reply"][0] = "%s: " % msg["mucnick"] + data["reply"][0] - # reply = misc.deduplicate(data['reply']) - reply = data["reply"] - self.send_message(mto=msg['from'].bare, mbody="\n".join(reply), mtype=msg['type']) + # check if mucnick is present to tag the user + if msg["mucnick"] not in data["reply"][0]: + data["reply"][0] = "%s: " % msg["mucnick"] + data["reply"][0] + + # send the answer + self.send_message(mto=msg['from'].bare, mbody="\n".join(data["reply"]), mtype=msg['type']) def build_queue(self, data, msg): # building the queue @@ -125,35 +128,39 @@ class QueryBot(slixmpp.ClientXMPP): wordcount = len(data["words"]) # check all words in side the message for possible hits - for x in enumerate(data['words']): - # check for valid keywords - index = x[0] - keyword = x[1] - - # match all words starting with ! and member of no_arg_keywords - if keyword.startswith("!") and keyword in StaticAnswers().keys("no_arg_keywords"): - data['queue'].append({keyword: [None, None]}) - - # matching all words starting with ! and member of keywords - elif keyword.startswith("!") and keyword in StaticAnswers().keys("keywords"): + for index, keyword in enumerate(data['words']): + if keyword.startswith("!") and keyword in StaticAnswers().keys("all"): # init variables to circumvent IndexErrors target, opt_arg = None, None # compare to wordcount if assignment is possible if index + 1 < wordcount: - target = data["words"][index + 1] + if not data["words"][index + 1].startswith("!"): + target = data["words"][index + 1] if index + 2 < wordcount: if not data["words"][index + 2].startswith("!"): opt_arg = data["words"][index + 2] - # only add job to queue if domain is valid + # only add job to queue if target is valid if misc.validate(keyword, target): - logging.debug("Item added to queue %s" % {str(keyword): [target, opt_arg]}) - data['queue'].append({str(keyword): [target, opt_arg]}) + + # if keyword is a key in expand category expand it + if keyword in StaticAnswers().keys("expand").keys(): + # expand the keyword with the keyword list attached to it + for key in StaticAnswers().keys("expand")[keyword]: + data['queue'].append({key: [target, None]}) + + logging.debug("expanding keyword %s" % keyword) + + else: + data['queue'].append({keyword: [target, opt_arg]}) + logging.warning("Item added to queue %s" % {keyword: [target, opt_arg]}) # deduplicate queue elements + logging.warning("before dedup %s" %data["queue"]) data["queue"] = misc.deduplicate(data["queue"]) + logging.warning("after dedup %s" %data["queue"]) return data @@ -164,8 +171,8 @@ if __name__ == '__main__': parser.add_argument('-q', '--quiet', help='set logging to ERROR', action='store_const', dest='loglevel', const=logging.ERROR, default=logging.INFO) parser.add_argument('-d', '--debug', help='set logging to DEBUG', action='store_const', dest='loglevel', - const=logging.DEBUG, default=logging.INFO) - parser.add_argument('-D', '--dev', help='set logging to console', action='store_const', dest='logfile', + const=logging.WARNING, default=logging.INFO) + parser.add_argument('-D', '--dev', help='set log output to console', action='store_const', dest='logfile', const="", default='bot.log') args = parser.parse_args() -- cgit v1.2.3-18-g5258