summaryrefslogtreecommitdiffstats
path: root/classes/xep.py
diff options
context:
space:
mode:
Diffstat (limited to 'classes/xep.py')
-rw-r--r--classes/xep.py97
1 files changed, 64 insertions, 33 deletions
diff --git a/classes/xep.py b/classes/xep.py
index 9e4f61f..f5fae61 100644
--- a/classes/xep.py
+++ b/classes/xep.py
@@ -1,19 +1,17 @@
-#!/usr/bin/env python3
# -*- coding: utf-8 -*-
+import os
import requests
-import defusedxml.ElementTree as ET
+import defusedxml.ElementTree as Et
class XEPRequest:
- def __init__(self, msg, xepnumber):
- """
- class which requests the header of the referenced xep
- :param xepnumber: number int or str to request the xep for
- """
- self.message_type = msg['type']
- self.muc_nick = msg['mucnick']
+ """
+ class which requests the header of the referenced xep
+ """
+ def __init__(self):
+ # init all necessary variables
+ self.reqxep, self.opt_arg = None, None
- self.reqxep = str(xepnumber)
self.xeplist = None
self.acceptedxeps = list()
@@ -21,30 +19,40 @@ class XEPRequest:
"""
query and save the current xep list to reduce network bandwidth
"""
- try:
- with open(".etag") as file:
+ # check if etag header is present if not set local_etag to ""
+ if os.path.isfile("./common/.etag"):
+ with open("./common/.etag") as file:
local_etag = file.read()
- except FileNotFoundError:
+ else:
local_etag = ""
with requests.Session() as s:
+ # head request the xeplist.xml
s.headers.update({'Accept': 'application/xml'})
head = s.head("https://xmpp.org/extensions/xeplist.xml")
etag = head.headers['etag']
+ # compare etag with local_etag if they match up no request is made
if local_etag == etag:
- with open("xeplist.xml", "r") as file:
- self.xeplist = ET.fromstring(file.read())
+ with open("./common/xeplist.xml", "r") as file:
+ self.xeplist = Et.fromstring(file.read())
+
+ # if the connection is not possible use cached xml if present
+ elif os.path.isfile("./common/xeplist.xml") and head.status_code != 200:
+ with open("./common/xeplist.xml", "r") as file:
+ self.xeplist = Et.fromstring(file.read())
+
+ # in any other case request the latest xml
else:
r = s.get("https://xmpp.org/extensions/xeplist.xml")
r.encoding = 'utf-8'
local_etag = head.headers['etag']
- with open("xeplist.xml", "w") as file:
+ with open("./common/xeplist.xml", "w") as file:
file.write(r.content.decode())
- self.xeplist = ET.fromstring(r.content.decode())
+ self.xeplist = Et.fromstring(r.content.decode())
- with open('.etag', 'w') as string:
+ with open('./common/.etag', 'w') as string:
string.write(local_etag)
# populate xep comparison list
@@ -54,34 +62,57 @@ class XEPRequest:
def get(self):
"""
function to query the xep entry if xepnumber is present in xeplist
- :return: nicely formatted xep header information
+ :return: formatted xep header information
"""
+ # all possible subtags grouped by location
+ last_revision_tags = ["date", "version", "initials", "remark"]
+ xep_tags = ["number", "title", "abstract", "type", "status", "approver", "shortname", "sig", "lastcall", "date", "version", "initials", "remark"]
+
# check if xeplist is accurate
self.req_xeplist()
result = list()
- # if requested number is inside acceptedxeps continou
- if self.reqxep in self.acceptedxeps:
+ # if requested number is member of acceptedxeps continue
+ if str(self.reqxep) in self.acceptedxeps:
searchstring = ".//*[@accepted='true']/[number='%s']" % self.reqxep
for item in self.xeplist.findall(searchstring):
- for x in range(1,5):
- result.append(item[x].tag + " : " + item[x].text)
-
+ # if the opt_arg references is member of xeptag return only that tag
+ if self.opt_arg in xep_tags:
+
+ # if the opt_arg references is member of last_revision_tags return only that subtag
+ if self.opt_arg in last_revision_tags:
+ query = item.find("last-revision").find(self.opt_arg)
+ else:
+ query = item.find(self.opt_arg)
+
+ # append opt_arg results to the result list
+ if query is not None:
+ result.append("%s : %s" % (query.tag, query.text))
+ else:
+ result.append("%s does not have a %s tag." % (self.reqxep, self.opt_arg))
+
+ # in any other case return the general answer
+ else:
+ result_opts = ["title", "type", "abstract", "status"]
+ for tag in result_opts:
+ result.append(item.find(tag).text)
+
+ # if the requested number is no member of acceptedxeps and/or not accepted return error.
else:
- if self.message_type == "groupchat":
- result.append(self.muc_nick + " : " + "XEP-" + str(self.reqxep) + " : is not available.")
- else:
- result.append("XEP-" + str(self.reqxep) + " : is not available.")
+ result.append("XEP-%s : is not available." % self.reqxep)
return result
- def format(self):
+ def format(self, query, target, opt_arg):
+ """
+ :param target: number int or str to request the xep for
+ :return:
+ """
+ self.reqxep = int(target)
+ self.opt_arg = opt_arg
+
reply = self.get()
- if self.message_type == "groupchat":
- text = "%s: " % self.muc_nick
- reply[0] = text + reply[0]
text = '\n'.join(reply)
-
return text