+# See for more information
+# See for more hooks
+- repo:
+ rev: v3.2.0
+ hooks:
+ - id: check-docstring-first
+ - id: check-executables-have-shebangs
+ - id: fix-encoding-pragma
+ - id: trailing-whitespace
+ - id: end-of-file-fixer
+ - id: mixed-line-ending
+ args:
+ - "--fix=lf"
+ - id: check-yaml
+- repo:
+ rev: 19.10b0
+ hooks:
+ - id: black
+- repo:
+ rev: 3.8.3
+ hooks:
+ - id: flake8
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+import requests
+class TeamSpeakApi:
+ """
+ class to interact with the teamspeak rest api
+ """
+ def __init__(self, apikey: str, url: str):
+ # variables
+ self._apikey = apikey
+ self._url = url
+ # define api handler
+ self.cmd = self._rest
+ self.session = requests.Session()
+ @property
+ def _auth(self):
+ if self._apikey is not None:
+ return {"x-api-key": self._apikey}
+ def _rest(self, command: str) -> dict:
+ # add authentication header to the session obj
+ auth_header = self._auth
+ # build get request
+ r = self.session.get("/".join([self._url, command]), headers=auth_header, verify=False)
+ # proceed if response is ok
+ if r.ok:
+ return r.json()
+ return {}
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+import sys
+from pathlib import Path
+from ruamel.yaml import YAML
+from ruamel.yaml.parser import ParserError
+from ruamel.yaml.scanner import ScannerError
+class Config:
+ def __init__(self):
+ # class variables
+ self.content = None
+ # select config file
+ if Path.exists(Path("teamspeak-influx.yml")):
+ self.conf_file = Path("teamspeak-influx.yml")
+ else:
+ self.conf_file = Path("/etc/teamspeak-influx.yml")
+ # read config file
+ self._read()
+ def _read(self):
+ """init the config object with this method"""
+ self._check()
+ # open file as an iostream
+ with open(self.conf_file, "r", encoding="utf-8") as f:
+ try:
+ self.content = YAML(typ="safe").load(f)
+ # catch json decoding errors
+ except (ParserError, ScannerError) as err:
+ print(err, file=sys.stderr)
+ exit(1)
+ def _check(self):
+ """internal method to check if the config file exists"""
+ try:
+ # if file is present continue
+ if self.conf_file.exists():
+ return
+ # if not create a blank file
+ else:
+ self.conf_file.touch(mode=0o640)
+ # catch permission exceptions as this tries to write to /etc/
+ except PermissionError as err:
+ print(err, file=sys.stderr)
+ sys.exit(err.errno)
+ def get(self, key: str = None, default: (str, int) = None) -> (dict, str, int, None):
+ """
+ method to retrieve all config values, a single value
+ or the optional default value
+ """
+ # if a special key is request, return only that value
+ if key is not None:
+ # safety measure
+ if key in self.content:
+ return self.content[key]
+ # if a default value is given return that
+ if default is not None:
+ return default
+ # if the key isn't part if self.content return None
+ else:
+ return None
+ # else return everything
+ return self.content
+Description=teamspeak influxdb exporter
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+import time
+import string
+from influxdb import InfluxDBClient
+from config import Config
+from metrics import TeamSpeakMetrics
+keys = [
+ "connection_bytes_received_control",
+ "connection_bytes_received_keepalive",
+ "connection_bytes_received_speech",
+ "connection_bytes_received_total",
+ "connection_bytes_sent_control",
+ "connection_bytes_sent_keepalive",
+ "connection_bytes_sent_speech",
+ "connection_bytes_sent_total",
+ "connection_filetransfer_bytes_received_total",
+ "connection_filetransfer_bytes_sent_total",
+ "connection_packets_received_control",
+ "connection_packets_received_keepalive",
+ "connection_packets_received_speech",
+ "connection_packets_received_total",
+ "connection_packets_sent_control",
+ "connection_packets_sent_keepalive",
+ "connection_packets_sent_speech",
+ "connection_packets_sent_total",
+ "virtualserver_autostart",
+ "virtualserver_channelsonline",
+ "virtualserver_clientsonline",
+ "virtualserver_maxclients",
+ "virtualserver_queryclientsonline",
+ "virtualserver_reserved_slots",
+ "virtualserver_status",
+ "virtualserver_total_packetloss_control",
+ "virtualserver_total_packetloss_keepalive",
+ "virtualserver_total_packetloss_speech",
+ "virtualserver_total_packetloss_total",
+ "virtualserver_total_ping",
+ "virtualserver_uptime",
+class Influx:
+ def __init__(self, data, cld):
+ self._metrics = data
+ self.client = cld
+ @staticmethod
+ def _timestamp():
+ return int(time.time() * 1000)
+ @staticmethod
+ def _rmspace(key: str = None, value: (str, int) = None):
+ try:
+ key = key.replace(" ", "\ ") # noqa: W605
+ value = value.replace(" ", "\ ") # noqa: W605
+ except (TypeError, AttributeError):
+ pass
+ return key, value
+ def _parse(self, name, key, value, ts, tags=None):
+ output = name
+ # check if tags is a dict
+ if isinstance(tags, dict):
+ # create tag_key=tag_value pairs for all elements and append them to name
+ for k, v in tags.items():
+ output += ",{}={}".format(*self._rmspace(k, v))
+ # append key=value to name
+ if value[0] in string.ascii_letters:
+ output += ' {}="{}" {}'.format(*self._rmspace(key, value), ts)
+ elif value.isdigit():
+ output += " {}={}i {}".format(*self._rmspace(key, value), ts)
+ else:
+ output += " {}={} {}".format(*self._rmspace(key, value), ts)
+ return output
+ def write_metrics(self):
+ data = list()
+ cur_ts = self._timestamp()
+ # serverinfo
+ for id in ids:
+ query = self._metrics.serverinfo(id)
+ name, sid = query["virtualserver_name"], query["virtualserver_id"]
+ for key, value in query.items():
+ if key in keys:
+ data.append(self._parse("teamspeak", key, value, cur_ts, {"name": name, "virtual_server": sid}))
+ self.client.write_points(data, time_precision="ms", batch_size=10000, protocol="line")
+if __name__ == "__main__":
+ # load config
+ config = Config()
+ # credentials and parameters
+ url = config.get("url", default="https://localhost:10443")
+ apikey = config.get("apikey", default="")
+ ids = config.get("ids", default=1)
+ # config influxdb
+ influx_host = config.get("influxdb_host", default="localhost")
+ influx_port = config.get("influxdb_port", default=8086)
+ influx_dbname = config.get("influxdb_db", default="teamspeak")
+ # init handler
+ metrics = TeamSpeakMetrics(url=url, apikey=apikey)
+ client = InfluxDBClient(host=influx_host, port=influx_port, database=influx_dbname, retries=5)
+ # create database only once
+ client.create_database(influx_dbname)
+ # init influx class
+ influx = Influx(metrics, client)
+ while True:
+ influx.write_metrics()
+ time.sleep(10)
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+from api import TeamSpeakApi
+class TeamSpeakMetrics(TeamSpeakApi):
+ """
+ TeamSpeak metrics harvester
+ """
+ def __init__(self, url: str, apikey: str = None):
+ # init teamspeak api
+ super().__init__(apikey=apikey, url=url)
+ # variables
+ self.url = url
+ def serverinfo(self, id: int = 1) -> dict:
+ tmp = self.cmd(f"{id}/serverinfo")
+ if tmp["status"]["code"] != 0:
+ return {}
+ else:
+ return tmp["body"][0]
+line-length = 120
+target-version = ['py37', 'py38']
+include = '\.pyi?$'
+exclude = '''
+ /(
+ \.eggs # exclude a few common directories in the
+ | \.git # root of the project
+ | \.hg
+ | \.mypy_cache
+ | \.tox
+ | \.venv
+ | _build
+ | buck-out
+ | build
+ | dist
+ )/ # the root of the project
+ignore = E501,E203
+exclude = .git,__pycache__,.gitlab
+max-complexity = 15
+max-line-length = 120
+# endpoint url
+url: ""
+# virtual server ids
+ids: [1,2,3]
+# authentication
+apikey: "loreipsumloreipsumloreipsum"
+# influx db configuration
+influxdb_host: "localhost"
+influxdb_port: 8086
+influxdb_db: "example"