import logging
import queue
import time
from ..Shared.Internal import Statistics
from ..Sources.BaseSource import StatusCode
[docs]class BaseController:
"""
Abstract class for controllers.
:param str name: Name to be used in logfiles
:param shared: a reference to the shared object
"""
def __init__(self, name, shared):
self.name = name
self.shared = shared
self.add_logger(name)
self.init_queue()
self.init_sources(None)
self.init_parsers(None)
self.add_interrupt(None)
self._statistics_counters = {
"last_minute": 0,
"last_minute_time": ((time.time() // 60) * 60),
}
def _statistics_update_last_minute(self, increment):
"""
Write internal statistics to the Statistics singleton if activated
:param int increment: set to ``1`` every time a queue item is processed.
Set to ``0`` to just refresh statistics.
"""
if Statistics.on:
Statistics.set(self.name + ".incoming.queue.size", self.incoming.qsize())
counters = self._statistics_counters
if not counters["last_minute_time"] == ((time.time() // 60) * 60):
Statistics.set(
self.name + ".incoming.last_minute.count", counters["last_minute"]
)
Statistics.set(
self.name + ".incoming.last_minute.time",
time.strftime(
"%y.%m.%d %H:%M", time.localtime(counters["last_minute_time"])
),
)
counters["last_minute_time"] = (time.time() // 60) * 60
counters["last_minute"] = 1 if increment else 0
elif increment:
counters["last_minute"] += 1
[docs] def add_logger(self, name):
"Setup logging module"
self.logger = logging.getLogger(name)
[docs] def init_queue(self):
"Setup the message queue and timeout"
self.incoming = self.shared.queues.get_messages_to_controller(self.name)
self.messagetypes = self.shared.queues.MessageType
self.appstatetypes = self.shared.queues.AppStateType
self.queue_timeout = 0.1
[docs] def add_interrupt(self, interrupt):
"Setup the interrupt signal"
self._interrupt = interrupt
[docs] def has_interrupt(self):
"Returns True if the interrupt signal is received"
return self._interrupt.is_set()
[docs] def sleep(self, seconds):
""""
Sleep by waiting for the interrupt.
Should be used instead of time.sleep.
Override if sleep should be interrupted by even more signals
"""
self._interrupt.wait(seconds)
[docs] def init_sources(self, sources):
"""
Setup the source storage as a dict.
Override if something else is needed.
"""
if sources:
self._sources = sources
else:
self._sources = {}
[docs] def init_parsers(self, parsers):
"""
Setup the parser storage as a list.
Override if something else is needed.
"""
if parsers:
self._parsers = parsers
else:
self._parsers = []
[docs] def has_source(self, name):
"""
Return True if source name is found
"""
return name in self._sources
[docs] def add_source(self, name, init_value):
"""
Add a source to the storage dict.
Override if something else is needed.
"""
if not self.has_source(name):
self._sources[name] = init_value
if Statistics.on:
Statistics.set(self.name + ".sources.count", len(self._sources))
[docs] def get_sources(self):
"Return source storage"
return self._sources
[docs] def get_source(self, name):
"Return named source"
return self._sources[name]
[docs] def get_parsers(self):
"Return parser storage"
return self._parsers
[docs] def add_parser(self, parser):
"Add parser if not already exists"
if not parser in self._parsers:
self._parsers.append(parser)
[docs] def run(self):
"""
Override this function in controller. Example:
.. code-block:: python
def run(self):
self.logger.info("Running")
while not self.has_interrupt():
self.loop_incoming() # dispatch handle_* functions
self.loop_outgoing() # dispatch poll_* functions
self.logger.info("Stopped")
"""
raise NotImplementedError
[docs] def clear_incoming(self, until_empty=True, until_messagetype=None):
"""
Delete all messages from incoming queue.
:param bool until_empty: If True the function will block until queue is empty.
If False it will block forever.
:param MessageType until_messagetype: Block
until given messagetype is received
Example:
.. code-block:: python
...
while not self.has_interrupt():
reconnect = False
try:
if reconnect:
self.clear_incoming()
self.try_reconnect()
# main loop
while not self.has_interrupt():
self.loop_incoming()
self.loop_outgoing()
except ConnectionError:
reconnect = True
...
"""
while not self.has_interrupt():
try:
messagetype, incoming = self.incoming.get(block=False)
self.logger.debug("Discarding message of type %s", messagetype)
self.incoming.task_done()
if until_messagetype and until_messagetype == messagetype:
self._statistics_update_last_minute(1)
return
except queue.Empty:
if until_empty:
self._statistics_update_last_minute(0)
return
else:
self.sleep(self.queue_timeout)
[docs] def fetch_one_incoming(self):
"""
Returns one message from the queue.
:returns: tuple of (messagetype, incoming)
:rtype: tuple(MessageType, BaseSource)
"""
try:
if not self.has_interrupt():
messagetype, incoming = self.incoming.get(
block=True, timeout=self.queue_timeout
)
self._statistics_update_last_minute(1)
return messagetype, incoming
except queue.Empty:
self._statistics_update_last_minute(0)
return None, None
[docs] def loop_until_app_state_running(self):
"""
Usefull if you want your controller to block while ADD_SOURCE and ADD_PARSER is
dispatched
Example:
.. code-block:: python
def run(self):
self.loop_until_app_state_running()
while not self.has_interrupt():
try:
self.handle_connection()
while not self.has_interrupt():
self.loop_incoming()
self.loop_outgoing()
except ConnectionError:
self.handle_conn_error()
"""
self.loop_incoming(
until_empty=False, until_app_state=self.appstatetypes.RUNNING
)
[docs] def loop_incoming(
self,
until_empty=True,
until_timeout=0.0,
until_messagetype=None,
until_app_state=None,
):
"""
Get every message from the queue and dispatch the associated handler function.
:param bool until_empty: Blocking until queue is empty
:param float until_timeout: Timeout in seconds. ``0.0`` blocks forever.
:param MessageType until_messagetype: Blocking until given messagetype is dispatched
:param AppStateType until_app_state: Blocking until given app_state is dispatched
"""
loop_timeout = time.time() + until_timeout
while not self.has_interrupt():
if until_timeout > 0.0:
if loop_timeout < time.time():
return
try:
messagetype, incoming = self.incoming.get(
block=True, timeout=self.queue_timeout
)
self._statistics_update_last_minute(1)
if messagetype == self.messagetypes.READ_ALL:
self.handle_readall(incoming)
elif messagetype == self.messagetypes.ADD_SOURCE:
self.handle_add_source(incoming)
elif messagetype == self.messagetypes.READ_SOURCE:
self.handle_read_source(incoming)
elif messagetype == self.messagetypes.WRITE_SOURCE:
self.handle_write_source(incoming[0], incoming[1], incoming[2])
elif messagetype == self.messagetypes.ADD_PARSER:
self.handle_add_parser(incoming)
elif messagetype == self.messagetypes.TICK:
self.handle_tick(incoming)
elif messagetype == self.messagetypes.APP_STATE:
self.handle_app_state(incoming)
if until_app_state and incoming == until_app_state:
self.incoming.task_done()
return
else:
raise NotImplementedError
self.incoming.task_done()
if until_messagetype and until_messagetype == messagetype:
return
except queue.Empty:
self._statistics_update_last_minute(0)
if until_empty:
return
[docs] def handle_tick(self, incoming):
"""
Answer the tick message
"""
incoming.tick()
[docs] def handle_app_state(self, app_state):
"""
Override if controller need to react to application states
"""
if app_state == self.appstatetypes.RUNNING:
self.handle_app_state_running()
elif app_state == self.appstatetypes.SETUP:
self.handle_app_state_setup()
[docs] def handle_app_state_running(self):
"""
Override if controller need to react to running state
"""
self.logger.debug("Entering running state")
[docs] def handle_app_state_setup(self):
"""
Override if controller need to react to setup state
"""
self.logger.debug("Entering setup state")
[docs] def handle_add_parser(self, incoming):
"Add parser to controller if not already exists"
self.add_parser(incoming)
[docs] def handle_readall(self, incoming):
raise NotImplementedError
[docs] def handle_add_source(self, incoming):
raise NotImplementedError
[docs] def handle_read_source(self, incoming):
raise NotImplementedError
[docs] def handle_write_source(self, incoming, value, source_time):
raise NotImplementedError
[docs] def loop_outgoing(self):
"""
Check every source and call the poll_outgoing_item function
"""
for item in self.get_sources().values():
self.poll_outgoing_item(item)
[docs] def poll_outgoing_item(self, item):
raise NotImplementedError
[docs] def send_outgoing(self, outgoing):
"Send RUN_EXPRESSION message on valuechange"
self.shared.queues.run_expressions_in_rule(outgoing)
# self.shared.queues.send_message_to_rule(
# self.shared.queues.MessageType.RUN_EXPRESSION, outgoing.rule, outgoing
# )
[docs] def statistics_update(self):
self._statistics_update_last_minute(0)
[docs] @classmethod
def update_source_instance_status(cls, source_instance, status_ok, oldnew_check):
""" Updates state on given source_instance
Returns True if source_instance have triggered a value change
"""
value = source_instance.get
stime = source_instance.source_time
return cls.update_source_instance_value(
source_instance, value, stime, status_ok, oldnew_check
)
[docs] @staticmethod
def update_source_instance_value(
source_instance, value, stime, status_ok, oldnew_check
):
""" Updates value, timestamp and state on given source_instance
Returns True if source_instance have triggered a value change
"""
prev_val = source_instance.get
prev_st = source_instance.status_code
if status_ok:
if oldnew_check:
if prev_val == value and prev_st in (
StatusCode.GOOD,
StatusCode.INITIAL,
):
return False
source_instance.get = value
source_instance.source_time = stime
if prev_st == StatusCode.NONE:
source_instance.status_code = StatusCode.INITIAL
else:
source_instance.status_code = StatusCode.GOOD
return True
else:
if oldnew_check:
if prev_val == value and prev_st == StatusCode.INVALID:
return False
source_instance.get = value
source_instance.source_time = stime
if prev_st != StatusCode.NONE:
source_instance.status_code = StatusCode.INVALID
return True