#!/usr/bin/env python3 # -*- coding: utf-8 -*- import logging import time from influxdb import InfluxDBClient from config import Config from metrics import EjabberdMetrics class Influx: def __init__(self, data, cld): self._metrics = data self.client = cld @staticmethod def _timestamp(): return int(time.time() * 1000) @staticmethod def _rmspace(key: str = None, value: (str, int) = None): try: key = key.replace(" ", "\ ") # noqa: W605 value = value.replace(" ", "\ ") # noqa: W605 except (TypeError, AttributeError): pass return key, value def _parse(self, name, key, value, ts, tags=None): output = name # check if tags is a dict if isinstance(tags, dict): # create tag_key=tag_value pairs for all elements and append them to name for k, v in tags.items(): output += ",{}={}".format(*self._rmspace(k, v)) # append key=value to name output += " {}={}i {}".format(*self._rmspace(key, value), ts) return output def write_metrics(self): data = list() # global values cur_ts = self._timestamp() data.append(f"ejabberd s2s_in={self._metrics.get_s2s_in()}i {cur_ts}") data.append(f"ejabberd s2s_out={self._metrics.get_s2s_out()}i {cur_ts}") data.append(f"ejabberd uptime={self._metrics.get_uptime()}i {cur_ts}") data.append(f"ejabberd processes={self._metrics.get_processes()}i {cur_ts}") # vhost values for vhost in self._metrics.get_vhosts(): cur_ts = self._timestamp() data.append(f"ejabberd,vhost={vhost} registered={self._metrics.get_registered(vhost)}i {cur_ts}") data.append(f"ejabberd,vhost={vhost} muc={self._metrics.get_muc(vhost)}i {cur_ts}") # vhost statistics on their respective node for node in self._metrics.get_nodes(): cur_ts = self._timestamp() for k, v in self._metrics.get_online_by_status(node=node, vhost=vhost).items(): data.append(self._parse("ejabberd_online_status", k, v, cur_ts, {"node": node, "vhost": vhost})) for k, v in self._metrics.get_online_by_client(node=node, vhost=vhost).items(): data.append(self._parse("ejabberd_online_client", k, v, cur_ts, {"node": node, "vhost": vhost})) for k, v in self._metrics.get_online_by_ipversion(node=node, vhost=vhost).items(): data.append(self._parse("ejabberd_online_ipversion", k, v, cur_ts, {"node": node, "vhost": vhost})) for k, v in self._metrics.get_online_by_connection(node=node, vhost=vhost).items(): data.append(self._parse("ejabberd_online_connection", k, v, cur_ts, {"node": node, "vhost": vhost})) for cl, ipv in self._metrics.get_online_client_by_ipversion(node=node, vhost=vhost).items(): for k, v in ipv.items(): data.append( self._parse( "ejabberd_online_client_ipversion", k, v, cur_ts, {"vhost": vhost, "node": node, "ipversion": k, "client": cl}, ) ) # write output to database self.client.write_points(data, time_precision="ms", batch_size=10000, protocol="line") if __name__ == "__main__": # load config config = Config() if config.get("debug", default=False): logging.getLogger().setLevel(logging.DEBUG) # credentials and parameters url = config.get("url", default="http://localhost:5280/api") login = config.get("login", default=None) api = config.get("api", default="rest") # config influxdb influx_host = config.get("influxdb_host", default="localhost") influx_port = config.get("influxdb_port", default=8086) influx_dbname = config.get("influxdb_db", default="ejabberd") # init handler metrics = EjabberdMetrics(url, login, api) client = InfluxDBClient(host=influx_host, port=influx_port, database=influx_dbname, retries=5) # create database only once client.create_database(influx_dbname) # init influx class influx = Influx(metrics, client) while True: metrics.update() influx.write_metrics() time.sleep(10)