Source code for netdef.Controllers.ZmqDataAccessController

import datetime
import logging

import zmq
from netdef.Controllers import BaseController, Controllers
from netdef.Sources.BaseSource import StatusCode

# import my supported sources
from ..Sources.ZmqDataAccessSource import ZmqDataAccessSource

# this controller is in development, do not use it yet.


[docs]@Controllers.register("ZmqDataAccessController") class ZmqDataAccessController(BaseController.BaseController): """ .. danger:: Development Status :: 3 - Alpha """ def __init__(self, name, shared): super().__init__(name, shared) self.logger = logging.getLogger(self.name) self.logger.info("init") config = self.shared.config.config self.topic = b"" # config(self.name, "topic", "dist") self.publish_url = config(self.name, "publish_url", "tcp://127.0.0.1:5556") self.subscribe_list = config( self.name, "subscribe_urls", "{}_subscribe_urls".format(self.name) ) self.subscribe_urls = [ url for url in self.shared.config.get_dict(self.subscribe_list).values() ] self.context = zmq.Context() self.pub = self.context.socket(zmq.XPUB) self.sub = self.context.socket(zmq.SUB) self.poller = zmq.Poller()
[docs] def connect(self): self.pub.bind(self.publish_url) self.logger.info("listening to %s", self.publish_url) for url in self.subscribe_urls: self.sub.connect(url) self.logger.info("connecting to: %s", url) self.sub.setsockopt(zmq.SUBSCRIBE, self.topic) self.pub.setsockopt(zmq.XPUB_VERBOSE, 1) self.poller.register(self.pub, zmq.POLLIN) self.poller.register(self.sub, zmq.POLLIN)
[docs] def loop_subscribers(self): try: events = {"first": 1} while events and not self.has_interrupt(): events = dict(self.poller.poll(10)) if self.sub in events: vlist = self.sub.recv_pyobj(flags=zmq.NOBLOCK) for val in vlist: item_key, value, stime = val self.logger.debug("SUB RECV: %s %s %s", item_key, value, stime) if self.has_source(item_key): item = self.get_source(item_key) if self.update_source_instance_value( item, value, stime, True, False ): self.send_outgoing(item) elif self.pub in events: event = self.pub.recv(flags=zmq.NOBLOCK) self.logger.debug("PUB RECV %s", event) if event[0] == 1: # subscribe-event rep = tuple( (item.key, item.value, item.source_time) for item in self.get_sources().values() ) self.pub.send_pyobj(rep) except zmq.ZMQError as error: if error.errno == 11: pass else: self.logger.exception(error)
[docs] def run(self): "Main loop. Will exit when receiving interrupt signal" self.logger.info("Running") self.connect() while not self.has_interrupt(): self.loop_incoming() # dispatch handle_* functions self.loop_subscribers() # dispatch poll_* functions self.logger.info("Stopped")
[docs] def handle_add_source(self, incoming): self.logger.debug("'Add source' event for %s", incoming.key) self.add_source(incoming.key, incoming)
[docs] def handle_write_source(self, incoming, value, source_time): self.logger.debug( "'Write source' event to %s. value: %s at: %s", incoming.key, value, source_time, ) self.pub.send_pyobj(((incoming.key, value, source_time),), flags=zmq.NOBLOCK)