netdef.Controllers package

Controllers

class netdef.Controllers.Controllers.Controllers(shared=None)[source]

Bases: object

A collection of all loaded controllers

add_shared_object(shared)[source]
init()[source]
load(base_packages)[source]

Imports controller modules. Creates queue instances associated with the given controllers.

Example:

from netdef.Controllers import Controllers
controllers = Controllers.Controllers(shared)
controllers.load([__package__, 'netdef'])
netdef.Controllers.Controllers.register(name, classref=None)[source]

A decorator to register controllers. Example:

from netdef.Controllers import BaseController, Controllers

@Controllers.register("NewControllerTemplate")
class NewControllerTemplate(BaseController.BaseController):
    def __init__(self, name, shared):
        ...

Can also be called as a normal function:

from netdef.Controllers import BaseController, Controllers

def setup(shared):
    Controllers.register("NewControllerTemplate", NewControllerTemplate)

class NewControllerTemplate(BaseController.BaseController):
    def __init__(self, name, shared):
        ...
Parameters:
  • name (str) – Name of the controller class
  • classref (object) – Should be None if used as a decorator and a class if called as a function
Returns:

A callable that returns a class if used as a decorator and a class if called as a normal function

Abstract base controllers

BaseController

This is an abstract baseclass

class netdef.Controllers.BaseController.BaseController(name, shared)[source]

Bases: object

Abstract class for controllers.

Parameters:
  • name (str) – Name to be used in logfiles
  • shared – a reference to the shared object
add_interrupt(interrupt)[source]

Setup the interrupt signal

add_logger(name)[source]

Setup logging module

add_parser(parser)[source]

Add parser if not already exists

add_source(name, init_value)[source]

Add a source to the storage dict. Override if something else is needed.

clear_incoming(until_empty=True, until_messagetype=None)[source]

Delete all messages from incoming queue.

Parameters:
  • until_empty (bool) – If True the function will block until queue is empty. If False it will block forever.
  • until_messagetype (MessageType) – Block until given messagetype is received

Example:

...

while not self.has_interrupt():
    reconnect = False
    try:
        if reconnect:
            self.clear_incoming()
            self.try_reconnect()
        # main loop
        while not self.has_interrupt():
            self.loop_incoming()
            self.loop_outgoing()
    except ConnectionError:
        reconnect = True

...
fetch_one_incoming()[source]

Returns one message from the queue.

Returns:tuple of (messagetype, incoming)
Return type:tuple(MessageType, BaseSource)
get_parsers()[source]

Return parser storage

get_source(name)[source]

Return named source

get_sources()[source]

Return source storage

handle_add_parser(incoming)[source]

Add parser to controller if not already exists

handle_add_source(incoming)[source]
handle_app_state(app_state)[source]

Override if controller need to react to application states

handle_app_state_running()[source]

Override if controller need to react to running state

handle_app_state_setup()[source]

Override if controller need to react to setup state

handle_read_source(incoming)[source]
handle_readall(incoming)[source]
handle_tick(incoming)[source]

Answer the tick message

handle_write_source(incoming, value, source_time)[source]
has_interrupt()[source]

Returns True if the interrupt signal is received

has_source(name)[source]

Return True if source name is found

init_parsers(parsers)[source]

Setup the parser storage as a list. Override if something else is needed.

init_queue()[source]

Setup the message queue and timeout

init_sources(sources)[source]

Setup the source storage as a dict. Override if something else is needed.

loop_incoming(until_empty=True, until_timeout=0.0, until_messagetype=None, until_app_state=None)[source]

Get every message from the queue and dispatch the associated handler function.

Parameters:
  • until_empty (bool) – Blocking until queue is empty
  • until_timeout (float) – Timeout in seconds. 0.0 blocks forever.
  • until_messagetype (MessageType) – Blocking until given messagetype is dispatched
  • until_app_state (AppStateType) – Blocking until given app_state is dispatched
loop_outgoing()[source]

