netdef.Sources package¶
Sources¶
-
netdef.Sources.Sources.register(name, classref=None)[source]¶ A decorator to register sources. Example:
from netdef.Sources import BaseSource, Sources @Sources.register("NewSourceTemplate") class NewSourceTemplate(BaseSource.BaseSource): def __init__(self, name, shared): ...
Can also be called as a normal function:
from netdef.Sources import BaseSource, Sources def setup(shared): Sources.register("NewSourceTemplate", NewSourceTemplate) class NewSourceTemplate(BaseSource.BaseSource): def __init__(self, name, shared): ...
Parameters: - name (str) – Name of the source class
- classref (object) – Should be None if used as a decorator and a class if called as a function
Returns: A callable that returns a class if used as a decorator and a class if called as a normal function
Abstract base¶
BaseSource¶
-
class
netdef.Sources.BaseSource.BaseSource(key=None, value=None, controller=None, source=None, rule=None)[source]¶ Bases:
object-
can_set_value_from_string()[source]¶ Returns True if the value can be converted from string to its given datatype. Only
builtins.int,strandfloathave built-in support, but aditional types can be implemented by this function andset_value_from_string
-
static
can_unpack_subitems(value)[source]¶ Function that confirms / decides on input data a known list. If so, then unpack_subitems can be used afterwards.
Example:
def parse_response(self, response): for parser in self.get_parsers(): if parser.can_unpack_subitems(response): yield from parser.unpack_subitems(response)
-
static
can_unpack_value(value)[source]¶ Function that confirms / determines if the input data is compatible with this class. If so, unpack_value should be used afterwards.
Example:
def parse_item(self, item): for parser in self.get_parsers(): if parser.can_unpack_value(item): key, source_time, value = parser.unpack_value(item) self.send_datachange(key, source_time, value)
-
get¶ Get the value that is updated by the controller
-
get_reference()[source]¶ Used to identify similar sources. if two instances return the same reference this means that one instance is redundant and can be replaced
-
pack_add_source()[source]¶ Used if source must be added to external system. I.e. a subscription. Can be overridden and customized.
-
pack_value(value)[source]¶ Function that converts key and values into a format that the source uses. Can be overridden and adapted to the controller it is to be used in.
Example:
def handle_write_source(self, incoming, value, source_time): data = incoming.pack_value(value, source_time) topic, payload = incoming.make_message(incoming.key, data) self.publish_data_item(topic, payload)
-
register_set_callback(set_callback)[source]¶ Register the callback that sends WRITE_SOURCE message to the controller queue.
-
set¶ Get the value that is updated by expressions
-
set_value_from_string(value, stime=None, status_ok=True, origin='')[source]¶ Converts given value to correct datatype and sends a WRITE_SOURCE message to controller.
This function is called when a value change is triggered from
Parameters: - value – value to be set
- or datetime.datetime) stime ((None) – timestamp when the value was changed
- status_ok (bool) – True if value is good
- origin (str) – who set the value
-
static
unpack_subitems(value)[source]¶ Function that parses response from source and yield items found in value. This can be overridden and adapted to the controller it is to be used in.
Example:
def parse_response(self, response): for parser in self.get_parsers(): if parser.can_unpack_subitems(response): yield from parser.unpack_subitems(response)
-
static
unpack_value(key, source_time, value)[source]¶ Function that parses response from source and returns following tuple: (key, source_time, value) Key can then be used to find the right instance and update values.
Can be overridden and adapted to the controller it is to be used in.
Returns: tuple(key, source_time, value) Return type: tuple Example:
def parse_item(self, item): for parser in self.get_parsers(): if parser.can_unpack_value(item): key, source_time, value = parser.unpack_value(item) self.send_datachange(key, source_time, value)
-
value_as_string¶ Is primarily used by web interfaces to display value in table. Can be overridden to limit the display of large data. Example:
@property def value_as_string(self): if self.value and isinstance(self.value, bytes): n = len(self.value) return "<{}...><data len:{}>".format(self.value[:10], n) else: return super().value_as_string
-
-
class
netdef.Sources.BaseSource.StatusCode[source]¶ Bases:
enum.EnumUsed to indicate the quality of a value in BaseSource.status_code
NONE: Value is not set yet. INITIAL: First value. you might have to update cashes with this value at application startup. GOOD: A normal value update. INVALID: A value update where the value is not to be trusted.
-
GOOD= 2¶
-
INITIAL= 1¶
-
INVALID= 3¶
-
NONE= 0¶
-
Built-in Interfaces¶
BytestringSource¶
-
class
netdef.Sources.BytestringSource.BytestringSource(*args, **kwargs)[source]¶ Bases:
netdef.Sources.BaseSource.BaseSource-
value_as_string¶ byte data as string
-
CommTestSource¶
ConcurrentWebRequestSource¶
-
class
netdef.Sources.ConcurrentWebRequestSource.ConcurrentWebRequestSource(*args, **kwargs)[source]¶ Bases:
netdef.Sources.BaseSource.BaseSource-
DEFAULT_CLIENT_SESSION_TIMEOUT= 2¶
-
CrontabSource¶
DictSource¶
FloatSource¶
InfluxDBLoggerSource¶
-
class
netdef.Sources.InfluxDBLoggerSource.InfluxDBLoggerSource(*args, **kwargs)[source]¶ Bases:
netdef.Sources.BaseSource.BaseSourceA dataholder class to be used with
InfluxDBLoggerController-
get_points(data, source_time, status_code)[source]¶ Returns a list suitable as argument for
InfluxDBClient.write_points()Parameters: - data (InfluxDBLoggerInterface.Value) – an object with data to store in influxdb
- source_time (datetime.datetime) – measurement time
- status_code (BaseSource.StatusCode) – measurement field.status_code
Returns: a list of dicts
-
static
make_points(interface, measurement, value, source_time, status_code)[source]¶ Make a list suitable as argument for
InfluxDBClient.write_points()Parameters: - interface (BaseSource, InfluxDBLoggerInterface) – an object with key, rule, source an controller attrs
- measurement (str) – influxdb measurement name
- value – measurement field.value
- source_time (datetime.datetime) – measurement time
- status_code (BaseSource.StatusCode) – measurement field.status_code
Returns: a list of dicts
-
IntegerSource¶
InternalSource¶
-
class
netdef.Sources.InternalSource.InternalSource(*args, **kwargs)[source]¶ Bases:
netdef.Sources.DictSource.DictSource-
static
can_unpack_value(value)[source]¶ Function that confirms / determines if the input data is compatible with this class. If so, unpack_value should be used afterwards.
Example:
def parse_item(self, item): for parser in self.get_parsers(): if parser.can_unpack_value(item): key, source_time, value = parser.unpack_value(item) self.send_datachange(key, source_time, value)
-
pack_value(value)[source]¶ Function that converts key and values into a format that the source uses. Can be overridden and adapted to the controller it is to be used in.
Example:
def handle_write_source(self, incoming, value, source_time): data = incoming.pack_value(value, source_time) topic, payload = incoming.make_message(incoming.key, data) self.publish_data_item(topic, payload)
-
static
unpack_value(value)[source]¶ Function that parses response from source and returns following tuple: (key, source_time, value) Key can then be used to find the right instance and update values.
Can be overridden and adapted to the controller it is to be used in.
Returns: tuple(key, source_time, value) Return type: tuple Example:
def parse_item(self, item): for parser in self.get_parsers(): if parser.can_unpack_value(item): key, source_time, value = parser.unpack_value(item) self.send_datachange(key, source_time, value)
-
static
MQTTDataMessageSource¶
-
class
netdef.Sources.MQTTDataMessageSource.MQTTDataMessageSource(*args, **kwargs)[source]¶ Bases:
netdef.Sources.BaseSource.BaseSource-
static
make_message(topic, datamessage)[source]¶ Wraps given datamessage into a json-payload
Parameters: - topic (str) – mqtt topic
- datamessage (DataMessage) – a datamessage object
Returns: tuple of topic and json payload
Return type: tuple
-
static
SubprocessSource¶
-
class
netdef.Sources.SubprocessSource.SubprocessSource(*args, **kwargs)[source]¶ Bases:
netdef.Sources.BaseSource.BaseSource-
DEFAULT_INTERVAL= 10¶
-
SystemMonitorSource¶
-
class
netdef.Sources.SystemMonitorSource.SystemMonitorByteSource(*args, **kwargs)[source]¶ Bases:
netdef.Sources.SystemMonitorSource.SystemMonitorSource
-
class
netdef.Sources.SystemMonitorSource.SystemMonitorPercentSource(*args, **kwargs)[source]¶ Bases:
netdef.Sources.SystemMonitorSource.SystemMonitorSource