Source code for netdef.Engines.ThreadedEngine

import logging
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Event, Thread

from ..Shared.Internal import Statistics
from . import BaseEngine

log = logging.getLogger("ThreadedEngine")
log.info("Enter ThreadedEngine")


[docs]class ThreadedEngine(BaseEngine.BaseEngine): def __init__(self, shared): super().__init__(shared) self._controller_pool = {} self._rule_pool = {} self._expression_executor = None self._expression_executor_thread = None self._interrupt = Event()
[docs] def init(self): self._controllers.init() self._rules.init() self._sources.init()
[docs] def load(self, base_package): pass
[docs] def start(self): time.sleep(0.1) log.info("Setup rules") for name, obj in self._rules.instances.items(): obj.setup() log.info("start rules") for name, obj in self._rules.instances.items(): obj.add_interrupt(self._interrupt) thr = Thread(target=obj.run, name=name) thr.start() self._rule_pool[name] = thr log.info("Start expression executor") self._expression_executor = ExpressionExecutor( "ExpressionExecutor", self.shared ) self._expression_executor.add_interrupt(self._interrupt) self._expression_executor_thread = Thread( target=self._expression_executor.run, name="ExpressionExecutor" ) self._expression_executor_thread.start() log.info("Start controllers") for name, obj in self._controllers.instances.items(): self.shared.queues.send_running_state_to_controller(name) obj.add_interrupt(self._interrupt) thr = Thread(target=obj.run, name=name) thr.start() self._controller_pool[name] = thr
[docs] @staticmethod def block(): log.info("Wait for interrupt") try: while 1: time.sleep(1) except KeyboardInterrupt: pass
[docs] def stop(self): log.info("Send terminate interrupt") self._interrupt.set()
[docs] def wait(self): ct = threading.current_thread() for thread in threading.enumerate(): if not thread is ct: if not thread.daemon: thread.join()
[docs]class ExpressionExecutor(BaseEngine.BaseExpressionExecutor): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) max_workers = (os.cpu_count() or 1) * 10 max_workers = self.shared.config.config(self.name, "max_workers", max_workers) self.thread_pool = ThreadPoolExecutor(max_workers=max_workers) self.future_pool = [] if Statistics.on: Statistics.set(self.name + ".threadpool.max_workers.count", max_workers)
[docs] def run(self): log.info("Running") while not self.has_interrupt(): self.loop_incoming() # dispatch handle_* functions self.loop_futures() self.thread_pool.shutdown(wait=True) log.info("Stopped")
[docs] def handle_run_expression( self, source_item, expressions, value, source_time, status_code ): for expression in expressions: args = expression.get_args(source_item, value) kwargs = expression.get_kwargs() # debuggingtriks: expression.execute(args, kwargs) self.future_pool.append( ( self.thread_pool.submit(expression.execute, args, kwargs), expression.filename, ) )
[docs] def loop_futures(self): # while not self.has_interrupt(): if Statistics.on: Statistics.set( self.name + ".threadpool.workers.count", len(self.future_pool) ) for future, filename in self.future_pool: if future.done(): self.future_pool.remove((future, filename)) returned_exception = future.exception() if returned_exception: log.error("Exception in %s", filename) log.exception(returned_exception)