Check every source and call the poll_outgoing_item function

loop_until_app_state_running()[source]

Usefull if you want your controller to block while ADD_SOURCE and ADD_PARSER is dispatched

Example:

def run(self):
    self.loop_until_app_state_running()
    while not self.has_interrupt():
        try:
            self.handle_connection()
            while not self.has_interrupt():
                self.loop_incoming()
                self.loop_outgoing()
        except ConnectionError:
            self.handle_conn_error()
poll_outgoing_item(item)[source]
run()[source]

Override this function in controller. Example:

def run(self):
    self.logger.info("Running")

    while not self.has_interrupt():
        self.loop_incoming() # dispatch handle_* functions
        self.loop_outgoing() # dispatch poll_* functions

    self.logger.info("Stopped")
send_outgoing(outgoing)[source]

Send RUN_EXPRESSION message on valuechange

sleep(seconds)[source]

” Sleep by waiting for the interrupt. Should be used instead of time.sleep. Override if sleep should be interrupted by even more signals

statistics_update()[source]
classmethod update_source_instance_status(source_instance, status_ok, oldnew_check)[source]

Updates state on given source_instance Returns True if source_instance have triggered a value change

static update_source_instance_value(source_instance, value, stime, status_ok, oldnew_check)[source]

Updates value, timestamp and state on given source_instance Returns True if source_instance have triggered a value change

BaseAsyncController

class netdef.Controllers.BaseAsyncController.BaseAsyncController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

get_event_loop()[source]
handle_app_state_running()[source]

Override if controller need to react to running state

init_asyncio()[source]
loop_incoming_until_interrupt()[source]
run()[source]

Override this function in controller. Example:

def run(self):
    self.logger.info("Running")

    some_client = SomeAsyncioClient()

    # Start polling of the blocking incoming queue in a thread executor
    self.loop.run_in_executor(None, self.loop_incoming_until_interrupt)

    # TODO: define a coroutine that stops your async client when called.
    async def stop_some_client():
        await some_client.stop()

    # register coroutine to be run at interrupt / shutdown 
    self.loop.create_task(self.run_async_on_interrupt(stop_some_client))

    # TODO: start your client coroutine
    self.loop.run_until_complete(some_client.start())

    self.logger.info("Stopped")
run_async_on_interrupt(callback)[source]

Built-in controller modules:

Built-in controller modules

CommTestController

class netdef.Controllers.CommTestController.CommTestController(name, shared)[source]

Bases: netdef.Controllers.BaseAsyncController.BaseAsyncController

Tip

Development Status :: 5 - Production/Stable

This class will send TCP or ICMP ping requests based on sources received in ADD_SOURCE messages and store the result into given sources. When result is stored into a source this class will send the changed source in a RUN_EXPRESSION message to the source’s rule.

Parameters:
  • name (str) – Name of controller
  • shared (netdef.Shared) – Instance of applications shared object.
Configuration:
  • timeout – Connection timeout in seconds
  • interval – Poll interval in seconds
  • test_type – Available types: [tcpip, ping]
  • max_concurrent_sockets – Max number of simultaneous open sockets.
  • disable – If disabled this controller will enter running state but all messages will be discarded.
Defaults:
[CommTestController]
timeout = 2
interval = 10
test_type = tcpip
max_concurrent_sockets = 1000
disable = 0
loop_outgoing_until_interrupt()[source]

Main coroutine. loops until interrupt is set.

run()[source]

Main thread loop. Will exit when receiving interrupt signal Sets up

ConcurrentWebRequestController

class netdef.Controllers.ConcurrentWebRequestController.ConcurrentWebRequestController(name, shared)[source]

Bases: netdef.Controllers.BaseAsyncController.BaseAsyncController

Danger

Development Status :: 3 - Alpha

Basically just a web scraper. Can scrape multiple web pages simultaneously.

IO is handeled by this controller. The poll interval and program flow is implemented in ConcurrentWebRequestSource

