import logging
import time
import datetime
import subprocess
import shlex
from netdef.Controllers import BaseController, Controllers
from ..Sources.SubprocessSource import SubprocessSource
[docs]def stdout_from_terminal(command_as_str, err_msg=None):
command_args = shlex.split(command_as_str)
try:
res = subprocess.run(command_args, stdout=subprocess.PIPE).stdout
return str(res, errors="replace")
except Exception as error:
if err_msg is None:
return str(error)
else:
return err_msg
[docs]@Controllers.register("SubprocessController")
class SubprocessController(BaseController.BaseController):
"""
.. danger:: Development Status :: 3 - Alpha
"""
def __init__(self, name, shared):
super().__init__(name, shared)
self.logger = logging.getLogger(self.name)
self.logger.info("init")
self.value_as_args = self.shared.config.config(self.name, "value_as_args", 1)
self.interval_plan = None
self.oldnew = False
[docs] def run(self):
"Main loop. Will exit when receiving interrupt signal"
self.logger.info("Running")
self.loop_until_app_state_running()
self.interval_plan = self.setup_interval_plan()
for item in self.get_sources().values():
if item.has_initial_poll():
self.poll_outgoing_item(item)
while not self.has_interrupt():
if self.interval_plan.has_interval():
timeout, current_interval = self.interval_plan.next(time.time())
self.loop_incoming(until_empty=False, until_timeout=timeout) # dispatch handle_* functions
for item in self.get_sources().values():
if item.has_poll_interval():
if item.get_poll_interval() == current_interval:
self.poll_outgoing_item(item)
else:
self.loop_incoming()
self.logger.info("Stopped")
[docs] def setup_interval_plan(self):
interval_plan = NextInterval(time.time())
for source in self.get_sources().values():
if source.has_poll_interval():
pri = source.get_poll_interval()
if pri > 0:
interval_plan.add(pri)
return interval_plan
[docs] def handle_add_source(self, incoming):
self.logger.debug("'Add source' event for %s", incoming.key)
self.add_source(incoming.key, incoming)
def _verify_incoming(self, incoming):
if not self.has_source(incoming.key):
self.logger.error(
"%s not found",
incoming.key
)
return False
if not isinstance(incoming, SubprocessSource):
self.logger.error(
"Got write event for %s, but only SubprocessSource is supported",
type(incoming)
)
return False
return True
[docs] def handle_write_source(self, incoming, value, source_time):
self.logger.debug("'Write source' event to %s. value: %s at: %s", incoming.key, value, source_time)
if not self._verify_incoming(incoming):
return
if self.value_as_args:
cmd_as_str = incoming.get_command_and_args(value)
else:
cmd_as_str = incoming.get_command_and_args()
new_val = stdout_from_terminal(cmd_as_str)
stime = datetime.datetime.utcnow()
status_ok = True
if self.update_source_instance_value(incoming, new_val, stime, status_ok, self.oldnew):
self.send_outgoing(incoming)
[docs] def poll_outgoing_item(self, item):
self.logger.debug("'Poll source' %s.", item.key)
if not self._verify_incoming(item):
return
cmd_as_str = item.get_command_and_args()
response = stdout_from_terminal(cmd_as_str)
response = item.parse_stdout_response(response)
stime = datetime.datetime.utcnow()
status_ok = True
for sub_item in self.parse_response(response):
self.parse_item(sub_item)
if self.update_source_instance_value(item, response, stime, status_ok, self.oldnew):
self.send_outgoing(item)
[docs] def parse_response(self, response):
for parser in self.get_parsers():
if parser.can_unpack_subitems(response):
yield from parser.unpack_subitems(response)
[docs] def parse_item(self, item):
for parser in self.get_parsers():
if parser.can_unpack_value(item):
key, source_time, value = parser.unpack_value(item)
self.send_datachange(key, value, source_time, True)
[docs] def send_datachange(self, source_key, value, source_time, status_ok):
if not self.has_source(source_key):
return
if not source_time:
source_time = datetime.datetime.utcnow()
item = self.get_source(source_key)
if self.update_source_instance_value(
item, value, source_time, status_ok, self.oldnew
):
self.send_outgoing(item)
[docs]class NextInterval:
"Call next() to retrieve seconds to next interval, and which interval it is"
__slots__ = ["spans", "start"]
def __init__(self, timestamp):
self.spans = []
self.start = timestamp
[docs] def has_interval(self):
return True if self.spans else False
[docs] def add(self, interval):
new_span = [self.start + interval, interval]
if not new_span in self.spans:
self.spans.append(new_span)
[docs] def next(self, now):
okay_then = min(self.spans)
when, what = okay_then
if when < now:
when = now
okay_then[0] = when + what
return when - now, what