Source code for netdef.Controllers.InfluxDBLoggerController

import datetime
import logging

from requests.exceptions import RequestException

from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
from netdef.Controllers import BaseController, Controllers
from netdef.Sources.BaseSource import StatusCode

# import my supported sources
from netdef.Sources.InfluxDBLoggerSource import InfluxDBLoggerSource


[docs]@Controllers.register("InfluxDBLoggerController") class InfluxDBLoggerController(BaseController.BaseController): """ .. danger:: Development Status :: 3 - Alpha A logging controller. Its purpose is to store every write event into influxdb. """ def __init__(self, name, shared): super().__init__(name, shared) self.logger = logging.getLogger(self.name) self.logger.info("init") self.dsn = self.shared.config.config( self.name, "dsn", "influxdb:///netdef_generic_db" ) self.client = InfluxDBClient.from_dsn(self.dsn)
[docs] def run(self): "Main loop. Will exit when receiving interrupt signal" reconnect_timeout = 0 while not self.has_interrupt(): self.sleep(reconnect_timeout) reconnect_timeout = self.shared.config.config( self.name, "reconnect_timeout", 20 ) try: self.logger.info("Running") self.client.create_database(self.client._database) while not self.has_interrupt(): self.loop_incoming() # dispatch handle_* functions # self.loop_outgoing() # dispatch poll_* functions except ( InfluxDBClientError, InfluxDBServerError, RequestException, ) as error: self.logger.debug("Exception: %s", error) self.logger.error( "Connection error. Reconnect in %s sec.", reconnect_timeout ) self.statistics_update() self.logger.info("Stopped")
def handle_add_source(self, incoming): self.logger.debug("'Add source' event for %s", incoming.key) if isinstance(incoming, InfluxDBLoggerSource): self.add_source(incoming.unpack_measurement(), incoming)
[docs] def handle_write_source(self, incoming, value, source_time): """ Write given value and timestamp into influxdb :param InfluxDBLoggerSource incoming: source instance :param value: frozen value if instance :param datetime.datetime source_time: value timestamp """ if isinstance(incoming, InfluxDBLoggerSource): points = incoming.get_points(value, source_time, incoming.status_code) else: points = InfluxDBLoggerSource.make_points( incoming, incoming.key, value, source_time, incoming.status_code ) try: if not self.client.write_points(points): self.logger.error( "Write error on influxdb db:%s measurement:%s value:%s time:%s", self.client._database, incoming.key, value, source_time, ) except (InfluxDBServerError, RequestException) as write_error: self.logger.exception(write_error) self.logger.error( "Server error on influxdb db:%s measurement:%s value:%s time:%s", self.client._database, incoming.key, value, source_time, ) self.logger.debug( "'Write source' event to %s. value: %s at: %s", incoming.key, value, source_time, )