aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornico <nico@magicbroccoli.de>2019-11-08 17:27:11 +0100
committernico <nico@magicbroccoli.de>2019-11-08 17:27:11 +0100
commitac62403ce9a6a78c93d5e5756588e49cd35f6edd (patch)
tree81a30bed3ee82c8830ca92c7a327996a522a8f99
parent505da62f2ad7f6839c07f22c5cc413d6ecd49600 (diff)
parent5590714f941ab0a762821a43f55f5fa81e899b79 (diff)
Merge branch 'ingest_class'
transfer ingest methods (#5) * transfer ingest method to own class * adjusted in main method
-rw-r--r--ingest.py89
-rwxr-xr-xmain.py388
2 files changed, 247 insertions, 230 deletions
diff --git a/ingest.py b/ingest.py
new file mode 100644
index 0000000..0c762c8
--- /dev/null
+++ b/ingest.py
@@ -0,0 +1,89 @@
+# -*- coding: utf-8 -*-
+import gzip
+import re
+import sqlite3
+import sys
+
+from defusedxml import ElementTree
+
+
+class IngestLogfile:
+ """log ingestion class"""
+ def __init__(self, conn):
+ """
+ :param conn: sqlite connection object
+ """
+ self.conn = conn
+
+ self.jid_pattern = re.compile("^(?:([^\"&'/:<>@]{1,1023})@)?([^/@]{1,1023})(?:/(.{1,1023}))?$")
+ self.message_pattern = re.compile(r'<message.*?</message>', re.DOTALL)
+
+ def read(self, infile: list = None):
+ """
+ ingest method to split up the ingest file list, if necessary decompression and decoding are applied
+ :param infile: list containing log filenames to be ingested
+ """
+ magic_number = b"\x1f\x8b\x08"
+
+ # iterate over all infile elements
+ for element in infile:
+
+ try:
+ # open file in binary mode
+ with open(element, "rb") as infile:
+ content = infile.read()
+
+ # in case of a missing file set content to an empty string
+ except FileNotFoundError as err:
+ content = ""
+ print(err, file=sys.stderr)
+
+ # if magic number is present decompress and decode file
+ if content.startswith(magic_number):
+ content = gzip.decompress(content).decode("utf-8")
+ # in any other case read file normally
+ else:
+ content = content.decode("utf-8")
+
+ # None catch
+ if content is not None:
+ log = re.findall(self.message_pattern, content)
+
+ if log is not None:
+ self.db_import(log)
+
+ def db_import(self, message_log: list):
+ """
+ import xml stanzas into database
+ :param message_log: list of xml messages
+ """
+ for message in message_log:
+ message_parsed = ElementTree.fromstring(message)
+
+ # parse 'from' tag
+ spam_from = message_parsed.get('from')
+ match = self.jid_pattern.match(spam_from)
+ (node, domain, resource) = match.groups()
+
+ # stamp
+ all_delay_tags = message_parsed.findall('.//{urn:xmpp:delay}delay')
+ spam_time = None
+ for tag in all_delay_tags:
+ if "@" in tag.get("from"):
+ continue
+
+ spam_time = tag.get('stamp')
+
+ # body
+ spam_body = message_parsed.find('{jabber:client}body')
+ if spam_body is not None:
+ spam_body = spam_body.text
+
+ # format sql
+ try:
+ self.conn.execute('''INSERT INTO spam VALUES(:user, :domain, :spam_time, :spam_body);''',
+ {"user": node, "domain": domain, "spam_time": spam_time, "spam_body": spam_body})
+ except sqlite3.IntegrityError:
+ pass
+ finally:
+ self.conn.commit()
diff --git a/main.py b/main.py
index 60e65f5..5b644b0 100755
--- a/main.py
+++ b/main.py
@@ -2,246 +2,174 @@
# -*- coding: utf-8 -*-
import argparse
import datetime as dt
-import gzip
import os
-import re
import sqlite3
import sys
import tabulate
-from defusedxml import ElementTree
+from ingest import IngestLogfile
from report import ReportDomain
class AbuseReport:
- """ingestion script for ejabberd spam logs"""
-
- def __init__(self, arguments):
- self.infile = arguments.infile
- self.domain = arguments.domain
- self.report = arguments.report
- self.start = arguments.start
- self.stop = arguments.stop
- self.path = os.path.dirname(__file__)
-
- self.conn = sqlite3.connect("/".join([self.path, "spam.db"]))
- self.Report = ReportDomain(self.conn)
- self.jid_pattern = re.compile("^(?:([^\"&'/:<>@]{1,1023})@)?([^/@]{1,1023})(?:/(.{1,1023}))?$")
- self.message_pattern = re.compile(r'<message.*?</message>', re.DOTALL)
-
- def main(self):
- """main method guiding the actions to take"""
- # run check method before each execution
- self.check()
-
- if self.infile is None:
- # infile unset -> report top10
- self.egest()
-
- elif self.infile:
- # infile set -> ingest
- self.ingest()
-
- # close sqlite connection
- self.conn.close()
-
- def check(self):
- # check if the minimum requirements are met
- table = ('table', 'spam')
- master = self.conn.execute('''SELECT type, name from sqlite_master;''').fetchall()
-
- # if not run create method
- if table not in master:
- self.create()
-
- def create(self):
- # open and execute base schema file
- script = "/".join([self.path, "schema.sql"])
- with open(script) as file:
- schema = file.read()
-
- self.conn.executescript(schema)
-
- def egest(self):
- """egest method returning the database results"""
- # init result list
- result = list()
-
- # parse time values
- if self.start is None:
- # default timeperiod are 31 days calculated via the timedelta
- default = dt.datetime.combine(dt.date.today(), dt.time()) - dt.timedelta(days=31)
- self.start = dt.datetime.strftime(default, "%Y-%m-%dT%H:%M:%S")
-
- if self.stop is None:
- # set stop value to now
- self.stop = dt.datetime.strftime(dt.datetime.now(), '%Y-%m-%dT%H:%M:%S')
-
- # add validated timestamps to report class
- self.Report.addtime(self.start, self.stop)
-
- # if one or more domains are specified return only their info
- if self.domain is not None:
-
- # iterate over all domains supplied
- for domain in self.domain:
-
- # build and execute
- sql = '''SELECT COUNT(*) AS messages, COUNT(DISTINCT user) AS bots, domain, MIN(ts) AS first, MAX(ts) AS last
- FROM spam
- WHERE domain = :domain
- AND ts > :start AND ts < :stop;'''
- parameter = {
- "domain": domain,
- "start": self.start,
- "stop": self.stop
- }
- query = self.conn.execute(sql, parameter).fetchall()
-
- # if specified domain is not listed yet, the resulting table will not show the domain name
- # this ugly tuple 2 list swap prevents this
- temp = list(query[0])
- if temp[2] is None:
- temp[2] = domain
- query[0] = tuple(temp)
-
- # extend result tables
- result.extend(query)
-
- # generate report if enabled
- if self.report:
- self.gen_report(domain, query)
-
- else:
- # build and execute
- sql = '''SELECT COUNT(*) AS messages, COUNT(DISTINCT user) AS bots, domain AS domain from spam
- WHERE ts > :start AND ts < :stop
- GROUP BY domain ORDER BY 1 DESC LIMIT 10;'''
- result = self.conn.execute(sql, {"start": self.start, "stop": self.stop}).fetchall()
-
- # tabelize data
- spam_table = tabulate.tabulate(result, tablefmt="psql", headers=["messages", "bots", "domain","first seen",
- "last seen"])
-
- # output to stdout
- output = "\n\n".join([spam_table])
- print(output, file=sys.stdout)
-
- def ingest(self):
- """
- ingest method to split up the ingest file list
- if necessary decompression and decoding are applied
- """
- magic_number = b"\x1f\x8b\x08"
-
- # iterate over all infile elements
- for element in self.infile:
-
- try:
- # open file in binary mode
- with open(element, "rb") as infile:
- content = infile.read()
-
- except FileNotFoundError as err:
- content = ""
- print(err, file=sys.stderr)
-
- # if magic number is present decompress and decode file
- if content.startswith(magic_number):
- content = gzip.decompress(content).decode("utf-8")
- # in any other case read file normally
- else:
- content = content.decode("utf-8")
-
- # automated run None catch
- if content is not None:
- log = re.findall(self.message_pattern, content)
-
- if log is not None:
- self.db_import(log)
-
- def db_import(self, message_log: list):
- """
- import xml stanzas into database
- :param message_log: list of xml messages
- """
- for message in message_log:
- message_parsed = ElementTree.fromstring(message)
-
- # parse 'from' tag
- spam_from = message_parsed.get('from')
- match = self.jid_pattern.match(spam_from)
- (node, domain, resource) = match.groups()
-
- # stamp
- all_delay_tags = message_parsed.findall('.//{urn:xmpp:delay}delay')
- spam_time = None
- for tag in all_delay_tags:
- if "@" in tag.get("from"):
- continue
-
- spam_time = tag.get('stamp')
-
- # body
- spam_body = message_parsed.find('{jabber:client}body')
- if spam_body is not None:
- spam_body = spam_body.text
-
- # format sql
- try:
- self.conn.execute('''INSERT INTO spam VALUES(:user, :domain, :spam_time, :spam_body);''',
- {"user": node, "domain": domain, "spam_time": spam_time, "spam_body": spam_body})
- except sqlite3.IntegrityError:
- pass
- finally:
- self.conn.commit()
-
- def gen_report(self, domain: str, query: list):
- """
- method generating the report files
- :param domain: string containing a domain name
- :param query: list of tuples containing the query results for the specified domain/s
- """
- try:
- # open abuse report template file
- with open("/".join([self.path, "template/abuse-template.txt"]), "r", encoding="utf-8") as template:
- report_template = template.read()
-
- except FileNotFoundError as err:
- print(err, file=sys.stderr)
- exit(1)
-
- # current date
- now = dt.datetime.strftime(dt.datetime.now(), "%Y-%m-%d")
-
- # output to report directory
- report_filename = "abuse-{domain}-{date}.txt".format(date=now, domain=domain)
- jids_filename = "abuse-{domain}-{date}-jids.txt".format(date=now, domain=domain)
- logs_filename = "abuse-{domain}-{date}-logs.txt".format(date=now, domain=domain)
-
- # write report files
- with open("/".join([self.path, "report", report_filename]), "w", encoding="utf-8") as report_out:
- content = self.Report.template(report_template, domain, query)
- report_out.write(content)
-
- with open("/".join([self.path, "report", jids_filename]), "w", encoding="utf-8") as report_out:
- content = self.Report.jids(domain)
- report_out.write(content)
-
- with open("/".join([self.path, "report", logs_filename]), "w", encoding="utf-8") as report_out:
- content = self.Report.logs(domain)
- report_out.write(content)
+ """ingestion script for ejabberd spam logs"""
+
+ def __init__(self, arguments):
+ self.infile = arguments.infile
+ self.domain = arguments.domain
+ self.report = arguments.report
+ self.start = arguments.start
+ self.stop = arguments.stop
+ self.path = os.path.dirname(__file__)
+
+ self.conn = sqlite3.connect("/".join([self.path, "spam.db"]))
+ self.Report = ReportDomain(self.conn)
+ self.Ingest = IngestLogfile(self.conn)
+
+ def main(self):
+ """main method guiding the actions to take"""
+ # run check method before each execution
+ self.check()
+
+ if self.infile is None:
+ # infile unset -> report top10
+ self.egest()
+
+ elif self.infile:
+ # infile set -> ingest
+ self.Ingest.read(self.infile)
+
+ # close sqlite connection
+ self.conn.close()
+
+ def check(self):
+ # check if the minimum requirements are met
+ table = ('table', 'spam')
+ master = self.conn.execute('''SELECT type, name from sqlite_master;''').fetchall()
+
+ # if not run create method
+ if table not in master:
+ self.create()
+
+ def create(self):
+ # open and execute base schema file
+ script = "/".join([self.path, "schema.sql"])
+ with open(script) as file:
+ schema = file.read()
+
+ self.conn.executescript(schema)
+
+ def egest(self):
+ """egest method returning the database results"""
+ # init result list
+ result = list()
+
+ # parse time values
+ if self.start is None:
+ # default timeperiod are 31 days calculated via the timedelta
+ default = dt.datetime.combine(dt.date.today(), dt.time()) - dt.timedelta(days=31)
+ self.start = dt.datetime.strftime(default, "%Y-%m-%dT%H:%M:%S")
+
+ if self.stop is None:
+ # set stop value to now
+ self.stop = dt.datetime.strftime(dt.datetime.now(), '%Y-%m-%dT%H:%M:%S')
+
+ # add validated timestamps to report class
+ self.Report.addtime(self.start, self.stop)
+
+ # if one or more domains are specified return only their info
+ if self.domain is not None:
+
+ # iterate over all domains supplied
+ for domain in self.domain:
+
+ # build and execute
+ sql = '''SELECT COUNT(*) AS messages, COUNT(DISTINCT user) AS bots, domain, MIN(ts) AS first, MAX(ts) AS last
+ FROM spam
+ WHERE domain = :domain
+ AND ts > :start AND ts < :stop;'''
+ parameter = {
+ "domain": domain,
+ "start": self.start,
+ "stop": self.stop
+ }
+ query = self.conn.execute(sql, parameter).fetchall()
+
+ # if specified domain is not listed yet, the resulting table will not show the domain name
+ # this ugly tuple 2 list swap prevents this
+ temp = list(query[0])
+ if temp[2] is None:
+ temp[2] = domain
+ query[0] = tuple(temp)
+
+ # extend result tables
+ result.extend(query)
+
+ # generate report if enabled
+ if self.report:
+ self.gen_report(domain, query)
+
+ else:
+ # build and execute
+ sql = '''SELECT COUNT(*) AS messages, COUNT(DISTINCT user) AS bots, domain AS domain from spam
+ WHERE ts > :start AND ts < :stop
+ GROUP BY domain ORDER BY 1 DESC LIMIT 10;'''
+ result = self.conn.execute(sql, {"start": self.start, "stop": self.stop}).fetchall()
+
+ # tabelize data
+ spam_table = tabulate.tabulate(result, tablefmt="psql", headers=["messages", "bots", "domain","first seen",
+ "last seen"])
+
+ # output to stdout
+ output = "\n\n".join([spam_table])
+ print(output, file=sys.stdout)
+
+ def gen_report(self, domain: str, query: list):
+ """
+ method generating the report files
+ :param domain: string containing a domain name
+ :param query: list of tuples containing the query results for the specified domain/s
+ """
+ try:
+ # open abuse report template file
+ with open("/".join([self.path, "template/abuse-template.txt"]), "r", encoding="utf-8") as template:
+ report_template = template.read()
+
+ except FileNotFoundError as err:
+ print(err, file=sys.stderr)
+ exit(1)
+
+ # current date
+ now = dt.datetime.strftime(dt.datetime.now(), "%Y-%m-%d")
+
+ # output to report directory
+ report_filename = "abuse-{domain}-{date}.txt".format(date=now, domain=domain)
+ jids_filename = "abuse-{domain}-{date}-jids.txt".format(date=now, domain=domain)
+ logs_filename = "abuse-{domain}-{date}-logs.txt".format(date=now, domain=domain)
+
+ # write report files
+ with open("/".join([self.path, "report", report_filename]), "w", encoding="utf-8") as report_out:
+ content = self.Report.template(report_template, domain, query)
+ report_out.write(content)
+
+ with open("/".join([self.path, "report", jids_filename]), "w", encoding="utf-8") as report_out:
+ content = self.Report.jids(domain)
+ report_out.write(content)
+
+ with open("/".join([self.path, "report", logs_filename]), "w", encoding="utf-8") as report_out:
+ content = self.Report.logs(domain)
+ report_out.write(content)
if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument('-in', '--infile', nargs='+', help='set path to input file', dest='infile')
- parser.add_argument('-d', '--domain', action='append', help='specify report domain', dest='domain')
- parser.add_argument('-r', '--report', action='store_true', help='toggle report output to file', dest='report')
- parser.add_argument('-f', '--from', help='ISO-8601 timestamp from where to search', dest='start')
- parser.add_argument('-t', '--to', help='ISO-8601 timestamp up until where to search', dest='stop')
- args = parser.parse_args()
-
- # run
- AbuseReport(args).main()
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-in', '--infile', nargs='+', help='set path to input file', dest='infile')
+ parser.add_argument('-d', '--domain', action='append', help='specify report domain', dest='domain')
+ parser.add_argument('-r', '--report', action='store_true', help='toggle report output to file', dest='report')
+ parser.add_argument('-f', '--from', help='ISO-8601 timestamp from where to search', dest='start')
+ parser.add_argument('-t', '--to', help='ISO-8601 timestamp up until where to search', dest='stop')
+ args = parser.parse_args()
+
+ # run
+ AbuseReport(args).main()