import datetime
import hashlib
import logging
import os
import re
import time
from ..Sources.BaseSource import StatusCode
from . import BaseController, Controllers
[docs]@Controllers.register("InternalController")
class InternalController(BaseController.BaseController):
"""
.. tip:: Development Status :: 5 - Production/Stable
Internal variables that works just like any other value from a controller.
Can trigger events on valuechanges. State can be cached to disk.
:param str name: The name is used i logfile and default.ini
:param Shared shared: Instance of applications shared object.
**Configuration**
.. code-block:: ini
[InternalController]
send_init_event = 0
send_events = 0
persistent_value = 0
key_in_filename = 0
Options
* **send_init_event** -- trigger RUN_EXPRESSION with StatusCode.INITIAL
for every source at startup
* **send_events** -- trigger a RUN_EXPRESSION message for every
WRITE_SOURCE message
* **persistent_value** -- store values to disk
* **key_in_filename** -- use source key as prefix in filename for
persistent storage
**Sequence diagram**
.. seqdiag::
seqdiag app{
activation = none;
default_note_color = LemonChiffon;
span_height = 12;
edge_length = 200;
Queue [color=LemonChiffon];
Controller [label=InternalController,color=LemonChiffon];
=== Initialization ===
Queue -> Controller [label="APP_STATE, SETUP"]
=== Setup ===
Queue -> Controller [label="ADD_SOURCE, source [n]"]
Queue -> Controller [label="APP_STATE, RUNNING"]
=== Running ===
=== Begin loop ===
Queue -> Controller [
label="
WRITE_SOURCE,
source [n], value, timestamp",
note="
Update value of
source [n]"
]
... ...
Controller -> Queue [
label="RUN_EXPRESSION, source [n]",
note="
Value change
in source [n]"
]
=== End Loop ===
}
"""
def __init__(self, name, shared):
super().__init__(name, shared)
self.logger = logging.getLogger(self.name)
self.logger.info("init")
config = self.shared.config.config
self.send_events = config(self.name, "send_events", 0)
self.send_init_event = config(self.name, "send_init_event", 0)
self.persistent_value = config(self.name, "persistent_value", 0)
self.key_in_filename = config(self.name, "key_in_filename", 0)
# cyclic storage
self.next_write_delta = 60
self.next_write = time.time() + self.next_write_delta
self.utcnow = datetime.datetime.utcnow()
# filename prefix
self.prefix = re.compile(r"[^\w \-\.]")
[docs] def run(self):
"Main loop. Will exit when receiving interrupt signal"
self.logger.info("Running")
while not self.has_interrupt():
# dispatch handle_* functions
self.loop_incoming()
if time.time() > self.next_write:
self.next_write = time.time() + self.next_write_delta
# dispatch poll_* functions
self.loop_outgoing()
self.utcnow = datetime.datetime.utcnow()
if self.persistent_value:
self.logger.info("Write cache to disk ...")
self.store_to_disk()
self.logger.info("Stopped")
[docs] def get_cache_filename(self, key):
"""
Generate sha256 hash to be used as filename.
If config key_in_filename=1 then key will be prefixed to the
hexdigest. Valid characters: a-z A-Z 0-9 _.-
:param str key: string to encode
:return: filename
"""
if self.key_in_filename:
prefix = self.prefix.sub("_", key) + "."
else:
prefix = ""
return (
prefix
+ hashlib.sha256(key.encode("utf8", errors="ignore")).hexdigest()
+ ".db"
)
[docs] def store_to_disk(self, item=None):
"""
Store sources into files at [proj-path]/db/internal/
"""
cache_dir = os.path.join("db", "internal")
if not os.path.isdir(cache_dir):
os.makedirs(cache_dir)
for source in self.get_sources().values():
if item is None or (item is source):
if source.can_unpack_value(""):
data = source.pack_value(source.value)
cache = os.path.join(cache_dir, self.get_cache_filename(source.key))
self.logger.debug("write %s", cache)
with open(cache, "wb") as f:
f.write(data)
[docs] def handle_add_source(self, incoming):
"""
Add given source instance to internal source list
:param InternalSource incoming: source instance
"""
self.logger.debug("'Add source' event for %s", incoming.key)
self.add_source(incoming.key, incoming)
init_value = {}
if self.persistent_value:
cache = os.path.join(
"db", "internal", self.get_cache_filename(incoming.key)
)
if os.path.isfile(cache) and incoming.can_unpack_value(""):
with open(cache, "rb") as f:
data = f.read()
init_value = incoming.unpack_value(data)
if not self.send_events:
incoming.status_code = StatusCode.INITIAL
incoming.get = init_value
incoming.source_time = datetime.datetime.utcnow()
if self.send_init_event:
incoming.status_code = StatusCode.INITIAL
incoming.get = init_value
incoming.source_time = datetime.datetime.utcnow()
self.send_outgoing(incoming)
[docs] def handle_write_source(self, incoming, value, source_time):
"""
Update internal dict with new value.
:param InternalSource incoming: source instance
:param value: frozen value of instance
:param datetime.datetime source_time: value timestamp
"""
incoming.get = value
incoming.source_time = source_time
prev_st = incoming.status_code
if prev_st == StatusCode.NONE:
incoming.status_code = StatusCode.INITIAL
else:
incoming.status_code = StatusCode.GOOD
if self.send_events:
self.send_outgoing(incoming)
[docs] def poll_outgoing_item(self, item):
"""
Check if given source should be cached to disk.
:param InternalSource item: source instance
"""
if self.persistent_value and item.source_time:
if item.source_time >= self.utcnow:
self.store_to_disk(item)