get_client_session(item)[source]

Returns a aiohttp session. Add new session to source if not found. Session will be initialized with basic auth and a default timeout

handle_add_source(incoming)[source]

Add source to controller

handle_write_source(incoming, value, source_time)[source]

execute a command if given value is the name of a command

init_task_limit()[source]

Read configuration

loop_outgoing_until_interrupt()[source]

Main async loop.

proccess_task(item, method)[source]

Retrives data from web site and packs it into the source

proccess_web_request_item(item, method, session)[source]

handle IO by interfacing with the sources data generator

run()[source]

Main sync loop

class netdef.Controllers.ConcurrentWebRequestController.NextInterval(timestamp)[source]

Bases: object

Call next() to retrieve seconds to next interval, and which interval it is

add(interval)[source]
has_interval()[source]
next(now)[source]
spans
start

CrontabController

class netdef.Controllers.CrontabController.CrontabController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

poll_outgoing_item(item)[source]

Check if it is time to trigger event for given source

Parameters:item – source instance to check
run()[source]

Main loop. Will exit when receiving interrupt signal

InfluxDBLoggerController

class netdef.Controllers.InfluxDBLoggerController.InfluxDBLoggerController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Danger

Development Status :: 3 - Alpha

A logging controller. Its purpose is to store every write event into influxdb.

handle_write_source(incoming, value, source_time)[source]

Write given value and timestamp into influxdb

Parameters:
  • incoming (InfluxDBLoggerSource) – source instance
  • value – frozen value if instance
  • source_time (datetime.datetime) – value timestamp
run()[source]

Main loop. Will exit when receiving interrupt signal

InternalController

class netdef.Controllers.InternalController.InternalController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

Internal variables that works just like any other value from a controller. Can trigger events on valuechanges. State can be cached to disk.

Parameters:
  • name (str) – The name is used i logfile and default.ini
  • shared (Shared) – Instance of applications shared object.

Configuration

[InternalController]
send_init_event = 0
send_events = 0
persistent_value = 0
key_in_filename = 0
Options
  • send_init_event – trigger RUN_EXPRESSION with StatusCode.INITIAL for every source at startup
  • send_events – trigger a RUN_EXPRESSION message for every WRITE_SOURCE message
  • persistent_value – store values to disk
  • key_in_filename – use source key as prefix in filename for persistent storage

Sequence diagram

blockdiag Queue InternalController Update value of source [n] Value change in source [n] APP_STATE, SETUP ADD_SOURCE, source [n] APP_STATE, RUNNING WRITE_SOURCE, source [n], value, timestamp RUN_EXPRESSION, source [n] Initialization Setup Running Begin loop End Loop
get_cache_filename(key)[source]

Generate sha256 hash to be used as filename. If config key_in_filename=1 then key will be prefixed to the hexdigest. Valid characters: a-z A-Z 0-9 _.-

Parameters:key (str) – string to encode
Returns:filename
handle_add_source(incoming)[source]

Add given source instance to internal source list

Parameters:incoming (InternalSource) – source instance
handle_write_source(incoming, value, source_time)[source]

Update internal dict with new value.

Parameters:
  • incoming (InternalSource) – source instance
  • value – frozen value of instance
  • source_time (datetime.datetime) – value timestamp
poll_outgoing_item(item)[source]

Check if given source should be cached to disk.

Parameters:item (InternalSource) – source instance
run()[source]

Main loop. Will exit when receiving interrupt signal

store_to_disk(item=None)[source]

Store sources into files at [proj-path]/db/internal/

ModbusClientController

class netdef.Controllers.ModbusClientController.ModbusClientController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

Read and write holding registers of a modbus device.

Parameters:
  • name (str) – The name is used i logfile and default.ini
  • shared (Shared) – reference to the global shared instance

Settings:

[ModbusClientController]

# connection
host = 127.0.0.1
port = 5020

# RUN_EXPRESSION is only sent if value has changed
oldnew_comparision = 1

