import logging
import queue
import threading
from ..Shared.Internal import Statistics
log = logging.getLogger(__name__)
log.debug("load module")
[docs]class BaseEngine:
def __init__(self, shared=None):
self.add_shared_object(shared)
self._controllers = None
self._rules = None
self._sources = None
[docs] def add_shared_object(self, shared):
self.shared = shared
[docs] def add_controller_classes(self, controllers):
self._controllers = controllers
[docs] def add_rule_classes(self, rules):
self._rules = rules
[docs] def add_source_classes(self, sources):
self._sources = sources
[docs] def start(self):
raise NotImplementedError
[docs] def stop(self):
raise NotImplementedError
[docs] def wait(self):
raise NotImplementedError
[docs] def init(self):
raise NotImplementedError
[docs] def load(self, base_package):
raise NotImplementedError
[docs] @staticmethod
def block():
raise NotImplementedError
[docs]class BaseExpressionExecutor:
def __init__(self, name, shared):
self.add_name(name)
self.add_shared(shared)
self.init_queue()
self.add_interrupt(None)
[docs] def add_name(self, name):
self.name = name
[docs] def add_shared(self, shared):
self.shared = shared
[docs] def init_queue(self):
self.incoming = self.shared.queues.get_messages_to_engine()
self.messagetypes = self.shared.queues.MessageType
self.queue_timeout = 0.1
[docs] def add_interrupt(self, interrupt):
self._interrupt = interrupt
[docs] def has_interrupt(self):
return self._interrupt.is_set()
[docs] def run(self):
raise NotImplementedError
[docs] def loop_incoming(self):
try:
while not self.has_interrupt():
if Statistics.on:
Statistics.set(
self.name + ".incoming.queue.size", self.incoming.qsize()
)
Statistics.set(
self.name + ".threading.total.count", threading.active_count()
)
messagetype, incoming = self.incoming.get(
block=True, timeout=self.queue_timeout
)
if messagetype == self.messagetypes.RUN_EXPRESSION:
source_item, expressions, value, source_time, status_code = incoming
self.handle_run_expression(
source_item, expressions, value, source_time, status_code
)
else:
raise NotImplementedError
except queue.Empty:
pass
[docs] def handle_run_expression(
self, source_item, expressions, value, source_time, status_code
):
raise NotImplementedError