import datetime
import json
import logging
from netdef.Interfaces.datamessage import DataDefinition, DataMessage
from netdef.Interfaces.DefaultInterface import DefaultInterface
from netdef.Sources import BaseSource, Sources
log = logging.getLogger(__name__)
log.debug("Loading module")
[docs]@Sources.register("MQTTDataMessageSource")
class MQTTDataMessageSource(BaseSource.BaseSource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
log.debug("init %s", self.key)
self.interface = DefaultInterface
[docs] @staticmethod
def make_message(topic, datamessage):
"""
Wraps given datamessage into a json-payload
:param str topic: mqtt topic
:param DataMessage datamessage: a datamessage object
:return: tuple of topic and json payload
:rtype: tuple
"""
payload = datamessage.to_json()
return topic, payload
[docs] @staticmethod
def parse_message(topic, payload):
"""
Parse given json-payload into a datamessage object
:param str topic: mqtt topic
:param str payload: json payload
:return: a :class:`DataMessage` object
:rtype: DataMessage
"""
if isinstance(payload, bytes):
payload = str(payload, "utf8")
datamessage = DataMessage.from_json(payload)
return topic, datamessage
[docs] def pack_value(self, value, stime, status_code, origin):
"pack the value and stime into a mqtt payload"
payload = DataMessage(
key=self.key,
value=value,
source_time=stime.timestamp(),
status_code=status_code,
origin=origin,
extension={},
)
return payload
[docs] @staticmethod
def can_unpack_value(value):
"Check if it is possible to extract a value from the payload"
if isinstance(value, DataMessage):
return True
return False
[docs] @staticmethod
def unpack_value(value):
"""
Return a tuple with key, time and value from the mqtt payload
:param DataMessage value: datamessage from mqtt payload
:returns: tuple(key, source_time, value, origin)
:rtype: tuple
"""
source_time = datetime.datetime.utcfromtimestamp(value.source_time)
return value.key, source_time, value.value, value.status_code, value.origin