summaryrefslogtreecommitdiffstats
path: root/classes
diff options
context:
space:
mode:
authornico <nico@magicbroccoli.de>2018-11-06 23:43:11 +0100
committernico <nico@magicbroccoli.de>2018-11-06 23:43:11 +0100
commit0c313565f2b649366f7382dc1b3f28a3e80f4ffc (patch)
tree953d5ccff11b3954794d3ed713239e16b3be6b19 /classes
parent559ab280ca705bca200823a0493308b10aba1dd4 (diff)
simplification and major rework
* updated gitignore file * partly reworked servercontact implementation * complete rework of uptime, version * part rework of xep requests + added more comments to xep requests + added opt_arg to version, xep and contact * complete rework of validate function * updated HandleError function * part rework of StaticStrings function + implemented data dictionary to hold all data in main bot + added message_ids * complete rework of queue building and deduplication
Diffstat (limited to 'classes')
-rw-r--r--classes/functions.py108
-rw-r--r--classes/servercontact.py45
-rw-r--r--classes/uptime.py43
-rw-r--r--classes/version.py39
-rw-r--r--classes/xep.py85
5 files changed, 154 insertions, 166 deletions
diff --git a/classes/functions.py b/classes/functions.py
deleted file mode 100644
index a8ed356..0000000
--- a/classes/functions.py
+++ /dev/null
@@ -1,108 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
-# XEP-0072: Server Version
-class Version:
- def __init__(self, version, msg, target):
- self.version = version['software_version']['version']
- self.os = version['software_version']['os']
- self.name = version['software_version']['name']
- self.nick = msg['mucnick']
- self.message_type = msg['type']
- self.target = target
-
- def format_version(self):
- if self.message_type == "groupchat":
- text = "%s: %s is running %s version %s on %s" % (self.nick, self.target, self.name, self.version, self.os)
- else:
- text = "%s is running %s version %s on %s" % (self.target, self.name, self.version, self.os)
-
- return text
-
-
-# XEP-0012: Last Activity
-class LastActivity:
- """ query the server uptime of the specified domain, defined by XEP-0012 """
- def __init__(self, last_activity, msg, target):
- self.last_activity = last_activity
- self.nick = msg['mucnick']
- self.message_type = msg['type']
- self.target = target
-
- def format_values(self, granularity=4):
- seconds = self.last_activity['last_activity']['seconds']
- uptime = []
- intervals = (
- ('years', 31536000), # 60 * 60 * 24 * 365
- ('weeks', 604800), # 60 * 60 * 24 * 7
- ('days', 86400), # 60 * 60 * 24
- ('hours', 3600), # 60 * 60
- ('minutes', 60),
- ('seconds', 1)
- )
- for name, count in intervals:
- value = seconds // count
- if value:
- seconds -= value * count
- if value == 1:
- name = name.rstrip('s')
- uptime.append("{} {}".format(value, name))
- result = ' '.join(uptime[:granularity])
-
- if self.message_type == "groupchat":
- text = "%s: %s is running since %s" % (self.nick, self.target, result)
- else:
- text = "%s is running since %s" % (self.target, result)
-
- return text
-
-
-# XEP-0157: Contact Addresses for XMPP Services
-class ContactInfo:
- def __init__(self, contact, msg, target):
- self.contact = contact
- self.message = msg
- self.target = target
-
- def format_contact(self):
- server_info = []
- sep = ' , '
- possible_vars = ['abuse-addresses',
- 'admin-addresses',
- 'feedback-addresses',
- 'sales-addresses',
- 'security-addresses',
- 'support-addresses']
-
- for field in self.contact['disco_info']['form']:
- var = field['var']
- if var in possible_vars:
- field_value = field.get_value(convert=False)
- value = sep.join(field_value) if isinstance(field_value, list) else field_value
- server_info.append(' - %s: %s' % (var, value))
-
- if server_info:
- text = "contact addresses for %s are" % self.target
- for count in range(server_info.__len__()):
- text += "\n" + server_info[count]
- else:
- text = "%s has no contact addresses configured." % self.target
-
- return text
-
-
-# class handeling XMPPError exeptions
-class HandleError:
- def __init__(self, error, msg, key, target="target missing"):
- self.error = error
- self.message = msg
- self.key = key
- self.target = target
-
- def build_report(self):
- condition = self.error.condition
- keyword = self.key[1:]
-
- text = "There was an error requesting " + self.target + '\'s ' + keyword + " : " + condition
-
- return text
diff --git a/classes/servercontact.py b/classes/servercontact.py
index 27b72f0..ea7216d 100644
--- a/classes/servercontact.py
+++ b/classes/servercontact.py
@@ -2,12 +2,13 @@
import defusedxml.ElementTree as Et
+# XEP-0157: Contact Addresses for XMPP Services
class ServerContact:
- def __init__(self, contact, msg, target):
- self.contact = contact
- self.message = msg
- self.target = target
-
+ """
+ plugin to process the server contact addresses from a disco query
+ """
+ def __init__(self):
+ # init all necessary variables
self.possible_vars = ['abuse-addresses',
'admin-addresses',
'feedback-addresses',
@@ -15,6 +16,9 @@ class ServerContact:
'security-addresses',
'support-addresses']
+ self.contact = None
+ self.target, self.opt_arg = None, None
+
def process(self):
# get etree from base xml
iq = Et.fromstring(str(self.contact))
@@ -36,8 +40,16 @@ class ServerContact:
# iterate over all x nodes
for child in xdata[x]:
+ # if one opt_arg is defined return just that one
+ if self.opt_arg in self.possible_vars:
+ if child.attrib['var'] == self.opt_arg:
+ # add section to result dict and append info
+ result[child.attrib['var']] = list()
+ for value in child:
+ result[child.attrib['var']].append(value.text)
+
# if node has a var attribute that matches our list process
- if child.attrib['var'] in self.possible_vars:
+ elif child.attrib['var'] in self.possible_vars:
# add section to result dict and append info
result[child.attrib['var']] = list()
for value in child:
@@ -45,17 +57,30 @@ class ServerContact:
return result
- def format_contact(self):
+ def format(self, query, target, opt_arg):
+ self.contact = query
+
+ self.target = target
+ self.opt_arg = opt_arg
+
result = self.process()
+ # if result is present continue
if result:
text = "contact addresses for %s are\n" % self.target
+ # if opt_arg is present and member of possible_vars change text line
+ if opt_arg in self.possible_vars:
+ text = "%s for %s are\n" % (self.opt_arg, self.target)
+
for key in result.keys():
- if result[key]:
- addr = ' , '.join(result[key])
- text += "- %s : %s\n" % (key, addr)
+ addr = ' , '.join(result[key])
+ text += "- %s : %s\n" % (key, addr)
else:
text = "%s has no contact addresses configured." % self.target
+ # if opt_arg is present and member of possible_vars but the key is empty change text line
+ if opt_arg in self.possible_vars:
+ text = "%s for %s are not defined." % (self.opt_arg, self.target)
+
return text
diff --git a/classes/uptime.py b/classes/uptime.py
index 96c8685..6eb15dd 100644
--- a/classes/uptime.py
+++ b/classes/uptime.py
@@ -3,23 +3,19 @@
# XEP-0012: Last Activity
class LastActivity:
- """ query the server uptime of the specified domain, defined by XEP-0012 """
- def __init__(self, session, msg, target):
- self.session = session
- self.nick = msg['mucnick']
- self.message_type = msg['type']
- self.target = target
-
- async def query(self):
- last_activity = await self.session['xep_0012'].get_last_activity(jid=self.target)
- seconds = await last_activity['last_activity']['seconds']
-
- return seconds
+ """
+ query the server uptime of the specified domain, defined by XEP-0012
+ """
+ def __init__(self):
+ # init all necessary variables
+ self.last_activity = None
+ self.target, self.opt_arg = None, None
- async def format_values(self, granularity=4):
- seconds = await self.query()
- #seconds = last_activity['last_activity']['seconds']
+ def process(self, granularity=4):
+ seconds = self.last_activity['last_activity']['seconds']
uptime = []
+
+ # touple with displayable time sections
intervals = (
('years', 31536000), # 60 * 60 * 24 * 365
('weeks', 604800), # 60 * 60 * 24 * 7
@@ -28,6 +24,8 @@ class LastActivity:
('minutes', 60),
('seconds', 1)
)
+
+ # for every element in possible time section process the seconds
for name, count in intervals:
value = seconds // count
if value:
@@ -37,9 +35,16 @@ class LastActivity:
uptime.append("{} {}".format(value, name))
result = ' '.join(uptime[:granularity])
- if self.message_type == "groupchat":
- text = "%s: %s is running since %s" % (self.nick, self.target, result)
- else:
- text = "%s is running since %s" % (self.target, result)
+ # insert values into result string
+ text = "%s is running since %s" % (self.target, result)
return text
+
+ def format(self, query, target, opt_arg):
+ self.last_activity = query
+
+ self.target = target
+ self.opt_arg = opt_arg
+
+ reply = self.process()
+ return reply
diff --git a/classes/version.py b/classes/version.py
new file mode 100644
index 0000000..1e9ef7e
--- /dev/null
+++ b/classes/version.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+
+
+# XEP-0072: Server Version
+class Version:
+ """
+ process and format a version query
+ """
+ def __init__(self):
+ # init all necessary variables
+ self.software_version = None
+ self.target, self.opt_arg = None, None
+
+ def format_result(self):
+ # list of all possible opt_arg
+ possible_opt_args = ["version", "os", "name"]
+
+ name = self.software_version['name']
+ version = self.software_version['version']
+ os = self.software_version['os']
+
+ # if opt_arg is given member of possible_opt_args list return that element
+ if self.opt_arg in possible_opt_args:
+ text = "%s: %s" % (self.opt_arg, self.software_version[self.opt_arg])
+
+ # otherwise return full version string
+ else:
+ text = "%s is running %s version %s on %s" % (self.target, name, version, os)
+
+ return text
+
+ def format(self, query, target, opt_arg):
+ self.software_version = query['software_version']
+
+ self.target = target
+ self.opt_arg = opt_arg
+
+ reply = self.format_result()
+ return reply
diff --git a/classes/xep.py b/classes/xep.py
index 9e8414c..98f4b78 100644
--- a/classes/xep.py
+++ b/classes/xep.py
@@ -1,19 +1,18 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
+import os
import requests
-import defusedxml.ElementTree as ET
+import defusedxml.ElementTree as et
class XEPRequest:
- def __init__(self, msg, xepnumber):
- """
- class which requests the header of the referenced xep
- :param xepnumber: number int or str to request the xep for
- """
- self.message_type = msg['type']
- self.muc_nick = msg['mucnick']
+ """
+ class which requests the header of the referenced xep
+ """
+ def __init__(self):
+ # init all necessary variables
+ self.reqxep, self.opt_arg = None, None
- self.reqxep = int(xepnumber)
self.xeplist = None
self.acceptedxeps = list()
@@ -21,20 +20,30 @@ class XEPRequest:
"""
query and save the current xep list to reduce network bandwidth
"""
- try:
- with open(".etag") as file:
+ # check if etag header is present if not set local_etag to ""
+ if os.path.isfile("./common/.etag"):
+ with open("./common/.etag") as file:
local_etag = file.read()
- except FileNotFoundError:
+ else:
local_etag = ""
with requests.Session() as s:
+ # head request the xeplist.xml
s.headers.update({'Accept': 'application/xml'})
head = s.head("https://xmpp.org/extensions/xeplist.xml")
etag = head.headers['etag']
+ # compare etag with local_etag if they match up no request is made
if local_etag == etag:
with open("./common/xeplist.xml", "r") as file:
- self.xeplist = ET.fromstring(file.read())
+ self.xeplist = et.fromstring(file.read())
+
+ # if the connection is not possible use cached xml if present
+ elif os.path.isfile("./common/xeplist.xml") and head.status_code != 200:
+ with open("./common/xeplist.xml", "r") as file:
+ self.xeplist = et.fromstring(file.read())
+
+ # in any other case request the latest xml
else:
r = s.get("https://xmpp.org/extensions/xeplist.xml")
r.encoding = 'utf-8'
@@ -42,9 +51,9 @@ class XEPRequest:
with open("./common/xeplist.xml", "w") as file:
file.write(r.content.decode())
- self.xeplist = ET.fromstring(r.content.decode())
+ self.xeplist = et.fromstring(r.content.decode())
- with open('.etag', 'w') as string:
+ with open('./common/.etag', 'w') as string:
string.write(local_etag)
# populate xep comparison list
@@ -54,34 +63,52 @@ class XEPRequest:
def get(self):
"""
function to query the xep entry if xepnumber is present in xeplist
- :return: nicely formatted xep header information
+ :return: formatted xep header information
"""
+ # all possible subtags grouped by location
+ last_revision_tags = ["date", "version", "initials", "remark"]
+ xep_tags = ["number", "title", "abstract", "type", "status", "approver", "shortname", "sig", "lastcall"]
+
# check if xeplist is accurate
self.req_xeplist()
result = list()
- # if requested number is inside acceptedxeps continou
+ # if requested number is member of acceptedxeps continue
if str(self.reqxep) in self.acceptedxeps:
searchstring = ".//*[@accepted='true']/[number='%s']" % self.reqxep
for item in self.xeplist.findall(searchstring):
- for x in range(1,5):
- result.append(item[x].tag + " : " + item[x].text)
-
+ # if the opt_arg references is member of xeptag return only that tag
+ if self.opt_arg in xep_tags:
+ query = item.find(self.opt_arg)
+ result.append("%s : %s" % (query.tag, query.text))
+
+ # if the opt_arg references is member of last-revision_tags return only that tag
+ elif self.opt_arg in last_revision_tags:
+ query = item.find("last-revision").find(self.opt_arg)
+ result.append("%s : %s" % (query.tag, query.text))
+
+ # in any other case return the general answer
+ else:
+ result_opts = ["title", "type", "abstract", "status"]
+ for tag in result_opts:
+ result.append(item.find(tag).text)
+
+ # if the requested number is no member of acceptedxeps and/or not accepted return error.
else:
- if self.message_type == "groupchat":
- result.append(self.muc_nick + " : " + "XEP-" + str(self.reqxep) + " : is not available.")
- else:
- result.append("XEP-" + str(self.reqxep) + " : is not available.")
+ result.append("XEP-%s : is not available." % self.reqxep)
return result
- def format(self):
+ def format(self, query, target, opt_arg):
+ """
+ :param target: number int or str to request the xep for
+ :return:
+ """
+ self.reqxep = int(target)
+ self.opt_arg = opt_arg
+
reply = self.get()
- if self.message_type == "groupchat":
- text = "%s: " % self.muc_nick
- reply[0] = text + reply[0]
text = '\n'.join(reply)
-
return text