Source code for netdef.Controllers.ZmqDataAccessController

import datetime
import logging

import zmq
from netdef.Controllers import BaseController, Controllers
from netdef.Sources.BaseSource import StatusCode

# import my supported sources
from ..Sources.ZmqDataAccessSource import ZmqDataAccessSource

# this controller is in development, do not use it yet.

[docs]@Controllers.register("ZmqDataAccessController") class ZmqDataAccessController(BaseController.BaseController): """ .. danger:: Development Status :: 3 - Alpha """ def __init__(self, name, shared): super().__init__(name, shared) self.logger = logging.getLogger("init") config = self.shared.config.config self.topic = b"" # config(, "topic", "dist") self.publish_url = config(, "publish_url", "tcp://") self.subscribe_list = config(, "subscribe_urls", "{}_subscribe_urls".format( ) self.subscribe_urls = [ url for url in self.shared.config.get_dict(self.subscribe_list).values() ] self.context = zmq.Context() = self.context.socket(zmq.XPUB) self.sub = self.context.socket(zmq.SUB) self.poller = zmq.Poller()
[docs] def connect(self):"listening to %s", self.publish_url) for url in self.subscribe_urls: self.sub.connect(url)"connecting to: %s", url) self.sub.setsockopt(zmq.SUBSCRIBE, self.topic), 1) self.poller.register(, zmq.POLLIN) self.poller.register(self.sub, zmq.POLLIN)
[docs] def loop_subscribers(self): try: events = {"first": 1} while events and not self.has_interrupt(): events = dict(self.poller.poll(10)) if self.sub in events: vlist = self.sub.recv_pyobj(flags=zmq.NOBLOCK) for val in vlist: item_key, value, stime = val self.logger.debug("SUB RECV: %s %s %s", item_key, value, stime) if self.has_source(item_key): item = self.get_source(item_key) if self.update_source_instance_value( item, value, stime, True, False ): self.send_outgoing(item) elif in events: event = self.logger.debug("PUB RECV %s", event) if event[0] == 1: # subscribe-event rep = tuple( (item.key, item.value, item.source_time) for item in self.get_sources().values() ) except zmq.ZMQError as error: if error.errno == 11: pass else: self.logger.exception(error)
[docs] def run(self): "Main loop. Will exit when receiving interrupt signal""Running") self.connect() while not self.has_interrupt(): self.loop_incoming() # dispatch handle_* functions self.loop_subscribers() # dispatch poll_* functions"Stopped")
[docs] def handle_add_source(self, incoming): self.logger.debug("'Add source' event for %s", incoming.key) self.add_source(incoming.key, incoming)
[docs] def handle_write_source(self, incoming, value, source_time): self.logger.debug( "'Write source' event to %s. value: %s at: %s", incoming.key, value, source_time, ), value, source_time),), flags=zmq.NOBLOCK)