Source code for netdef.Controllers.MQTTDataMessageController

import logging
import uuid

import paho.mqtt.client as mqtt
from netdef.Controllers import BaseController, Controllers
from netdef.Sources.BaseSource import StatusCode

# import my supported sources
from ..Sources.MQTTDataMessageSource import MQTTDataMessageSource


[docs]@Controllers.register("MQTTDataMessageController") class MQTTDataMessageController(BaseController.BaseController): """ .. danger:: Development Status :: 3 - Alpha """ def __init__(self, name, shared): super().__init__(name, shared) self.logger = logging.getLogger(self.name) self.logger.info("init") config = self.shared.config.config self.topic_prefix = config(self.name, "topic_prefix", "NetdefDataMessage/") self.host = config(self.name, "host", "127.0.0.1") self.port = config(self.name, "port", 1883) self.keepalive = config(self.name, "keepalive", 60) self.origin = uuid.uuid1().urn self.origin = config(self.name, "origin", self.origin) self.ignore_origin = config(self.name, "ignore_messages_from_origin", 1) self.publish_qos = config(self.name, "publish_qos", 0) self.publish_retain = bool(config(self.name, "publish_retain", 1)) self.subscribe_list = config( self.name, "subscribe_topics", "{}_subscribe_topics".format(self.name) ) self.subscribe_topics = [ topic for topic in self.shared.config.get_dict(self.subscribe_list).values() ] self.subscribe_to_prefix = config( self.name, "subscribe_to_prefix", self.ignore_origin ) if self.subscribe_to_prefix: self.subscribe_topics.append("#") self.client = mqtt.Client() self.client.on_connect = self.on_connect self.client.on_disconnect = self.on_disconnect self.client.on_message = self.on_message
[docs] def get_topic(self, topic): return "{}{}".format(self.topic_prefix, topic)
[docs] def get_key(self, topic): if topic.find(self.topic_prefix) == 0: return topic[len(self.topic_prefix) :] return topic
[docs] def mqtt_connect(self): self.client.connect(self.host, self.port, self.keepalive)
[docs] def mqtt_safe_disconnect(self): self.client.disconnect()
[docs] def on_connect(self, client, userdata, flags, rc): self.logger.debug("Connected with result code %s", rc) for topic in self.subscribe_topics: client.subscribe(self.get_topic(topic)) self.logger.info("subscribe to %s", self.get_topic(topic))
[docs] def on_disconnect(self, client, userdata, rc): self.logger.debug("Disconnected with result code %s", rc)
[docs] def on_message(self, client, userdata, msg): self.logger.debug("%s %s ", msg.topic, msg.payload) item_key = self.get_key(msg.topic) if self.has_source(item_key): item = self.get_source(item_key) try: item_key, data = item.parse_message(item_key, msg.payload) if item.can_unpack_value(data): key, stime, value, status_code, origin = item.unpack_value(data) assert item_key == key status_ok = isinstance(status_code, int) and status_code > 0 if self.update_source_instance_value( item, value, stime, status_ok, False ): if not self.ignore_origin or (origin != self.origin): self.send_outgoing(item) except Exception as error: self.logger.error("could not parse payload of topic %s", msg.topic)
[docs] def loop_mqtt(self): rc = self.client.loop(timeout=1.0) # if rc != mqtt.MQTT_ERR_SUCCESS: if rc == mqtt.MQTT_ERR_CONN_LOST: raise OSError(mqtt.error_string(rc))
[docs] def run(self): "Main loop. Will exit when receiving interrupt signal" self.logger.info("Running") can_reconnect = False reconnect_timeout = 0 while not self.has_interrupt(): self.sleep(reconnect_timeout) reconnect_timeout = self.shared.config.config( self.name, "reconnect_timeout", 20 ) try: if can_reconnect: self.mqtt_safe_disconnect() self.mqtt_connect() can_reconnect = True while not self.has_interrupt(): self.loop_incoming() # dispatch handle_* functions self.loop_mqtt() except OSError as error: self.logger.debug("Exception: %s", error) self.logger.error( "Connection error. Reconnect in %s sec.", reconnect_timeout ) self.statistics_update() self.logger.info("Stopped")
[docs] def publish_data_item(self, topic, payload): self.client.publish( self.get_topic(topic), payload=payload, qos=self.publish_qos, retain=self.publish_retain, )
[docs] def handle_add_source(self, incoming): self.logger.debug("'Add source' event for %s", incoming.key) self.add_source(incoming.key, incoming)
[docs] def handle_write_source(self, incoming, value, source_time): self.logger.debug( "'Write source' event to %s. value: %s at: %s", incoming.key, value, source_time, ) data = incoming.pack_value(value, source_time, 1, self.origin) topic, payload = incoming.make_message(incoming.key, data) self.publish_data_item(topic, payload)