diff options
author | nico <nico@magicbroccoli.de> | 2018-11-11 03:12:11 +0100 |
---|---|---|
committer | nico <nico@magicbroccoli.de> | 2018-11-11 03:13:33 +0100 |
commit | 69951bba37a85cf7527d08861ab1f2715576df49 (patch) | |
tree | 3798538b14cc67e7deed4adc44957b4cc4607019 /classes | |
parent | 6bb9f1d5b87537ed6bccf0dd6efb7b80c6a81395 (diff) | |
parent | 1b13bdfd926e394cab2a2edd15ecabf0afcc4cf2 (diff) |
Merge branch 'dev'
+ added more comments to xep requests
+ added opt_arg to version, xep and contact
+ implemented data dictionary to hold all data in main bot
+ added message_ids
* updated gitignore file
* partly reworked servercontact implementation
* complete rework of uptime, version
* part rework of xep requests
* complete rework of validate function
* updated HandleError function
* part rework of StaticStrings function
* complete rework of queue building and deduplication
* logging parameter fix
Diffstat (limited to 'classes')
-rw-r--r-- | classes/functions.py | 108 | ||||
-rw-r--r-- | classes/servercontact.py | 106 | ||||
-rw-r--r-- | classes/strings.py | 54 | ||||
-rw-r--r-- | classes/uptime.py | 50 | ||||
-rw-r--r-- | classes/version.py | 39 | ||||
-rw-r--r-- | classes/xep.py | 97 |
6 files changed, 259 insertions, 195 deletions
diff --git a/classes/functions.py b/classes/functions.py deleted file mode 100644 index a8ed356..0000000 --- a/classes/functions.py +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# XEP-0072: Server Version -class Version: - def __init__(self, version, msg, target): - self.version = version['software_version']['version'] - self.os = version['software_version']['os'] - self.name = version['software_version']['name'] - self.nick = msg['mucnick'] - self.message_type = msg['type'] - self.target = target - - def format_version(self): - if self.message_type == "groupchat": - text = "%s: %s is running %s version %s on %s" % (self.nick, self.target, self.name, self.version, self.os) - else: - text = "%s is running %s version %s on %s" % (self.target, self.name, self.version, self.os) - - return text - - -# XEP-0012: Last Activity -class LastActivity: - """ query the server uptime of the specified domain, defined by XEP-0012 """ - def __init__(self, last_activity, msg, target): - self.last_activity = last_activity - self.nick = msg['mucnick'] - self.message_type = msg['type'] - self.target = target - - def format_values(self, granularity=4): - seconds = self.last_activity['last_activity']['seconds'] - uptime = [] - intervals = ( - ('years', 31536000), # 60 * 60 * 24 * 365 - ('weeks', 604800), # 60 * 60 * 24 * 7 - ('days', 86400), # 60 * 60 * 24 - ('hours', 3600), # 60 * 60 - ('minutes', 60), - ('seconds', 1) - ) - for name, count in intervals: - value = seconds // count - if value: - seconds -= value * count - if value == 1: - name = name.rstrip('s') - uptime.append("{} {}".format(value, name)) - result = ' '.join(uptime[:granularity]) - - if self.message_type == "groupchat": - text = "%s: %s is running since %s" % (self.nick, self.target, result) - else: - text = "%s is running since %s" % (self.target, result) - - return text - - -# XEP-0157: Contact Addresses for XMPP Services -class ContactInfo: - def __init__(self, contact, msg, target): - self.contact = contact - self.message = msg - self.target = target - - def format_contact(self): - server_info = [] - sep = ' , ' - possible_vars = ['abuse-addresses', - 'admin-addresses', - 'feedback-addresses', - 'sales-addresses', - 'security-addresses', - 'support-addresses'] - - for field in self.contact['disco_info']['form']: - var = field['var'] - if var in possible_vars: - field_value = field.get_value(convert=False) - value = sep.join(field_value) if isinstance(field_value, list) else field_value - server_info.append(' - %s: %s' % (var, value)) - - if server_info: - text = "contact addresses for %s are" % self.target - for count in range(server_info.__len__()): - text += "\n" + server_info[count] - else: - text = "%s has no contact addresses configured." % self.target - - return text - - -# class handeling XMPPError exeptions -class HandleError: - def __init__(self, error, msg, key, target="target missing"): - self.error = error - self.message = msg - self.key = key - self.target = target - - def build_report(self): - condition = self.error.condition - keyword = self.key[1:] - - text = "There was an error requesting " + self.target + '\'s ' + keyword + " : " + condition - - return text diff --git a/classes/servercontact.py b/classes/servercontact.py new file mode 100644 index 0000000..031ef67 --- /dev/null +++ b/classes/servercontact.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +import defusedxml.ElementTree as Et + + +# XEP-0157: Contact Addresses for XMPP Services +class ServerContact: + """ + plugin to process the server contact addresses from a disco query + """ + def __init__(self): + # init all necessary variables + self.possible_vars = ['abuse-addresses', + 'admin-addresses', + 'feedback-addresses', + 'sales-addresses', + 'security-addresses', + 'support-addresses'] + + self.contact = None + self.target, self.opt_arg = None, None + + def opt_arg_abbreviation(self): + """ + optional argument abbreviation function + if the provided string > 2 characters the most likely key will be chosen + :return: completes the opt_arg to the most likely one + """ + # if opt_argument is smaller then 2 pass to prohibit multiple answers + if len(self.opt_arg) < 2: + pass + + abbr = str(self.opt_arg) + possible_abbr = ["abuse-addresses", "admin-addresses", "feedback-addresses", "sales-addresses", + "security-addresses", "support-addresses"] + + # searches the best match in the list of possible_abbr and completes the opt_arg to that + self.opt_arg = [s for s in possible_abbr if s.startswith(abbr)][0] + + def process(self): + # optional argument abbreviation + self.opt_arg_abbreviation() + + # get etree from base xml + iq = Et.fromstring(str(self.contact)) + + # check if query is a valid result query + if iq.find('{http://jabber.org/protocol/disco#info}query'): + # only init result dict if result query is present + result = dict() + + # extract query from iq + query = iq.find('{http://jabber.org/protocol/disco#info}query') + + # extract jabber:x:data from query + xdata = query.findall('{jabber:x:data}x') + + # iterate over all nodes with the xdata tag + for node in xdata: + + # iterate over all child elements in node + for child in node: + + # if one opt_arg is defined return just that one + if self.opt_arg in self.possible_vars: + if child.attrib['var'] == self.opt_arg: + # add section to result dict and append info + result[child.attrib['var']] = list() + for value in child: + result[child.attrib['var']].append(value.text) + + # if node has a var attribute that matches our list process + elif child.attrib['var'] in self.possible_vars: + # add section to result dict and append info + result[child.attrib['var']] = list() + for value in child: + result[child.attrib['var']].append(value.text) + + return result + + def format(self, query, target, opt_arg): + self.contact = query + + self.target = target + self.opt_arg = opt_arg + + result = self.process() + + # if result is present continue + if result: + text = "contact addresses for %s are\n" % self.target + + # if opt_arg is present and member of possible_vars change text line + if self.opt_arg in self.possible_vars: + text = "%s for %s are\n" % (self.opt_arg, self.target) + + for key in result.keys(): + addr = ' , '.join(result[key]) + text += "- %s : %s\n" % (key, addr) + else: + text = "%s has no contact addresses configured." % self.target + + # if opt_arg is present and member of possible_vars but the key is empty change text line + if self.opt_arg in self.possible_vars: + text = "%s for %s are not defined." % (self.opt_arg, self.target) + + return text diff --git a/classes/strings.py b/classes/strings.py deleted file mode 100644 index 6866a31..0000000 --- a/classes/strings.py +++ /dev/null @@ -1,54 +0,0 @@ -# -*- coding: utf-8 -*- -from random import randint - - -class StaticAnswers: - """ - collection of callable static/ semi-static strings - """ - def __init__(self, nick=""): - self.nickname = nick - self.helpfile = { - 'help': '!help -- display this text', - 'version': '!version domain.tld -- receive XMPP server version', - 'uptime': '!uptime domain.tld -- receive XMPP server uptime', - 'contact': '!contact domain.tld -- receive XMPP server contact address info', - 'xep': '!xep XEP Number -- recieve information about the specified XEP'} - self.possible_answers = { - '1': 'I heard that, %s.', - '2': 'I am sorry for that %s.', - '3': '%s did you try turning it off and on again?'} - self.error_messages = { - '1': 'not reachable', - '2': 'not a valid target' - } - self.keywords = { - "keywords": ["!help", "!uptime", "!version", "!contact", "!xep"], - "domain_keywords": ["!uptime", "!version", "!contact"], - "no_arg_keywords": ["!help"], - "number_keywords": ["!xep"] - } - - def keys(self, arg="", keyword='keywords'): - if arg == 'list': - try: - return self.keywords[keyword] - except KeyError: - return self.keywords['keywords'] - else: - return self.keywords - - def gen_help(self): - helpdoc = "\n".join(['%s' % value for (_, value) in self.helpfile.items()]) - return helpdoc - - def gen_answer(self): - possible_answers = self.possible_answers - return possible_answers[str(randint(1, possible_answers.__len__()))] % self.nickname - - def error(self,code): - try: - text = self.error_messages[str(code)] - except KeyError: - return 'unknown error' - return text diff --git a/classes/uptime.py b/classes/uptime.py new file mode 100644 index 0000000..6eb15dd --- /dev/null +++ b/classes/uptime.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- + + +# XEP-0012: Last Activity +class LastActivity: + """ + query the server uptime of the specified domain, defined by XEP-0012 + """ + def __init__(self): + # init all necessary variables + self.last_activity = None + self.target, self.opt_arg = None, None + + def process(self, granularity=4): + seconds = self.last_activity['last_activity']['seconds'] + uptime = [] + + # touple with displayable time sections + intervals = ( + ('years', 31536000), # 60 * 60 * 24 * 365 + ('weeks', 604800), # 60 * 60 * 24 * 7 + ('days', 86400), # 60 * 60 * 24 + ('hours', 3600), # 60 * 60 + ('minutes', 60), + ('seconds', 1) + ) + + # for every element in possible time section process the seconds + for name, count in intervals: + value = seconds // count + if value: + seconds -= value * count + if value == 1: + name = name.rstrip('s') + uptime.append("{} {}".format(value, name)) + result = ' '.join(uptime[:granularity]) + + # insert values into result string + text = "%s is running since %s" % (self.target, result) + + return text + + def format(self, query, target, opt_arg): + self.last_activity = query + + self.target = target + self.opt_arg = opt_arg + + reply = self.process() + return reply diff --git a/classes/version.py b/classes/version.py new file mode 100644 index 0000000..1e9ef7e --- /dev/null +++ b/classes/version.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- + + +# XEP-0072: Server Version +class Version: + """ + process and format a version query + """ + def __init__(self): + # init all necessary variables + self.software_version = None + self.target, self.opt_arg = None, None + + def format_result(self): + # list of all possible opt_arg + possible_opt_args = ["version", "os", "name"] + + name = self.software_version['name'] + version = self.software_version['version'] + os = self.software_version['os'] + + # if opt_arg is given member of possible_opt_args list return that element + if self.opt_arg in possible_opt_args: + text = "%s: %s" % (self.opt_arg, self.software_version[self.opt_arg]) + + # otherwise return full version string + else: + text = "%s is running %s version %s on %s" % (self.target, name, version, os) + + return text + + def format(self, query, target, opt_arg): + self.software_version = query['software_version'] + + self.target = target + self.opt_arg = opt_arg + + reply = self.format_result() + return reply diff --git a/classes/xep.py b/classes/xep.py index 9e4f61f..f5fae61 100644 --- a/classes/xep.py +++ b/classes/xep.py @@ -1,19 +1,17 @@ -#!/usr/bin/env python3 # -*- coding: utf-8 -*- +import os import requests -import defusedxml.ElementTree as ET +import defusedxml.ElementTree as Et class XEPRequest: - def __init__(self, msg, xepnumber): - """ - class which requests the header of the referenced xep - :param xepnumber: number int or str to request the xep for - """ - self.message_type = msg['type'] - self.muc_nick = msg['mucnick'] + """ + class which requests the header of the referenced xep + """ + def __init__(self): + # init all necessary variables + self.reqxep, self.opt_arg = None, None - self.reqxep = str(xepnumber) self.xeplist = None self.acceptedxeps = list() @@ -21,30 +19,40 @@ class XEPRequest: """ query and save the current xep list to reduce network bandwidth """ - try: - with open(".etag") as file: + # check if etag header is present if not set local_etag to "" + if os.path.isfile("./common/.etag"): + with open("./common/.etag") as file: local_etag = file.read() - except FileNotFoundError: + else: local_etag = "" with requests.Session() as s: + # head request the xeplist.xml s.headers.update({'Accept': 'application/xml'}) head = s.head("https://xmpp.org/extensions/xeplist.xml") etag = head.headers['etag'] + # compare etag with local_etag if they match up no request is made if local_etag == etag: - with open("xeplist.xml", "r") as file: - self.xeplist = ET.fromstring(file.read()) + with open("./common/xeplist.xml", "r") as file: + self.xeplist = Et.fromstring(file.read()) + + # if the connection is not possible use cached xml if present + elif os.path.isfile("./common/xeplist.xml") and head.status_code != 200: + with open("./common/xeplist.xml", "r") as file: + self.xeplist = Et.fromstring(file.read()) + + # in any other case request the latest xml else: r = s.get("https://xmpp.org/extensions/xeplist.xml") r.encoding = 'utf-8' local_etag = head.headers['etag'] - with open("xeplist.xml", "w") as file: + with open("./common/xeplist.xml", "w") as file: file.write(r.content.decode()) - self.xeplist = ET.fromstring(r.content.decode()) + self.xeplist = Et.fromstring(r.content.decode()) - with open('.etag', 'w') as string: + with open('./common/.etag', 'w') as string: string.write(local_etag) # populate xep comparison list @@ -54,34 +62,57 @@ class XEPRequest: def get(self): """ function to query the xep entry if xepnumber is present in xeplist - :return: nicely formatted xep header information + :return: formatted xep header information """ + # all possible subtags grouped by location + last_revision_tags = ["date", "version", "initials", "remark"] + xep_tags = ["number", "title", "abstract", "type", "status", "approver", "shortname", "sig", "lastcall", "date", "version", "initials", "remark"] + # check if xeplist is accurate self.req_xeplist() result = list() - # if requested number is inside acceptedxeps continou - if self.reqxep in self.acceptedxeps: + # if requested number is member of acceptedxeps continue + if str(self.reqxep) in self.acceptedxeps: searchstring = ".//*[@accepted='true']/[number='%s']" % self.reqxep for item in self.xeplist.findall(searchstring): - for x in range(1,5): - result.append(item[x].tag + " : " + item[x].text) - + # if the opt_arg references is member of xeptag return only that tag + if self.opt_arg in xep_tags: + + # if the opt_arg references is member of last_revision_tags return only that subtag + if self.opt_arg in last_revision_tags: + query = item.find("last-revision").find(self.opt_arg) + else: + query = item.find(self.opt_arg) + + # append opt_arg results to the result list + if query is not None: + result.append("%s : %s" % (query.tag, query.text)) + else: + result.append("%s does not have a %s tag." % (self.reqxep, self.opt_arg)) + + # in any other case return the general answer + else: + result_opts = ["title", "type", "abstract", "status"] + for tag in result_opts: + result.append(item.find(tag).text) + + # if the requested number is no member of acceptedxeps and/or not accepted return error. else: - if self.message_type == "groupchat": - result.append(self.muc_nick + " : " + "XEP-" + str(self.reqxep) + " : is not available.") - else: - result.append("XEP-" + str(self.reqxep) + " : is not available.") + result.append("XEP-%s : is not available." % self.reqxep) return result - def format(self): + def format(self, query, target, opt_arg): + """ + :param target: number int or str to request the xep for + :return: + """ + self.reqxep = int(target) + self.opt_arg = opt_arg + reply = self.get() - if self.message_type == "groupchat": - text = "%s: " % self.muc_nick - reply[0] = text + reply[0] text = '\n'.join(reply) - return text |