netdef.Sources package

Sources

class netdef.Sources.Sources.Sources(shared=None)[source]

Bases: object

add_shared_object(shared)[source]
init()[source]
load(base_packages)[source]
netdef.Sources.Sources.register(name, classref=None)[source]

A decorator to register sources. Example:

from netdef.Sources import BaseSource, Sources

@Sources.register("NewSourceTemplate")
class NewSourceTemplate(BaseSource.BaseSource):
    def __init__(self, name, shared):
        ...

Can also be called as a normal function:

from netdef.Sources import BaseSource, Sources

def setup(shared):
    Sources.register("NewSourceTemplate", NewSourceTemplate)

class NewSourceTemplate(BaseSource.BaseSource):
    def __init__(self, name, shared):
        ...
Parameters:
  • name (str) – Name of the source class
  • classref (object) – Should be None if used as a decorator and a class if called as a function
Returns:

A callable that returns a class if used as a decorator and a class if called as a normal function

Abstract base

BaseSource

class netdef.Sources.BaseSource.BaseSource(key=None, value=None, controller=None, source=None, rule=None)[source]

Bases: object

can_set_value_from_string()[source]

Returns True if the value can be converted from string to its given datatype. Only builtins.int, str and float have built-in support, but aditional types can be implemented by this function and set_value_from_string

static can_unpack_subitems(value)[source]

Function that confirms / decides on input data a known list. If so, then unpack_subitems can be used afterwards.

Example:

def parse_response(self, response):
    for parser in self.get_parsers():
        if parser.can_unpack_subitems(response):
            yield from parser.unpack_subitems(response)
static can_unpack_value(value)[source]

Function that confirms / determines if the input data is compatible with this class. If so, unpack_value should be used afterwards.

Example:

def parse_item(self, item):
    for parser in self.get_parsers():
        if parser.can_unpack_value(item):
            key, source_time, value = parser.unpack_value(item)
            self.send_datachange(key, source_time, value)
copy_get_value()[source]

Shallow copy of the value

copy_value()[source]

Shallow copy of the value

get

Get the value that is updated by the controller

get_reference()[source]

Used to identify similar sources. if two instances return the same reference this means that one instance is redundant and can be replaced

pack_add_source()[source]

Used if source must be added to external system. I.e. a subscription. Can be overridden and customized.

static pack_subitems(value)[source]

Creates output that can be used to query for a list of inputs

pack_value(value)[source]

Function that converts key and values into a format that the source uses. Can be overridden and adapted to the controller it is to be used in.

Example:

def handle_write_source(self, incoming, value, source_time):
    data = incoming.pack_value(value, source_time)
    topic, payload = incoming.make_message(incoming.key, data)
    self.publish_data_item(topic, payload)
register_set_callback(set_callback)[source]

Register the callback that sends WRITE_SOURCE message to the controller queue.

set

Get the value that is updated by expressions

set_value_from_string(value, stime=None, status_ok=True, origin='')[source]

Converts given value to correct datatype and sends a WRITE_SOURCE message to controller.

This function is called when a value change is triggered from Webadmin ‣ Sources ‣ Edit

