#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json import time import os from influxdb import InfluxDBClient from ejabberdrpc import EjabberdMetrics class Influx: def __init__(self, metrics, client): self._metrics = metrics self.client = client @staticmethod def _timestamp(): return int(time.time() * 1000) @staticmethod def _rmspace(key: str = None, value=None): try: key = key.replace(" ", "\ ") value = value.replace(" ","\ ") except (TypeError, AttributeError): pass return key, value def _parse(self, name, key, value, ts, tags=None): output = name # check if tags is a dict if isinstance(tags, dict): # create tag_key=tag_value pairs for all elements and append them to name for k, v in tags.items(): output += ",{}={}".format(*self._rmspace(k, v)) # append key=value to name output += ' {}={}i {}'.format(*self._rmspace(key, value), ts) return output def writeMetrics(self): name = "ejabberd" data = list() # global values cur_ts = self._timestamp() data.append("{m} s2s_in={v}i {ts}".format(m=name, v=self._metrics.get_s2s_in(), ts=cur_ts)) data.append("{m} s2s_out={v}i {ts}".format(m=name, v=self._metrics.get_s2s_out(), ts=cur_ts)) # vhost values for vhost in self._metrics.get_vhosts(): cur_ts = self._timestamp() data.append("{m},vhost={vh} registered={v}i {ts}".format(m=name, vh= vhost, v=self._metrics.get_registered(vhost),ts=cur_ts)) data.append("{m},vhost={vh} muc={v}i {ts}".format(m=name, vh= vhost, v=self._metrics.get_muc(vhost), ts=cur_ts)) # vhost statistics on their respective node for node in self._metrics.get_nodes(): cur_ts = self._timestamp() for k, v in self._metrics.get_online_by_status(node=node, vhost=vhost).items(): data.append(self._parse("ejabberd_online_status", k, v, cur_ts, {"node": node, "vhost": vhost})) for k, v in self._metrics.get_online_by_client(node=node, vhost=vhost).items(): data.append(self._parse("ejabberd_online_client", k, v, cur_ts, {"node": node, "vhost": vhost})) for k, v in self._metrics.get_online_by_ipversion(node=node, vhost=vhost).items(): data.append(self._parse("ejabberd_online_ipversion", k, v, cur_ts, {"node": node, "vhost": vhost})) for k, v in self._metrics.get_online_by_connection(node=node, vhost=vhost).items(): data.append(self._parse("ejabberd_online_connection", k, v, cur_ts, {"node": node, "vhost": vhost})) for cl, ipv in self._metrics.get_online_client_by_ipversion(node=node, vhost=vhost).items(): for k, v in ipv.items(): data.append(self._parse("ejabberd_online_client_ipversion", k, v, cur_ts, {"vhost": vhost, "node": node, "ipversion": k, "client": cl})) # write output to database self.client.write_points(data, time_precision='ms', batch_size=10000, protocol='line') if __name__ == "__main__": # load config path = os.path.dirname(__file__) with open("/".join([path, "config.json"]), "r", encoding="utf-8") as f: config = json.load(f) url = config['url'] if "url" in config else "http://localhost:4560" login = config['login'] if "login" in config else None api = config['api'] if "api" in config else "rpc" # config influxdb influxdb_host = config['influxdb_host'] if "influxdb_host" in config else "localhost" influxdb_port = config['influxdb_port'] if "influxdb_port" in config else 8086 influxdb_database = config['database'] if "database" in config else "ejabberd" # init global handler metrics = EjabberdMetrics(url, login, api) client = InfluxDBClient(host=influxdb_host, port=influxdb_port, database=influxdb_database, retries=5) # create database only once client.create_database(influxdb_database) # init influx class influx = Influx(metrics, client) while True: metrics.update() influx.writeMetrics() time.sleep(10)