# cooldown on connection error og write error
reconnect_timeout = 20

# Buffer or clear write requests recieved during cooldown
clear_writes_on_disconnect = 1

# Polling interval
poll_interval = 0.5

Sequence diagram:

blockdiag Queue ModbusClientControlle r Modbus registers Update value of source [n] Value change in source [n] Update value of source [n] Value change in nodeid [n] APP_STATE, SETUP ADD_SOURCE, source [n] APP_STATE, RUNNING Value change, register [n] RUN_EXPRESSION, source [n] WRITE_SOURCE, source [n], value, timestamp update value, register [n] Initialization Setup Running Begin loop End Loop
handle_add_source(incoming)[source]

Add given source instance to internal source list

Parameters:incoming (HoldingRegisterSource) – source instance
handle_write_source(incoming, value, source_time)[source]

Write given value to the connected modbus device.

Parameters:
  • incoming (HoldingRegisterSource) – source instance
  • value – frozen value of instance
  • source_time (datetime.datetime) – value timestamp
poll_outgoing_item(item)[source]

Poll given source for its value in the modbus device

Parameters:item (HoldingRegisterSource) – source instance
run()[source]

Main loop. Will exit when receiving interrupt signal

safe_disconnect()[source]

Close the tcp socket if it is connected

ModbusServerController

class netdef.Controllers.ModbusServerController.ModbusServerController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

Sequence diagram:

blockdiag Queue ModbusServerControlle r Pymodbus datablock Update value of source [n] Value change in source [n] Update value of source [n] Value change in nodeid [n] APP_STATE, SETUP ADD_SOURCE, source [n] APP_STATE, RUNNING Value change, register [n] RUN_EXPRESSION, source [n] WRITE_SOURCE, source [n], value, timestamp update value, register [n] Initialization Setup Running Begin loop End Loop
get_framer()[source]

Returns the framer to be used. Override this function to return a custom framer

get_modbus_server_context()[source]

Iter the devicelist section in config-file and builds a ModbusServerContext object

Returns:an ModbusServerContext instance
handle_add_source(incoming)[source]
handle_datachange(unit, address, value, is_internal)[source]
handle_write_source(incoming, value, source_time)[source]
init_server(context, framer, identity, host, port)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

class netdef.Controllers.ModbusServerController.MyContext(*args, **kwargs)[source]

Bases: pymodbus.datastore.context.ModbusSlaveContext

setValues(fx, address, values, is_internal=False)[source]

Sets the datastore with the supplied values

Parameters:
  • fx – The function we are working with
  • address – The starting address
  • values – The new values to be set
class netdef.Controllers.ModbusServerController.MyController(*args, **kwargs)[source]

Bases: pymodbus.server.sync.ModbusTcpServer

daemon_threads = False
service_actions()[source]

Called by the serve_forever() loop.

May be overridden by a subclass / Mixin to implement any code that needs to be run during the loop.

MQTTDataMessageController

class netdef.Controllers.MQTTDataMessageController.MQTTDataMessageController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Danger

Development Status :: 3 - Alpha

get_key(topic)[source]
get_topic(topic)[source]
handle_add_source(incoming)[source]
handle_write_source(incoming, value, source_time)[source]
loop_mqtt()[source]
mqtt_connect()[source]
mqtt_safe_disconnect()[source]
on_connect(client, userdata, flags, rc)[source]
on_disconnect(client, userdata, rc)[source]
on_message(client, userdata, msg)[source]
publish_data_item(topic, payload)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

OPCUAClientController

class netdef.Controllers.OPCUAClientController.OPCUAClientController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Caution

Development Status :: 4 - Beta

config(key, default)[source]
handle_add_source(incoming)[source]
handle_write_source(incoming, value, source_time)[source]
loop_outgoing()[source]

Check every source and call the poll_outgoing_item function

run()[source]

Main loop. Will exit when receiving interrupt signal

