Source code for netdef.Controllers.CrontabController

import datetime
import logging
import time

import crontab
from netdef.Controllers import BaseController, Controllers
from netdef.Sources.BaseSource import StatusCode

log = logging.getLogger(__name__)
log.debug("Loading module")


[docs]@Controllers.register("CrontabController") class CrontabController(BaseController.BaseController): """ .. tip:: Development Status :: 5 - Production/Stable """ def __init__(self, name, shared): super().__init__(name, shared) log.info("init") self.send_inital_value = self.shared.config.config( self.name, "send_inital_value", 0 )
[docs] def run(self): "Main loop. Will exit when receiving interrupt signal" log.info("Running") while not self.has_interrupt(): self.loop_incoming() # dispatch handle_* functions self.loop_outgoing() # dispatch poll_* functions time.sleep(0.1) log.info("Stopped")
def handle_add_source(self, incoming): self.add_source(incoming.key, incoming) def handle_write_source(self, incoming, value): pass # log.debug("Write source event %s %s", incoming.key, value)
[docs] def poll_outgoing_item(self, item): """ Check if it is time to trigger event for given source :param item: source instance to check """ if item.source == "CrontabSource": now = datetime.datetime.utcnow() future = crontab.CronTab(item.key).next(default_utc=True, delta=False) future = datetime.datetime.utcfromtimestamp(future) prev_val = item.get prev_st = item.status_code if prev_st == StatusCode.NONE: item.status_code = StatusCode.INITIAL item.get = future item.source_time = now if self.send_inital_value: log.debug("Initial %s %s", item.key, future) self.send_outgoing(item) else: if now >= prev_val: log.debug("next %s at %s now: %s", item.key, future, now) item.status_code = StatusCode.GOOD item.get = future item.source_time = now self.send_outgoing(item)