#!/usr/bin/env python3 # -*- coding: utf-8 -*- import logging import re from packaging import version class EjabberdApi: """ class to interact with the ejabberd rest/ xmlrpc api """ def __init__(self, url, login=None, api: str = "rpc"): # api variables self._login = login self._url = url if api == "rpc": self.cmd = self._rpc else: import requests self.session = requests.Session() self.cmd = self._rest @property def _auth(self) -> (str, None): if self._login is not None: return f"{self._login['user']}@{self._login['server']}", self._login['password'] return None def _rest(self, command: str, data) -> dict: # add authentication header to the session obj if self.session.auth is None: self.session.auth = self._auth # post r = self.session.post('/'.join([self._url, command]), json=data) # proceed if response is ok if r.ok: return r.json() return {} def _rpc(self, command: str, data): from xmlrpc import client with client.ServerProxy(self._url) as server: fn = getattr(server, command) try: if self._login is not None: return fn(self._login, data) return fn(data) except: # this needs to be more specific return {}