safe_disconnect()[source]
send_datachange(nodeid, value, stime, status_ok)[source]
class netdef.Controllers.OPCUAClientController.SubHandler(parent)[source]

Bases: object

Client to subscription. It will receive events from server

datachange_notification(node, value, data)[source]
event_notification(event)[source]
status_change_notification(status)[source]

OPCUAServerController

class netdef.Controllers.OPCUAServerController.CustomAnonInternalSession(internal_server, aspace, submgr, name, user=<sphinx.ext.autodoc.importer._MockObject object>, external=False)[source]

Bases: opcua.server.internal_server.InternalSession

Custom InternalSession will set timestamp when missing

write(params)[source]
class netdef.Controllers.OPCUAServerController.CustomInternalSession(internal_server, aspace, submgr, name, user=<sphinx.ext.autodoc.importer._MockObject object>, external=False)[source]

Bases: netdef.Controllers.OPCUAServerController.CustomAnonInternalSession

This custom InternalSession will block anonymous access

activate_session(params)[source]
class netdef.Controllers.OPCUAServerController.CustomServer(shelffile=None, iserver=None)[source]

Bases: opcua.server.server.Server

Custom Server that enables Basic128Rsa15 and Basic256

class netdef.Controllers.OPCUAServerController.OPCUAServerController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

This Controller will start a freeopcua server instance and will add a nodeid for all sources received in ADD_SOURCE messages.

When a client writes a new value this event will be forwarded to the associated source and a RUN_EXPRESSION message will be sent.

When a WRITE_SOURCE message is received the value for the associated source will be updated in the server and all connected clients will receive a value update

Sequence diagram:

blockdiag Queue OPCUAServerController FreeOpcUA Update value of source [n] Value change in source [n] Update value of source [n] Value change in nodeid [n] APP_STATE, SETUP ADD_SOURCE, source [n] APP_STATE, RUNNING subscribe nodeid [n] Value change, nodeid [n] RUN_EXPRESSION, source [n] WRITE_SOURCE, source [n], value, timestamp update value, nodeid [n] Initialization Setup Running Begin loop End Loop
add_folder(parent, foldername)[source]

Add a folder in server

add_variablenode(parent, ref, val, varianttype)[source]

Create and add a variable in server and return the variable node

build_folders(parent, ref, sep)[source]
create_datavalue(val, datatype, statuscode, timestamp)[source]

Create a value for the server that keep the correct datatype

create_monitored_items(event, dispatcher)[source]

write a warning to logfile if the client add a nodeid that does not exists

get_default_value(incoming)[source]

Returns the default value of the source value

get_nodeid(incoming)[source]

Returns the nodeid from the source

get_varianttype(incoming)[source]

Returns the varianttype from the source

handle_add_source(incoming)[source]

Add a source to the server

handle_write_source(incoming, value, source_time)[source]

Receive a value change from an expression and update the server

is_writable(incoming)[source]

Returns True if source is writable for the opcua client

modify_monitored_items(event, dispatcher)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

send_datachange(nodeid, value, stime, status_ok, ua_status_code)[source]

Triggers a RUN_EXPRESSION message for given source

class netdef.Controllers.OPCUAServerController.SubHandler(controller)[source]

Bases: object

The subscription handler for the server. Will send value changes i server to the controller.

datachange_notification(node, val, data)[source]
event_notification(event)[source]

RESTJsonController

class netdef.Controllers.RESTJsonController.RESTJsonController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

connect()[source]
handle_add_source(incoming)[source]
handle_read_source(incoming)[source]
handle_readall(incoming)[source]
handle_write_source(incoming, value)[source]
loop_outgoing()[source]

Check every source and call the poll_outgoing_item function

parse_item(item)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

send_datachange(key, source_time, value)[source]
urlerrorhandling()[source]

SubprocessController

class netdef.Controllers.SubprocessController.NextInterval(timestamp)[source]

Bases: object

Call next() to retrieve seconds to next interval, and which interval it is

