diff options
-rw-r--r-- | ingest.py | 93 | ||||
-rwxr-xr-x | main.py | 107 |
2 files changed, 110 insertions, 90 deletions
diff --git a/ingest.py b/ingest.py new file mode 100644 index 0000000..dc8cf21 --- /dev/null +++ b/ingest.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +import gzip +import re +import sqlite3 +import sys + +from defusedxml import ElementTree + + +class Ingest: + """log ingestion class""" + def __init__(self, conn, infile: list = None): + """ + :param conn: sqlite connection object + :param infile: list containing log filenames to be ingested + """ + self.conn = conn + self.infile = infile + + self.jid_pattern = re.compile("^(?:([^\"&'/:<>@]{1,1023})@)?([^/@]{1,1023})(?:/(.{1,1023}))?$") + self.message_pattern = re.compile(r'<message.*?</message>', re.DOTALL) + + self.ingest() + + def ingest(self): + """ + ingest method to split up the ingest file list + if necessary decompression and decoding are applied + """ + magic_number = b"\x1f\x8b\x08" + + # iterate over all infile elements + for element in self.infile: + + try: + # open file in binary mode + with open(element, "rb") as infile: + content = infile.read() + + # in case of a missing file set content to an empty string + except FileNotFoundError as err: + content = "" + print(err, file=sys.stderr) + + # if magic number is present decompress and decode file + if content.startswith(magic_number): + content = gzip.decompress(content).decode("utf-8") + # in any other case read file normally + else: + content = content.decode("utf-8") + + # None catch + if content is not None: + log = re.findall(self.message_pattern, content) + + if log is not None: + self.db_import(log) + + def db_import(self, message_log: list): + """ + import xml stanzas into database + :param message_log: list of xml messages + """ + for message in message_log: + message_parsed = ElementTree.fromstring(message) + + # parse 'from' tag + spam_from = message_parsed.get('from') + match = self.jid_pattern.match(spam_from) + (node, domain, resource) = match.groups() + + # stamp + all_delay_tags = message_parsed.findall('.//{urn:xmpp:delay}delay') + spam_time = None + for tag in all_delay_tags: + if "@" in tag.get("from"): + continue + + spam_time = tag.get('stamp') + + # body + spam_body = message_parsed.find('{jabber:client}body') + if spam_body is not None: + spam_body = spam_body.text + + # format sql + try: + self.conn.execute('''INSERT INTO spam VALUES(:user, :domain, :spam_time, :spam_body);''', + {"user": node, "domain": domain, "spam_time": spam_time, "spam_body": spam_body}) + except sqlite3.IntegrityError: + pass + finally: + self.conn.commit() @@ -2,15 +2,13 @@ # -*- coding: utf-8 -*- import argparse import datetime as dt -import gzip import os -import re import sqlite3 import sys import tabulate -from defusedxml import ElementTree +from ingest import Ingest from report import ReportDomain @@ -25,13 +23,11 @@ class AbuseReport: self.stop = arguments.stop self.path = os.path.dirname(__file__) - self.conn = sqlite3.connect("/".join([self.path, "spam.db"])) - self.Report = ReportDomain(self.conn) - self.jid_pattern = re.compile("^(?:([^\"&'/:<>@]{1,1023})@)?([^/@]{1,1023})(?:/(.{1,1023}))?$") - self.message_pattern = re.compile(r'<message.*?</message>', re.DOTALL) + self.conn = sqlite3.connect("/".join([self.path, "spam.db"])) + self.Report = ReportDomain(self.conn) - def main(self): - """main method guiding the actions to take""" + def main(self): + """main method guiding the actions to take""" # run check method before each execution self.check() @@ -39,11 +35,11 @@ class AbuseReport: # infile unset -> report top10 self.egest() - elif self.infile: - # infile set -> ingest - self.ingest() + elif self.infile: + # infile set -> ingest + Ingest(self.conn, self.infile) - # close sqlite connection + # close sqlite connection self.conn.close() def check(self): @@ -120,86 +116,17 @@ class AbuseReport: GROUP BY domain ORDER BY 1 DESC LIMIT 10;''' result = self.conn.execute(sql, {"start": self.start, "stop": self.stop}).fetchall() - # tabelize data - spam_table = tabulate.tabulate(result, tablefmt="psql", headers=["messages", "bots", "domain","first seen", - "last seen"]) + # tabelize data + spam_table = tabulate.tabulate(result, tablefmt="psql", headers=["messages", "bots", "domain","first seen", + "last seen"]) - # output to stdout + # output to stdout output = "\n\n".join([spam_table]) - print(output, file=sys.stdout) + print(output, file=sys.stdout) - def ingest(self): - """ - ingest method to split up the ingest file list - if necessary decompression and decoding are applied - """ - magic_number = b"\x1f\x8b\x08" - - # iterate over all infile elements - for element in self.infile: - - try: - # open file in binary mode - with open(element, "rb") as infile: - content = infile.read() - - except FileNotFoundError as err: - content = "" - print(err, file=sys.stderr) - - # if magic number is present decompress and decode file - if content.startswith(magic_number): - content = gzip.decompress(content).decode("utf-8") - # in any other case read file normally - else: - content = content.decode("utf-8") - - # automated run None catch - if content is not None: - log = re.findall(self.message_pattern, content) - - if log is not None: - self.db_import(log) - - def db_import(self, message_log: list): - """ - import xml stanzas into database - :param message_log: list of xml messages - """ - for message in message_log: - message_parsed = ElementTree.fromstring(message) - - # parse 'from' tag - spam_from = message_parsed.get('from') - match = self.jid_pattern.match(spam_from) - (node, domain, resource) = match.groups() - - # stamp - all_delay_tags = message_parsed.findall('.//{urn:xmpp:delay}delay') - spam_time = None - for tag in all_delay_tags: - if "@" in tag.get("from"): - continue - - spam_time = tag.get('stamp') - - # body - spam_body = message_parsed.find('{jabber:client}body') - if spam_body is not None: - spam_body = spam_body.text - - # format sql - try: - self.conn.execute('''INSERT INTO spam VALUES(:user, :domain, :spam_time, :spam_body);''', - {"user": node, "domain": domain, "spam_time": spam_time, "spam_body": spam_body}) - except sqlite3.IntegrityError: - pass - finally: - self.conn.commit() - - def gen_report(self, domain: str, query: list): - """ - method generating the report files + def gen_report(self, domain: str, query: list): + """ + method generating the report files :param domain: string containing a domain name :param query: list of tuples containing the query results for the specified domain/s """ |