Source code for netdef.Rules.InfluxDBLoggerRule

import logging
import pathlib

from netdef.Interfaces.InfluxDBLoggerInterface import InfluxDBLoggerInterface
from netdef.Rules import BaseRule, Rules


[docs]@Rules.register("InfluxDBLoggerRule") class InfluxDBLoggerRule(BaseRule.BaseRule): def __init__(self, name, shared): super().__init__(name, shared) self.logger = logging.getLogger(name) self.logger.info("init") config = self.shared.config.config self.auto_logging_on = config(self.name, "auto_logging_on", 1)
[docs] def setup(self): "" self.logger.info("Running setup")
[docs] def setup_auto_logging(self): """ Autogenerate logging expressions and sources for every source that is already created by other rules """ def expression_func(something, dblogger): if something.new or something.update: _val = InfluxDBLoggerInterface.make( something.key, something.instance.source, something.instance.rule, something.instance.controller, something.value, something.instance.source_time, something.instance.status_code, ) dblogger.set = _val source_count = 0 exp_count = 0 items = [item for item in self.shared.sources.instances.items] for item in items: source_count += self.add_new_expression( BaseRule.ExpressionInfo( BaseRule.Expression(expression_func, pathlib.Path(__file__).name), [ BaseRule.SourceInfo(item.source, item.key, item.controller), BaseRule.SourceInfo( "InfluxDBLoggerSource", item.key, "InfluxDBLoggerController" ), ], ) ) exp_count += 1 self.update_statistics(self.name + ".auto_logging", 0, exp_count, source_count)
[docs] def run(self): """ Main loop. Will exit when receiving interrupt signal. Calls :func:`setup_auto_logging` once at startup """ if self.auto_logging_on: self.logger.info("Setup autologging") self.setup_auto_logging() self.logger.info("Running") while not self.has_interrupt(): self.loop_incoming() # dispatch handle_* functions self.logger.info("Stopped")
[docs] def handle_run_expression(self, incoming, value, source_time, status_code): expressions = self.get_expressions(incoming) self.logger.debug( "Received %s. Found expressions %s", incoming.key, len(expressions) ) if expressions: self.send_expressions_to_engine( incoming, expressions, value, source_time, status_code )