Source code for netdef.Rules.INIRule

import configparser
import logging
import pathlib
import re

from . import BaseRule, Rules

SourceInfo = BaseRule.SourceInfo
ExpressionInfo = BaseRule.ExpressionInfo

compile_parsers = re.compile(r"(?P<source>\w+)")
compile_arguments = re.compile(r"(?P<source>\w+)\((?P<key>([^()])*)\)")

NAME = "INIRule"


[docs]@Rules.register(NAME) class INIRule(BaseRule.BaseRule): """ .. caution:: Development Status :: 4 - Beta """ def __init__(self, name, shared): super().__init__(name, shared) self.logger = logging.getLogger(name) self.logger.info("init")
[docs] def setup(self): "Parse config files" self.logger.info("Running setup") for name, rel_ini_file in self.shared.config.get_dict(NAME).items(): if rel_ini_file: self.setup_ini_rule(name, rel_ini_file) self.logger.info("Done parsing") self.setup_done()
[docs] def setup_ini_rule(self, name, rel_inifile): "parse given ini-file" self.logger.info("loading %s", name) abs_root = self.shared.config("proj", "path") rel_inifile = rel_inifile.strip('"') # parse ini abs_inifile = str(pathlib.Path(abs_root).joinpath(rel_inifile)) encoding = None # TODO: parse from config ini_object = configparser.ConfigParser() ini_object.read(abs_inifile) expression_count = 0 source_count = 0 for section in ini_object.sections(): if not ini_object.getint(section, "on", fallback=1): continue self.logger.info("loading section: %s", section) parser_string = ini_object.get(section, "parsers", fallback="") parsers = [] for match in compile_parsers.finditer(parser_string): parsers.append(match.group("source")) for _source in parsers: source_name, controller_name = self.source_and_controller_from_key( _source ) self.add_new_parser(source_name, controller_name) _module = ini_object.get(section, "module", fallback=None) _kwargs = {} if not _module: self.logger.error("module= is missing in [%s]", section) continue _expression = ini_object.get(section, "expression", fallback=None) if _expression: _kwargs["func"] = _expression _setup = ini_object.get(section, "setup", fallback=None) if _setup: _kwargs["setup"] = _setup arguments_string = ini_object.get(section, "arguments", fallback="") arguments = arguments_string.splitlines() for args_line in arguments: if args_line: source_info_list = [] for match in compile_arguments.finditer(args_line): source_info_list.append( SourceInfo(match.group("source"), match.group("key")) ) expression_count += 1 expression_module = self.get_module_from_string( _module, __package__, abs_root, self.name, name ) expr_info = ExpressionInfo( expression_module, source_info_list, **_kwargs ) source_count += self.add_new_expression(expr_info) self.update_statistics( self.name + "." + name, 0, expression_count, source_count )
[docs] def run(self): "Main loop. Will exit when receiving interrupt signal" self.logger.info("Running") while not self.has_interrupt(): self.loop_incoming() # dispatch handle_* functions self.logger.info("Stopped")
[docs] def handle_run_expression(self, incoming, value, source_time, status_code): expressions = self.get_expressions(incoming) if expressions: self.send_expressions_to_engine( incoming, expressions, value, source_time, status_code )