Source code for netdef.Rules.YAMLRule

import logging
import pathlib

import yaml

from . import BaseRule, Rules

SourceInfo = BaseRule.SourceInfo
ExpressionInfo = BaseRule.ExpressionInfo

NAME = "YAMLRule"


[docs]@Rules.register(NAME) class YAMLRule(BaseRule.BaseRule): """ .. danger:: Development Status :: 3 - Alpha """ def __init__(self, name, shared): super().__init__(name, shared) self.logger = logging.getLogger(name) self.logger.info("init")
[docs] def setup(self): "Parse config files" self.logger.info("Running setup") for name, rel_yaml_file in self.shared.config.get_dict(NAME).items(): if rel_yaml_file: self.setup_yaml_rule(name, rel_yaml_file) self.logger.info("Done parsing") self.setup_done()
[docs] def setup_yaml_rule(self, name, rel_yamlfile): "parse given yaml-file" self.logger.info("loading %s", name) abs_root = self.shared.config("proj", "path") rel_yamlfile = rel_yamlfile.strip('"') self.logger.info(rel_yamlfile) # parse yaml abs_yamlfile = str(pathlib.Path(abs_root).joinpath(rel_yamlfile)) encoding = None # TODO: parse from config with open(abs_yamlfile, encoding=encoding) as yamlfile: yaml_object = yaml.safe_load(yamlfile) expression_count = 0 source_count = 0 for _parser in yaml_object.get("parsers", []): _source = _parser["source"] _controller = _parser["controller"] if "controller" in _parser else None source_name, controller_name = self.source_and_controller_from_key( _source, _controller ) self.add_new_parser(source_name, controller_name) for _expression in yaml_object.get("expressions", []): _module = _expression["module"] _kwargs = {} _args = _expression["arguments"] if "expression" in _expression: _kwargs["func"] = _expression["expression"] if "setup" in _expression: _kwargs["setup"] = _expression["setup"] expression_count += 1 source_info_list = [ SourceInfo(arg["source"], arg["key"]) for arg in _args ] expression_module = self.get_module_from_string( _module, __package__, abs_root, self.name, name ) expr_info = ExpressionInfo( expression_module, source_info_list, **_kwargs ) source_count += self.add_new_expression(expr_info) self.update_statistics( self.name + "." + name, 0, expression_count, source_count )
[docs] def run(self): "Main loop. Will exit when receiving interrupt signal" self.logger.info("Running") while not self.has_interrupt(): self.loop_incoming() # dispatch handle_* functions self.logger.info("Stopped")
[docs] def handle_run_expression(self, incoming, value, source_time, status_code): expressions = self.get_expressions(incoming) if expressions: self.send_expressions_to_engine( incoming, expressions, value, source_time, status_code )