summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--classes/__init__.py5
-rw-r--r--classes/help.py29
-rw-r--r--classes/servercontact.py26
-rw-r--r--classes/version.py11
-rw-r--r--classes/xep.py2
-rwxr-xr-xcommon/misc.py52
-rw-r--r--common/strings.json108
-rw-r--r--common/strings.py69
-rwxr-xr-xmain.py73
9 files changed, 259 insertions, 116 deletions
diff --git a/classes/__init__.py b/classes/__init__.py
new file mode 100644
index 0000000..838c8c2
--- /dev/null
+++ b/classes/__init__.py
@@ -0,0 +1,5 @@
+from classes.version import Version
+from classes.servercontact import ServerContact
+from classes.uptime import LastActivity
+from classes.xep import XEPRequest
+from classes.help import Helper
diff --git a/classes/help.py b/classes/help.py
new file mode 100644
index 0000000..c268520
--- /dev/null
+++ b/classes/help.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+from common.misc import arg_abbr
+from common.strings import StaticAnswers
+import json
+
+
+class Helper:
+ def __init__(self):
+ self.possible_vars = StaticAnswers().helpfile["help_advanced"].keys()
+
+ def receive(self, target):
+ # optional argument abbreviation
+ target = arg_abbr(target, self.possible_vars)
+
+ if target in self.possible_vars:
+ return StaticAnswers().help_doc(target)
+ else:
+ return StaticAnswers().help_doc()
+
+ def format(self, query, target, opt_arg):
+ doc = self.receive(target)
+
+ if target in self.possible_vars:
+ answer = json.dumps(StaticAnswers().help_doc(target), indent=4)
+
+ else:
+ answer = "\n".join(['%s' % value for (_, value) in doc.items()])
+
+ return answer
diff --git a/classes/servercontact.py b/classes/servercontact.py
index c2f4ad5..e0e871e 100644
--- a/classes/servercontact.py
+++ b/classes/servercontact.py
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
+from common.misc import arg_abbr
import defusedxml.ElementTree as Et
@@ -19,23 +20,6 @@ class ServerContact:
self.contact = None
self.target, self.opt_arg = None, None
- def opt_arg_abbreviation(self):
- """
- optional argument abbreviation function
- if the provided string > 2 characters the most likely key will be chosen
- :return: completes the opt_arg to the most likely one
- """
- # if opt_argument is smaller then 2 pass to prohibit multiple answers
- if len(self.opt_arg) < 2:
- pass
-
- abbr = str(self.opt_arg)
- possible_abbr = ["abuse-addresses", "admin-addresses", "feedback-addresses", "sales-addresses",
- "security-addresses", "support-addresses"]
-
- # searches the best match in the list of possible_abbr and completes the opt_arg to that
- self.opt_arg = [s for s in possible_abbr if s.startswith(abbr)][0]
-
def process(self):
# get etree from base xml
iq = Et.fromstring(str(self.contact))
@@ -57,11 +41,11 @@ class ServerContact:
# iterate over all child elements in node
for child in node:
- # if one opt_arg is defined return just that one
- if self.opt_arg in self.possible_vars:
- # check for possible abbreviations to the optional argument
- self.opt_arg_abbreviation()
+ # check for possible abbreviations to the optional argument
+ self.opt_arg = arg_abbr(self.opt_arg, self.possible_vars)
+ # if opt_arg is defined and valid return just that one
+ if self.opt_arg in self.possible_vars:
if child.attrib['var'] == self.opt_arg:
# add section to result dict and append info
result[child.attrib['var']] = list()
diff --git a/classes/version.py b/classes/version.py
index 1e9ef7e..42b3e61 100644
--- a/classes/version.py
+++ b/classes/version.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-
+from common.misc import arg_abbr
# XEP-0072: Server Version
class Version:
@@ -10,17 +10,18 @@ class Version:
# init all necessary variables
self.software_version = None
self.target, self.opt_arg = None, None
+ self.possible_vars = ["version", "os", "name"]
def format_result(self):
- # list of all possible opt_arg
- possible_opt_args = ["version", "os", "name"]
-
name = self.software_version['name']
version = self.software_version['version']
os = self.software_version['os']
+ # check for possible abbreviations to the optional argument
+ self.opt_arg = arg_abbr(self.opt_arg, self.possible_vars)
+
# if opt_arg is given member of possible_opt_args list return that element
- if self.opt_arg in possible_opt_args:
+ if self.opt_arg in self.possible_vars:
text = "%s: %s" % (self.opt_arg, self.software_version[self.opt_arg])
# otherwise return full version string
diff --git a/classes/xep.py b/classes/xep.py
index f5fae61..df401a7 100644
--- a/classes/xep.py
+++ b/classes/xep.py
@@ -47,10 +47,10 @@ class XEPRequest:
r = s.get("https://xmpp.org/extensions/xeplist.xml")
r.encoding = 'utf-8'
local_etag = head.headers['etag']
+ self.xeplist = Et.fromstring(r.content.decode())
with open("./common/xeplist.xml", "w") as file:
file.write(r.content.decode())
- self.xeplist = Et.fromstring(r.content.decode())
with open('./common/.etag', 'w') as string:
string.write(local_etag)
diff --git a/common/misc.py b/common/misc.py
index b91c618..edfe3a0 100755
--- a/common/misc.py
+++ b/common/misc.py
@@ -24,28 +24,56 @@ def validate(keyword, target):
:param target: provided target
:return: true if valid
"""
- # if keyword in domain_keywords list
- if keyword in StaticAnswers().keys('domain_keywords'):
+ # if keyword is in noarg list return True
+ if keyword in StaticAnswers().keys("noarg"):
+ return True
+
+ # prevent AttributeError if target is NoneType
+ if target is None:
+ return False
+
+ # if keyword in domain list
+ if keyword in StaticAnswers().keys('domain'):
# if target is a domain / email return True
if validators.domain(target) or validators.email(target):
return True
- # check if keyword is in number_keyword list
- elif keyword in StaticAnswers().keys('number_keywords'):
- # prevent AttributeError if target is NoneType
- if target is not None:
- # if target only consists of digits return True
- return target.isdigit()
+ # check if keyword is in number list
+ elif keyword in StaticAnswers().keys('number'):
+ return target.isdigit()
- # if keyword is in no_arg_keywords list return True
- elif keyword in StaticAnswers().keys("no_arg_keywords"):
+ # if keyword in expand list return True
+ elif keyword in StaticAnswers().keys("expand"):
return True
# if the target could not be validated until this return False
return False
-#
+def arg_abbr(value, possible_values):
+ """
+ optional argument abbreviation
+ if the provided string value > 2 characters the most likely value will be chosen
+ :return: completes the value to the most likely one
+ """
+ # prevent traceback if value is None
+ if value and possible_values is not None:
+ # if opt_argument is smaller then 2 pass to prohibit multiple answers
+ if len(value) < 2:
+ return value
+
+ abbr = str(value)
+
+ # searches the best match in the list of possible_abbr and completes the opt_arg to that
+ new_value = [s for s in list(possible_values) if s.startswith(abbr)]
+
+ # prevent index error if abbreviation has not result
+ if new_value:
+ value = new_value[0]
+
+ return value
+
+
class HandleError:
"""
simple XMPP error / exception class formating the error condition
@@ -59,6 +87,6 @@ class HandleError:
def report(self):
# return the formatted result string to the user
- text = "%s. %s %s resulted in: %s" % (self.text, self.key, self.target, self.condition)
+ text = "%s %s resulted in: %s" % (self.key, self.target, self.condition)
return text
diff --git a/common/strings.json b/common/strings.json
new file mode 100644
index 0000000..eefd73e
--- /dev/null
+++ b/common/strings.json
@@ -0,0 +1,108 @@
+{
+ "help": {
+ "help_basic": {
+ "contact": "!contact domain.tld -- receive XMPP server contact address info",
+ "help": "!help -- display all basic help information",
+ "uptime": "!uptime domain.tld -- receive XMPP server uptime",
+ "version": "!version domain.tld -- receive XMPP server version",
+ "xep": "!xep $xepnumber -- receive xep info abstract"
+ },
+ "help_advanced": {
+ "contact": {
+ "info": "XEP-0157 contact addresses for XMPP Services request",
+ "optional_args": {
+ "info": "it is not required to type the whole optional argument, it is only necessary to type at least the two starting characters.",
+ "possible_vars": [
+ "abuse-addresses",
+ "admin-addresses",
+ "feedback-addresses",
+ "sales-addresses",
+ "security-addresses",
+ "support-addresses"
+ ]
+ },
+ "targets": "valid target: domain.tld",
+ "command": "!contact $target $optional_argument"
+ },
+ "uptime": {
+ "info": "XEP-0012 last activity request",
+ "targets": "valid targets: domain.tld",
+ "command": "!uptime $target $optional_argument"
+ },
+ "version": {
+ "info": "XEP-0072 version query request",
+ "optional_args": {
+ "name": "receive the name of the software used",
+ "os": "receive only the operating system version",
+ "version": "receive only the software version used"
+ },
+ "targets": "valid targets are: domain.tld and jid/resource",
+ "command": "!version $target $optional_argument"
+ },
+ "xep": {
+ "info": "receive abstract of referenced XEP",
+ "optional_args": {
+ "last_revision_tags": [
+ "date",
+ "version",
+ "initials",
+ "remark"
+ ],
+ "tags": [
+ "number",
+ "title",
+ "abstract",
+ "type",
+ "status",
+ "approver",
+ "shortname",
+ "sig",
+ "lastcall",
+ "date",
+ "version",
+ "initials",
+ "remark"
+ ]
+ },
+ "targets": "any valid integer",
+ "command": "!xep $xepnumber $optional_argument"
+ },
+ "info": {
+ "info": "pooled command to accumulate dataset composed of uptime version and contact",
+ "targets": "valid target is: domain.tld",
+ "command": "!info $target"
+ }
+ }
+ },
+ "functions": {
+ "keywords": {
+ "all": [
+ "!help",
+ "!uptime",
+ "!version",
+ "!contact",
+ "!xep",
+ "!info"
+ ],
+ "domain": [
+ "!uptime",
+ "!version",
+ "!contact"
+ ],
+ "expand": {
+ "!info": ["!uptime", "!version", "!contact"]
+ },
+ "noarg": [
+ "!help"
+ ],
+ "number": [
+ "!xep"
+ ]
+ },
+ "answers": [
+ "I heard that %s",
+ "%s I am sorry for that.",
+ "%s did you try turning it off and on again?"
+ ]
+ }
+} \ No newline at end of file
diff --git a/common/strings.py b/common/strings.py
index 6f1b629..6941a6b 100644
--- a/common/strings.py
+++ b/common/strings.py
@@ -1,55 +1,36 @@
# -*- coding: utf-8 -*-
-from random import randint
+from random import choice
+import json
class StaticAnswers:
"""
collection of callable static/ semi-static strings
"""
- def __init__(self, nick=""):
- self.nickname = nick
- self.helpfile = {
- 'help': '!help -- display this text',
- 'version': '!version domain.tld -- receive XMPP server version',
- 'uptime': '!uptime domain.tld -- receive XMPP server uptime',
- 'contact': '!contact domain.tld -- receive XMPP server contact address info',
- 'xep': '!xep XEP Number -- recieve information about the specified XEP'
- }
- self.possible_answers = {
- '1': 'I heard that, %s.',
- '2': 'I am sorry for that %s.',
- '3': '%s did you try turning it off and on again?'
- }
- self.error_messages = {
- '1': 'not reachable',
- '2': 'not a valid target'
- }
- self.keywords = {
- "keywords": ["!help", "!uptime", "!version", "!contact", "!xep"],
- "domain_keywords": ["!uptime", "!version", "!contact"],
- "no_arg_keywords": ["!help"],
- "number_keywords": ["!xep"]
- }
-
- def keys(self, key=""):
+ def __init__(self):
+ with open("./common/strings.json") as basefile:
+ self.strings = json.load(basefile)
+
+ self.helpfile = self.strings["help"]
+ self.keywords = self.strings["functions"]["keywords"]
+ self.replys = self.strings["functions"]["answers"]
+
+ def keys(self, key=None):
# if specific keyword in referenced return that
- if key in self.keywords.keys():
+ if key in self.keywords:
return self.keywords[key]
# in any other case return the whole dict
- return self.keywords["keywords"]
-
- def gen_help(self):
- helpdoc = "\n".join(['%s' % value for (_, value) in self.helpfile.items()])
- return helpdoc
-
- def gen_answer(self):
- possible_answers = self.possible_answers
- return possible_answers[str(randint(1, possible_answers.__len__()))] % self.nickname
-
- def error(self,code):
- try:
- text = self.error_messages[str(code)]
- except KeyError:
- return 'unknown error'
- return text
+ return self.keywords["all"]
+
+ def help_doc(self, key=None):
+ # if specific key is referenced return only that key
+ if key is not None:
+ return self.helpfile["help_advanced"][key]
+
+ # in any other case return basic dict
+ return self.helpfile["help_basic"]
+
+ def answers(self, nick=None):
+ # return pseudo random answer
+ return choice(self.replys) % nick
diff --git a/main.py b/main.py
index 50f9b03..d04f8d3 100755
--- a/main.py
+++ b/main.py
@@ -17,10 +17,7 @@ from slixmpp.exceptions import XMPPError
import common.misc as misc
from common.strings import StaticAnswers
-from classes.servercontact import ServerContact
-from classes.version import Version
-from classes.uptime import LastActivity
-from classes.xep import XEPRequest
+from classes import *
class QueryBot(slixmpp.ClientXMPP):
@@ -35,7 +32,8 @@ class QueryBot(slixmpp.ClientXMPP):
"!uptime": LastActivity(),
"!contact": ServerContact(),
"!version": Version(),
- "!xep": XEPRequest()
+ "!xep": XEPRequest(),
+ "!help": Helper()
}
# session start event, starting point for the presence and roster requests
@@ -72,8 +70,15 @@ class QueryBot(slixmpp.ClientXMPP):
return
elif self.nick in msg['body']:
+ if msg["type"] == "groupchat":
+ # discover role first
+ affiliation = self['xep_0045'].get_jid_property(msg["mucroom"], self.nick, "role")
+ if affiliation in ["moderator", "owner"]:
+ # discover real jid
+ realjid = self['xep_0045'].get_jid_property(msg["mucroom"], msg["mucnick"], "jid")
+
# add pre predefined text to reply list
- data['reply'].append(StaticAnswers(msg['mucnick']).gen_answer())
+ data['reply'].append(StaticAnswers().answers(msg['mucnick']))
data = self.build_queue(data, msg)
@@ -86,10 +91,6 @@ class QueryBot(slixmpp.ClientXMPP):
opt_arg = job[keyword][1]
query = None
- if keyword == '!help':
- data['reply'].append(StaticAnswers().gen_help())
- continue
-
try:
if keyword == "!uptime":
query = await self['xep_0012'].get_last_activity(jid=target)
@@ -101,7 +102,7 @@ class QueryBot(slixmpp.ClientXMPP):
query = await self['xep_0030'].get_info(jid=target, cached=False)
except XMPPError as error:
- logging.info(misc.HandleError(error, keyword, target).report())
+ logging.error(misc.HandleError(error, keyword, target).report())
data['reply'].append(misc.HandleError(error, keyword, target).report())
continue
@@ -110,13 +111,15 @@ class QueryBot(slixmpp.ClientXMPP):
# remove None type from list and send all elements
if list(filter(None.__ne__, data['reply'])) and data['reply']:
- # if msg type is groupchat prepend mucnick
+ # if msg type is groupchat
if msg["type"] == "groupchat":
- data["reply"][0] = "%s: " % msg["mucnick"] + data["reply"][0]
- # reply = misc.deduplicate(data['reply'])
- reply = data["reply"]
- self.send_message(mto=msg['from'].bare, mbody="\n".join(reply), mtype=msg['type'])
+ # check if mucnick is present to tag the user
+ if msg["mucnick"] not in data["reply"][0]:
+ data["reply"][0] = "%s: " % msg["mucnick"] + data["reply"][0]
+
+ # send the answer
+ self.send_message(mto=msg['from'].bare, mbody="\n".join(data["reply"]), mtype=msg['type'])
def build_queue(self, data, msg):
# building the queue
@@ -125,35 +128,39 @@ class QueryBot(slixmpp.ClientXMPP):
wordcount = len(data["words"])
# check all words in side the message for possible hits
- for x in enumerate(data['words']):
- # check for valid keywords
- index = x[0]
- keyword = x[1]
-
- # match all words starting with ! and member of no_arg_keywords
- if keyword.startswith("!") and keyword in StaticAnswers().keys("no_arg_keywords"):
- data['queue'].append({keyword: [None, None]})
-
- # matching all words starting with ! and member of keywords
- elif keyword.startswith("!") and keyword in StaticAnswers().keys("keywords"):
+ for index, keyword in enumerate(data['words']):
+ if keyword.startswith("!") and keyword in StaticAnswers().keys("all"):
# init variables to circumvent IndexErrors
target, opt_arg = None, None
# compare to wordcount if assignment is possible
if index + 1 < wordcount:
- target = data["words"][index + 1]
+ if not data["words"][index + 1].startswith("!"):
+ target = data["words"][index + 1]
if index + 2 < wordcount:
if not data["words"][index + 2].startswith("!"):
opt_arg = data["words"][index + 2]
- # only add job to queue if domain is valid
+ # only add job to queue if target is valid
if misc.validate(keyword, target):
- logging.debug("Item added to queue %s" % {str(keyword): [target, opt_arg]})
- data['queue'].append({str(keyword): [target, opt_arg]})
+
+ # if keyword is a key in expand category expand it
+ if keyword in StaticAnswers().keys("expand").keys():
+ # expand the keyword with the keyword list attached to it
+ for key in StaticAnswers().keys("expand")[keyword]:
+ data['queue'].append({key: [target, None]})
+
+ logging.debug("expanding keyword %s" % keyword)
+
+ else:
+ data['queue'].append({keyword: [target, opt_arg]})
+ logging.warning("Item added to queue %s" % {keyword: [target, opt_arg]})
# deduplicate queue elements
+ logging.warning("before dedup %s" %data["queue"])
data["queue"] = misc.deduplicate(data["queue"])
+ logging.warning("after dedup %s" %data["queue"])
return data
@@ -164,8 +171,8 @@ if __name__ == '__main__':
parser.add_argument('-q', '--quiet', help='set logging to ERROR', action='store_const', dest='loglevel',
const=logging.ERROR, default=logging.INFO)
parser.add_argument('-d', '--debug', help='set logging to DEBUG', action='store_const', dest='loglevel',
- const=logging.DEBUG, default=logging.INFO)
- parser.add_argument('-D', '--dev', help='set logging to console', action='store_const', dest='logfile',
+ const=logging.WARNING, default=logging.INFO)
+ parser.add_argument('-D', '--dev', help='set log output to console', action='store_const', dest='logfile',
const="", default='bot.log')
args = parser.parse_args()