Source code for netdef.Engines.BaseEngine

import logging
import queue
import threading

from ..Shared.Internal import Statistics

log = logging.getLogger(__name__)
log.debug("load module")


[docs]class BaseEngine: def __init__(self, shared=None): self.add_shared_object(shared) self._controllers = None self._rules = None self._sources = None
[docs] def add_shared_object(self, shared): self.shared = shared
[docs] def add_controller_classes(self, controllers): self._controllers = controllers
[docs] def add_rule_classes(self, rules): self._rules = rules
[docs] def add_source_classes(self, sources): self._sources = sources
[docs] def start(self): raise NotImplementedError
[docs] def stop(self): raise NotImplementedError
[docs] def wait(self): raise NotImplementedError
[docs] def init(self): raise NotImplementedError
[docs] def load(self, base_package): raise NotImplementedError
[docs] @staticmethod def block(): raise NotImplementedError
[docs]class BaseExpressionExecutor: def __init__(self, name, shared): self.add_name(name) self.add_shared(shared) self.init_queue() self.add_interrupt(None)
[docs] def add_name(self, name): self.name = name
[docs] def add_shared(self, shared): self.shared = shared
[docs] def init_queue(self): self.incoming = self.shared.queues.get_messages_to_engine() self.messagetypes = self.shared.queues.MessageType self.queue_timeout = 0.1
[docs] def add_interrupt(self, interrupt): self._interrupt = interrupt
[docs] def has_interrupt(self): return self._interrupt.is_set()
[docs] def run(self): raise NotImplementedError
[docs] def loop_incoming(self): try: while not self.has_interrupt(): if Statistics.on: Statistics.set( self.name + ".incoming.queue.size", self.incoming.qsize() ) Statistics.set( self.name + ".threading.total.count", threading.active_count() ) messagetype, incoming = self.incoming.get( block=True, timeout=self.queue_timeout ) if messagetype == self.messagetypes.RUN_EXPRESSION: source_item, expressions, value, source_time, status_code = incoming self.handle_run_expression( source_item, expressions, value, source_time, status_code ) else: raise NotImplementedError except queue.Empty: pass
[docs] def handle_run_expression( self, source_item, expressions, value, source_time, status_code ): raise NotImplementedError