aboutsummaryrefslogtreecommitdiffstats
path: root/influx.py
blob: 36d7a4768fbbc14b2ae59547e426ea0767e9e687 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import json
import os
import time

from influxdb import InfluxDBClient

from ejabberdrpc import EjabberdMetrics


class Influx:
    def __init__(self, data, cld):
        self._metrics = data
        self.client = cld

    @staticmethod
    def _timestamp():
        return int(time.time() * 1000)

    @staticmethod
    def _rmspace(key: str = None, value: (str, int) = None):
        try:
            key = key.replace(' ', '\ ')
            value = value.replace(' ', '\ ')
        except (TypeError, AttributeError):
            pass

        return key, value

    def _parse(self, name, key, value, ts, tags=None):
        output = name

        # check if tags is a dict
        if isinstance(tags, dict):

            # create tag_key=tag_value pairs for all elements and append them to name
            for k, v in tags.items():
                output += ',{}={}'.format(*self._rmspace(k, v))

        # append key=value to name
        output += ' {}={}i {}'.format(*self._rmspace(key, value), ts)
        return output

    def write_metrics(self):
        data = list()

        # global values
        cur_ts = self._timestamp()
        data.append(f'ejabberd s2s_in={self._metrics.get_s2s_in()}i {cur_ts}')
        data.append(f'ejabberd s2s_out={self._metrics.get_s2s_out()}i {cur_ts}')

        # vhost values
        for vhost in self._metrics.get_vhosts():
            cur_ts = self._timestamp()
            data.append(f'ejabberd,vhost={vhost} registered={self._metrics.get_registered(vhost)}i {cur_ts}')
            data.append(f'ejabberd,vhost={vhost} muc={self._metrics.get_muc(vhost)}i {cur_ts}')

            # vhost statistics on their respective node
            for node in self._metrics.get_nodes():
                cur_ts = self._timestamp()
                for k, v in self._metrics.get_online_by_status(node=node, vhost=vhost).items():
                    data.append(self._parse('ejabberd_online_status', k, v, cur_ts, {'node': node, 'vhost': vhost}))
                for k, v in self._metrics.get_online_by_client(node=node, vhost=vhost).items():
                    data.append(self._parse('ejabberd_online_client', k, v, cur_ts, {'node': node, 'vhost': vhost}))
                for k, v in self._metrics.get_online_by_ipversion(node=node, vhost=vhost).items():
                    data.append(self._parse('ejabberd_online_ipversion', k, v, cur_ts, {'node': node, 'vhost': vhost}))
                for k, v in self._metrics.get_online_by_connection(node=node, vhost=vhost).items():
                    data.append(self._parse('ejabberd_online_connection', k, v, cur_ts, {'node': node, 'vhost': vhost}))
                for cl, ipv in self._metrics.get_online_client_by_ipversion(node=node, vhost=vhost).items():
                    for k, v in ipv.items():
                        data.append(self._parse('ejabberd_online_client_ipversion', k, v, cur_ts,
                                                {'vhost': vhost, 'node': node, 'ipversion': k, 'client': cl}))

        # write output to database
        self.client.write_points(data, time_precision='ms', batch_size=10000, protocol='line')


if __name__ == '__main__':
    # load config
    path = os.path.dirname(__file__)
    with open('/'.join([path, 'config.json']), 'r', encoding='utf-8') as f:
        config = json.load(f)

    # creds and params
    url = config['url'] if 'url' in config else 'http://localhost:5280/api'
    login = config['login'] if 'login' in config else None
    api = config['api'] if 'api' in config else 'rest'

    # config influxdb
    influx_host = config['influxdb_host'] if 'influxdb_host' in config else 'localhost'
    influx_port = config['influxdb_port'] if 'influxdb_port' in config else 8086
    influx_dbname = config['influxdb_db'] if 'influxdb_db' in config else 'ejabberd'

    # init handler
    metrics = EjabberdMetrics(url, login, api)
    client = InfluxDBClient(host=influx_host, port=influx_port, database=influx_dbname, retries=5)

    # create database only once
    client.create_database(influx_dbname)

    # init influx class
    influx = Influx(metrics, client)

    while True:
        metrics.update()
        influx.write_metrics()

        time.sleep(10)