summaryrefslogtreecommitdiffstats
path: root/main.py
blob: e170d43fd07bd45c14a48f2064816855db5e4b92 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
	Slixmpp: The Slick XMPP Library
	Copyright (C) 2010  Nathanael C. Fritz
	This file is part of Slixmpp.

	See the file LICENSE for copying permission.
"""
import asyncio
import configparser
import logging
import slixmpp
import ssl
import validators
from argparse import ArgumentParser
from datetime import datetime, timedelta
from random import randint
from slixmpp.exceptions import XMPPError


class QueryBot(slixmpp.ClientXMPP):
	""" A simple Slixmpp bot with some features """
	def __init__(self, jid, password, room, nick):
		slixmpp.ClientXMPP.__init__(self, jid, password)

		self.room = room
		self.nick = nick

		# session start event, starting point for the presence and roster requests
		self.add_event_handler('session_start', self.start)

		# register handler to recieve both groupchat and normal message events
		self.add_event_handler('message', self.message)

	def start(self, event):
		"""
		Arguments:
			event -- An empty dictionary. The session_start event does not provide any additional data.
		"""
		self.send_presence()
		self.get_roster()

		# If a room password is needed, use: password=the_room_password
		self.plugin['xep_0045'].join_muc(self.room, self.nick, wait=True)

	@staticmethod
	def precheck(line):
		"""
		pre check function
		- check that keywords are used properly
		- check that following a keyword a proper jid is following
		:param line: line from message body
		:return: true if correct
		"""
		keywords = ["!help", "!uptime", "!version", "!contact"]
		proper_domain, proper_key = False, False

		try:
			# check for valid keyword in position 0
			if line[0] in keywords:
				proper_key = True
			else:
				return

			# help command is used
			if line[0] == "!help":
				proper_domain = True
			# check if domain is valid
			elif validators.domain(line[1]):
				proper_domain = True
			else:
				return
		except IndexError:
			pass

		return proper_key and proper_domain

	@asyncio.coroutine
	def message(self, msg):
		"""
		Arguments:
			msg -- The received message stanza. See the documentation for stanza objects and the Message stanza to see
					how it may be used.
		"""

		# catch self messages to prevent self flooding
		if msg['mucnick'] == self.nick:
			return

		if self.nick in msg['body']:
			# answer with predefined text when mucnick is used
			self.send_message(mto=msg['from'].bare, mbody=notice_answer(msg['mucnick']), mtype=msg['type'])

		for line in msg['body'].splitlines():
			""" split multiline messages into lines to check every line for keywords """
			line = line.split(sep= " ")

			if self.precheck(line):
				""" true if keyword and domain are valid """
				# Display help
				if line[0] == '!help':
					""" display help when keyword !help is recieved """
					self.send_message(mto=msg['from'].bare, mbody=help_doc(), mtype=msg['type'])

				# XEP-0072: Server Version
				if line[0] == '!version':
					""" query the server software version of the specified domain, defined by XEP-0092 """
					try:
						version = yield from self['xep_0092'].get_version(line[1])

						if msg['type'] == "groupchat":
							text = "%s: %s is running %s version %s on %s" % (msg['mucnick'], line[1], version[
							'software_version']['name'], version['software_version']['version'], version[
							'software_version']['os'])
						else:
							text = "%s is running %s version %s on %s" % (line[1], version['software_version'][
								'name'], version['software_version']['version'], version['software_version']['os'])

						self.send_message(mto=msg['from'].bare, mbody=text, mtype=msg['type'])
					except NameError:
						pass
					except XMPPError:
						pass

				# XEP-0012: Last Activity
				if line[0] == '!uptime':
					""" query the server uptime of the specified domain, defined by XEP-0012 """
					try:
						# try if domain[0] is set if not just pass
						last_activity = yield from self['xep_0012'].get_last_activity(line[1])
						uptime = datetime(1, 1, 1) + timedelta(seconds=last_activity['last_activity']['seconds'])

						if msg['type'] == "groupchat":
							text = "%s: %s is running since %d days %d hours %d minutes" % (msg['mucnick'], line[1],
																						uptime.day - 1, uptime.hour,
																						uptime.minute)
						else:
							text = "%s is running since %d days %d hours %d minutes" % (line[1], uptime.day - 1,
																						uptime.hour, uptime.minute)
						self.send_message(mto=msg['from'].bare, mbody=text, mtype=msg['type'])
					except NameError:
						pass
					except XMPPError:
						pass

				# XEP-0157: Contact Addresses for XMPP Services
				if line[0] == "!contact":
					""" query the XEP-0030: Service Discovery and extract contact information """
					try:
						result = yield from self['xep_0030'].get_info(jid=line[1], cached=False)
						server_info = []
						for field in result['disco_info']['form']:
							var = field['var']
							if field['type'] == 'hidden' and var == 'FORM_TYPE':
								title = field['value'][0]
								continue
							sep = '\n  ' + len(var) * ' '
							field_value = field.get_value(convert=False)
							value = sep.join(field_value) if isinstance(field_value, list) else field_value
							server_info.append('%s: %s' % (var, value))

							self.send_message(mto=msg['from'].bare, mbody=server_info, mtype=msg['type'])
					except NameError:
						pass
					except XMPPError:
						pass

				# TODO
				# append all results to single message send just once
			else:
				pass


def help_doc():
	helpfile = {'help': '!help -- display this text',
				'version': '!version domain.tld  -- receive XMPP server version',
				'uptime':'!uptime domain.tld -- receive XMPP server uptime',
				'contact': '!contact domain.tld -- receive XMPP server contact address info'}
	return "".join(['%s\n' % (value) for (_, value) in helpfile.items()])


def notice_answer(nickname):
	possible_answers = {'1': 'I heard that, %s.',
						'2': 'I am sorry for that %s.',
						'3': '%s did you try turning it off and on again?'}
	return  possible_answers[str(randint(1, len(possible_answers)))] % nickname

if __name__ == '__main__':
	# command line arguments.
	parser = ArgumentParser()
	parser.add_argument('-q', '--quiet', help='set logging to ERROR', action='store_const', dest='loglevel',
						const=logging.ERROR, default=logging.INFO)
	parser.add_argument('-d', '--debug', help='set logging to DEBUG', action='store_const', dest='loglevel',
						const=logging.DEBUG, default=logging.INFO)
	parser.add_argument('-D', '--dev', help='set logging to console', action='store_const', dest='logfile',
						const="", default='bot.log')
	args = parser.parse_args()

	# logging
	logging.basicConfig(filename=args.logfile, level=args.loglevel, format='%(levelname)-8s %(message)s')
	logger = logging.getLogger(__name__)

	# configfile
	config = configparser.RawConfigParser()
	config.read('./bot.cfg')
	args.jid = config.get('Account', 'jid')
	args.password = config.get('Account', 'password')
	args.room = config.get('MUC', 'rooms')
	args.nick = config.get('MUC', 'nick')
	args.admins = config.get('ADMIN', 'admins')

	# init the bot and register used slixmpp plugins
	xmpp = QueryBot(args.jid, args.password, args.room, args.nick)
	xmpp.ssl_version = ssl.PROTOCOL_TLSv1_2
	xmpp.register_plugin('xep_0012')  # Last Activity
	xmpp.register_plugin('xep_0030')  # Service Discovery
	xmpp.register_plugin('xep_0045')  # Multi-User Chat
	xmpp.register_plugin('xep_0060')  # PubSub
	xmpp.register_plugin('xep_0085')  # Chat State Notifications
	xmpp.register_plugin('xep_0092')  # Software Version
	xmpp.register_plugin('xep_0199')  # XMPP Ping

	# connect and start receiving stanzas
	xmpp.connect()
	xmpp.process()