Source code for netdef.Shared.SharedQueues

import logging
import queue
from enum import Enum

# mesage types


[docs]class MessageType(Enum): READ_ALL = 1 # not implemented yet "**warning:** Not implemented yet" ADD_SOURCE = 2 "Instruct the controller to update the given source's value from external datasource" READ_SOURCE = 3 # not implemented yet "**warning:** Not implemented yet" WRITE_SOURCE = 4 "Instruct the controller to update external datasource from the given source's value" RUN_EXPRESSION = 5 "Instruct the rule or engine to execute the given expression's function" ADD_PARSER = 6 "Instruct the controller to use the given source class as a parser" REMOVE_SOURCE = 7 # not implemented yet "**warning:** Not implemented yet" TICK = 8 "Instruct the controller to send a reply" APP_STATE = 9 "Inform the controller of application state"
[docs]class AppStateType(Enum): SETUP = 1 RUNNING = 2
[docs]class SharedQueues: """ Message queues for all controllers, rules and the engine """ MessageType = MessageType AppStateType = AppStateType def __init__(self, maxsize=0): self.maxsize = maxsize self.logger = logging.getLogger(__name__) # dette er en dict med inncoming-køene til controllerene self.messages_to_controller = {} # inncoming-køene til regelmotorene self.messages_to_rule = {} # en liste over kontrollerene som er aktivert self.available_controllers = [] # en liste over regelmotorer som er aktivert self.available_rules = [] # den finnes bare én motor. dette er incoming-køen self.messages_to_engine = queue.Queue(maxsize)
[docs] def add_controller(self, name): """ Create a *incoming* queue for given controller' """ self.messages_to_controller[name] = queue.Queue(self.maxsize) self.available_controllers.append(name)
[docs] def add_rule(self, name): """ Create a *incoming* queue for given rule' """ self.messages_to_rule[name] = queue.Queue(self.maxsize) self.available_rules.append(name)
[docs] def get_messages_to_controller(self, name): """ Returns the *incoming* queue for given controller """ return self.messages_to_controller[name]
[docs] def get_messages_to_rule(self, name): """ Returns the *incoming* queue for given rule """ return self.messages_to_rule[name]
[docs] def get_messages_to_engine(self): """ Returns the *incoming* queue for the engine """ return self.messages_to_engine
[docs] def send_message_to_controller(self, messagetype, controllername, message_object): """ Send a message to given controller :param self.MessageType messagetype: :param str controllername: :param message_object: usually a source instance. can also be a tuple. """ try: self.messages_to_controller[controllername].put_nowait( (messagetype, message_object) ) except KeyError: self.logger.error( "Cannot send message %s. %s not enabled.", message_object, controllername, )
[docs] def send_message_to_rule(self, messagetype, rule_name, message_object): """ Send a message to given rule :param self.MessageType messagetype: :param str rule_name: :param message_object: usually a source instance. """ if rule_name == "*": for name in self.available_rules: self.messages_to_rule[name].put_nowait((messagetype, message_object)) else: self.messages_to_rule[rule_name].put_nowait((messagetype, message_object))
[docs] def send_message_to_engine(self, messagetype, message_object): """ Send a message to the engine :param self.MessageType messagetype: probably MessageType.RUN_EXPRESSION :param message_object: usually a source instance. """ self.messages_to_engine.put_nowait((messagetype, message_object))
[docs] def run_expressions_in_engine( self, source_instance, expressions, value, source_time, status_code ): """ Send a RUN_EXPRESSION message to the engine. :param source_instance: the source that triggered given expressions :param list expressions: list of expressions """ self.send_message_to_engine( MessageType.RUN_EXPRESSION, (source_instance, expressions, value, source_time, status_code), )
[docs] def write_value_to_controller(self, source_instance, value, source_time): """ Send a WRITE_SOURCE message to given controller :param source_instance: the source :param value: new value. datatype have to match the given source :param datetime.datetime source_time: timestamp in utc """ controllername = source_instance.controller try: self.messages_to_controller[controllername].put_nowait( (MessageType.WRITE_SOURCE, (source_instance, value, source_time)) ) except queue.Full: self.logger.error( "Cannot send message %s. Queue %s is full.", source_instance, controllername, )
[docs] def send_setup_state_to_controller(self, controllername): """ Send a APP_STATE message to given controller :param controllername: the controller """ self.messages_to_controller[controllername].put_nowait( (MessageType.APP_STATE, AppStateType.SETUP) )
[docs] def send_running_state_to_controller(self, controllername): """ Send a APP_STATE message to given controller :param controllername: the controller """ self.messages_to_controller[controllername].put_nowait( (MessageType.APP_STATE, AppStateType.RUNNING) )
[docs] def run_expressions_in_rule(self, source_instance): """ Send a RUN_EXPRESSION message to given rule. :param source_instance: the source """ rulename = source_instance.rule try: self.send_message_to_rule( MessageType.RUN_EXPRESSION, rulename, ( source_instance, source_instance.copy_get_value(), source_instance.source_time, source_instance.status_code, ), ) except queue.Full: self.logger.error( "Cannot send message %s. Queue %s is full.", source_instance, rulename )