netdef.Sources package¶
Sources¶
-
netdef.Sources.Sources.
register
(name, classref=None)[source]¶ A decorator to register sources. Example:
from netdef.Sources import BaseSource, Sources @Sources.register("NewSourceTemplate") class NewSourceTemplate(BaseSource.BaseSource): def __init__(self, name, shared): ...
Can also be called as a normal function:
from netdef.Sources import BaseSource, Sources def setup(shared): Sources.register("NewSourceTemplate", NewSourceTemplate) class NewSourceTemplate(BaseSource.BaseSource): def __init__(self, name, shared): ...
Parameters: - name (str) – Name of the source class
- classref (object) – Should be None if used as a decorator and a class if called as a function
Returns: A callable that returns a class if used as a decorator and a class if called as a normal function
Abstract base¶
BaseSource¶
-
class
netdef.Sources.BaseSource.
BaseSource
(key=None, value=None, controller=None, source=None, rule=None)[source]¶ Bases:
object
-
can_set_value_from_string
()[source]¶ Returns True if the value can be converted from string to its given datatype. Only
builtins.int
,str
andfloat
have built-in support, but aditional types can be implemented by this function andset_value_from_string
-
static
can_unpack_subitems
(value)[source]¶ Function that confirms / decides on input data a known list. If so, then unpack_subitems can be used afterwards.
Example:
def parse_response(self, response): for parser in self.get_parsers(): if parser.can_unpack_subitems(response): yield from parser.unpack_subitems(response)
-
static
can_unpack_value
(value)[source]¶ Function that confirms / determines if the input data is compatible with this class. If so, unpack_value should be used afterwards.
Example:
def parse_item(self, item): for parser in self.get_parsers(): if parser.can_unpack_value(item): key, source_time, value = parser.unpack_value(item) self.send_datachange(key, source_time, value)
-
get
¶ Get the value that is updated by the controller
-
get_reference
()[source]¶ Used to identify similar sources. if two instances return the same reference this means that one instance is redundant and can be replaced
-
pack_add_source
()[source]¶ Used if source must be added to external system. I.e. a subscription. Can be overridden and customized.
-
pack_value
(value)[source]¶ Function that converts key and values into a format that the source uses. Can be overridden and adapted to the controller it is to be used in.
Example:
def handle_write_source(self, incoming, value, source_time): data = incoming.pack_value(value, source_time) topic, payload = incoming.make_message(incoming.key, data) self.publish_data_item(topic, payload)
-
register_set_callback
(set_callback)[source]¶ Register the callback that sends WRITE_SOURCE message to the controller queue.
-
set
¶ Get the value that is updated by expressions
-
set_value_from_string
(value, stime=None, status_ok=True, origin='')[source]¶ Converts given value to correct datatype and sends a WRITE_SOURCE message to controller.
This function is called when a value change is triggered from
Parameters: - value – value to be set
- or datetime.datetime) stime ((None) – timestamp when the value was changed
- status_ok (bool) – True if value is good
- origin (str) – who set the value
-
static
unpack_subitems
(value)[source]¶ Function that parses response from source and yield items found in value. This can be overridden and adapted to the controller it is to be used in.
Example:
def parse_response(self, response): for parser in self.get_parsers(): if parser.can_unpack_subitems(response): yield from parser.unpack_subitems(response)
-
static
unpack_value
(key, source_time, value)[source]¶ Function that parses response from source and returns following tuple: (key, source_time, value) Key can then be used to find the right instance and update values.
Can be overridden and adapted to the controller it is to be used in.
Returns: tuple(key, source_time, value) Return type: tuple Example:
def parse_item(self, item): for parser in self.get_parsers(): if parser.can_unpack_value(item): key, source_time, value = parser.unpack_value(item) self.send_datachange(key, source_time, value)
-
value_as_string
¶ Is primarily used by web interfaces to display value in table. Can be overridden to limit the display of large data. Example:
@property def value_as_string(self): if self.value and isinstance(self.value, bytes): n = len(self.value) return "<{}...><data len:{}>".format(self.value[:10], n) else: return super().value_as_string
-
-
class
netdef.Sources.BaseSource.
StatusCode
[source]¶ Bases:
enum.Enum
Used to indicate the quality of a value in BaseSource.status_code
NONE: Value is not set yet. INITIAL: First value. you might have to update cashes with this value at application startup. GOOD: A normal value update. INVALID: A value update where the value is not to be trusted.
-
GOOD
= 2¶
-
INITIAL
= 1¶
-
INVALID
= 3¶
-
NONE
= 0¶
-
Built-in Interfaces¶
BytestringSource¶
-
class
netdef.Sources.BytestringSource.
BytestringSource
(*args, **kwargs)[source]¶ Bases:
netdef.Sources.BaseSource.BaseSource
-
value_as_string
¶ byte data as string
-
CommTestSource¶
ConcurrentWebRequestSource¶
-
class
netdef.Sources.ConcurrentWebRequestSource.
ConcurrentWebRequestSource
(*args, **kwargs)[source]¶ Bases:
netdef.Sources.BaseSource.BaseSource
-
DEFAULT_CLIENT_SESSION_TIMEOUT
= 2¶
-
CrontabSource¶
DictSource¶
FloatSource¶
InfluxDBLoggerSource¶
-
class
netdef.Sources.InfluxDBLoggerSource.
InfluxDBLoggerSource
(*args, **kwargs)[source]¶ Bases:
netdef.Sources.BaseSource.BaseSource
A dataholder class to be used with
InfluxDBLoggerController
-
get_points
(data, source_time, status_code)[source]¶ Returns a list suitable as argument for
InfluxDBClient.write_points()
Parameters: - data (InfluxDBLoggerInterface.Value) – an object with data to store in influxdb
- source_time (datetime.datetime) – measurement time
- status_code (BaseSource.StatusCode) – measurement field.status_code
Returns: a list of dicts
-
static
make_points
(interface, measurement, value, source_time, status_code)[source]¶ Make a list suitable as argument for
InfluxDBClient.write_points()
Parameters: - interface (BaseSource, InfluxDBLoggerInterface) – an object with key, rule, source an controller attrs
- measurement (str) – influxdb measurement name
- value – measurement field.value
- source_time (datetime.datetime) – measurement time
- status_code (BaseSource.StatusCode) – measurement field.status_code
Returns: a list of dicts
-
IntegerSource¶
InternalSource¶
-
class
netdef.Sources.InternalSource.
InternalSource
(*args, **kwargs)[source]¶ Bases:
netdef.Sources.DictSource.DictSource
-
static
can_unpack_value
(value)[source]¶ Function that confirms / determines if the input data is compatible with this class. If so, unpack_value should be used afterwards.
Example:
def parse_item(self, item): for parser in self.get_parsers(): if parser.can_unpack_value(item): key, source_time, value = parser.unpack_value(item) self.send_datachange(key, source_time, value)
-
pack_value
(value)[source]¶ Function that converts key and values into a format that the source uses. Can be overridden and adapted to the controller it is to be used in.
Example:
def handle_write_source(self, incoming, value, source_time): data = incoming.pack_value(value, source_time) topic, payload = incoming.make_message(incoming.key, data) self.publish_data_item(topic, payload)
-
static
unpack_value
(value)[source]¶ Function that parses response from source and returns following tuple: (key, source_time, value) Key can then be used to find the right instance and update values.
Can be overridden and adapted to the controller it is to be used in.
Returns: tuple(key, source_time, value) Return type: tuple Example:
def parse_item(self, item): for parser in self.get_parsers(): if parser.can_unpack_value(item): key, source_time, value = parser.unpack_value(item) self.send_datachange(key, source_time, value)
-
static
MQTTDataMessageSource¶
-
class
netdef.Sources.MQTTDataMessageSource.
MQTTDataMessageSource
(*args, **kwargs)[source]¶ Bases:
netdef.Sources.BaseSource.BaseSource
-
static
make_message
(topic, datamessage)[source]¶ Wraps given datamessage into a json-payload
Parameters: - topic (str) – mqtt topic
- datamessage (DataMessage) – a datamessage object
Returns: tuple of topic and json payload
Return type: tuple
-
static
SubprocessSource¶
-
class
netdef.Sources.SubprocessSource.
SubprocessSource
(*args, **kwargs)[source]¶ Bases:
netdef.Sources.BaseSource.BaseSource
-
DEFAULT_INTERVAL
= 10¶
-
SystemMonitorSource¶
-
class
netdef.Sources.SystemMonitorSource.
SystemMonitorByteSource
(*args, **kwargs)[source]¶ Bases:
netdef.Sources.SystemMonitorSource.SystemMonitorSource
-
class
netdef.Sources.SystemMonitorSource.
SystemMonitorPercentSource
(*args, **kwargs)[source]¶ Bases:
netdef.Sources.SystemMonitorSource.SystemMonitorSource