netdef.Controllers package¶
- Controllers
- Abstract base controllers
- Built-in controller modules
- CommTestController
- ConcurrentWebRequestController
- CrontabController
- InfluxDBLoggerController
- InternalController
- ModbusClientController
- ModbusServerController
- MQTTDataMessageController
- OPCUAClientController
- OPCUAServerController
- RESTJsonController
- SubprocessController
- SystemMonitorController
- XmlRpcController
- ZmqDataAccessController
Controllers¶
-
class
netdef.Controllers.Controllers.
Controllers
(shared=None)[source]¶ Bases:
object
A collection of all loaded controllers
-
netdef.Controllers.Controllers.
register
(name, classref=None)[source]¶ A decorator to register controllers. Example:
from netdef.Controllers import BaseController, Controllers @Controllers.register("NewControllerTemplate") class NewControllerTemplate(BaseController.BaseController): def __init__(self, name, shared): ...
Can also be called as a normal function:
from netdef.Controllers import BaseController, Controllers def setup(shared): Controllers.register("NewControllerTemplate", NewControllerTemplate) class NewControllerTemplate(BaseController.BaseController): def __init__(self, name, shared): ...
Parameters: - name (str) – Name of the controller class
- classref (object) – Should be None if used as a decorator and a class if called as a function
Returns: A callable that returns a class if used as a decorator and a class if called as a normal function
Abstract base controllers¶
BaseController¶
This is an abstract baseclass
-
class
netdef.Controllers.BaseController.
BaseController
(name, shared)[source]¶ Bases:
object
Abstract class for controllers.
Parameters: - name (str) – Name to be used in logfiles
- shared – a reference to the shared object
-
add_source
(name, init_value)[source]¶ Add a source to the storage dict. Override if something else is needed.
-
clear_incoming
(until_empty=True, until_messagetype=None)[source]¶ Delete all messages from incoming queue.
Parameters: - until_empty (bool) – If True the function will block until queue is empty. If False it will block forever.
- until_messagetype (MessageType) – Block until given messagetype is received
Example:
... while not self.has_interrupt(): reconnect = False try: if reconnect: self.clear_incoming() self.try_reconnect() # main loop while not self.has_interrupt(): self.loop_incoming() self.loop_outgoing() except ConnectionError: reconnect = True ...
-
fetch_one_incoming
()[source]¶ Returns one message from the queue.
Returns: tuple of (messagetype, incoming) Return type: tuple(MessageType, BaseSource)
-
init_parsers
(parsers)[source]¶ Setup the parser storage as a list. Override if something else is needed.
-
init_sources
(sources)[source]¶ Setup the source storage as a dict. Override if something else is needed.
-
loop_incoming
(until_empty=True, until_timeout=0.0, until_messagetype=None, until_app_state=None)[source]¶ Get every message from the queue and dispatch the associated handler function.
Parameters: - until_empty (bool) – Blocking until queue is empty
- until_timeout (float) – Timeout in seconds.
0.0
blocks forever. - until_messagetype (MessageType) – Blocking until given messagetype is dispatched
- until_app_state (AppStateType) – Blocking until given app_state is dispatched
-
loop_until_app_state_running
()[source]¶ Usefull if you want your controller to block while ADD_SOURCE and ADD_PARSER is dispatched
Example:
def run(self): self.loop_until_app_state_running() while not self.has_interrupt(): try: self.handle_connection() while not self.has_interrupt(): self.loop_incoming() self.loop_outgoing() except ConnectionError: self.handle_conn_error()
-
run
()[source]¶ Override this function in controller. Example:
def run(self): self.logger.info("Running") while not self.has_interrupt(): self.loop_incoming() # dispatch handle_* functions self.loop_outgoing() # dispatch poll_* functions self.logger.info("Stopped")
-
sleep
(seconds)[source]¶ ” Sleep by waiting for the interrupt. Should be used instead of time.sleep. Override if sleep should be interrupted by even more signals
BaseAsyncController¶
-
class
netdef.Controllers.BaseAsyncController.
BaseAsyncController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseController.BaseController
Tip
Development Status :: 5 - Production/Stable
-
run
()[source]¶ Override this function in controller. Example:
def run(self): self.logger.info("Running") some_client = SomeAsyncioClient() # Start polling of the blocking incoming queue in a thread executor self.loop.run_in_executor(None, self.loop_incoming_until_interrupt) # TODO: define a coroutine that stops your async client when called. async def stop_some_client(): await some_client.stop() # register coroutine to be run at interrupt / shutdown self.loop.create_task(self.run_async_on_interrupt(stop_some_client)) # TODO: start your client coroutine self.loop.run_until_complete(some_client.start()) self.logger.info("Stopped")
-
Built-in controller modules:
Built-in controller modules¶
CommTestController¶
-
class
netdef.Controllers.CommTestController.
CommTestController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseAsyncController.BaseAsyncController
Tip
Development Status :: 5 - Production/Stable
This class will send TCP or ICMP ping requests based on sources received in ADD_SOURCE messages and store the result into given sources. When result is stored into a source this class will send the changed source in a RUN_EXPRESSION message to the source’s rule.
Parameters: - name (str) – Name of controller
- shared (netdef.Shared) – Instance of applications shared object.
- Configuration:
- timeout – Connection timeout in seconds
- interval – Poll interval in seconds
- test_type – Available types: [tcpip, ping]
- max_concurrent_sockets – Max number of simultaneous open sockets.
- disable – If disabled this controller will enter running state but all messages will be discarded.
- Defaults:
[CommTestController] timeout = 2 interval = 10 test_type = tcpip max_concurrent_sockets = 1000 disable = 0
ConcurrentWebRequestController¶
-
class
netdef.Controllers.ConcurrentWebRequestController.
ConcurrentWebRequestController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseAsyncController.BaseAsyncController
Danger
Development Status :: 3 - Alpha
Basically just a web scraper. Can scrape multiple web pages simultaneously.
IO is handeled by this controller. The poll interval and program flow is implemented in
ConcurrentWebRequestSource
-
get_client_session
(item)[source]¶ Returns a aiohttp session. Add new session to source if not found. Session will be initialized with basic auth and a default timeout
-
handle_write_source
(incoming, value, source_time)[source]¶ execute a command if given value is the name of a command
-
CrontabController¶
-
class
netdef.Controllers.CrontabController.
CrontabController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseController.BaseController
Tip
Development Status :: 5 - Production/Stable
InfluxDBLoggerController¶
-
class
netdef.Controllers.InfluxDBLoggerController.
InfluxDBLoggerController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseController.BaseController
Danger
Development Status :: 3 - Alpha
A logging controller. Its purpose is to store every write event into influxdb.
-
handle_write_source
(incoming, value, source_time)[source]¶ Write given value and timestamp into influxdb
Parameters: - incoming (InfluxDBLoggerSource) – source instance
- value – frozen value if instance
- source_time (datetime.datetime) – value timestamp
-
InternalController¶
-
class
netdef.Controllers.InternalController.
InternalController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseController.BaseController
Tip
Development Status :: 5 - Production/Stable
Internal variables that works just like any other value from a controller. Can trigger events on valuechanges. State can be cached to disk.
Parameters: - name (str) – The name is used i logfile and default.ini
- shared (Shared) – Instance of applications shared object.
Configuration
[InternalController] send_init_event = 0 send_events = 0 persistent_value = 0 key_in_filename = 0
- Options
- send_init_event – trigger RUN_EXPRESSION with StatusCode.INITIAL for every source at startup
- send_events – trigger a RUN_EXPRESSION message for every WRITE_SOURCE message
- persistent_value – store values to disk
- key_in_filename – use source key as prefix in filename for persistent storage
Sequence diagram
-
get_cache_filename
(key)[source]¶ Generate sha256 hash to be used as filename. If config key_in_filename=1 then key will be prefixed to the hexdigest. Valid characters: a-z A-Z 0-9 _.-
Parameters: key (str) – string to encode Returns: filename
-
handle_add_source
(incoming)[source]¶ Add given source instance to internal source list
Parameters: incoming (InternalSource) – source instance
-
handle_write_source
(incoming, value, source_time)[source]¶ Update internal dict with new value.
Parameters: - incoming (InternalSource) – source instance
- value – frozen value of instance
- source_time (datetime.datetime) – value timestamp
-
poll_outgoing_item
(item)[source]¶ Check if given source should be cached to disk.
Parameters: item (InternalSource) – source instance
ModbusClientController¶
-
class
netdef.Controllers.ModbusClientController.
ModbusClientController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseController.BaseController
Tip
Development Status :: 5 - Production/Stable
Read and write holding registers of a modbus device.
Parameters: - name (str) – The name is used i logfile and default.ini
- shared (Shared) – reference to the global shared instance
Settings:
[ModbusClientController] # connection host = 127.0.0.1 port = 5020 # RUN_EXPRESSION is only sent if value has changed oldnew_comparision = 1 # cooldown on connection error og write error reconnect_timeout = 20 # Buffer or clear write requests recieved during cooldown clear_writes_on_disconnect = 1 # Polling interval poll_interval = 0.5
Sequence diagram:
-
handle_add_source
(incoming)[source]¶ Add given source instance to internal source list
Parameters: incoming (HoldingRegisterSource) – source instance
-
handle_write_source
(incoming, value, source_time)[source]¶ Write given value to the connected modbus device.
Parameters: - incoming (HoldingRegisterSource) – source instance
- value – frozen value of instance
- source_time (datetime.datetime) – value timestamp
-
poll_outgoing_item
(item)[source]¶ Poll given source for its value in the modbus device
Parameters: item (HoldingRegisterSource) – source instance
ModbusServerController¶
-
class
netdef.Controllers.ModbusServerController.
ModbusServerController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseController.BaseController
Tip
Development Status :: 5 - Production/Stable
Sequence diagram:
-
get_framer
()[source]¶ Returns the framer to be used. Override this function to return a custom framer
-
-
class
netdef.Controllers.ModbusServerController.
MyContext
(*args, **kwargs)[source]¶ Bases:
pymodbus.datastore.context.ModbusSlaveContext
MQTTDataMessageController¶
-
class
netdef.Controllers.MQTTDataMessageController.
MQTTDataMessageController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseController.BaseController
Danger
Development Status :: 3 - Alpha
OPCUAClientController¶
-
class
netdef.Controllers.OPCUAClientController.
OPCUAClientController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseController.BaseController
Caution
Development Status :: 4 - Beta
OPCUAServerController¶
-
class
netdef.Controllers.OPCUAServerController.
CustomAnonInternalSession
(internal_server, aspace, submgr, name, user=<sphinx.ext.autodoc.importer._MockObject object>, external=False)[source]¶ Bases:
opcua.server.internal_server.InternalSession
Custom InternalSession will set timestamp when missing
-
class
netdef.Controllers.OPCUAServerController.
CustomInternalSession
(internal_server, aspace, submgr, name, user=<sphinx.ext.autodoc.importer._MockObject object>, external=False)[source]¶ Bases:
netdef.Controllers.OPCUAServerController.CustomAnonInternalSession
This custom InternalSession will block anonymous access
-
class
netdef.Controllers.OPCUAServerController.
CustomServer
(shelffile=None, iserver=None)[source]¶ Bases:
opcua.server.server.Server
Custom Server that enables Basic128Rsa15 and Basic256
-
class
netdef.Controllers.OPCUAServerController.
OPCUAServerController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseController.BaseController
Tip
Development Status :: 5 - Production/Stable
This Controller will start a freeopcua server instance and will add a nodeid for all sources received in
ADD_SOURCE
messages.When a client writes a new value this event will be forwarded to the associated source and a
RUN_EXPRESSION
message will be sent.When a
WRITE_SOURCE
message is received the value for the associated source will be updated in the server and all connected clients will receive a value updateSequence diagram:
-
add_variablenode
(parent, ref, val, varianttype)[source]¶ Create and add a variable in server and return the variable node
-
create_datavalue
(val, datatype, statuscode, timestamp)[source]¶ Create a value for the server that keep the correct datatype
-
create_monitored_items
(event, dispatcher)[source]¶ write a warning to logfile if the client add a nodeid that does not exists
-
RESTJsonController¶
-
class
netdef.Controllers.RESTJsonController.
RESTJsonController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseController.BaseController
Tip
Development Status :: 5 - Production/Stable
SubprocessController¶
-
class
netdef.Controllers.SubprocessController.
NextInterval
(timestamp)[source]¶ Bases:
object
Call next() to retrieve seconds to next interval, and which interval it is
-
spans
¶
-
start
¶
-
-
class
netdef.Controllers.SubprocessController.
SubprocessController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseController.BaseController
Danger
Development Status :: 3 - Alpha
SystemMonitorController¶
-
class
netdef.Controllers.SystemMonitorController.
DataItem
(source_type, key, interval, func, args=None)[source]¶ Bases:
object
-
args
¶ Arguments for
self.func
callback
-
func
¶ Callback to retrieve value
-
interval
¶ Poll interval
-
key
¶ Unique identifier
-
next
¶ Next scheduled call to
self.func
-
source_type
¶ Reference to a SystemMonitorSource class
-
-
class
netdef.Controllers.SystemMonitorController.
SystemMonitorController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseController.BaseController
Tip
Development Status :: 5 - Production/Stable
-
netdef.Controllers.SystemMonitorController.
get_clean_mount_point_name
(node)[source]¶ Replace / or with .
Example:
for disk in psutil.disk_partitions(): print (get_clean_mount_point_name(disk.mountpoint))
Parameters: node (str) – name of mountpoint Returns: new node name
-
netdef.Controllers.SystemMonitorController.
get_data_items_dict
(mempoll, cpupoll, poll, checkdisk, diskpoll)[source]¶ Create a dict with items to monitor.
Parameters: - mempoll (int) – poll interval for memory callbacks
- cpupoll (int) – poll interval for cpu callbacks
- poll (int) – general poll interval
- checkdisk (bool) – Set True to poll disk drives
- diskpoll (int) – poll interval for disk drives
Returns: dict of
DataItem
-
netdef.Controllers.SystemMonitorController.
get_proc
()[source]¶ Helperfunction.
Returns: psutil.Process
XmlRpcController¶
-
class
netdef.Controllers.XmlRpcController.
XmlRpcController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseController.BaseController
Tip
Development Status :: 5 - Production/Stable
Sequence diagram:
ZmqDataAccessController¶
-
class
netdef.Controllers.ZmqDataAccessController.
ZmqDataAccessController
(name, shared)[source]¶ Bases:
netdef.Controllers.BaseController.BaseController
Danger
Development Status :: 3 - Alpha