From 0a4d8af9a317511982af7bba7e07ae341366afa1 Mon Sep 17 00:00:00 2001 From: nico Date: Fri, 8 Nov 2019 15:55:22 +0100 Subject: transfer ingest methods * transfer ingest method to own class * adjusted in main method --- ingest.py | 93 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ main.py | 107 ++++++++++---------------------------------------------------- 2 files changed, 110 insertions(+), 90 deletions(-) create mode 100644 ingest.py diff --git a/ingest.py b/ingest.py new file mode 100644 index 0000000..dc8cf21 --- /dev/null +++ b/ingest.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +import gzip +import re +import sqlite3 +import sys + +from defusedxml import ElementTree + + +class Ingest: + """log ingestion class""" + def __init__(self, conn, infile: list = None): + """ + :param conn: sqlite connection object + :param infile: list containing log filenames to be ingested + """ + self.conn = conn + self.infile = infile + + self.jid_pattern = re.compile("^(?:([^\"&'/:<>@]{1,1023})@)?([^/@]{1,1023})(?:/(.{1,1023}))?$") + self.message_pattern = re.compile(r'', re.DOTALL) + + self.ingest() + + def ingest(self): + """ + ingest method to split up the ingest file list + if necessary decompression and decoding are applied + """ + magic_number = b"\x1f\x8b\x08" + + # iterate over all infile elements + for element in self.infile: + + try: + # open file in binary mode + with open(element, "rb") as infile: + content = infile.read() + + # in case of a missing file set content to an empty string + except FileNotFoundError as err: + content = "" + print(err, file=sys.stderr) + + # if magic number is present decompress and decode file + if content.startswith(magic_number): + content = gzip.decompress(content).decode("utf-8") + # in any other case read file normally + else: + content = content.decode("utf-8") + + # None catch + if content is not None: + log = re.findall(self.message_pattern, content) + + if log is not None: + self.db_import(log) + + def db_import(self, message_log: list): + """ + import xml stanzas into database + :param message_log: list of xml messages + """ + for message in message_log: + message_parsed = ElementTree.fromstring(message) + + # parse 'from' tag + spam_from = message_parsed.get('from') + match = self.jid_pattern.match(spam_from) + (node, domain, resource) = match.groups() + + # stamp + all_delay_tags = message_parsed.findall('.//{urn:xmpp:delay}delay') + spam_time = None + for tag in all_delay_tags: + if "@" in tag.get("from"): + continue + + spam_time = tag.get('stamp') + + # body + spam_body = message_parsed.find('{jabber:client}body') + if spam_body is not None: + spam_body = spam_body.text + + # format sql + try: + self.conn.execute('''INSERT INTO spam VALUES(:user, :domain, :spam_time, :spam_body);''', + {"user": node, "domain": domain, "spam_time": spam_time, "spam_body": spam_body}) + except sqlite3.IntegrityError: + pass + finally: + self.conn.commit() diff --git a/main.py b/main.py index 60e65f5..cf417d5 100755 --- a/main.py +++ b/main.py @@ -2,15 +2,13 @@ # -*- coding: utf-8 -*- import argparse import datetime as dt -import gzip import os -import re import sqlite3 import sys import tabulate -from defusedxml import ElementTree +from ingest import Ingest from report import ReportDomain @@ -25,13 +23,11 @@ class AbuseReport: self.stop = arguments.stop self.path = os.path.dirname(__file__) - self.conn = sqlite3.connect("/".join([self.path, "spam.db"])) - self.Report = ReportDomain(self.conn) - self.jid_pattern = re.compile("^(?:([^\"&'/:<>@]{1,1023})@)?([^/@]{1,1023})(?:/(.{1,1023}))?$") - self.message_pattern = re.compile(r'', re.DOTALL) + self.conn = sqlite3.connect("/".join([self.path, "spam.db"])) + self.Report = ReportDomain(self.conn) - def main(self): - """main method guiding the actions to take""" + def main(self): + """main method guiding the actions to take""" # run check method before each execution self.check() @@ -39,11 +35,11 @@ class AbuseReport: # infile unset -> report top10 self.egest() - elif self.infile: - # infile set -> ingest - self.ingest() + elif self.infile: + # infile set -> ingest + Ingest(self.conn, self.infile) - # close sqlite connection + # close sqlite connection self.conn.close() def check(self): @@ -120,86 +116,17 @@ class AbuseReport: GROUP BY domain ORDER BY 1 DESC LIMIT 10;''' result = self.conn.execute(sql, {"start": self.start, "stop": self.stop}).fetchall() - # tabelize data - spam_table = tabulate.tabulate(result, tablefmt="psql", headers=["messages", "bots", "domain","first seen", - "last seen"]) + # tabelize data + spam_table = tabulate.tabulate(result, tablefmt="psql", headers=["messages", "bots", "domain","first seen", + "last seen"]) - # output to stdout + # output to stdout output = "\n\n".join([spam_table]) - print(output, file=sys.stdout) + print(output, file=sys.stdout) - def ingest(self): - """ - ingest method to split up the ingest file list - if necessary decompression and decoding are applied - """ - magic_number = b"\x1f\x8b\x08" - - # iterate over all infile elements - for element in self.infile: - - try: - # open file in binary mode - with open(element, "rb") as infile: - content = infile.read() - - except FileNotFoundError as err: - content = "" - print(err, file=sys.stderr) - - # if magic number is present decompress and decode file - if content.startswith(magic_number): - content = gzip.decompress(content).decode("utf-8") - # in any other case read file normally - else: - content = content.decode("utf-8") - - # automated run None catch - if content is not None: - log = re.findall(self.message_pattern, content) - - if log is not None: - self.db_import(log) - - def db_import(self, message_log: list): - """ - import xml stanzas into database - :param message_log: list of xml messages - """ - for message in message_log: - message_parsed = ElementTree.fromstring(message) - - # parse 'from' tag - spam_from = message_parsed.get('from') - match = self.jid_pattern.match(spam_from) - (node, domain, resource) = match.groups() - - # stamp - all_delay_tags = message_parsed.findall('.//{urn:xmpp:delay}delay') - spam_time = None - for tag in all_delay_tags: - if "@" in tag.get("from"): - continue - - spam_time = tag.get('stamp') - - # body - spam_body = message_parsed.find('{jabber:client}body') - if spam_body is not None: - spam_body = spam_body.text - - # format sql - try: - self.conn.execute('''INSERT INTO spam VALUES(:user, :domain, :spam_time, :spam_body);''', - {"user": node, "domain": domain, "spam_time": spam_time, "spam_body": spam_body}) - except sqlite3.IntegrityError: - pass - finally: - self.conn.commit() - - def gen_report(self, domain: str, query: list): - """ - method generating the report files + def gen_report(self, domain: str, query: list): + """ + method generating the report files :param domain: string containing a domain name :param query: list of tuples containing the query results for the specified domain/s """ -- cgit v1.2.3-54-g00ecf From 2b4b95a42ffabeb650484b1319ae599831dbd899 Mon Sep 17 00:00:00 2001 From: nico Date: Fri, 8 Nov 2019 16:28:57 +0100 Subject: * minor restructuring --- ingest.py | 16 ++++++---------- main.py | 7 ++++--- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/ingest.py b/ingest.py index dc8cf21..0c762c8 100644 --- a/ingest.py +++ b/ingest.py @@ -7,30 +7,26 @@ import sys from defusedxml import ElementTree -class Ingest: +class IngestLogfile: """log ingestion class""" - def __init__(self, conn, infile: list = None): + def __init__(self, conn): """ :param conn: sqlite connection object - :param infile: list containing log filenames to be ingested """ self.conn = conn - self.infile = infile self.jid_pattern = re.compile("^(?:([^\"&'/:<>@]{1,1023})@)?([^/@]{1,1023})(?:/(.{1,1023}))?$") self.message_pattern = re.compile(r'', re.DOTALL) - self.ingest() - - def ingest(self): + def read(self, infile: list = None): """ - ingest method to split up the ingest file list - if necessary decompression and decoding are applied + ingest method to split up the ingest file list, if necessary decompression and decoding are applied + :param infile: list containing log filenames to be ingested """ magic_number = b"\x1f\x8b\x08" # iterate over all infile elements - for element in self.infile: + for element in infile: try: # open file in binary mode diff --git a/main.py b/main.py index cf417d5..e528d3b 100755 --- a/main.py +++ b/main.py @@ -8,7 +8,7 @@ import sys import tabulate -from ingest import Ingest +from ingest import IngestLogfile from report import ReportDomain @@ -25,6 +25,7 @@ class AbuseReport: self.conn = sqlite3.connect("/".join([self.path, "spam.db"])) self.Report = ReportDomain(self.conn) + self.Ingest = IngestLogfile(self.conn) def main(self): """main method guiding the actions to take""" @@ -37,10 +38,10 @@ class AbuseReport: elif self.infile: # infile set -> ingest - Ingest(self.conn, self.infile) + self.Ingest.read(self.infile) # close sqlite connection - self.conn.close() + self.conn.close() def check(self): # check if the minimum requirements are met -- cgit v1.2.3-54-g00ecf From 5590714f941ab0a762821a43f55f5fa81e899b79 Mon Sep 17 00:00:00 2001 From: nico Date: Fri, 8 Nov 2019 17:23:41 +0100 Subject: * pep8 issues --- main.py | 246 ++++++++++++++++++++++++++++++++-------------------------------- 1 file changed, 123 insertions(+), 123 deletions(-) diff --git a/main.py b/main.py index e528d3b..5b644b0 100755 --- a/main.py +++ b/main.py @@ -13,15 +13,15 @@ from report import ReportDomain class AbuseReport: - """ingestion script for ejabberd spam logs""" + """ingestion script for ejabberd spam logs""" - def __init__(self, arguments): - self.infile = arguments.infile - self.domain = arguments.domain - self.report = arguments.report - self.start = arguments.start - self.stop = arguments.stop - self.path = os.path.dirname(__file__) + def __init__(self, arguments): + self.infile = arguments.infile + self.domain = arguments.domain + self.report = arguments.report + self.start = arguments.start + self.stop = arguments.stop + self.path = os.path.dirname(__file__) self.conn = sqlite3.connect("/".join([self.path, "spam.db"])) self.Report = ReportDomain(self.conn) @@ -29,12 +29,12 @@ class AbuseReport: def main(self): """main method guiding the actions to take""" - # run check method before each execution - self.check() + # run check method before each execution + self.check() - if self.infile is None: - # infile unset -> report top10 - self.egest() + if self.infile is None: + # infile unset -> report top10 + self.egest() elif self.infile: # infile set -> ingest @@ -43,133 +43,133 @@ class AbuseReport: # close sqlite connection self.conn.close() - def check(self): - # check if the minimum requirements are met - table = ('table', 'spam') - master = self.conn.execute('''SELECT type, name from sqlite_master;''').fetchall() - - # if not run create method - if table not in master: - self.create() - - def create(self): - # open and execute base schema file - script = "/".join([self.path, "schema.sql"]) - with open(script) as file: - schema = file.read() - - self.conn.executescript(schema) - - def egest(self): - """egest method returning the database results""" - # init result list - result = list() - - # parse time values - if self.start is None: - # default timeperiod are 31 days calculated via the timedelta - default = dt.datetime.combine(dt.date.today(), dt.time()) - dt.timedelta(days=31) - self.start = dt.datetime.strftime(default, "%Y-%m-%dT%H:%M:%S") - - if self.stop is None: - # set stop value to now - self.stop = dt.datetime.strftime(dt.datetime.now(), '%Y-%m-%dT%H:%M:%S') - - # add validated timestamps to report class - self.Report.addtime(self.start, self.stop) - - # if one or more domains are specified return only their info - if self.domain is not None: - - # iterate over all domains supplied - for domain in self.domain: - - # build and execute - sql = '''SELECT COUNT(*) AS messages, COUNT(DISTINCT user) AS bots, domain, MIN(ts) AS first, MAX(ts) AS last - FROM spam - WHERE domain = :domain - AND ts > :start AND ts < :stop;''' - parameter = { - "domain": domain, - "start": self.start, - "stop": self.stop - } - query = self.conn.execute(sql, parameter).fetchall() - - # if specified domain is not listed yet, the resulting table will not show the domain name - # this ugly tuple 2 list swap prevents this - temp = list(query[0]) - if temp[2] is None: - temp[2] = domain - query[0] = tuple(temp) - - # extend result tables - result.extend(query) - - # generate report if enabled - if self.report: - self.gen_report(domain, query) - - else: - # build and execute - sql = '''SELECT COUNT(*) AS messages, COUNT(DISTINCT user) AS bots, domain AS domain from spam - WHERE ts > :start AND ts < :stop - GROUP BY domain ORDER BY 1 DESC LIMIT 10;''' - result = self.conn.execute(sql, {"start": self.start, "stop": self.stop}).fetchall() + def check(self): + # check if the minimum requirements are met + table = ('table', 'spam') + master = self.conn.execute('''SELECT type, name from sqlite_master;''').fetchall() + + # if not run create method + if table not in master: + self.create() + + def create(self): + # open and execute base schema file + script = "/".join([self.path, "schema.sql"]) + with open(script) as file: + schema = file.read() + + self.conn.executescript(schema) + + def egest(self): + """egest method returning the database results""" + # init result list + result = list() + + # parse time values + if self.start is None: + # default timeperiod are 31 days calculated via the timedelta + default = dt.datetime.combine(dt.date.today(), dt.time()) - dt.timedelta(days=31) + self.start = dt.datetime.strftime(default, "%Y-%m-%dT%H:%M:%S") + + if self.stop is None: + # set stop value to now + self.stop = dt.datetime.strftime(dt.datetime.now(), '%Y-%m-%dT%H:%M:%S') + + # add validated timestamps to report class + self.Report.addtime(self.start, self.stop) + + # if one or more domains are specified return only their info + if self.domain is not None: + + # iterate over all domains supplied + for domain in self.domain: + + # build and execute + sql = '''SELECT COUNT(*) AS messages, COUNT(DISTINCT user) AS bots, domain, MIN(ts) AS first, MAX(ts) AS last + FROM spam + WHERE domain = :domain + AND ts > :start AND ts < :stop;''' + parameter = { + "domain": domain, + "start": self.start, + "stop": self.stop + } + query = self.conn.execute(sql, parameter).fetchall() + + # if specified domain is not listed yet, the resulting table will not show the domain name + # this ugly tuple 2 list swap prevents this + temp = list(query[0]) + if temp[2] is None: + temp[2] = domain + query[0] = tuple(temp) + + # extend result tables + result.extend(query) + + # generate report if enabled + if self.report: + self.gen_report(domain, query) + + else: + # build and execute + sql = '''SELECT COUNT(*) AS messages, COUNT(DISTINCT user) AS bots, domain AS domain from spam + WHERE ts > :start AND ts < :stop + GROUP BY domain ORDER BY 1 DESC LIMIT 10;''' + result = self.conn.execute(sql, {"start": self.start, "stop": self.stop}).fetchall() # tabelize data spam_table = tabulate.tabulate(result, tablefmt="psql", headers=["messages", "bots", "domain","first seen", "last seen"]) # output to stdout - output = "\n\n".join([spam_table]) + output = "\n\n".join([spam_table]) print(output, file=sys.stdout) def gen_report(self, domain: str, query: list): """ method generating the report files - :param domain: string containing a domain name - :param query: list of tuples containing the query results for the specified domain/s - """ - try: - # open abuse report template file - with open("/".join([self.path, "template/abuse-template.txt"]), "r", encoding="utf-8") as template: - report_template = template.read() + :param domain: string containing a domain name + :param query: list of tuples containing the query results for the specified domain/s + """ + try: + # open abuse report template file + with open("/".join([self.path, "template/abuse-template.txt"]), "r", encoding="utf-8") as template: + report_template = template.read() - except FileNotFoundError as err: - print(err, file=sys.stderr) - exit(1) + except FileNotFoundError as err: + print(err, file=sys.stderr) + exit(1) - # current date - now = dt.datetime.strftime(dt.datetime.now(), "%Y-%m-%d") + # current date + now = dt.datetime.strftime(dt.datetime.now(), "%Y-%m-%d") - # output to report directory - report_filename = "abuse-{domain}-{date}.txt".format(date=now, domain=domain) - jids_filename = "abuse-{domain}-{date}-jids.txt".format(date=now, domain=domain) - logs_filename = "abuse-{domain}-{date}-logs.txt".format(date=now, domain=domain) + # output to report directory + report_filename = "abuse-{domain}-{date}.txt".format(date=now, domain=domain) + jids_filename = "abuse-{domain}-{date}-jids.txt".format(date=now, domain=domain) + logs_filename = "abuse-{domain}-{date}-logs.txt".format(date=now, domain=domain) - # write report files - with open("/".join([self.path, "report", report_filename]), "w", encoding="utf-8") as report_out: - content = self.Report.template(report_template, domain, query) - report_out.write(content) + # write report files + with open("/".join([self.path, "report", report_filename]), "w", encoding="utf-8") as report_out: + content = self.Report.template(report_template, domain, query) + report_out.write(content) - with open("/".join([self.path, "report", jids_filename]), "w", encoding="utf-8") as report_out: - content = self.Report.jids(domain) - report_out.write(content) + with open("/".join([self.path, "report", jids_filename]), "w", encoding="utf-8") as report_out: + content = self.Report.jids(domain) + report_out.write(content) - with open("/".join([self.path, "report", logs_filename]), "w", encoding="utf-8") as report_out: - content = self.Report.logs(domain) - report_out.write(content) + with open("/".join([self.path, "report", logs_filename]), "w", encoding="utf-8") as report_out: + content = self.Report.logs(domain) + report_out.write(content) if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument('-in', '--infile', nargs='+', help='set path to input file', dest='infile') - parser.add_argument('-d', '--domain', action='append', help='specify report domain', dest='domain') - parser.add_argument('-r', '--report', action='store_true', help='toggle report output to file', dest='report') - parser.add_argument('-f', '--from', help='ISO-8601 timestamp from where to search', dest='start') - parser.add_argument('-t', '--to', help='ISO-8601 timestamp up until where to search', dest='stop') - args = parser.parse_args() - - # run - AbuseReport(args).main() + parser = argparse.ArgumentParser() + parser.add_argument('-in', '--infile', nargs='+', help='set path to input file', dest='infile') + parser.add_argument('-d', '--domain', action='append', help='specify report domain', dest='domain') + parser.add_argument('-r', '--report', action='store_true', help='toggle report output to file', dest='report') + parser.add_argument('-f', '--from', help='ISO-8601 timestamp from where to search', dest='start') + parser.add_argument('-t', '--to', help='ISO-8601 timestamp up until where to search', dest='stop') + args = parser.parse_args() + + # run + AbuseReport(args).main() -- cgit v1.2.3-54-g00ecf