Parameters:
  • value – value to be set
  • or datetime.datetime) stime ((None) – timestamp when the value was changed
  • status_ok (bool) – True if value is good
  • origin (str) – who set the value
static unpack_subitems(value)[source]

Function that parses response from source and yield items found in value. This can be overridden and adapted to the controller it is to be used in.

Example:

def parse_response(self, response):
    for parser in self.get_parsers():
        if parser.can_unpack_subitems(response):
            yield from parser.unpack_subitems(response)
static unpack_value(key, source_time, value)[source]

Function that parses response from source and returns following tuple: (key, source_time, value) Key can then be used to find the right instance and update values.

Can be overridden and adapted to the controller it is to be used in.

Returns:tuple(key, source_time, value)
Return type:tuple

Example:

def parse_item(self, item):
    for parser in self.get_parsers():
        if parser.can_unpack_value(item):
            key, source_time, value = parser.unpack_value(item)
            self.send_datachange(key, source_time, value)
value_as_string

Is primarily used by web interfaces to display value in table. Can be overridden to limit the display of large data. Example:

@property
def value_as_string(self):
    if self.value and isinstance(self.value, bytes):
        n = len(self.value)
        return "<{}...><data len:{}>".format(self.value[:10], n)
    else:
        return super().value_as_string
class netdef.Sources.BaseSource.StatusCode[source]

Bases: enum.Enum

Used to indicate the quality of a value in BaseSource.status_code

NONE: Value is not set yet. INITIAL: First value. you might have to update cashes with this value at application startup. GOOD: A normal value update. INVALID: A value update where the value is not to be trusted.

GOOD = 2
INITIAL = 1
INVALID = 3
NONE = 0

Built-in Interfaces

BytestringSource

class netdef.Sources.BytestringSource.BytestringSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

value_as_string

byte data as string

CommTestSource

class netdef.Sources.CommTestSource.CommTestSource(*args, **kwargs)[source]

Bases: netdef.Sources.FloatSource.FloatSource

unpack_host_and_port()[source]

ConcurrentWebRequestSource

class netdef.Sources.ConcurrentWebRequestSource.ConcurrentWebRequestSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

DEFAULT_CLIENT_SESSION_TIMEOUT = 2
add_client_session(session)[source]
build_url(url)[source]
get_basic_auth()[source]
get_client_session()[source]
get_client_session_timeout()[source]
get_commands_list()[source]
get_connect_request()[source]
get_poll_request()[source]
get_poll_request_interval()[source]
get_start_url()[source]
has_basic_auth()[source]
has_connect_request()[source]
has_poll_request()[source]
parse_url(url)[source]
class netdef.Sources.ConcurrentWebRequestSource.Request(method, url, params=None, data=None)[source]

Bases: object

data
method
params
url
class netdef.Sources.ConcurrentWebRequestSource.Result(result)[source]

Bases: object

result
netdef.Sources.ConcurrentWebRequestSource.setup(shared)[source]

CrontabSource

class netdef.Sources.CrontabSource.CrontabSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

DictSource

class netdef.Sources.DictSource.DictSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

FloatSource

class netdef.Sources.FloatSource.FloatSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

HoldingRegisterSource

class netdef.Sources.HoldingRegisterSource.HoldingRegisterSource(*args, **kwargs)[source]

Bases: netdef.Sources.IntegerSource.IntegerSource

static pack_unit_and_address(unit, address)[source]
unpack_unit_and_address()[source]

InfluxDBLoggerSource

class netdef.Sources.InfluxDBLoggerSource.InfluxDBLoggerSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

A dataholder class to be used with InfluxDBLoggerController

get_points(data, source_time, status_code)[source]

Returns a list suitable as argument for InfluxDBClient.write_points()

Parameters:
Returns:

a list of dicts

static make_points(interface, measurement, value, source_time, status_code)[source]

Make a list suitable as argument for InfluxDBClient.write_points()

Parameters:
  • interface (BaseSource, InfluxDBLoggerInterface) – an object with key, rule, source an controller attrs
  • measurement (str) – influxdb measurement name
  • value – measurement field.value
  • source_time (datetime.datetime) – measurement time
  • status_code (BaseSource.StatusCode) – measurement field.status_code
Returns:

a list of dicts

unpack_measurement()[source]

Returns self.key. Override to change measurement name.

IntegerSource

class netdef.Sources.IntegerSource.IntegerSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

InternalSource

class netdef.Sources.InternalSource.InternalSource(*args, **kwargs)[source]

Bases: netdef.Sources.DictSource.DictSource

static can_unpack_value(value)[source]

Function that confirms / determines if the input data is compatible with this class. If so, unpack_value should be used afterwards.

Example:

def parse_item(self, item):
    for parser in self.get_parsers():
        if parser.can_unpack_value(item):
            key, source_time, value = parser.unpack_value(item)
            self.send_datachange(key, source_time, value)
pack_value(value)[source]

Function that converts key and values into a format that the source uses. Can be overridden and adapted to the controller it is to be used in.

Example:

def handle_write_source(self, incoming, value, source_time):
    data = incoming.pack_value(value, source_time)
    topic, payload = incoming.make_message(incoming.key, data)
    self.publish_data_item(topic, payload)
static unpack_value(value)[source]

Function that parses response from source and returns following tuple: (key, source_time, value) Key can then be used to find the right instance and update values.

Can be overridden and adapted to the controller it is to be used in.

Returns:tuple(key, source_time, value)
Return type:tuple

Example:

def parse_item(self, item):
    for parser in self.get_parsers():
        if parser.can_unpack_value(item):
            key, source_time, value = parser.unpack_value(item)
            self.send_datachange(key, source_time, value)

MQTTDataMessageSource

class netdef.Sources.MQTTDataMessageSource.MQTTDataMessageSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

static can_unpack_value(value)[source]

Check if it is possible to extract a value from the payload

static make_message(topic, datamessage)[source]

Wraps given datamessage into a json-payload

Parameters:
  • topic (str) – mqtt topic
  • datamessage (DataMessage) – a datamessage object
Returns:

tuple of topic and json payload

Return type:

tuple

pack_value(value, stime, status_code, origin)[source]

pack the value and stime into a mqtt payload

static parse_message(topic, payload)[source]

Parse given json-payload into a datamessage object

Parameters:
  • topic (str) – mqtt topic
  • payload (str) – json payload
Returns:

a DataMessage object

Return type:

DataMessage

static unpack_value(value)[source]

Return a tuple with key, time and value from the mqtt payload :param DataMessage value: datamessage from mqtt payload :returns: tuple(key, source_time, value, origin) :rtype: tuple

SubprocessSource

class netdef.Sources.SubprocessSource.SubprocessSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

DEFAULT_INTERVAL = 10
static can_unpack_subitems(value)[source]

Returns False, cannot unpack subitems

get_command_and_args(args=None)[source]

Get command and argument to run

get_poll_interval()[source]
has_initial_poll()[source]
has_poll_interval()[source]
parse_stdout_response(value)[source]

Implement parsing function

static unpack_subitems(value)[source]

Yields None, cannot unpack subitems

netdef.Sources.SubprocessSource.setup(shared)[source]

SystemMonitorSource

class netdef.Sources.SystemMonitorSource.SystemMonitorByteSource(*args, **kwargs)[source]

Bases: netdef.Sources.SystemMonitorSource.SystemMonitorSource

static get_interface()[source]
class netdef.Sources.SystemMonitorSource.SystemMonitorPercentSource(*args, **kwargs)[source]

Bases: netdef.Sources.SystemMonitorSource.SystemMonitorSource

static get_interface()[source]
class netdef.Sources.SystemMonitorSource.SystemMonitorSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

static get_interface()[source]
get_value_and_unit()[source]
value_as_string

TextSource

class netdef.Sources.TextSource.TextSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

VariantSource

class netdef.Sources.VariantSource.VariantSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

XmlRpcMethodCallSource

class netdef.Sources.XmlRpcMethodCallSource.XmlRpcMethodCallSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

static can_unpack_subitems(value)[source]

Returns False, cannot unpack subitems

make_rpc_request(value)[source]
parse_rpc_response(value)[source]
poll_request()[source]
static unpack_subitems(value)[source]

Yields None, cannot unpack subitems

ZmqDataAccessSource

class netdef.Sources.ZmqDataAccessSource.ZmqDataAccessSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

pack_address(addr)[source]
unpack_address()[source]