Source code for netdef.Sources.InfluxDBLoggerSource

from netdef.Interfaces.InfluxDBLoggerInterface import InfluxDBLoggerInterface, Value
from netdef.Sources import BaseSource, Sources


[docs]@Sources.register("InfluxDBLoggerSource") class InfluxDBLoggerSource(BaseSource.BaseSource): """ A dataholder class to be used with :class:`InfluxDBLoggerController` """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.interface = InfluxDBLoggerInterface
[docs] def unpack_measurement(self): """ Returns self.key. Override to change measurement name. """ return self.key
[docs] @staticmethod def make_points(interface, measurement, value, source_time, status_code): """ Make a list suitable as argument for :func:`InfluxDBClient.write_points` :param interface: an object with key, rule, source an controller attrs :type interface: BaseSource, InfluxDBLoggerInterface :param str measurement: influxdb measurement name :param value: measurement field.value :param datetime.datetime source_time: measurement time :param BaseSource.StatusCode status_code: measurement field.status_code :returns: a list of dicts """ points = [ { "measurement": measurement, "time": source_time, "tags": { "key": interface.key, "rule": interface.rule, "source": interface.source, "controller": interface.controller, }, "fields": {"value": value, "status_code": str(status_code)}, } ] return points
[docs] def get_points(self, data, source_time, status_code): """ Returns a list suitable as argument for :func:`InfluxDBClient.write_points` :param InfluxDBLoggerInterface.Value data: an object with data to store in influxdb :param datetime.datetime source_time: measurement time :param BaseSource.StatusCode status_code: measurement field.status_code :returns: a list of dicts """ if isinstance(data, Value): return self.make_points( data, self.key, data.value, data.source_time, data.status_code ) else: return self.make_points(self, self.key, data, source_time, status_code)