Source code for netdef.Rules.CSVRule

import csv
import logging
import pathlib

from . import BaseRule, Rules

SourceInfo = BaseRule.SourceInfo
ExpressionInfo = BaseRule.ExpressionInfo

log = logging.getLogger("CSVRule")
NAME = "CSVRule"

log.debug("Loading module")


[docs]@Rules.register(NAME) class CSVRule(BaseRule.BaseRule): """ .. tip:: Development Status :: 5 - Production/Stable """ def __init__(self, name, shared): super().__init__(name, shared) log.info("init")
[docs] def setup(self): "Parse config files" log.info("Running setup") for name, active in self.shared.config.get_dict(NAME).items(): if int(active): self.setup_csv_rule(name) log.info("Done parsing") self.setup_done()
[docs] def setup_csv_rule(self, name): """ Parse CSV file. """ log.info("loading %s", name) abs_root = self.shared.config("proj", "path") rel_pyfile = self.shared.config(name, "py").strip('"') rel_csvfile = self.shared.config(name, "csv").strip('"') encoding = self.shared.config(name, "encoding", "").strip('"') or None log.info(rel_pyfile) log.info(rel_csvfile) expression_module = self.get_module_from_string( rel_pyfile, __package__, abs_root, self.name, name ) abs_csvfile = str(pathlib.Path(abs_root).joinpath(rel_csvfile)) start_of_csv = 0 with open(abs_csvfile, encoding=encoding) as csvfile: # support the excel spesific sep= header firstline = csvfile.readline() if firstline.startswith("sep="): start_of_csv = csvfile.tell() csvfile.seek(start_of_csv) dialect = csv.Sniffer().sniff(csvfile.read(1024)) csvfile.seek(start_of_csv) reader = csv.reader(csvfile, dialect) headers = next(reader) headers = list(h for h in headers if h) expression_count = 0 source_count = 0 for header in headers: source_name, controller_name = self.source_and_controller_from_key( header ) self.add_new_parser(source_name, controller_name) for row in reader: expression_count += 1 source_info_list = [ SourceInfo(header, column) for header, column in zip(headers, row) ] expr_info = ExpressionInfo(expression_module, source_info_list) source_count += self.add_new_expression(expr_info) self.update_statistics( self.name + "." + name, 0, expression_count, source_count )
[docs] def run(self): "Main loop. Will exit when receiving interrupt signal" log.info("Running") while not self.has_interrupt(): self.loop_incoming() # dispatch handle_* functions log.info("Stopped")
[docs] def handle_run_expression(self, incoming, value, source_time, status_code): expressions = self.get_expressions(incoming) # log.debug("Received %s. Found expressions %s",incoming.key, len(expressions)) if expressions: self.send_expressions_to_engine( incoming, expressions, value, source_time, status_code )
# for expression in expressions: # expression.execute()