aboutsummaryrefslogtreecommitdiffstats
path: root/influx.py
blob: 049bef09f67af0eb7dfc379500b4231e2bc605fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import json
import time
import os

from influxdb import InfluxDBClient

from ejabberdrpc import EjabberdMetrics


class Influx:
    def __init__(self, metrics, client):
        self._metrics = metrics
        self.client = client

    @staticmethod
    def _timestamp():
        return int(time.time() * 1000)

    @staticmethod
    def _rmspace(key: str = None, value=None):
        try:
            key = key.replace(" ", "\ ")
            value = value.replace(" ","\ ")
        except (TypeError, AttributeError):
            pass

        return key, value

    def _parse(self, name, key, value, ts, tags=None):
        output = name

        # check if tags is a dict
        if isinstance(tags, dict):

            # create tag_key=tag_value pairs for all elements and append them to name
            for k, v in tags.items():
                output += ",{}={}".format(*self._rmspace(k, v))

        # append key=value to name
        output += ' {}={}i {}'.format(*self._rmspace(key, value), ts)
        return output

    def writeMetrics(self):
        name = "ejabberd"
        data = list()

        # global values
        cur_ts = self._timestamp()
        data.append("{m} s2s_in={v}i {ts}".format(m=name, v=self._metrics.get_s2s_in(), ts=cur_ts))
        data.append("{m} s2s_out={v}i {ts}".format(m=name, v=self._metrics.get_s2s_out(), ts=cur_ts))

        # vhost values
        for vhost in self._metrics.get_vhosts():
            cur_ts = self._timestamp()
            data.append("{m},vhost={vh} registered={v}i {ts}".format(m=name, vh= vhost, v=self._metrics.get_registered(vhost),ts=cur_ts))
            data.append("{m},vhost={vh} muc={v}i {ts}".format(m=name, vh= vhost, v=self._metrics.get_muc(vhost), ts=cur_ts))

            # vhost statistics on their respective node
            for node in self._metrics.get_nodes():
                cur_ts = self._timestamp()
                for k, v in self._metrics.get_online_by_status(node=node, vhost=vhost).items():
                    data.append(self._parse("ejabberd_online_status", k, v, cur_ts, {"node": node, "vhost": vhost}))
                for k, v in self._metrics.get_online_by_client(node=node, vhost=vhost).items():
                    data.append(self._parse("ejabberd_online_client", k, v, cur_ts, {"node": node, "vhost": vhost}))
                for k, v in self._metrics.get_online_by_ipversion(node=node, vhost=vhost).items():
                    data.append(self._parse("ejabberd_online_ipversion", k, v, cur_ts, {"node": node, "vhost": vhost}))
                for k, v in self._metrics.get_online_by_connection(node=node, vhost=vhost).items():
                    data.append(self._parse("ejabberd_online_connection", k, v, cur_ts, {"node": node, "vhost": vhost}))
                for cl, ipv in self._metrics.get_online_client_by_ipversion(node=node, vhost=vhost).items():
                    for k, v in ipv.items():
                        data.append(self._parse("ejabberd_online_client_ipversion", k, v, cur_ts, {"vhost": vhost, "node": node, "ipversion": k, "client": cl}))

        # write output to database
        self.client.write_points(data, time_precision='ms', batch_size=10000, protocol='line')


if __name__ == "__main__":
    # load config
    path = os.path.dirname(__file__)
    with open("/".join([path, "config.json"]), "r", encoding="utf-8") as f:
        config = json.load(f)

    # init global handler
    metrics = EjabberdMetrics("http://localhost:4560", config['login'])
    client = InfluxDBClient(host='localhost', port=8086, database=config['database'], retries=5)

    # create database only once
    client.create_database(config['database'])

    # init influx class
    influx = Influx(metrics, client)

    while True:
        metrics.update()
        influx.writeMetrics()

        time.sleep(10)