add(interval)[source]
has_interval()[source]
next(now)[source]
spans
start
class netdef.Controllers.SubprocessController.SubprocessController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Danger

Development Status :: 3 - Alpha

handle_add_source(incoming)[source]
handle_write_source(incoming, value, source_time)[source]
parse_item(item)[source]
parse_response(response)[source]
poll_outgoing_item(item)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

send_datachange(source_key, value, source_time, status_ok)[source]
setup_interval_plan()[source]
netdef.Controllers.SubprocessController.stdout_from_terminal(command_as_str, err_msg=None)[source]

SystemMonitorController

class netdef.Controllers.SystemMonitorController.DataItem(source_type, key, interval, func, args=None)[source]

Bases: object

args

Arguments for self.func callback

func

Callback to retrieve value

get_value()[source]

Returns value of self.func callback

interval

Poll interval

key

Unique identifier

next

Next scheduled call to self.func

ready()[source]

Returns True if interval for this item has elapsed.

source_type

Reference to a SystemMonitorSource class

class netdef.Controllers.SystemMonitorController.SystemMonitorController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

handle_add_source(incoming)[source]
handle_write_source(incoming, value, source_time)[source]
poll_data()[source]

Iter the dict of DataItem and get values.

run()[source]

Main loop. Will exit when receiving interrupt signal

send_datachange(source_key, value, stime, status_ok)[source]
netdef.Controllers.SystemMonitorController.get_clean_mount_point_name(node)[source]

Replace / or with .

Example:

for disk in psutil.disk_partitions():
    print (get_clean_mount_point_name(disk.mountpoint))
Parameters:node (str) – name of mountpoint
Returns:new node name
netdef.Controllers.SystemMonitorController.get_data_items_dict(mempoll, cpupoll, poll, checkdisk, diskpoll)[source]

Create a dict with items to monitor.

Parameters:
  • mempoll (int) – poll interval for memory callbacks
  • cpupoll (int) – poll interval for cpu callbacks
  • poll (int) – general poll interval
  • checkdisk (bool) – Set True to poll disk drives
  • diskpoll (int) – poll interval for disk drives
Returns:

dict of DataItem

netdef.Controllers.SystemMonitorController.get_proc()[source]

Helperfunction.

Returns:psutil.Process
netdef.Controllers.SystemMonitorController.get_vm()[source]

Helperfunction.

Returns:psutil.virtual_memory
netdef.Controllers.SystemMonitorController.statistics_update(item)[source]

Write internal statistics to the Statistics singleton if activated

XmlRpcController

class netdef.Controllers.XmlRpcController.XmlRpcController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

Sequence diagram:

blockdiag Queue XmlRpcController xmlrpc.client For source [n] Update value of source [n] Unpack subitems into source [i] Value change in source [i] Value change in source [n] Update value of source [n] APP_STATE, SETUP ADD_PARSER, class [n] ADD_SOURCE, source [n] ADD_SOURCE, source [i] APP_STATE, RUNNING Rpc call, request [n] Rpc call, response [n] parse subitems RUN_EXPRESSION, source [i] RUN_EXPRESSION, source [n] WRITE_SOURCE, source [n], value, timestamp Rpc call, request [n] Rpc call, response [n] Initialization Setup Running Begin polling loop End Loop
handle_add_source(incoming)[source]
handle_read_source(incoming)[source]
handle_readall(incoming)[source]
handle_write_source(incoming, value, source_time)[source]
parse_item(item)[source]
parse_response(response)[source]
poll_outgoing_item(item)[source]
rpc_call(item, value)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

send_datachange(key, source_time, value)[source]

ZmqDataAccessController

class netdef.Controllers.ZmqDataAccessController.ZmqDataAccessController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Danger

Development Status :: 3 - Alpha

connect()[source]
handle_add_source(incoming)[source]
handle_write_source(incoming, value, source_time)[source]
loop_subscribers()[source]
run()[source]

Main loop. Will exit when receiving interrupt signal