Welcome to Netdef’s documentation!

Netdef

Summary

An application framework with built-in drivers (Controllers), data holders (Sources) and config parsers (Rules). Also includes a web interface for configuration and troubleshooting.

Features

  • Abstract base classes for creating custom controllers, sources and rules
  • The configuration is done using configparser with extended interpolation
  • Start a new netdef project with cookiecutter or make-project. Templates available at https://gitlab.com/fholmer/netdef-project/
  • Built-in Controllers:
    • OpcUa server / client (freeopcua)
    • TcpModbus server / client (pymodbus)
    • icmp ping / url ping
    • XmlRpc client
    • trigger events by using crontab format (crontab)
    • disk, memory and CPU monitoring (psutil)
    • MQTT client (using a simple messaging format called DataMessage) (paho-mqtt)
    • Simple RESTJson client
    • Simple Influxdb logger (influxdb)
  • Built-in Rules:
    • Generic CSV config parser
    • Generic INI config parser
    • Generic Yaml config parser (PyYAML)
  • Built-in application engines:
    • threaded engine with stdout/stderr only
    • threaded engine with web-interface (webadmin)
    • serve webadmin behind nginx reverse proxy

Use Cases

Netdef is useful if you want to create a middleware that can translate a protocol into a completely different protocol or data format into a completely different data format.

Getting started

First install make-project:

$ python3 -m pip install --user make

Create your first application:

$ python3 -m make project gl:fholmer/netdef-project/minimal-app

When asked for project_name type Test-App:

project_name? [First-App]: Test-App

Setup development environment for your application:

$ cd Test-App
$ python3 -m venv venv
$ source venv/bin/activate
$ pip install wheel
$ pip install -r requirements-dev.txt
$ pip install -r requirements.txt
$ python -m test_app -i .

Run:

$ python -m test_app -r .

CTRL-C to exit

Package your application:

$ python setup.py bdist_wheel

Exit development environment:

$ deactivate

Prepare deployment:

$ sudo mkdir -p /opt/test-app
$ sudo chown $USER:$USER /opt/test-app/
$ python3 -m venv /opt/test-app/

Deploy your application:

$ source /opt/test-app/bin/activate
$ pip install ./dist/Test_App-0.1.0-py3-none-any.whl
$ python -m test_app -i /opt/test-app/

Install as service:

$ sudo /opt/test-app/bin/Test-App-Service -u $USER --install /opt/test-app/

Enable and run:

$ sudo systemctl --system daemon-reload
$ sudo systemctl enable test-app-service.service
$ sudo systemctl start test-app-service.service

Application architecture

When you create an application using the Netdef framework your application consists of:

Glossary

engine

The engine is an instance of netdef.Engines.ThreadedEngine.

_images/classes_engines.png
rule

A rule is an instance derived from netdef.Rules.BaseRule.

_images/classes_rules.png
source

A source is an instance derived from netdef.Sources.BaseSource.

_images/classes_sources.png
controller

A controller is an instance derived from netdef.Controllers.BaseController.

_images/classes_controllers.png
expression

A python callable that is executed by engine when a associated source changes its value. The associated sources are arguments to the callable. See netdef.Engines.expression.Expression.

# Example:
def expression(arg1, arg2):
    print("expression was called")
    print("This is a netdef.Engines.expression.Expression.Argument:", arg1)
    print("This is the associated source instance:", arg1.instance)
    print("The name of the associated controller:", arg1.instance.controller)

Shared queues

All instances have their own incoming queue. This queue is available to the other instances in the shared object. See netdef.Shared.SharedQueues.SharedQueues

_images/shared_queues.png

The instances communicate with each other by registering messages in the recipient’s queue. The example below shows a project with one controller and one rule:

_images/shared_queues_message_example_1.png

The most important message types in your application are APP_STATE, ADD_SOURCE, ADD_PARSER, WRITE_SOURCE and RUN_EXPRESSION. See netdef.Shared.SharedQueues.MessageType

The message flow will in most cases be as follows:

At application initialization:

  • The engine will send APP_STATE to all active controllers.
  • Every rule will send ÀDD_PARSER or/and ADD_SOURCE to a specific controller depending on what is in the configuration files.
  • The engine will send a new APP_STATE to all active controllers.

Repeats until application is terminated:

  • Every controller will send RUN_EXPRESSION back to a specific rule on data changes.
  • The specific rule will then collect the associated expression to be evaluated depending on given data change and send RUN_EXPRESSION to the engine.
  • If the expression generate a new data change then a WRITE_SOURCE message is sent back directly to controller.
blockdiag ThreadedEngine Rule Controller External Parses / Unpacks external data format and updates value of source [n] Value change in source [n] ThreadedEngine Executes expression [n] in ThreadPoolExecutor Looks up the expression for given source and passes it to ThreadedEngine If expression produces a value change Packs value into external data format APP_STATE, AppStateType.SETUP ADD_PARSER, source class ADD_SOURCE, source 1 ADD_SOURCE, source 2 ADD_SOURCE, source [n] APP_STATE, AppStateType.RUNNING [optional] Setup subscription for source [n] [optional] Polling Data received RUN_EXPRESSION, source [n] RUN_EXPRESSION, expression [n] WRITE_SOURCE, source [n], value, timestamp Data sent Initialization Setup Parsing config files Parsing finished Running Begin loop End Loop

Installation

Netdef is implemented in Python and supports Python 3.5.3+.

Prerequisites

  • Debian:

    Python3 requirements can be installed by typing:

    $ sudo apt-get install python3 python3-pip python3-venv
    

    Requirements for building psutil:

    $ sudo apt-get install build-essential python3-dev
    

    Ensure you have installed python 3.5.3 or newer. You can check this by typing:

    $ python3 -V
    Python 3.5.3
    
  • Windows:

    Ensure you have installed Python 3.5.3 or newer. You can check this by opening command prompt and type:

    > py -3 -V
    Python 3.5.3
    

    If py.exe is not found then you have to download and install Python 3.5.3 or newer.

Create an Virtual environment

  • Linux:

    $ python3 -m venv venv
    
  • Windows:

    > py -3 -m venv venv
    

Activate the environment

  • Linux:

    $ source venv/bin/activate
    
  • Windows:

    > venv\Scripts\activate
    

Install Netdef

  • Linux:

    $ pip install netdef
    
  • Windows:

    > pip install netdef
    

Quickstart

Netdef will require a specific project structure:

    /First-App
        /setup.py
        /config
            /default.conf
        /first_app
            /Controllers
            /Engines
                /__init__.py
                /templates
                /webadmin
            /Interfaces
            /Rules
            /Sources
            /__init__.py
            /__main__.py
            /defaultconfig.py
            /main.py

Pre made project templates are available using make-project or cookiecutter

Make-project

First install make-project:

$ python3 -m pip install make

Create your first application:

$ python3 -m make project gl:fholmer/netdef-project/minimal-app

The rest of this documentation assumes that your application is called First-App

Setup your application

Create a virtual environment for your application:

$ cd First-App
$ python3 -m venv venv
$ source venv/bin/activate

Install dependencies:

$ pip install -r requirements-dev.txt
$ pip install -r requirements.txt

Link your application into the virtual environment site-packages:

$ pip install -e .

Create config and log folders for your app:

$ First-App --init .

Launch application

There are several ways to run your application.

You can use the the entrypoint:

$ First-App --run .

Or you can use the package module:

$ python -m first_app --run .

There is also a simple launcher script:

$ python launchApp.py

You don’t have to activate the virtual environment to run your application. You can run it directly by using absolute paths:

$ cd /
$ [insert-abs-path-to-proj]/venv/bin/First-App --run [insert-abs-path-to-proj]

Examples

Create a wheel package:

$ source venv/bin/activate
$ python setup.py bdist_wheel
$ deactivate

Deploy to /opt/first_app

$ mkdir -p /opt/first_app
$ python3 -m venv /opt/first_app
$ /opt/first_app/bin/pip install [path-to-first-app-wheel]
$ /opt/first_app/bin/First-App -i /opt/first_app

Confirm that the application is working:

$ /opt/first_app/bin/First-App -r /opt/first_app

Create a systemd service unit file:

$ sudo /opt/first_app/bin/First-App-Service -u $USER -i /opt/first_app

Confirm that the unit-file looks correct:

$ cat /etc/systemd/system/first_app.service
[Unit]
Description=First-App
After=syslog.target network-online.target

[Service]
Type=simple
User=TODO-INSERT-MY-USERNAME
Group=TODO-INSERT-MY-USERNAME
Environment=PYTHONUNBUFFERED=true

WorkingDirectory=/opt/first_app
ExecStart=/opt/first_app/bin/First-App -r /opt/first_app

StandardOutput=syslog
StandardError=syslog

[Install]
WantedBy=multi-user.target

Webadmin

Webadmin is a simple web interface to configure and debug your application. You can customize basic behavour in default.conf. It is recommended to add these options in its own file webadmin.conf and reference this file in default.conf

Here is a basic example:

default.conf
 [config]
 webadmin_conf = config/webadmin.conf
webadmin.conf
 [webadmin]
 host = 0.0.0.0
 port = 8000
 users.admin.user = admin
 users.admin.password =
 users.admin.password_hash = pbkdf2:sha256:150000$$N2b3ky8d$$51fbf24e48d498bd5543d60a86bd94927fd4d6eb123bf2d81a7401666eeea5c0
 users.admin.roles = admin
 secret_key = 1b50383ec6945aff8993f018feb568fa
 on = 1
 home_on = 1
 config_on = 1
 installationrepo_on = 1
 tools_on = 1
 settings_on = 1
 sources_on = 1
 expressions_on = 1
 statistics_on = 1
 security_webadmin_on = 1
 security_certificates_on = 1
 ssl_certificate =
 ssl_certificate_key =
 ssl_on = 0
Webadmin
Section Key Default Description
webadmin Config Default Description
webadmin host 0.0.0.0 Webserver host address
webadmin port 8000 Webserver tcp port
webadmin users.admin.user admin Username
webadmin users.admin.password   Plain text password. If password_hash is set then this option is ignored.
webadmin users.admin.password_hash   Password hash generated with python -m netdef -ga command
webadmin users.admin.roles admin name of user role.
webadmin secret_key   Secret flask session key. Can be generated with python -m netdef -ga
webadmin on 1

Enable Webadmin.

  • 0 – disabled.
  • 1 – enabled.
webadmin home_on 1 Enable Webadmin‣Home.
webadmin config_on 1 Enable Webadmin‣Config.
webadmin tools_on 1 Enable Webadmin‣Tools.
webadmin installationrepo_on 1 Enable Webadmin‣Tools‣Upgrade.
webadmin security_webadmin_on 1 or 0

Enable Webadmin‣Tools‣Webadmin.

[config]
webadmin_conf=config/webadmin.conf

The default value is 1 if webadmin_conf exists in [config]

webadmin security_certificates_on 1 Enable Webadmin‣Tools‣Certificates.
webadmin settings_on 1 Enable Webadmin‣Settings.
webadmin sources_on 1 Enable Webadmin‣Sources.
webadmin expressions_on 1 Enable Webadmin‣Expressions.
webadmin statistics_on 1 Enable Webadmin‣Statistics.
webadmin ssl_certificate   File path to ssl certificate. Required if ssl_on=1.
webadmin ssl_certificate_key   File path to ssl certificate key. Required if ssl_on=1.
webadmin ssl_on 0 Enable https.
webadmin_views [viewident] 0

[viewident] is the unique name of a MyBaseView

  • 0 – disabled.
  • 1 – enabled.
Example
[webadmin_views]
Home = 1
webadmin_views Home 1 Enable Home view.
webadmin_views FileModel 1 Enable FileModel view.
webadmin_views SettingsModel 1 Enable SettingsModel view.
webadmin_views SourcesModel 1 Enable SourcesModel view.
webadmin_views ExpressionsView 1 Enable ExpressionsView view.
webadmin_views StatisticsModel 1 Enable StatisticsModel view.
webadmin_views Tools 1 Enable Tools view.

Override root endpoint

A common use case is to integrate an existing flask app into the root endpoint (/) of the webserver. The example shows how this is done by retrieving the webadmin WSGI app and register a new endpoint at ‘/’

first_app/main.py:

# function that register my custom flask app
def init_app(app):
    @app.route('/')
    def hello_world():
        return 'Hello, World!'
    return app

def main():
    ...

    engine = ThreadedWebGuiEngine.ThreadedWebGuiEngine(shared)

    # init my custom flask app as soon as the webgui engine is initialized.
    init_app(engine.get_flask_app())

    engine.add_controller_classes(controllers)
    engine.add_source_classes(sources)
    engine.add_rule_classes(rules)
    engine.load([__package__, 'netdef'])
    engine.init()
    engine.start()
    engine.block() # until ctrl-c or SIG_TERM
    engine.stop()
    ...

Override Webadmin‣Home

Copy the default html template.

netdef/Engines/templates/home.html:

{% extends 'home/home.html' %}

{% block home %}

        <p>Application version: {{version}}</p>

{% endblock home %}

Paste it into your application with extended information:

first_app/Engines/templates/home.html:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{% extends 'home/home.html' %}

{% block home %}

        <p>{{app_name}} version: {{app_version}}</p>
        <p>netdef version: {{netdef_version}}</p>
        <p>Python version: {{py_version}}</p>
        <p>Platform version: {{sys_version}}</p>

{% endblock home %}

Now you only have to override the Home View by creating following file:

first_app/Engines/webadmin/Home.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import sys
import datetime
import platform
from flask import current_app
from flask_admin import expose

from netdef.Engines.webadmin import Views, Home

from netdef import __version__ as netdef_version
from ... import __version__ as app_version
from ... import __package__ as app_name

@Views.register("Home")
def setup(admin):
    Home.setup(admin, MyNewHome(name='Home', endpoint='home'))

class MyNewHome(Home.Home):
    @expose("/")
    def index(self):
        return self.render(
            'home.html',
            app_name=app_name,
            app_version=app_version,
            netdef_version=netdef_version,
            py_version=sys.version,
            sys_version=str(platform.version())
        )
  • At line 13 we replace the default Webadmin‣Home with your own
  • At line 17 we override the default Home class with our extended functionality

Override Webadmin‣Tools

Copy the default html template.

netdef/Engines/templates/tools.html:

{% extends 'tools/tools.html' %}

Paste it into your application with extended information:

first_app/Engines/templates/tools.html:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{% extends 'tools/tools.html' %}
{% block system_panel %}
        <div class="panel panel-default">
            <div class="panel-heading">System</div>
            <div class="panel-body">
                <p>Uptime: {{sys_uptime}}</p>
                <div class="container">
                    <div class="row">
                        <a href="./cmd_dir/" class="btn btn-default col-md-2" role="button">
                            <span class="glyphicon glyphicon-list" aria-hidden="true"></span>
                            dir
                        </a>
                    </div>
                </div>
            </div>
        </div>
{% endblock system_panel %}

Now you only have to override the Tools View by creating following file:

first_app/Engines/webadmin/Tools.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from flask import stream_with_context, Response
from flask_admin import expose
from netdef.Engines.webadmin import Views, Tools

@Views.register("Tools")
def setup(admin):
    Tools.setup(admin, MoreTools(name='Tools', endpoint='tools'))

class MoreTools(Tools.Tools):
    @expose("/cmd_dir/")
    def hg_log(self):
        return Response(
            stream_with_context(
                Tools.stdout_from_terminal_as_generator(
                    "dir",
                    pre="Command:\n\n   hg log -r .:\n\nResult:\n\n",
                    post=""
                )
            )
        )
  • At line 5 we replace the default Webadmin‣Tools with your own
  • At line 9 we override the default Tools class with our extended functionality

Configuration

Config files is parsed at startup using the configparser module. Multiple strings and files is read in following order:

  • (str) mypackage.defaultconfig:default_config_string
  • (file) config/default.conf
  • (file) config/default. [osname] .conf where osname is nt on windows and posix on linux.
  • (files) all files found in [config] section in default.conf
  • (file) config/default.conf.lock

Extended interpolation

Extended interpolation is using ${section:option} to denote a value from a foreign section. Example:

default.conf
[OPCUAClientController]
endpoint = opc.tcp://${client:host}:${client:port}/freeopcua/server/
user = ${client:user}
password = ${client:password}

[OPCUAServerController]
endpoint = opc.tcp://${server:host}:${server:port}/freeopcua/server/
user = ${server:user}
password = ${server:password}

[client]
host = 10.10.1.13
port = 4841
user = CommonUser
password = 7T-SECRET_PASS-PhsTh7yVpV9jKTShAXcOdL8KmO4m3MUY3EPu7

[server]
host = 0.0.0.0
port = 4841
user = ${client:user}
password = ${client:password}

By using extended interpolation in combination with [config] section you can move application secrets into its own config file:

default.conf
 [config]
 secrets_conf = config/secrets.conf

 [OPCUAClientController]
 endpoint = opc.tcp://${client:host}:${client:port}/freeopcua/server/
 user = ${client:user}
 password = ${client:password}

 [OPCUAServerController]
 endpoint = opc.tcp://${server:host}:${server:port}/freeopcua/server/
 user = ${server:user}
 password = ${server:password}
secrets.conf
 [client]
 host = 10.10.1.13
 port = 4841
 user = CommonUser
 password = 7T-SECRET_PASS-PhsTh7yVpV9jKTShAXcOdL8KmO4m3MUY3EPu7

 [server]
 host = 0.0.0.0
 port = 4841
 user = ${client:user}
 password = ${client:password}

Default configs

General configs
Section Key Default Description
general identifier [appident]

Name of application.

  • [appident] – is the unique name of your application. The name have to match in order for your application to accept the config file.
general version 1 Version of your configfile. If you have to break compatibility in the future you can bump the config version to reject outdated config files
config [unique key] [filename]

Name of a configfile to be parsed.

  • [unique key] – is just a unique key
  • [filepath] – is the actual filename. File path relative to project folder.
Example
[config]
my_conf = config/my_configuration.conf
more_things = config/more_configs.conf
logging logglevel 20

Default logging level for the application

  • 1 – All
  • 10 – Debug
  • 20 – Info
  • 30 – Warning
  • 40 – Error
  • 50 – Critical
logging loggformat %(asctime)-15s %(levelname)-9s: %(name)-11s: %(message)s Logging format for the application
logging loggdatefmt %Y-%m-%d %H:%M:%S Date time format
logging to_console 1
  • 0 – Suppress output to stdout
  • 1 – Write output to stdout
logging to_file 0
  • 0 – Disable logfile
  • 1 – Write output to logfile
logging loggfile log/application.log Path to logfile is relative to project folder.
logginglevels [module name] 20
  • [module name] is the name of a python module that is using the logging module

Values:

  • 1 – All
  • 10 – Debug
  • 20 – Info
  • 30 – Warning
  • 40 – Error
  • 50 – Critical
Example
[logginglevels]
werkzeug = 40
InternalController = 10
logginglevels werkzeug 40 Logging level of werkzeug module is set to warning
queues maxsize 0

Default queue size for all shared queues

  • 0 – No limit
  • >0 – Size limit
rules [unique key] 0

[unique key] is the unique name of a BaseRule

  • 0 – disabled
  • 1 – enabled
Example
 [rules]
 CSVRule = 1
controllers [unique key] 0

[unique key] is the unique name of a BaseController

  • 0 – disabled
  • 1 – enabled
Example
[controllers]
InternalController = 1
sources [unique key] 0

[unique key] is the unique name of a BaseSource

  • 0 – disabled
  • 1 – enabled
Example
[sources]
IntegerSource = 1
Aliases
Section Key Default Description
controller_aliases [unique key] [controllername]

Create multiple controller instances of same class

Example
[controllers]
CommTestController = 1

[controller_aliases]
FastPingController=CommTestController
SlowPingController=CommTestController
source_aliases [unique key] [sourcename]

Create multiple sources based on an existing source

Example
[sources]
IntegerSource = 1

[source_aliases]
IntStatusSource = IntegerSource
IntCommandSource = IntegerSource
Thread pool configs
Section Key Default Description
ExpressionExecutor max_workers [cpu_count * 10] Number of thread pool workers to be available in netdef.Engines.ThreadedEngine
Webadmin
Section Key Default Description
webadmin Config Default Description
webadmin host 0.0.0.0 Webserver host address
webadmin port 8000 Webserver tcp port
webadmin users.admin.user admin Username
webadmin users.admin.password   Plain text password. If password_hash is set then this option is ignored.
webadmin users.admin.password_hash   Password hash generated with python -m netdef -ga command
webadmin users.admin.roles admin name of user role.
webadmin secret_key   Secret flask session key. Can be generated with python -m netdef -ga
webadmin on 1

Enable Webadmin.

  • 0 – disabled.
  • 1 – enabled.
webadmin home_on 1 Enable Webadmin‣Home.
webadmin config_on 1 Enable Webadmin‣Config.
webadmin tools_on 1 Enable Webadmin‣Tools.
webadmin installationrepo_on 1 Enable Webadmin‣Tools‣Upgrade.
webadmin security_webadmin_on 1 or 0

Enable Webadmin‣Tools‣Webadmin.

[config]
webadmin_conf=config/webadmin.conf

The default value is 1 if webadmin_conf exists in [config]

webadmin security_certificates_on 1 Enable Webadmin‣Tools‣Certificates.
webadmin settings_on 1 Enable Webadmin‣Settings.
webadmin sources_on 1 Enable Webadmin‣Sources.
webadmin expressions_on 1 Enable Webadmin‣Expressions.
webadmin statistics_on 1 Enable Webadmin‣Statistics.
webadmin ssl_certificate   File path to ssl certificate. Required if ssl_on=1.
webadmin ssl_certificate_key   File path to ssl certificate key. Required if ssl_on=1.
webadmin ssl_on 0 Enable https.
webadmin_views [viewident] 0

[viewident] is the unique name of a MyBaseView

  • 0 – disabled.
  • 1 – enabled.
Example
[webadmin_views]
Home = 1
webadmin_views Home 1 Enable Home view.
webadmin_views FileModel 1 Enable FileModel view.
webadmin_views SettingsModel 1 Enable SettingsModel view.
webadmin_views SourcesModel 1 Enable SourcesModel view.
webadmin_views ExpressionsView 1 Enable ExpressionsView view.
webadmin_views StatisticsModel 1 Enable StatisticsModel view.
webadmin_views Tools 1 Enable Tools view.
Upgrade application
Section Key Default Description
auto_update on 0  
auto_update no_index 0  
auto_update pre_release 0  
auto_update force_reinstall 0  
auto_update find_links    
auto_update trusted_host  

auto_update minimal_timeout 0  
auto_update package [appident]  

Built-in Controllers and Rules

You can look opp the correct Built-in configs in API Reference

Advanced

Project layout

create a Project folder:

$ mkdir First-App
$ cd First-App

$ mkdir config
$ mkdir log
$ mkdir first_app
  • First-App, The Project name.
  • config, applications default configfiles
  • log, application.log is created in this folder
  • first_app, the python package with your applications files
    /First-App
        /setup.py
        /config
            /default.conf
        /first_app
            /Controllers
            /Engines
                /__init__.py
                /templates
                /webadmin
            /Interfaces
            /Rules
            /Sources
            /__init__.py
            /__main__.py
            /defaultconfig.py
            /main.py

setup.py:

  • package_data, make sure to include html templates
  • entry_points, the entry point will make it easy to launch application
from setuptools import setup, find_packages
from first_app import __version__ as app_version

NAME = "First-App"
MAIN_PACKAGE = "first_app"

setup(
    name=NAME,
    version=app_version,
    packages=find_packages(exclude=['contrib', 'docs', 'tests', 'config']),    
    install_requires=[
        'netdef'
    ],
    package_data={
        MAIN_PACKAGE: [
            'Engines/templates/*.html',
            'Engines/templates/*/*.html'
        ]
    },    
    entry_points={
        'console_scripts': [
            '{NAME}={MAIN_PACKAGE}.__main__:cli'.format(NAME=NAME, MAIN_PACKAGE=MAIN_PACKAGE),
        ],
    },
)

first_app/__init__.py:

__version__ = '0.1.0'

first_app/__main__.py:

from netdef.__main__ import entrypoint

def run_app():
    from . import main

def get_template_config():
    from . import defaultconfig
    return defaultconfig.template_config_string

def cli():
    # entrypoint: console_scripts
    entrypoint(run_app, get_template_config)

if __name__ == '__main__':
    # entrypoint: python -m console_scripts 
    entrypoint(run_app, get_template_config)

first_app/defaultconfig.py:

template_config_string = \
"""[general]
identifier = First-App
version = 1
"""

default_config_string = \
"""[general]
[config]
[ExpressionExecutor]
[webadmin]
host = 0.0.0.0
port = 8000
user = admin
password = admin

[webadmin_views]

[logging]
logglevel = 20
loggformat = %(asctime)-15s %(levelname)-9s: %(name)-11s: %(message)s
loggdatefmt = %Y-%m-%d %H:%M:%S
to_console = 1
to_file = 1

[logginglevels]
werkzeug = 40

[rules]

[controllers]

[controller_aliases]

[sources]

[source_aliases]
"""

first_app/main.py:

import os
from netdef.Controllers import Controllers
from netdef.Sources import Sources
from netdef.Rules import Rules
from netdef.Engines import ThreadedWebGuiEngine
from netdef.Shared import Shared
from netdef.utils import setup_logging, handle_restart
from . import defaultconfig

def main():
    # init shared-module
    try:
        install_path = os.path.dirname(__file__)
        proj_path = os.getcwd()
        config_string = defaultconfig.default_config_string
        shared = Shared.Shared("First-App", install_path, proj_path, config_string)
    except ValueError as error:
        print(error)
        raise SystemExit(1)

    # configure logging
    setup_logging(shared.config)

    controllers = Controllers.Controllers(shared)
    controllers.load([__package__, 'netdef'])

    sources = Sources.Sources(shared)
    sources.load([__package__, 'netdef'])

    rules = Rules.Rules(shared)
    rules.load([__package__, 'netdef'])

    # the engine connects webadmin, controllers, sources and rules.
    engine = ThreadedWebGuiEngine.ThreadedWebGuiEngine(shared)
    engine.add_controller_classes(controllers)
    engine.add_source_classes(sources)
    engine.add_rule_classes(rules)
    engine.load([__package__, 'netdef'])

    engine.init()
    engine.start()
    engine.block() # until ctrl-c or SIG_TERM
    engine.stop()

    # if restart-button in webadmin is pressed:
    handle_restart(shared, engine)

main()

config/default.conf:

[general]
identifier = First-App
version = 1

Add a controller

Built-in controllers can be activated by adding special values to the config file.

You can look opp the correct Built-in configs in API Reference

In this tutorial we will activate the CrontabController and the OPCUAServerController

We will have to merge the two configs into one and add them to config/default.conf

[controllers]
CrontabController = 1
OPCUAServerController = 1

[sources]
CrontabSource = 1
VariantSource = 1

[CrontabSource]
controller = CrontabController

[VariantSource]
controller = OPCUAServerController

[CrontabController]
[OPCUAServerController]

We also have to merge required packages into requirements.txt:

crontab
freeopcua

Next step is to start using the controllers and sources by setting up a Rule.

Add a rule

Built-in rules can be activated by adding special values to the config file, just like the controllers. There is currently only one built-in rule we can use.

Add the config for CSVRule to config/default.conf and replace the example rules with a hello_world rule like this:

[rules]
CSVRule = 1

[CSVRule]
hello_world_rule = 1

[hello_world_rule]
csv = config/hello_world_rule.csv
py = config/hello_world_rule.py

We now have to create the csv and py file:

config/hello_world_rule.csv

CrontabSource,VariantSource
*/2 * * * * * *,ns=2;s=hello_world

config/hello_world_rule.py

def expression(cron, oua):
    if cron.new:
        oua.set = "Hello, world"

    if cron.update:
        oua.set = "Hello, world {}".format(int(cron.value))

Now you can try to launch the application:

$ pip install -r requirements.txt
$ python -m first_app -r .

Create a custom controller

Copy the included template to create a custom controller.

netdef/Controllers/NewControllerTemplate.py:

import datetime
import logging

from netdef.Controllers import BaseController, Controllers
from netdef.Sources.BaseSource import StatusCode

# import my supported sources
from netdef.Sources.NewSourceTemplate import NewSourceTemplate


@Controllers.register("NewControllerTemplate")
class NewControllerTemplate(BaseController.BaseController):
    def __init__(self, name, shared):
        super().__init__(name, shared)
        self.logger = logging.getLogger(self.name)
        self.logger.info("init")
        self.one_config_entry = self.shared.config.config(
            self.name, "one_config_entry", "default_value"
        )

    def run(self):
        "Main loop. Will exit when receiving interrupt signal"
        self.logger.info("Running")
        while not self.has_interrupt():
            self.loop_incoming()  # dispatch handle_* functions
            self.loop_outgoing()  # dispatch poll_* functions
        self.logger.info("Stopped")

    def handle_readall(self, incoming):
        raise NotImplementedError

    def handle_add_source(self, incoming):
        self.logger.debug("'Add source' event for %s", incoming.key)
        self.add_source(incoming.key, incoming)

    def handle_read_source(self, incoming):
        raise NotImplementedError

    def handle_write_source(self, incoming, value, source_time):
        self.logger.debug(
            "'Write source' event to %s. value: %s at: %s",
            incoming.key,
            value,
            source_time,
        )

    def poll_outgoing_item(self, item):
        if isinstance(item, NewSourceTemplate):  # My
            # TODO: get new value somehow
            address = item.unpack_address()
            new_val = get_the_new_value_somehow(address)
            stime = datetime.datetime.utcnow()
            status_ok = True  # Why not
            cmp_oldew = False  # compare old and new value?

            if self.update_source_instance_value(
                item, new_val, stime, status_ok, cmp_oldew
            ):
                self.send_outgoing(item)

Paste it into your application with a new name:

first_app/Controllers/CmdController.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import logging
import datetime
from netdef.Controllers import BaseController, Controllers
from netdef.Sources.BaseSource import StatusCode

# import my supported sources
from netdef.Sources.NewSourceTemplate import NewSourceTemplate

@Controllers.register("CmdController")
class CmdController(BaseController.BaseController):
    def __init__(self, name, shared):
        super().__init__(name, shared)

...

Line 9 and 10 is changed to the same name as the file. Line 7 have to be replaced at a later time to a custom or built-in source

To activate the controller we have to merge following config to default.conf:

[controllers]
CmdController = 1

[CmdController]

Result after merge:

[controllers]
CrontabController = 1
OPCUAServerController = 1
CmdController = 1

[CrontabController]

[OPCUAServerController]

[CmdController]

Create a custom source

Copy the included template to create a custom source for your controller.

netdef/Sources/NewSourceTemplate.py:

from netdef.Interfaces.DefaultInterface import DefaultInterface
from netdef.Sources import BaseSource, Sources


@Sources.register("NewSourceTemplate")
class NewSourceTemplate(BaseSource.BaseSource):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.interface = DefaultInterface

    # TODO: add a address for your new controller
    def unpack_address(self):
        return self.key

Paste it into your application with a new name:

first_app/Sources/CmdSource.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from netdef.Sources import BaseSource, Sources
from netdef.Interfaces.DefaultInterface import DefaultInterface

@Sources.register("CmdSource")
class CmdSource(BaseSource.BaseSource):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.interface = DefaultInterface

    # TODO: add a address for your new controller
    def unpack_address(self):
        return self.key

Line 4 and 5 is changed to the same name as the file.

Change line 7 in your custom controller:

first_app/Controllers/CmdController.py:

1
2
3
4
5
6
7
8
import logging
import datetime
from netdef.Controllers import BaseController, Controllers
from netdef.Sources.BaseSource import StatusCode

# import my new source
from ..Sources.CmdSource import CmdSource
...

To activate the source we have to merge following config to default.conf:

[sources]
CmdSource = 1

[CmdSource]
controller = CmdController

Result:

[controllers]
CrontabController = 1
OPCUAServerController = 1
CmdController = 1

[sources]
CrontabSource = 1
VariantSource = 1
CmdSource = 1

[CrontabSource]
controller = CrontabController

[VariantSource]
controller = OPCUAServerController

[CmdSource]
controller = CmdController

[CrontabController]

[OPCUAServerController]

[CmdController]

Create a custom rule

Copy the included template to create a custom rule.

netdef/Rules/NewRuleTemplate.py:

import logging
import pathlib

from netdef.Rules import BaseRule, Rules
from netdef.Rules.utils import import_file

SourceInfo = BaseRule.SourceInfo
ExpressionInfo = BaseRule.ExpressionInfo


@Rules.register("NewTemplateRule")
class NewTemplateRule(BaseRule.BaseRule):
    def __init__(self, name, shared):
        super().__init__(name, shared)
        self.logger = logging.getLogger(name)
        self.logger.info("init")

        config = self.shared.config.config
        self.proj_path = pathlib.Path(config("proj", "path", ".")).absolute()

    def setup(self):
        self.logger.info("Running setup")

        # example:
        self.setup_example()

        # sub rule example:
        for name, active in self.shared.config.get_dict(self.name).items():
            if int(active):
                self.setup_sub_rule(name)
        self.logger.info("Done parsing")

    def setup_sub_rule(self, name):
        raise NotImplementedError

    def setup_example(self):
        # example_expression_module = self.import_py_file("config/example_expression.py")

        # config/example_expresion.py:
        # def expression(internal):
        #     if internal.new or internal.update:
        #         print(internal)

        self.add_new_parser("InternalSource")

        source_count = self.add_new_expression(
            ExpressionInfo(
                example_expression_module,
                [SourceInfo("InternalSource", "intern_test_1")],
            )
        )
        self.update_statistics(self.name + ".example", 0, 1, source_count)

    def import_py_file(self, rel_file):
        full_file = pathlib.Path(self.proj_path).joinpath(rel_file)
        nice_name = full_file.name
        return import_file(str(full_file), self.name, nice_name)

    def run(self):
        self.logger.info("Running")
        while not self.has_interrupt():
            self.loop_incoming()  #  dispatch handle_* functions
        self.logger.info("Stopped")

    def handle_run_expression(self, incoming, value, source_time, status_code):
        expressions = self.get_expressions(incoming)
        self.logger.debug(
            "Received %s. Found expressions %s", incoming.key, len(expressions)
        )
        if expressions:
            self.send_expressions_to_engine(
                incoming, expressions, value, source_time, status_code
            )

Paste it into your application with a new name:

first_app/Rules/FirstAppRule.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import logging
import pathlib
from .utils import import_file
from . import BaseRule, Rules

SourceInfo = BaseRule.SourceInfo
ExpressionInfo = BaseRule.ExpressionInfo

@Rules.register("FirstAppRule")
class FirstAppRule(BaseRule.BaseRule):
    def __init__(self, name, shared):
        super().__init__(name, shared)
        self.logger = logging.getLogger(name)
        self.logger.info("init")

Line 9 and 10 is changed to the same name as the file.

To activate the rule we have to merge following config to default.conf:

[rules]
FirstAppRule = 1

[FirstAppRule]

Result:

[rules]
FirstAppRule = 1

[FirstAppRule]

[controllers]
CrontabController = 1
OPCUAServerController = 1
CmdController = 1

[sources]
CrontabSource = 1
VariantSource = 1
CmdSource = 1

[CrontabSource]
controller = CrontabController

[VariantSource]
controller = OPCUAServerController

[CmdSource]
controller = CmdController

[CrontabController]

[OPCUAServerController]

[CmdController]

Putting it all together

In this example we want to pass following commands to the subprocess module:

  • echo hello
  • ls -lah .
  • ./simple_script.sh
  • echo Don’t break the

We could hard code these commands in the controller but it is more flexible to create a source for each command. And we also want to read these commands from a config file so it will be easy to reuse, change or extend the commands.

To achieve this we just implement a method in the source that returns the command. the command can be extracted from the sources key:

first_app/Sources/CmdSource.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from netdef.Sources import BaseSource, Sources
from netdef.Interfaces.DefaultInterface import DefaultInterface

@Sources.register("CmdSource")
class CmdSource(BaseSource.BaseSource):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.interface = DefaultInterface

    def get_command_and_args(self, args=None):
        if args:
            return self.key + " " + args
        else:
            return self.key

The controller can retrieve the command to run by calling get_command_and_args

first_app/Controllers/CmdController.py:

1
2
3
4
5
6
7
8
9
import logging
import datetime
import subprocess
import shlex

from netdef.Controllers import BaseController, Controllers
from netdef.Sources.BaseSource import StatusCode

from ..Sources.CmdSource import CmdSource

We will use subprocess and shlex from standard library to execute commands. To keep it simple we can create a wrapper function that run a command and return the result from stdout. In case of error the function return the error as text instead. Also, charset decoding errors is replaced with “?”.

10
11
12
13
14
15
16
17
18
19
def stdout_from_terminal(command_as_str, err_msg=None):
    command_args = shlex.split(command_as_str)
    try:
        res = subprocess.run(command_args, stdout=subprocess.PIPE).stdout
        return str(res, errors="replace")
    except Exception as error:
        if err_msg is None:
            return str(error)
        else:
            return err_msg

We create an option value_as_args to use the value from the source to be added as an argument to the command. the option is read from config file.

20
21
22
23
24
25
26
@Controllers.register("CmdController")
class CmdController(BaseController.BaseController):
    def __init__(self, name, shared):
        super().__init__(name, shared)
        self.logger = logging.getLogger(self.name)
        self.logger.info("init")
        self.value_as_args = self.shared.config.config(self.name, "value_as_args", 1)

The run method will be very simple in this tutorial. Normally this is where we create a polling loop or setup subscriptions and await events. In this example we only wait for WRITE_SOURCE messages. So we only have to iterate the message queue:

27
28
29
30
31
32
def run(self):
    "Main loop. Will exit when receiving interrupt signal"
    self.logger.info("Running")
    while not self.has_interrupt():
        self.loop_incoming() # dispatch handle_* functions
    self.logger.info("Stopped")

The rule will always send the source instance at startup as a ADD_SOURCE message. we have to receive the message and keep it in our controller. We can use netdef.Controllers.BaseController.BaseController.add_source

33
34
35
def handle_add_source(self, incoming):
    self.logger.debug("'Add source' event for %s", incoming.key)
    self.add_source(incoming.key, incoming)

When an expression changes the value on one of our sources we will receive a WRITE_SOURCE message. We have to verify that the received source is in our source list and that we know how to handle it.

To check if it is one of ours we use netdef.Controllers.BaseController.BaseController.has_source

To check if we know how to handle it we check if it is an instance of the source we created CmdSource.

36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def handle_write_source(self, incoming, value, source_time):
    self.logger.debug("'Write source' event to %s. value: %s at: %s", incoming.key, value, source_time)
    if not self.has_source(incoming.key):
        self.logger.error(
            "%s not found",
            incoming.key
            )
        return

    if not isinstance(incoming, CmdSource):
        self.logger.error(
            "Got write event for %s, but only CmdSource is supported",
            type(incoming)
            )
        return

We have verified that the source is an instance of CmdSource. Knowing this we can safely call CmdSource.get_command_and_args to get the command.

51
52
53
54
55
56
57
58
59
    if self.value_as_args:
        cmd_as_str = incoming.get_command_and_args(value)
    else:
        cmd_as_str = incoming.get_command_and_args()

    new_val = stdout_from_terminal(cmd_as_str)
    stime = datetime.datetime.utcnow()
    status_ok = True # Why not
    cmp_oldew = False # compare old and new value?

At last we create and send a RUN_EXPRESSION message using netdef.Controllers.BaseController.BaseController.update_source_instance_value and netdef.Controllers.BaseController.BaseController.send_outgoing

60
61
    if self.update_source_instance_value(incoming, new_val, stime, status_ok, cmp_oldew):
        self.send_outgoing(incoming)

We now have to create the configfile and expression that is parsed by rule. The command list can be a simple text file:

config/command_rule.txt:

1
2
3
4
echo hello
ls -lah .
./simple_script.sh
echo Don\'t break the

The expression is a python file. The rule expect to find a function called expression()

config/command_rule.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import logging
logger = logging.getLogger(__name__ + ":expression")

def expression(intern, cmd):
    # triggers at startup
    if intern.new:

        if "hello" in cmd.key:
            arg = "world"
        elif "Don\\'t break the" in cmd.key:
            arg = "circle"
        else:
            arg = ""

        logger.info("{}: Send command arg: {}".format(cmd.key, arg))
        cmd.set = arg

    if cmd.new or cmd.update:
        logger.info("{}: Result: {}".format(cmd.key, cmd.value))

Now we are ready to create the rule

first_app/Rules/FirstAppRule.py:

1
2
3
4
5
6
7
import logging
import pathlib
from netdef.Rules.utils import import_file
from netdef.Rules import BaseRule, Rules

SourceInfo = BaseRule.SourceInfo
ExpressionInfo = BaseRule.ExpressionInfo

We will look for the config file and expression file relative to the project folder.

 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Rules.register("FirstAppRule")
class FirstAppRule(BaseRule.BaseRule):
    def __init__(self, name, shared):
        super().__init__(name, shared)
        self.logger = logging.getLogger(name)
        self.logger.info("init")
        self.proj_path = shared.config.config("proj", "path")

    def read_list(self, rel_file):
        full_file = pathlib.Path(self.proj_path).joinpath(rel_file)
        lines = open(str(full_file), "r").readlines()
        return [l.strip() for l in lines]

    def import_py_file(self, rel_file):
        full_file = pathlib.Path(self.proj_path).joinpath(rel_file)
        nice_name = full_file.name
        return import_file(str(full_file), self.name, nice_name)

TODO

25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
    def setup(self):
        self.logger.info("Running setup")
        self.setup_commands()
        self.logger.info("Done parsing")

    def setup_commands(self):
        command_expression_module = self.import_py_file("config/command_rule.py")
        command_list = self.read_list("config/command_rule.txt")

        source_count = 0
        for command in command_list:
            source_count += self.add_new_expression(
                ExpressionInfo(
                    command_expression_module,
                    [
                        SourceInfo("InternalSource", "generic"),
                        SourceInfo("CmdSource", command)
                    ]
                )
            )
        self.update_statistics(self.name + ".commands", 0, 1, source_count)

TODO

46
47
48
49
50
    def run(self):
        self.logger.info("Running")
        while not self.has_interrupt():
            self.loop_incoming() #  dispatch handle_* functions
        self.logger.info("Stopped")

TODO

51
52
53
54
55
    def handle_run_expression(self, incoming, value, source_time, status_code):
        expressions = self.get_expressions(incoming)
        self.logger.debug("Received %s. Found expressions %s",incoming.key, len(expressions))
        if expressions:
            self.send_expressions_to_engine(incoming, expressions, value, source_time, status_code)

TODO

config/default.ini

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[rules]
FirstAppRule = 1

[FirstAppRule]

[sources]
CmdSource = 1
InternalSource = 1

[CmdSource]
controller = CmdController

[InternalSource]
controller = InternalController

[controllers]
CmdController = 1
InternalController = 1

[InternalController]
send_init_event = 1

[CmdController]
value_as_args = 1

TODO

tests/test_command_rule.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from netdef.testutils import MockExpression
from netdef.Sources.InternalSource import InternalSource
from first_app.Sources.CmdSource import CmdSource

def test_hello():
    mock = MockExpression(
        module="config/command_rule.py",
        intern=InternalSource("generic"),
        cmd=CmdSource("echo hello")
    )
    mock.intern.update_value(None, stat_init=True)
    mock.cmd.assert_called_once_with("world")
    mock.intern.assert_not_called()


def test_circle():
    mock = MockExpression(
        module="config/command_rule.py",
        intern=InternalSource("generic"),
        cmd=CmdSource("echo Don\\'t break the")
    )
    mock.intern.update_value(None, stat_init=True)
    mock.cmd.assert_called_once_with("circle")
    mock.intern.assert_not_called()


def test_ls():
    mock = MockExpression(
        module="config/command_rule.py",
        intern=InternalSource("generic"),
        cmd=CmdSource("ls -lah .")
    )
    mock.intern.update_value(None, stat_init=True)
    mock.cmd.assert_called_once_with("")
    mock.intern.assert_not_called()

TODO

Credits

Contributors

Changelog

1.0.7

2021-07-20

Enhancements

  • SecurityCertificatesView: Added OpcUa certs
  • Controllers: call setup-function in module if found at startup
  • Rules: call setup-function in module if found at startup
  • Sources: call setup-function in module if found at startup
  • BaseSource: call setup-function in source instance if found at startup
  • Added testutils.MockShared
  • OPCUAServerController: insert SourceTimestamp and ServerTimestamp if missing
  • OPCUAServerController: Added option for debug_statistics
  • Added: SubprocessController
  • Webadmin: Added a simple web interface for tracemalloc and gc

Bug fixes

  • Values from controller is frozen before RUN_EXPRESSION is sent.
  • CommTestSource: remove url_path from host
  • Rules.utils: relative import of filenames did not work properly

Incompatible API changes

  • handle_run_expression function in BaseRule now have 4 required arguments. All custom rules have to be updated.

1.0.6

2020-02-28

Enhancements

  • Webadmin: Added role based user table
  • Added testutils.MockExpression to simplify testing of expressions
  • Added: InfluxDBLogger
  • Improved ModbusClientController
  • BaseController: Improved message queue helper function
  • InternalController: Improved persistent storage
  • OPCUAServerController: added config for auto_build_folders
  • ModbusServerController: new option: daemon_threads

Bug fixes

  • ModbusServerController: Attempt to bind to socket for three minutes before throwing exception.

Incompatible API changes

  • MQTTDataMessageController: renamed from MQTTDataAccessController
  • Webadmin: [webadmin]user/password keyword has changed.
  • InternalController: changed persistent storage filenames

1.0.5

2019-11-07

Enhancements

  • OPCUAClientController: Improved configuration
  • OPCUAServerController: Added legacy support for basic128rsa15 and basic256.
  • Webadmin: Added SecurityWebadminView and SecurityCertificatesView
  • ModbusServerController: Attempt to bind to socket for one minute before throwing exception. (Handle CLOSE_WAIT state)

Bug fixes

  • Webadmin: Changed height of file edit textarea to 20 rows
  • Webadmin: Fixed routing.BuildError when you don’t have the permission to access the requested resource.
  • CSVRule: expression can now be a modulename or a python-file
  • Fixed Windows service.
  • OPCUAServerController: Fixed TypeError
  • OPCUAServerController: Only add subscription if exists

Incompatible API changes

  • InternalController: changed persistent storage filenames

1.0.4

2019-08-19

Enhancements

  • ModbusServerController: get modbus framer by calling self.get_framer
  • Added FloatInterface and StringInterface
  • Display a 10 second restart timer in webadmin on restart
  • InternalController: config entry send_init_event trigger event at startup
  • Added an experimental yaml parser
  • Added an experimental ini parser
  • Source value can be changed from webadmin –> Sources –> Edit
  • Added create_interface function to expression arguments
  • Added persistent storage to InternalController
  • Added new message type APP_STATE
  • Added Alpha version of ConcurrentWebRequestController
  • Added simple installer for Systemd services

Bug fixes

  • OPCUAServerController: Fixed a varianttype bug
  • Fixed pyinstaller hook file
  • BaseRule is rewritten to store expression info in shared module. This fixes a problem with multiple rules sharing same sources.
  • Fixed a problem where the name of a controller or rule and module name had to be equal.
  • OPCUAClientController: specify security mode in configfile
  • OPCUAServerController: reject X509IdentityToken
  • OPCUAServerController: force timestamp on values (from clients) where timestamp is none

Incompatible API changes

  • OPCUAServerController: startup statuscode changed from BadNoData to BadWaitingForInitialData
  • BaseRule: rule_name_from_key no longer accept * as a rule name
  • BaseController: fetch_one_incoming returns tuple

1.0.3

2019-06-16

Enhancements

  • SystemMonitorController: monitor disk partition usage
  • Display update options in webadmin –> Tools –> Upgrade
  • BaseRule: call setup-function in expressions if found at startup
  • Added docs
  • OPCUAServerController: OPCUA controller will set statuscode BadNoData on startup.
  • Added BaseAsyncController
  • Webadmin: / redirects to admin page. /admin redirects to /admin/home.
  • Allow for existing flask apps to be integrated in Webadmin

Bug fixes

  • Added requirements and missing interface
  • Added extendable blocks in html templates
  • Tools.setup got a view argument

Incompatible API changes

  • Expression: interface attribute have been removed from expressions arguments

1.0.2

2019-05-25

Enhancements

  • Added support for Windows services. require pywin32 package on windows

1.0.1

2019-05-17

Enhancements

  • Added CrontabController
  • Added MQTTDataAccessController
  • Added RESTJsonController
  • Added SystemMonitorController
  • Added simple user/pass to OPCUAServerController

Bug fixes

  • Fixed netdef entrypoint

1.0.0

2019-04-30

  • First public release

Build from source

Python

Normally you don’t have to compile python. On Windows you can download pre-compiled binaries, and most linux distros have a pre-installed version of python.

Compiling to a relative directory:

mkdir ~/Python-3.8/
$ cd ~/Python-3.8/
$ wget https://www.python.org/ftp/python/3.8.1/Python-3.8.1.tgz
$ tar zxvf Python-3.8.1.tgz
$ Python-3.8.1/configure
make
$ make install DESTDIR=.

Or absolute directory:

mkdir /opt/Python-3.8/
$ cd /opt/Python-3.8/
$ wget https://www.python.org/ftp/python/3.8.1/Python-3.8.1.tgz
$ tar zxvf Python-3.8.1.tgz
$ Python-3.8.1/configure --prefix=/opt/Python-3.8
make
$ make install

psutil

Normally you don’t have to compile yourself. pip install should compile automatically. If automatic compilation fails you can try to specify include dirs and library dirs:

$ pip install --global-option=build_ext \
  --global-option="-I~/Python-3.8/usr/local/include/python3.8" \
  --global-option="-L~/Python-3.8/usr/local/lib" \
  psutil

Netdef package

Debian

Install requirements:

# python 3.5 +
$ sudo apt-get install python3 python3-pip python3-venv

# source control management
$ sudo apt-get mercurial

# requirements for building psutil
$ sudo apt-get install build-essential python3-dev

Get sources:

$ hg clone git+ssh://git@gitlab.com:fholmer/netdef.git
$ cd netdef

Setup virtual environment:

$ python3 -m venv venv
$ source venv/bin/activate

Build sdist and wheel:

$ python setup.py sdist
$ python setup.py bdist_wheel

Windows

Install requirements:

Get Python and Mercurial

Get sources:

> hg clone git+ssh://git@gitlab.com:fholmer/netdef.git
> cd netdef

Setup an virtual environment:

> py -3 -m venv venv
> venv\Scripts\activate

Build sdist and wheel

> python setup.py sdist
> python setup.py bdist_wheel

Docs

Debian

Install requirements

# requirements for building psutil
$ sudo apt-get install build-essential python3-dev

# requirements for pdf
$ sudo apt-get install texlive-latex-recommended texlive-latex-extra texlive-fonts-recommended latexmk

# requirements for pdf multi language
$ sudo apt-get install texlive-lang-european texlive-lang-english

# requirements for UML diagram
$ sudo apt-get install plantuml

Setup virtual environment:

$ python3 -m venv venv
$ source venv/bin/activate

Build docs:

$ cd docs
$ make html
$ make latexpdf

UML diagrams:

Note

This is only needed if UML diagrams is out of date:

$ plantuml -tsvg docs/_static/uml/

Built-in configs

Controller configs

CommTestController

config/default.conf
[controllers]
CommTestController = 1

[sources]
CommTestSource = 1

[CommTestSource]
controller = CommTestController

[CommTestController]

ConcurrentWebRequestController

config/default.conf
[controllers]
ConcurrentWebRequestController = 1

[sources]
ConcurrentWebRequestSource = 1

[ConcurrentWebRequestSource]
controller = ConcurrentWebRequestController

[ConcurrentWebRequestController]
requirements.txt
 aiohttp

CrontabController

config/default.conf
[controllers]
CrontabController = 1

[sources]
CrontabSource = 1

[CrontabSource]
controller = CrontabController

[CrontabController]
requirements.txt
 crontab

InfluxDBLoggerController

config/default.conf
[rules]
InfluxDBLoggerRule = 1

[controllers]
InfluxDBLoggerController = 1

[sources]
InfluxDBLoggerSource = 1

[InfluxDBLoggerSource]
controller = InfluxDBLoggerController

[InfluxDBLoggerRule]
auto_logging_on = 1

[InfluxDBLoggerController]
dsn = influxdb:///Database-Name
requirements.txt
 influxdb

InternalController

config/default.conf
[controllers]
InternalController = 1

[sources]
InternalSource = 1

[InternalSource]
controller = InternalController

[InternalController]

ModbusServerController

config/default.conf
[controllers]
ModbusServerController = 1

[sources]
HoldingRegisterSource = 1

[HoldingRegisterSource]
controller = ModbusServerController

[ModbusServerController]

[ModbusServerController_devices]
ModbusServerController_device0 = 1

[ModbusServerController_device0]
requirements.txt
 pymodbus

MQTTDataMessageController

config/default.conf
[controllers]
MQTTDataMessageController = 1

[sources]
MQTTDataMessageSource = 1

[MQTTDataMessageSource]
controller = MQTTDataMessageController

[MQTTDataMessageController]
requirements.txt
 paho-mqtt

NewControllerTemplate

config/default.conf
[controllers]
NewControllerTemplate = 1

[sources]
NewSourceTemplate = 1

[NewSourceTemplate]
controller = NewControllerTemplate

[NewControllerTemplate]

OPCUAServerController

config/default.conf
[controllers]
OPCUAServerController = 1

[sources]
VariantSource = 1
BytestringSource = 1

[VariantSource]
controller = OPCUAServerController

[BytestringSource]
controller = OPCUAServerController

[OPCUAServerController]
requirements.txt
 freeopcua

SubprocessController

config/default.conf
[controllers]
SubprocessController = 1

[sources]
SubprocessSource = 1

[SubprocessSource]
controller = SubprocessController

[SubprocessController]

SystemMonitorController

config/default.conf
[controllers]
SystemMonitorController = 1

[sources]
SystemMonitorSource = 1

[SystemMonitorSource]
controller = SystemMonitorController

[SystemMonitorByteSource]
controller = SystemMonitorController

[SystemMonitorPercentSource]
controller = SystemMonitorController
requirements.txt
 psutil

XmlRpcController

config/default.conf
[controllers]
XmlRpcController = 1

[sources]
XmlRpcMethodCallSource = 1

[XmlRpcMethodCallSource]
controller = XmlRpcController

[XmlRpcController]

ZmqDataAccessController

config/default.conf
[controllers]
ZmqDataAccessController = 1

[sources]
ZmqDataAccessSource = 1

[ZmqDataAccessSource]
controller = ZmqDataAccessController

[ZmqDataAccessController]
requirements.txt
 pyzmq

Rule configs

CSVRule

config/default.conf
[rules]
CSVRule = 1

[CSVRule]
example_rule_101 = 1
example_rule_102 = 1

[example_rule_101]
csv = config/example_rule_101.csv
py = config/example_rule_101.py

[example_rule_102]
csv = config/example_rule_102.csv
py = config/example_rule_102.py
config/example_rule_101.csv
 IntegerSource,TextSource
 example-data1-as-int,example-data1-as-text
 example-data2-as-int,example-data2-as-text
config/example_rule_101.py
 def setup(shared):
     pass

 def expression(intdata, textdata):
     pass

InfluxDBLoggerRule

config/default.conf
[rules]
InfluxDBLoggerRule = 1

[InfluxDBLoggerRule]
auto_logging_on = 1
requirements.txt
 influxdb

INIRule

config/default.conf
[rules]
INIRule = 1

[INIRule]
example_rule_101 = config/example_rule_101.ini
example_rule_102 = config/example_rule_102.ini
config/example_rule_101.ini
[example_rule_101]
on = 1
parsers = IntegerSource, TextSource
module = config/example_rule_101.py
setup = setup
expression = expression
arguments =
    IntegerSource(example-data1-as-int), TextSource(example-data1-as-text)
    IntegerSource(example-data2-as-int), TextSource(example-data2-as-text)
config/example_rule_101.py
 def setup(shared):
     pass

 def expression(intdata, textdata):
     pass

YAMLRule

config/default.conf
[rules]
YAMLRule = 1

[YAMLRule]
example_rule_101 = config/example_rule_101.yaml
example_rule_102 = config/example_rule_102.yaml
config/example_rule_101.yaml
parsers:
  - source: IntegerSource
  - source: TextSource

expressions:
  - module: config/example_rule_101.py
    setup: setup
    expression: expression
    arguments:
      - source: IntegerSource
        key:    example-data1-as-int

      - source: TextSource
        key:    example-data1-as-text

  - module: config/example_rule_101.py
    setup: setup
    expression: expression
    arguments:
      - source: IntegerSource
        key:    example-data2-as-int

      - source: TextSource
        key:    example-data2-as-text
config/example_rule_101.py
def setup(shared):
    pass

def expression(intdata, textdata):
    pass
requirements.txt
PyYAML

netdef package

netdef.__main__

netdef.__main__.cli()[source]

entrypoint for use in setup.py:

entry_points={
    'console_scripts': [
        '{NAME}={MAIN_PACKAGE}.__main__:cli'.format(NAME=NAME, MAIN_PACKAGE=MAIN_PACKAGE),
    ],
},
netdef.__main__.create_project(proj_path, template_config_callback)[source]

Create project structure in given folder. Add content from template_config_callback into config/default.ini

Parameters:
  • proj_path (str) – project folder
  • template_config_callback (str) – config text
netdef.__main__.entrypoint(run_callback, template_config_callback)[source]

Entrypoint to be used in your application. Parses Command line arguments and dispatch functions.

Example from First-App/first_app/__main__.py:

from netdef.__main__ import entrypoint

def run_app():
    from . import main

def get_template_config():
    from . import defaultconfig
    return defaultconfig.template_config_string

def cli():
    # entrypoint: console_scripts
    entrypoint(run_app, get_template_config)

if __name__ == '__main__':
    # entrypoint: python -m console_scripts 
    entrypoint(run_app, get_template_config)
netdef.__main__.framework_entrypoint()[source]

The main entrypoint for the netdef package. Used by cli().

Parses command line arguments and dispatch functions

netdef.__main__.generate_certificate(interactive=True)[source]

Generate ssl certificates using openssl. Files is created in project folder.

  • certificate.pem.key
  • certificate.pem
  • certificate.der.key
  • certificate.der

Prints result to stdout.

Parameters:interactive (bool) – ask for CN if True.
netdef.__main__.generate_webadmin_auth(interactive=True)[source]

Generate a user and password in ini-format. Prints result to stdout. Can be copy-pasted into config/default.conf

Parameters:interactive (bool) – ask for user/pass if True. Generate automatically if not.

netdef.service

netdef.service.get_service(*args, **kwargs)[source]

Note

This function is only implemented for Windows and Systemd based linux distributions

Returns the Service-class to use as argument in run_service()

Parameters:
  • svc_name – name of the service
  • exe_name – filename of the service
  • app_callback – a function that will start your application
  • template_callback – a function that returns template config
Returns:

GenericApplicationService

Example:

from netdef.service import get_service, run_service

def run_app():
    from . import main

def get_template_config():
    from . import defaultconfig
    return defaultconfig.template_config_string

application_service = get_service("First-App", "First-App-Service", run_app, get_template_config)
run_service(application_service)
netdef.service.run_service(*args, **kwargs)[source]

Note

This function is only implemented for Windows and Systemd based linux distributions

Parameters:app_service_class – service class from get_service()

Create an instance of app_service_class and run as service

Example:

from netdef.service import get_service, run_service

def run_app():
    from . import main

def get_template_config():
    from . import defaultconfig
    return defaultconfig.template_config_string

application_service = get_service("First-App", "First-App-Service", run_app, get_template_config)
run_service(application_service)

netdef.windows_service

class netdef.windows_service.GenericApplicationService(args)[source]

Bases: sphinx.ext.autodoc.importer._MockObject

SvcDoRun()[source]
SvcStop()[source]
application = None
netdef.windows_service.get_service(svc_name, exe_name, app_callback, template_callback=None)[source]

Note

This function is only implemented for Windows and Systemd based linux distributions

Returns the Service-class to use as argument in run_service()

Parameters:
  • svc_name – name of the service
  • exe_name – filename of the service
  • app_callback – a function that will start your application
  • template_callback – a function that returns template config
Returns:

GenericApplicationService

Example:

from netdef.service import get_service, run_service

def run_app():
    from . import main

def get_template_config():
    from . import defaultconfig
    return defaultconfig.template_config_string

application_service = get_service("First-App", "First-App-Service", run_app, get_template_config)
run_service(application_service)
netdef.windows_service.run_service(app_service_class)[source]

Note

This function is only implemented for Windows and Systemd based linux distributions

Parameters:app_service_class – service class from get_service()

Create an instance of app_service_class and run as service

Example:

from netdef.service import get_service, run_service

def run_app():
    from . import main

def get_template_config():
    from . import defaultconfig
    return defaultconfig.template_config_string

application_service = get_service("First-App", "First-App-Service", run_app, get_template_config)
run_service(application_service)

netdef.systemd_service

netdef.systemd_service can also be invoked directly using the -m switch of the interpreter with proj_path as argument.

This example installs the project in current directory as a service:

$ python -m netdef.systemd_service -i .
class netdef.systemd_service.ApplicationService(svc_name, exe_name, app_callback, template_callback)

Bases: tuple

app_callback

Alias for field number 2

exe_name

Alias for field number 1

svc_name

Alias for field number 0

template_callback

Alias for field number 3

netdef.systemd_service.get_service(svc_name, exe_name, app_callback, template_callback)[source]

Note

This function is only implemented for Windows and Systemd based linux distributions

Returns the Service-class to use as argument in run_service()

Parameters:
  • svc_name – name of the service
  • exe_name – filename of the service
  • app_callback – a function that will start your application
  • template_callback – a function that returns template config
Returns:

GenericApplicationService

Example:

from netdef.service import get_service, run_service

def run_app():
    from . import main

def get_template_config():
    from . import defaultconfig
    return defaultconfig.template_config_string

application_service = get_service("First-App", "First-App-Service", run_app, get_template_config)
run_service(application_service)
netdef.systemd_service.install_service(proj_path, service_file, svc_name, user)[source]

Note

This function is only implemented for Systemd based linux distributions

Creates a systemd service file in /etc/systemd/system/

netdef.systemd_service.run_service(app_service_class)[source]

Note

This function is only implemented for Windows and Systemd based linux distributions

Parameters:app_service_class – service class from get_service()

Create an instance of app_service_class and run as service

Example:

from netdef.service import get_service, run_service

def run_app():
    from . import main

def get_template_config():
    from . import defaultconfig
    return defaultconfig.template_config_string

application_service = get_service("First-App", "First-App-Service", run_app, get_template_config)
run_service(application_service)

netdef.utils

netdef.utils.handle_restart(shared, engine)[source]

By calling this function your application will restart on SystemExit if shared.restart_on_exit is True.

Parameters:

Example:

from netdef.utils import handle_restart
...
engine.init()
engine.start()
engine.block() # until ctrl-c or SIG_TERM
engine.stop()
handle_restart(shared, engine)
netdef.utils.setup_logging(config)[source]

Parse the config file for:

[logging]
logglevel
loggformat
loggdatefmt
loggfile
to_console
to_file

Then the logging module is set according to the configs

Parameters:config – instance of netdef.Shared.SharedConfig.Config

Example:

...
from netdef.Shared import Shared
from netdef.utils import setup_logging
shared = Shared.Shared("First-App", install_path, proj_path, config_string)
setup_logging(shared.config)
...

netdef.testutils

class netdef.testutils.MockExpression(**kwargs)[source]

Bases: object

Example:

from netdef.testutils import MockExpression

def test_hello():
    mock = MockExpression(
        module="config/command_rule.py",
        intern=InternalSource("generic"),
        cmd=CmdSource("echo hello")
    )
    mock.intern.update_value(None, stat_init=True)
    mock.cmd.assert_called_once_with("world")
    mock.intern.assert_not_called()
get_callbacks()[source]
get_module()[source]

Returns the expression module

set_init_values(**kwargs)[source]
set_none_values(**kwargs)[source]
class netdef.testutils.MockShared(config_string='')[source]

Bases: netdef.Shared.Shared.Shared

class netdef.testutils.MockSource(expression, source)[source]

Bases: object

assert_any_call(value)[source]
assert_called()[source]
assert_called_once()[source]
assert_called_once_with(value)[source]
assert_called_with(value)[source]
assert_not_called()[source]
assert_value(value)[source]

A helper function to assert value and timestamp

call_args
call_args_list
call_count
update_value(val, stime=None, stat_none=False, stat_init=False, stat_good=False, stat_invalid=False, run_expression=True)[source]

A Helper function to update values in expression

netdef.Controllers package

Controllers

class netdef.Controllers.Controllers.Controllers(shared=None)[source]

Bases: object

A collection of all loaded controllers

add_shared_object(shared)[source]
init()[source]
load(base_packages)[source]

Imports controller modules. Creates queue instances associated with the given controllers.

Example:

from netdef.Controllers import Controllers
controllers = Controllers.Controllers(shared)
controllers.load([__package__, 'netdef'])
netdef.Controllers.Controllers.register(name, classref=None)[source]

A decorator to register controllers. Example:

from netdef.Controllers import BaseController, Controllers

@Controllers.register("NewControllerTemplate")
class NewControllerTemplate(BaseController.BaseController):
    def __init__(self, name, shared):
        ...

Can also be called as a normal function:

from netdef.Controllers import BaseController, Controllers

def setup(shared):
    Controllers.register("NewControllerTemplate", NewControllerTemplate)

class NewControllerTemplate(BaseController.BaseController):
    def __init__(self, name, shared):
        ...
Parameters:
  • name (str) – Name of the controller class
  • classref (object) – Should be None if used as a decorator and a class if called as a function
Returns:

A callable that returns a class if used as a decorator and a class if called as a normal function

Abstract base controllers

BaseController

This is an abstract baseclass

class netdef.Controllers.BaseController.BaseController(name, shared)[source]

Bases: object

Abstract class for controllers.

Parameters:
  • name (str) – Name to be used in logfiles
  • shared – a reference to the shared object
add_interrupt(interrupt)[source]

Setup the interrupt signal

add_logger(name)[source]

Setup logging module

add_parser(parser)[source]

Add parser if not already exists

add_source(name, init_value)[source]

Add a source to the storage dict. Override if something else is needed.

clear_incoming(until_empty=True, until_messagetype=None)[source]

Delete all messages from incoming queue.

Parameters:
  • until_empty (bool) – If True the function will block until queue is empty. If False it will block forever.
  • until_messagetype (MessageType) – Block until given messagetype is received

Example:

...

while not self.has_interrupt():
    reconnect = False
    try:
        if reconnect:
            self.clear_incoming()
            self.try_reconnect()
        # main loop
        while not self.has_interrupt():
            self.loop_incoming()
            self.loop_outgoing()
    except ConnectionError:
        reconnect = True

...
fetch_one_incoming()[source]

Returns one message from the queue.

Returns:tuple of (messagetype, incoming)
Return type:tuple(MessageType, BaseSource)
get_parsers()[source]

Return parser storage

get_source(name)[source]

Return named source

get_sources()[source]

Return source storage

handle_add_parser(incoming)[source]

Add parser to controller if not already exists

handle_add_source(incoming)[source]
handle_app_state(app_state)[source]

Override if controller need to react to application states

handle_app_state_running()[source]

Override if controller need to react to running state

handle_app_state_setup()[source]

Override if controller need to react to setup state

handle_read_source(incoming)[source]
handle_readall(incoming)[source]
handle_tick(incoming)[source]

Answer the tick message

handle_write_source(incoming, value, source_time)[source]
has_interrupt()[source]

Returns True if the interrupt signal is received

has_source(name)[source]

Return True if source name is found

init_parsers(parsers)[source]

Setup the parser storage as a list. Override if something else is needed.

init_queue()[source]

Setup the message queue and timeout

init_sources(sources)[source]

Setup the source storage as a dict. Override if something else is needed.

loop_incoming(until_empty=True, until_timeout=0.0, until_messagetype=None, until_app_state=None)[source]

Get every message from the queue and dispatch the associated handler function.

Parameters:
  • until_empty (bool) – Blocking until queue is empty
  • until_timeout (float) – Timeout in seconds. 0.0 blocks forever.
  • until_messagetype (MessageType) – Blocking until given messagetype is dispatched
  • until_app_state (AppStateType) – Blocking until given app_state is dispatched
loop_outgoing()[source]

Check every source and call the poll_outgoing_item function

loop_until_app_state_running()[source]

Usefull if you want your controller to block while ADD_SOURCE and ADD_PARSER is dispatched

Example:

def run(self):
    self.loop_until_app_state_running()
    while not self.has_interrupt():
        try:
            self.handle_connection()
            while not self.has_interrupt():
                self.loop_incoming()
                self.loop_outgoing()
        except ConnectionError:
            self.handle_conn_error()
poll_outgoing_item(item)[source]
run()[source]

Override this function in controller. Example:

def run(self):
    self.logger.info("Running")

    while not self.has_interrupt():
        self.loop_incoming() # dispatch handle_* functions
        self.loop_outgoing() # dispatch poll_* functions

    self.logger.info("Stopped")
send_outgoing(outgoing)[source]

Send RUN_EXPRESSION message on valuechange

sleep(seconds)[source]

” Sleep by waiting for the interrupt. Should be used instead of time.sleep. Override if sleep should be interrupted by even more signals

statistics_update()[source]
classmethod update_source_instance_status(source_instance, status_ok, oldnew_check)[source]

Updates state on given source_instance Returns True if source_instance have triggered a value change

static update_source_instance_value(source_instance, value, stime, status_ok, oldnew_check)[source]

Updates value, timestamp and state on given source_instance Returns True if source_instance have triggered a value change

BaseAsyncController

class netdef.Controllers.BaseAsyncController.BaseAsyncController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

get_event_loop()[source]
handle_app_state_running()[source]

Override if controller need to react to running state

init_asyncio()[source]
loop_incoming_until_interrupt()[source]
run()[source]

Override this function in controller. Example:

def run(self):
    self.logger.info("Running")

    some_client = SomeAsyncioClient()

    # Start polling of the blocking incoming queue in a thread executor
    self.loop.run_in_executor(None, self.loop_incoming_until_interrupt)

    # TODO: define a coroutine that stops your async client when called.
    async def stop_some_client():
        await some_client.stop()

    # register coroutine to be run at interrupt / shutdown 
    self.loop.create_task(self.run_async_on_interrupt(stop_some_client))

    # TODO: start your client coroutine
    self.loop.run_until_complete(some_client.start())

    self.logger.info("Stopped")
run_async_on_interrupt(callback)[source]

Built-in controller modules:

Built-in controller modules

CommTestController

class netdef.Controllers.CommTestController.CommTestController(name, shared)[source]

Bases: netdef.Controllers.BaseAsyncController.BaseAsyncController

Tip

Development Status :: 5 - Production/Stable

This class will send TCP or ICMP ping requests based on sources received in ADD_SOURCE messages and store the result into given sources. When result is stored into a source this class will send the changed source in a RUN_EXPRESSION message to the source’s rule.

Parameters:
  • name (str) – Name of controller
  • shared (netdef.Shared) – Instance of applications shared object.
Configuration:
  • timeout – Connection timeout in seconds
  • interval – Poll interval in seconds
  • test_type – Available types: [tcpip, ping]
  • max_concurrent_sockets – Max number of simultaneous open sockets.
  • disable – If disabled this controller will enter running state but all messages will be discarded.
Defaults:
[CommTestController]
timeout = 2
interval = 10
test_type = tcpip
max_concurrent_sockets = 1000
disable = 0
loop_outgoing_until_interrupt()[source]

Main coroutine. loops until interrupt is set.

run()[source]

Main thread loop. Will exit when receiving interrupt signal Sets up

ConcurrentWebRequestController

class netdef.Controllers.ConcurrentWebRequestController.ConcurrentWebRequestController(name, shared)[source]

Bases: netdef.Controllers.BaseAsyncController.BaseAsyncController

Danger

Development Status :: 3 - Alpha

Basically just a web scraper. Can scrape multiple web pages simultaneously.

IO is handeled by this controller. The poll interval and program flow is implemented in ConcurrentWebRequestSource

get_client_session(item)[source]

Returns a aiohttp session. Add new session to source if not found. Session will be initialized with basic auth and a default timeout

handle_add_source(incoming)[source]

Add source to controller

handle_write_source(incoming, value, source_time)[source]

execute a command if given value is the name of a command

init_task_limit()[source]

Read configuration

loop_outgoing_until_interrupt()[source]

Main async loop.

proccess_task(item, method)[source]

Retrives data from web site and packs it into the source

proccess_web_request_item(item, method, session)[source]

handle IO by interfacing with the sources data generator

run()[source]

Main sync loop

class netdef.Controllers.ConcurrentWebRequestController.NextInterval(timestamp)[source]

Bases: object

Call next() to retrieve seconds to next interval, and which interval it is

add(interval)[source]
has_interval()[source]
next(now)[source]
spans
start

CrontabController

class netdef.Controllers.CrontabController.CrontabController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

poll_outgoing_item(item)[source]

Check if it is time to trigger event for given source

Parameters:item – source instance to check
run()[source]

Main loop. Will exit when receiving interrupt signal

InfluxDBLoggerController

class netdef.Controllers.InfluxDBLoggerController.InfluxDBLoggerController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Danger

Development Status :: 3 - Alpha

A logging controller. Its purpose is to store every write event into influxdb.

handle_write_source(incoming, value, source_time)[source]

Write given value and timestamp into influxdb

Parameters:
  • incoming (InfluxDBLoggerSource) – source instance
  • value – frozen value if instance
  • source_time (datetime.datetime) – value timestamp
run()[source]

Main loop. Will exit when receiving interrupt signal

InternalController

class netdef.Controllers.InternalController.InternalController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

Internal variables that works just like any other value from a controller. Can trigger events on valuechanges. State can be cached to disk.

Parameters:
  • name (str) – The name is used i logfile and default.ini
  • shared (Shared) – Instance of applications shared object.

Configuration

[InternalController]
send_init_event = 0
send_events = 0
persistent_value = 0
key_in_filename = 0
Options
  • send_init_event – trigger RUN_EXPRESSION with StatusCode.INITIAL for every source at startup
  • send_events – trigger a RUN_EXPRESSION message for every WRITE_SOURCE message
  • persistent_value – store values to disk
  • key_in_filename – use source key as prefix in filename for persistent storage

Sequence diagram

blockdiag Queue InternalController Update value of source [n] Value change in source [n] APP_STATE, SETUP ADD_SOURCE, source [n] APP_STATE, RUNNING WRITE_SOURCE, source [n], value, timestamp RUN_EXPRESSION, source [n] Initialization Setup Running Begin loop End Loop
get_cache_filename(key)[source]

Generate sha256 hash to be used as filename. If config key_in_filename=1 then key will be prefixed to the hexdigest. Valid characters: a-z A-Z 0-9 _.-

Parameters:key (str) – string to encode
Returns:filename
handle_add_source(incoming)[source]

Add given source instance to internal source list

Parameters:incoming (InternalSource) – source instance
handle_write_source(incoming, value, source_time)[source]

Update internal dict with new value.

Parameters:
  • incoming (InternalSource) – source instance
  • value – frozen value of instance
  • source_time (datetime.datetime) – value timestamp
poll_outgoing_item(item)[source]

Check if given source should be cached to disk.

Parameters:item (InternalSource) – source instance
run()[source]

Main loop. Will exit when receiving interrupt signal

store_to_disk(item=None)[source]

Store sources into files at [proj-path]/db/internal/

ModbusClientController

class netdef.Controllers.ModbusClientController.ModbusClientController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

Read and write holding registers of a modbus device.

Parameters:
  • name (str) – The name is used i logfile and default.ini
  • shared (Shared) – reference to the global shared instance

Settings:

[ModbusClientController]

# connection
host = 127.0.0.1
port = 5020

# RUN_EXPRESSION is only sent if value has changed
oldnew_comparision = 1

# cooldown on connection error og write error
reconnect_timeout = 20

# Buffer or clear write requests recieved during cooldown
clear_writes_on_disconnect = 1

# Polling interval
poll_interval = 0.5

Sequence diagram:

blockdiag Queue ModbusClientControlle r Modbus registers Update value of source [n] Value change in source [n] Update value of source [n] Value change in nodeid [n] APP_STATE, SETUP ADD_SOURCE, source [n] APP_STATE, RUNNING Value change, register [n] RUN_EXPRESSION, source [n] WRITE_SOURCE, source [n], value, timestamp update value, register [n] Initialization Setup Running Begin loop End Loop
handle_add_source(incoming)[source]

Add given source instance to internal source list

Parameters:incoming (HoldingRegisterSource) – source instance
handle_write_source(incoming, value, source_time)[source]

Write given value to the connected modbus device.

Parameters:
  • incoming (HoldingRegisterSource) – source instance
  • value – frozen value of instance
  • source_time (datetime.datetime) – value timestamp
poll_outgoing_item(item)[source]

Poll given source for its value in the modbus device

Parameters:item (HoldingRegisterSource) – source instance
run()[source]

Main loop. Will exit when receiving interrupt signal

safe_disconnect()[source]

Close the tcp socket if it is connected

ModbusServerController

class netdef.Controllers.ModbusServerController.ModbusServerController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

Sequence diagram:

blockdiag Queue ModbusServerControlle r Pymodbus datablock Update value of source [n] Value change in source [n] Update value of source [n] Value change in nodeid [n] APP_STATE, SETUP ADD_SOURCE, source [n] APP_STATE, RUNNING Value change, register [n] RUN_EXPRESSION, source [n] WRITE_SOURCE, source [n], value, timestamp update value, register [n] Initialization Setup Running Begin loop End Loop
get_framer()[source]

Returns the framer to be used. Override this function to return a custom framer

get_modbus_server_context()[source]

Iter the devicelist section in config-file and builds a ModbusServerContext object

Returns:an ModbusServerContext instance
handle_add_source(incoming)[source]
handle_datachange(unit, address, value, is_internal)[source]
handle_write_source(incoming, value, source_time)[source]
init_server(context, framer, identity, host, port)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

class netdef.Controllers.ModbusServerController.MyContext(*args, **kwargs)[source]

Bases: pymodbus.datastore.context.ModbusSlaveContext

setValues(fx, address, values, is_internal=False)[source]

Sets the datastore with the supplied values

Parameters:
  • fx – The function we are working with
  • address – The starting address
  • values – The new values to be set
class netdef.Controllers.ModbusServerController.MyController(*args, **kwargs)[source]

Bases: pymodbus.server.sync.ModbusTcpServer

daemon_threads = False
service_actions()[source]

Called by the serve_forever() loop.

May be overridden by a subclass / Mixin to implement any code that needs to be run during the loop.

MQTTDataMessageController

class netdef.Controllers.MQTTDataMessageController.MQTTDataMessageController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Danger

Development Status :: 3 - Alpha

get_key(topic)[source]
get_topic(topic)[source]
handle_add_source(incoming)[source]
handle_write_source(incoming, value, source_time)[source]
loop_mqtt()[source]
mqtt_connect()[source]
mqtt_safe_disconnect()[source]
on_connect(client, userdata, flags, rc)[source]
on_disconnect(client, userdata, rc)[source]
on_message(client, userdata, msg)[source]
publish_data_item(topic, payload)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

OPCUAClientController

class netdef.Controllers.OPCUAClientController.OPCUAClientController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Caution

Development Status :: 4 - Beta

config(key, default)[source]
handle_add_source(incoming)[source]
handle_write_source(incoming, value, source_time)[source]
loop_outgoing()[source]

Check every source and call the poll_outgoing_item function

run()[source]

Main loop. Will exit when receiving interrupt signal

safe_disconnect()[source]
send_datachange(nodeid, value, stime, status_ok)[source]
class netdef.Controllers.OPCUAClientController.SubHandler(parent)[source]

Bases: object

Client to subscription. It will receive events from server

datachange_notification(node, value, data)[source]
event_notification(event)[source]
status_change_notification(status)[source]

OPCUAServerController

class netdef.Controllers.OPCUAServerController.CustomAnonInternalSession(internal_server, aspace, submgr, name, user=<sphinx.ext.autodoc.importer._MockObject object>, external=False)[source]

Bases: opcua.server.internal_server.InternalSession

Custom InternalSession will set timestamp when missing

write(params)[source]
class netdef.Controllers.OPCUAServerController.CustomInternalSession(internal_server, aspace, submgr, name, user=<sphinx.ext.autodoc.importer._MockObject object>, external=False)[source]

Bases: netdef.Controllers.OPCUAServerController.CustomAnonInternalSession

This custom InternalSession will block anonymous access

activate_session(params)[source]
class netdef.Controllers.OPCUAServerController.CustomServer(shelffile=None, iserver=None)[source]

Bases: opcua.server.server.Server

Custom Server that enables Basic128Rsa15 and Basic256

class netdef.Controllers.OPCUAServerController.OPCUAServerController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

This Controller will start a freeopcua server instance and will add a nodeid for all sources received in ADD_SOURCE messages.

When a client writes a new value this event will be forwarded to the associated source and a RUN_EXPRESSION message will be sent.

When a WRITE_SOURCE message is received the value for the associated source will be updated in the server and all connected clients will receive a value update

Sequence diagram:

blockdiag Queue OPCUAServerController FreeOpcUA Update value of source [n] Value change in source [n] Update value of source [n] Value change in nodeid [n] APP_STATE, SETUP ADD_SOURCE, source [n] APP_STATE, RUNNING subscribe nodeid [n] Value change, nodeid [n] RUN_EXPRESSION, source [n] WRITE_SOURCE, source [n], value, timestamp update value, nodeid [n] Initialization Setup Running Begin loop End Loop
add_folder(parent, foldername)[source]

Add a folder in server

add_variablenode(parent, ref, val, varianttype)[source]

Create and add a variable in server and return the variable node

build_folders(parent, ref, sep)[source]
create_datavalue(val, datatype, statuscode, timestamp)[source]

Create a value for the server that keep the correct datatype

create_monitored_items(event, dispatcher)[source]

write a warning to logfile if the client add a nodeid that does not exists

get_default_value(incoming)[source]

Returns the default value of the source value

get_nodeid(incoming)[source]

Returns the nodeid from the source

get_varianttype(incoming)[source]

Returns the varianttype from the source

handle_add_source(incoming)[source]

Add a source to the server

handle_write_source(incoming, value, source_time)[source]

Receive a value change from an expression and update the server

is_writable(incoming)[source]

Returns True if source is writable for the opcua client

modify_monitored_items(event, dispatcher)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

send_datachange(nodeid, value, stime, status_ok, ua_status_code)[source]

Triggers a RUN_EXPRESSION message for given source

class netdef.Controllers.OPCUAServerController.SubHandler(controller)[source]

Bases: object

The subscription handler for the server. Will send value changes i server to the controller.

datachange_notification(node, val, data)[source]
event_notification(event)[source]

RESTJsonController

class netdef.Controllers.RESTJsonController.RESTJsonController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

connect()[source]
handle_add_source(incoming)[source]
handle_read_source(incoming)[source]
handle_readall(incoming)[source]
handle_write_source(incoming, value)[source]
loop_outgoing()[source]

Check every source and call the poll_outgoing_item function

parse_item(item)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

send_datachange(key, source_time, value)[source]
urlerrorhandling()[source]

SubprocessController

class netdef.Controllers.SubprocessController.NextInterval(timestamp)[source]

Bases: object

Call next() to retrieve seconds to next interval, and which interval it is

add(interval)[source]
has_interval()[source]
next(now)[source]
spans
start
class netdef.Controllers.SubprocessController.SubprocessController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Danger

Development Status :: 3 - Alpha

handle_add_source(incoming)[source]
handle_write_source(incoming, value, source_time)[source]
parse_item(item)[source]
parse_response(response)[source]
poll_outgoing_item(item)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

send_datachange(source_key, value, source_time, status_ok)[source]
setup_interval_plan()[source]
netdef.Controllers.SubprocessController.stdout_from_terminal(command_as_str, err_msg=None)[source]

SystemMonitorController

class netdef.Controllers.SystemMonitorController.DataItem(source_type, key, interval, func, args=None)[source]

Bases: object

args

Arguments for self.func callback

func

Callback to retrieve value

get_value()[source]

Returns value of self.func callback

interval

Poll interval

key

Unique identifier

next

Next scheduled call to self.func

ready()[source]

Returns True if interval for this item has elapsed.

source_type

Reference to a SystemMonitorSource class

class netdef.Controllers.SystemMonitorController.SystemMonitorController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

handle_add_source(incoming)[source]
handle_write_source(incoming, value, source_time)[source]
poll_data()[source]

Iter the dict of DataItem and get values.

run()[source]

Main loop. Will exit when receiving interrupt signal

send_datachange(source_key, value, stime, status_ok)[source]
netdef.Controllers.SystemMonitorController.get_clean_mount_point_name(node)[source]

Replace / or with .

Example:

for disk in psutil.disk_partitions():
    print (get_clean_mount_point_name(disk.mountpoint))
Parameters:node (str) – name of mountpoint
Returns:new node name
netdef.Controllers.SystemMonitorController.get_data_items_dict(mempoll, cpupoll, poll, checkdisk, diskpoll)[source]

Create a dict with items to monitor.

Parameters:
  • mempoll (int) – poll interval for memory callbacks
  • cpupoll (int) – poll interval for cpu callbacks
  • poll (int) – general poll interval
  • checkdisk (bool) – Set True to poll disk drives
  • diskpoll (int) – poll interval for disk drives
Returns:

dict of DataItem

netdef.Controllers.SystemMonitorController.get_proc()[source]

Helperfunction.

Returns:psutil.Process
netdef.Controllers.SystemMonitorController.get_vm()[source]

Helperfunction.

Returns:psutil.virtual_memory
netdef.Controllers.SystemMonitorController.statistics_update(item)[source]

Write internal statistics to the Statistics singleton if activated

XmlRpcController

class netdef.Controllers.XmlRpcController.XmlRpcController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Tip

Development Status :: 5 - Production/Stable

Sequence diagram:

blockdiag Queue XmlRpcController xmlrpc.client For source [n] Update value of source [n] Unpack subitems into source [i] Value change in source [i] Value change in source [n] Update value of source [n] APP_STATE, SETUP ADD_PARSER, class [n] ADD_SOURCE, source [n] ADD_SOURCE, source [i] APP_STATE, RUNNING Rpc call, request [n] Rpc call, response [n] parse subitems RUN_EXPRESSION, source [i] RUN_EXPRESSION, source [n] WRITE_SOURCE, source [n], value, timestamp Rpc call, request [n] Rpc call, response [n] Initialization Setup Running Begin polling loop End Loop
handle_add_source(incoming)[source]
handle_read_source(incoming)[source]
handle_readall(incoming)[source]
handle_write_source(incoming, value, source_time)[source]
parse_item(item)[source]
parse_response(response)[source]
poll_outgoing_item(item)[source]
rpc_call(item, value)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

send_datachange(key, source_time, value)[source]

ZmqDataAccessController

class netdef.Controllers.ZmqDataAccessController.ZmqDataAccessController(name, shared)[source]

Bases: netdef.Controllers.BaseController.BaseController

Danger

Development Status :: 3 - Alpha

connect()[source]
handle_add_source(incoming)[source]
handle_write_source(incoming, value, source_time)[source]
loop_subscribers()[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

netdef.Engines package

Abstract baseclass

BaseEngine

This is an abstract baseclass

class netdef.Engines.BaseEngine.BaseEngine(shared=None)[source]

Bases: object

add_controller_classes(controllers)[source]
add_rule_classes(rules)[source]
add_shared_object(shared)[source]
add_source_classes(sources)[source]
static block()[source]
init()[source]
load(base_package)[source]
start()[source]
stop()[source]
wait()[source]
class netdef.Engines.BaseEngine.BaseExpressionExecutor(name, shared)[source]

Bases: object

add_interrupt(interrupt)[source]
add_name(name)[source]
add_shared(shared)[source]
handle_run_expression(source_item, expressions, value, source_time, status_code)[source]
has_interrupt()[source]
init_queue()[source]
loop_incoming()[source]
run()[source]

Expressions

Expression

class netdef.Engines.expression.Expression.Argument(source_instance, instigator, frozen_value=None)[source]

Bases: object

A wrapper for source instances.

Parameters:
  • source_instance (BaseSource) – An source instance
  • instigator (boolean) – True if given source instance triggered the execution
controller

Returns the controller attribute from source instance

create_interface(value=None)[source]

Wrap given value into the source interface. (See interface attr of netdef.Sources.BaseSource.BaseSource)

Parameters:value (object) – value to be wrapped
Returns:An interface instance
Return type:netdef.Interfaces.DefaultInterface
get

Returns the value from source instance. NB! this is not a frozen copy of the value. It may change if the controller updates the value.

instance

reference to the source instance

key

Returns the key attribute from source instance

new

Returns True if source triggered the expression and this is the first value. (StatusCode.INITIAL)

set

Write a new value to the source. This will trigger a WRITE_SOURCE message to the controller.

status_ok

Returns True if value is StatusCode.GOOD or StatusCode.INITIAL

update

Returns True if source triggered the expression. (StatusCode.GOOD or INVALID)

value

a frozen copy of the value in self.instance.get

class netdef.Engines.expression.Expression.Expression(expression, filename)[source]

Bases: object

A class containing a reference to the expression-function and references to the source-instances that will become arguments to the expression function

Parameters:
  • expression (callable) – A reference to the actual function
  • filename (str) – Filename of the module where the function is found
add_arg(arg)[source]

arg: this should be a source instance

add_kwarg(keyword, arg)[source]

This could be anything. This function exist for you to extend arguments for the expressions. netdef itself do not use this

disable()[source]

If there is problems with the expression it can be automaticly disabled by calling this function

execute(args, kwargs)[source]

Execute the expression-function with given arguments

get_args(source_instance=None, frozen_value=None)[source]

Wrap each source-instance into its own Argument instance Return a tuple of Arguments

get_kwargs()[source]

Collector

class netdef.Engines.expression.Collector.Collector(fn, wait, mode)[source]

Bases: object

Takes a function but does not call it right away. After the given wait time has elapsed the function is called based on the given mode.

Parameters:
  • fn (callable) – a function or callable
  • wait (float) – seconds to wait
  • mode (Mode) – how to call the callable
__call__(*args)[source]

Add arguments to a queue. Only the first call will acquire self.lock and sleep until wait time has elapsed. After sleep the arguments in queue is used to call the function self.fn based on the chosen mode.

class netdef.Engines.expression.Collector.Mode[source]

Bases: enum.Enum

collector modes

FIRST = 1

Use arguments from the first call

FIRST_WITH_EVENT = 4

Use arguments from the first call and an additional argument called event

LAST = 2

Use arguments from the last call

LAST_WITH_EVENT = 5

Use arguments from the last call and an additional argument called event

LIST_ALL = 3

Convert arguments to lists with every call

netdef.Engines.expression.Collector.collect(wait, mode)[source]

A decorator for expressions.

Usage:

from netdef.Engines.expression.Collector import collect, Mode

@collect(wait=0.1, mode=Mode.LIST_ALL)
def expression(c1, c2, c3):
    pass

Built-in engine modules

ThreadedEngine

class netdef.Engines.ThreadedEngine.ExpressionExecutor(*args, **kwargs)[source]

Bases: netdef.Engines.BaseEngine.BaseExpressionExecutor

handle_run_expression(source_item, expressions, value, source_time, status_code)[source]
loop_futures()[source]
run()[source]
class netdef.Engines.ThreadedEngine.ThreadedEngine(shared)[source]

Bases: netdef.Engines.BaseEngine.BaseEngine

static block()[source]
init()[source]
load(base_package)[source]
start()[source]
stop()[source]
wait()[source]

ThreadedWebGuiEngine

class netdef.Engines.ThreadedWebGuiEngine.ThreadedWebGuiEngine(shared)[source]

Bases: netdef.Engines.ThreadedEngine.ThreadedEngine

Integrates a simple werkzeug webserver to serve flask_admin webpages

block()[source]

Run webserver and wait for KeyboardInterrupt

get_flask_app()[source]

Returns the main flask app.

Common use case is to integrate an existing flask app.

main.py Example:

def init_app(app):

    @app.route('/')
    def hello_world():
        return 'Hello, World!'
    
    return app


def main():
    ...

    engine = ThreadedWebGuiEngine.ThreadedWebGuiEngine(shared)

    # here we go
    init_app(engine.get_flask_app())

    engine.add_controller_classes(controllers)
    engine.add_source_classes(sources)
    engine.add_rule_classes(rules)
    engine.load([__package__, 'netdef'])
    engine.init()
    engine.start()
    engine.block() # until ctrl-c or SIG_TERM
    engine.stop()
    ...
init()[source]
load(base_package)[source]
netdef.Engines.ThreadedWebGuiEngine.init_app(app, webadmin_views, shared)[source]

Configure flask. Setup flask_admin and flask_login

netdef.Engines.ThreadedWebGuiEngine.make_admin_users_dict(config, section)[source]

NginxWebGuiReverseProxy

class netdef.Engines.NginxWebGuiReverseProxy.NginxReverseProxy(shared)[source]

Bases: netdef.Engines.ThreadedWebGuiEngine.ThreadedWebGuiEngine

block()[source]

Run webserver and wait for KeyboardInterrupt

Webadmin

AdminIndex

class netdef.Engines.webadmin.AdminIndex.LoginForm(formdata=None, obj=None, prefix='', data=None, meta=None, **kwargs)[source]

Bases: wtforms.form.Form

get_user()[source]
login = <UnboundField(StringField, (), {'validators': [<wtforms.validators.Required object>]})>
password = <UnboundField(PasswordField, (), {'validators': [<wtforms.validators.Required object>]})>
validate_login(field)[source]
class netdef.Engines.webadmin.AdminIndex.MyAdminIndexView(name=None, category=None, endpoint=None, url=None, template='admin/index.html', menu_class_name=None, menu_icon_type=None, menu_icon_value=None)[source]

Bases: flask_admin.base.AdminIndexView

command_result_view()[source]
index()[source]
login_view()[source]
logout_view()[source]
restart_view()[source]
restarting = 0
shutdown_view()[source]
shuttingdown = 0
class netdef.Engines.webadmin.AdminIndex.User(userid, roles)[source]

Bases: flask_login.mixins.UserMixin

has_role(roles)[source]
netdef.Engines.webadmin.AdminIndex.shutdown_server()[source]

ExpressionsView

class netdef.Engines.webadmin.ExpressionsView.ExpressionsModel(expression)[source]

Bases: object

function_arguments
function_name
module_filename
class netdef.Engines.webadmin.ExpressionsView.ExpressionsModelForm(formdata=None, obj=None, prefix='', data=None, meta=None, **kwargs)[source]

Bases: wtforms.form.Form

function_arguments = <UnboundField(StringField, ('function_arguments',), {})>
function_name = <UnboundField(StringField, ('function_name',), {})>
module_filename = <UnboundField(StringField, ('module_filename',), {})>
class netdef.Engines.webadmin.ExpressionsView.ExpressionsModelView(model, name=None, category=None, endpoint=None, url=None, static_folder=None, menu_class_name=None, menu_icon_type=None, menu_icon_value=None)[source]

Bases: netdef.Engines.webadmin.MyBaseView.MyBaseView, flask_admin.model.base.BaseModelView

action_view()

Mass-model action view.

ajax_lookup()
ajax_update()

Edits a single column of a record in list view.

can_create = False
can_delete = False
can_edit = False
column_list = ('module_filename', 'function_name', 'function_arguments')
column_searchable_list = ('module_filename', 'function_name', 'function_arguments')
column_sortable_list = ()
create_view()

Create model view

delete_view()

Delete model view. Only POST method is allowed.

details_view()

Details model view

edit_view()

Edit model view

export(export_type)
form

alias of ExpressionsModelForm

get_list(page, sort_field, sort_desc, search, filters, page_size=None)[source]

Return a paginated and sorted list of models from the data source.

Must be implemented in the child class.

Parameters:
  • page – Page number, 0 based. Can be set to None if it is first page.
  • sort_field – Sort column name or None.
  • sort_desc – If set to True, sorting is in descending order.
  • search – Search query
  • filters – List of filter tuples. First value in a tuple is a search index, second value is a search value.
  • page_size – Number of results. Defaults to ModelView’s page_size. Can be overriden to change the page_size limit. Removing the page_size limit requires setting page_size to 0 or False.
get_pk_value(model_)[source]

Return PK value from a model object.

index_view()

List view

Initialize search. If data provider does not support search, init_search will return False.

is_accessible()[source]

Override this method to add permission checks.

Flask-Admin does not make any assumptions about the authentication system used in your application, so it is up to you to implement it.

By default, it will allow access for everyone.

static sampling(selection, offset=0, limit=None)[source]
netdef.Engines.webadmin.ExpressionsView.setup(admin)[source]

FileModel

class netdef.Engines.webadmin.FileModel.Files(base_path, *args, **kwargs)[source]

Bases: netdef.Engines.webadmin.MyBaseView.MyBaseView, flask_admin.contrib.fileadmin.FileAdmin

action_view()
allowed_extensions = ('txt', 'conf', 'csv', 'der', 'pam', 'key', 'zip', 'gz', '7z', 'py', 'ini', 'yaml')
can_download = True
delete()

Delete view method

download(path=None)

Download view method.

Parameters:path – File path.
edit()

Edit view method

edit_template = 'admin/fileedit.html'
editable_extensions = ('txt', 'conf', 'csv', 'py', 'ini', 'yaml')
get_edit_form()[source]

Create form class for file editing view.

Override to implement customized behavior.

index(path=None)
index_view(path=None)

Index view method

Parameters:path – Optional directory path. If not provided, will use the base directory
is_accessible()[source]

Override this method to add permission checks.

Flask-Admin does not make any assumptions about the authentication system used in your application, so it is up to you to implement it.

By default, it will allow access for everyone.

is_accessible_path(path)[source]

Verify if the provided path is accessible for the current user.

Override to customize behavior.

Parameters:path – Relative path to the root
list_template = 'admin/filelist.html'
mkdir(path=None)

Directory creation view method

Parameters:path – Optional directory path. If not provided, will use the base directory
rename()

Rename view method

upload(path=None)

Upload view method

Parameters:path – Optional directory path. If not provided, will use the base directory
class netdef.Engines.webadmin.FileModel.InstallationRepo(base_path, *args, **kwargs)[source]

Bases: netdef.Engines.webadmin.MyBaseView.MyBaseView, flask_admin.contrib.fileadmin.FileAdmin

action_view()
allowed_extensions = ('zip', 'whl', 'gz')
can_download = True
can_rename = False
delete()

Delete view method

download(path=None)

Download view method.

Parameters:path – File path.
edit()

Edit view method

index(path=None)
index_view(path=None)

Index view method

Parameters:path – Optional directory path. If not provided, will use the base directory
is_accessible()[source]

Override this method to add permission checks.

Flask-Admin does not make any assumptions about the authentication system used in your application, so it is up to you to implement it.

By default, it will allow access for everyone.

list_template = 'admin/filelist.html'
mkdir(path=None)

Directory creation view method

Parameters:path – Optional directory path. If not provided, will use the base directory
rename()

Rename view method

upload(path=None)

Upload view method

Parameters:path – Optional directory path. If not provided, will use the base directory
netdef.Engines.webadmin.FileModel.setup(admin)[source]

MyBaseView

class netdef.Engines.webadmin.MyBaseView.MyBaseView(name=None, category=None, endpoint=None, url=None, static_folder=None, static_url_path=None, menu_class_name=None, menu_icon_type=None, menu_icon_value=None)[source]

Bases: flask_admin.base.BaseView

has_role(roles)[source]
inaccessible_callback(name, **kwargs)[source]

Handle the response to inaccessible views.

By default, it throw HTTP 403 error. Override this method to customize the behaviour.

is_accessible()[source]

Override this method to add permission checks.

Flask-Admin does not make any assumptions about the authentication system used in your application, so it is up to you to implement it.

By default, it will allow access for everyone.

SettingsModel

class netdef.Engines.webadmin.SettingsModel.SettingsModel(section, key, value)[source]

Bases: object

key
section
value
class netdef.Engines.webadmin.SettingsModel.SettingsModelForm(formdata=None, obj=None, prefix='', data=None, meta=None, **kwargs)[source]

Bases: wtforms.form.Form

key = <UnboundField(StringField, ('key',), {})>
section = <UnboundField(StringField, ('section',), {})>
value = <UnboundField(StringField, ('value',), {})>
class netdef.Engines.webadmin.SettingsModel.SettingsModelView(model, name=None, category=None, endpoint=None, url=None, static_folder=None, menu_class_name=None, menu_icon_type=None, menu_icon_value=None)[source]

Bases: netdef.Engines.webadmin.MyBaseView.MyBaseView, flask_admin.model.base.BaseModelView

action_view()

Mass-model action view.

ajax_lookup()
ajax_update()

Edits a single column of a record in list view.

can_create = False
can_delete = False
can_edit = False
column_list = ('section', 'key', 'value')
column_searchable_list = 'key'
column_sortable_list = ()
create_view()

Create model view

delete_view()

Delete model view. Only POST method is allowed.

details_view()

Details model view

edit_view()

Edit model view

export(export_type)
form

alias of SettingsModelForm

get_list(page, sort_field, sort_desc, search, filters, page_size=None)[source]

Return a paginated and sorted list of models from the data source.

Must be implemented in the child class.

Parameters:
  • page – Page number, 0 based. Can be set to None if it is first page.
  • sort_field – Sort column name or None.
  • sort_desc – If set to True, sorting is in descending order.
  • search – Search query
  • filters – List of filter tuples. First value in a tuple is a search index, second value is a search value.
  • page_size – Number of results. Defaults to ModelView’s page_size. Can be overriden to change the page_size limit. Removing the page_size limit requires setting page_size to 0 or False.
get_pk_value(model_)[source]

Return PK value from a model object.

index_view()

List view

Initialize search. If data provider does not support search, init_search will return False.

is_accessible()[source]

Override this method to add permission checks.

Flask-Admin does not make any assumptions about the authentication system used in your application, so it is up to you to implement it.

By default, it will allow access for everyone.

static sampling(selection, offset=0, limit=None)[source]
netdef.Engines.webadmin.SettingsModel.setup(admin)[source]

SourcesModel

class netdef.Engines.webadmin.SourcesModel.SourcesModelForm(formdata=None, obj=None, prefix='', data=None, meta=None, **kwargs)[source]

Bases: wtforms.form.Form

key = <UnboundField(StringField, ('key',), {'render_kw': {'readonly': True}})>
process(*args, **kwargs)[source]

Take form, object data, and keyword arg input and have the fields process them.

Parameters:
  • formdata – Used to pass data coming from the enduser, usually request.POST or equivalent.
  • obj – If formdata is empty or not provided, this object is checked for attributes matching form field names, which will be used for field values.
  • data – If provided, must be a dictionary of data. This is only used if formdata is empty or not provided and obj does not contain an attribute named the same as the field.
  • **kwargs – If formdata is empty or not provided and obj does not contain an attribute named the same as a field, form will assign the value of a matching keyword argument to the field, if one exists.
set_origin = <UnboundField(StringField, ('set_origin',), {'render_kw': {'readonly': True}})>
set_source_time = <UnboundField(StringField, ('set_source_time',), {'render_kw': {'readonly': True}})>
set_status_code = <UnboundField(StringField, ('set_status_code',), {'render_kw': {'readonly': True}})>
set_value = <UnboundField(StringField, ('set_value',), {'render_kw': {'readonly': True}})>
source = <UnboundField(StringField, ('source',), {'render_kw': {'readonly': True}})>
source_datatype = <UnboundField(StringField, ('source_datatype',), {'render_kw': {'readonly': True}})>
class netdef.Engines.webadmin.SourcesModel.SourcesModelView(model, name=None, category=None, endpoint=None, url=None, static_folder=None, menu_class_name=None, menu_icon_type=None, menu_icon_value=None)[source]

Bases: netdef.Engines.webadmin.MyBaseView.MyBaseView, flask_admin.model.base.BaseModelView

action_view()

Mass-model action view.

ajax_lookup()
ajax_update()

Edits a single column of a record in list view.

can_create = False
can_delete = False
can_edit = True
can_view_details = True
column_details_list = ('key', 'rule', 'source', 'controller', 'value_as_string', 'status_code', 'source_time', 'source_datatype', 'get_value', 'get_source_time', 'get_status_code', 'get_origin', 'set_value', 'set_source_time', 'set_status_code', 'set_origin')
column_list = ('key', 'rule', 'source', 'controller', 'value_as_string', 'status_code', 'source_time')
column_searchable_list = ('key', 'rule', 'source', 'controller', 'value')
column_sortable_list = ()
create_view()

Create model view

delete_view()

Delete model view. Only POST method is allowed.

details_view()

Details model view

edit_view()

Edit model view

export(export_type)
form

alias of SourcesModelForm

get_list(page, sort_field, sort_desc, search, filters, page_size=None)[source]

Return a paginated and sorted list of models from the data source.

Must be implemented in the child class.

Parameters:
  • page – Page number, 0 based. Can be set to None if it is first page.
  • sort_field – Sort column name or None.
  • sort_desc – If set to True, sorting is in descending order.
  • search – Search query
  • filters – List of filter tuples. First value in a tuple is a search index, second value is a search value.
  • page_size – Number of results. Defaults to ModelView’s page_size. Can be overriden to change the page_size limit. Removing the page_size limit requires setting page_size to 0 or False.
get_one(ref)[source]

Return one model by its id.

Must be implemented in the child class.

Parameters:id – Model id
get_pk_value(model_)[source]

Return PK value from a model object.

index_view()

List view

Initialize search. If data provider does not support search, init_search will return False.

is_accessible()[source]

Override this method to add permission checks.

Flask-Admin does not make any assumptions about the authentication system used in your application, so it is up to you to implement it.

By default, it will allow access for everyone.

static sampling(selection, offset=0, limit=None)[source]
update_model(form, model)[source]

Update model from the form.

Returns True if operation succeeded.

Must be implemented in the child class.

Parameters:
  • form – Form instance
  • model – Model instance
netdef.Engines.webadmin.SourcesModel.setup(admin)[source]

StatisticsModel

class netdef.Engines.webadmin.StatisticsModel.StatisticsModel(key, value)[source]

Bases: object

key
value
class netdef.Engines.webadmin.StatisticsModel.StatisticsModelForm(formdata=None, obj=None, prefix='', data=None, meta=None, **kwargs)[source]

Bases: wtforms.form.Form

key = <UnboundField(StringField, ('key',), {})>
value = <UnboundField(StringField, ('value',), {})>
class netdef.Engines.webadmin.StatisticsModel.StatisticsModelView(model, name=None, category=None, endpoint=None, url=None, static_folder=None, menu_class_name=None, menu_icon_type=None, menu_icon_value=None)[source]

Bases: netdef.Engines.webadmin.SourcesModel.SourcesModelView

action_view()

Mass-model action view.

ajax_lookup()
ajax_update()

Edits a single column of a record in list view.

can_create = False
can_delete = False
can_edit = False
can_view_details = False
column_list = ('key', 'value')
column_searchable_list = 'key'
column_sortable_list = ()
create_view()

Create model view

delete_view()

Delete model view. Only POST method is allowed.

details_view()

Details model view

edit_view()

Edit model view

export(export_type)
form

alias of StatisticsModelForm

get_list(page, sort_field, sort_desc, search, filters, page_size=None)[source]

Return a paginated and sorted list of models from the data source.

Must be implemented in the child class.

Parameters:
  • page – Page number, 0 based. Can be set to None if it is first page.
  • sort_field – Sort column name or None.
  • sort_desc – If set to True, sorting is in descending order.
  • search – Search query
  • filters – List of filter tuples. First value in a tuple is a search index, second value is a search value.
  • page_size – Number of results. Defaults to ModelView’s page_size. Can be overriden to change the page_size limit. Removing the page_size limit requires setting page_size to 0 or False.
get_pk_value(model_)[source]

Return PK value from a model object.

index_view()

List view

netdef.Engines.webadmin.StatisticsModel.setup(admin)[source]

Tools

class netdef.Engines.webadmin.Tools.Tools(name=None, category=None, endpoint=None, url=None, static_folder=None, static_url_path=None, menu_class_name=None, menu_icon_type=None, menu_icon_value=None)[source]

Bases: netdef.Engines.webadmin.MyBaseView.MyBaseView

autoupgrade()[source]
autoupgrade_upgrade()[source]
echo()[source]
index()[source]
is_accessible()[source]

Override this method to add permission checks.

Flask-Admin does not make any assumptions about the authentication system used in your application, so it is up to you to implement it.

By default, it will allow access for everyone.

logfile()[source]
netdef.Engines.webadmin.Tools.get_update_cmd(executable, no_index, pre, force_reinstall, find_links, trusted_host, minimal_timeout, package)[source]
netdef.Engines.webadmin.Tools.setup(admin, view=None)[source]
netdef.Engines.webadmin.Tools.stdout_from_terminal(*command, err_msg=None)[source]
netdef.Engines.webadmin.Tools.stdout_from_terminal_as_generator(*command, err_msg=None, pre='', post='')[source]

SecurityWebadminView

class netdef.Engines.webadmin.SecurityWebadminView.BasicSecurityForm(formdata=None, obj=None, prefix='', data=None, meta=None, **kwargs)[source]

Bases: netdef.Engines.webadmin.SecurityWebadminView.SecurityForm

new_flask_secret = <UnboundField(SelectField, ('Renew session cookie',), {'choices': [('yes', 'Yes'), ('no', 'No')]})>
old_password = None
validate_password = None
class netdef.Engines.webadmin.SecurityWebadminView.SecurityForm(formdata=None, obj=None, prefix='', data=None, meta=None, **kwargs)[source]

Bases: wtforms.form.Form

confirm = <UnboundField(PasswordField, ('Repeat Password',), {})>
login = <UnboundField(StringField, ('Login', [<wtforms.validators.Required object>]), {'default': functools.partial(<bound method RawConfigParser.get of <configparser.ConfigParser object>>, 'webadmin', 'users.admin.user', fallback='admin')})>
new_flask_secret = <UnboundField(SelectField, ('Renew session cookie',), {'choices': [('no', 'No'), ('yes', 'Yes')]})>
old_password = <UnboundField(PasswordField, ('Current password',), {})>
password = <UnboundField(PasswordField, ('New Password',), {})>
ssl_certificate = <UnboundField(SelectField, ('SSL Certificate',), {'default': functools.partial(<bound method RawConfigParser.get of <configparser.ConfigParser object>>, 'webadmin', 'ssl_certificate', fallback=''), 'choices': []})>
ssl_certificate_key = <UnboundField(SelectField, ('SSL Key',), {'default': functools.partial(<bound method RawConfigParser.get of <configparser.ConfigParser object>>, 'webadmin', 'ssl_certificate_key', fallback=''), 'choices': []})>
ssl_on = <UnboundField(SelectField, ('HTTPS On',), {'default': functools.partial(<bound method RawConfigParser.get of <configparser.ConfigParser object>>, 'webadmin', 'ssl_on', fallback='0'), 'choices': [('0', 'Off'), ('1', 'On')]})>
update_on = <UnboundField(SelectField, ('Package upgrade',), {'default': functools.partial(<bound method RawConfigParser.get of <configparser.ConfigParser object>>, 'auto_update', 'on', fallback=0), 'choices': [('0', 'Disable'), ('1', 'Enable')]})>
update_pre_release = <UnboundField(SelectField, ('Accept pre-releases',), {'default': functools.partial(<bound method RawConfigParser.get of <configparser.ConfigParser object>>, 'auto_update', 'pre_release', fallback=0), 'choices': [('0', 'No'), ('1', 'Yes')]})>
static validate_old_password(form, field)[source]
static validate_password(form, field)[source]
class netdef.Engines.webadmin.SecurityWebadminView.SecurityWebadminView(name=None, category=None, endpoint=None, url=None, static_folder=None, static_url_path=None, menu_class_name=None, menu_icon_type=None, menu_icon_value=None)[source]

Bases: netdef.Engines.webadmin.MyBaseView.MyBaseView

choices_crts = [('', 'None')]
choices_keys = [('', 'None')]
index()[source]
is_accessible()[source]

Override this method to add permission checks.

Flask-Admin does not make any assumptions about the authentication system used in your application, so it is up to you to implement it.

By default, it will allow access for everyone.

setup_conf_secrets_and_https(webadmin_conf, form)[source]
setup_conf_userdata(webadmin_conf, form)[source]
setup_form_defaults(form)[source]
update_usertable(form)[source]
usertable_is_empty()[source]
netdef.Engines.webadmin.SecurityWebadminView.setup(admin, view=None)[source]

SecurityCertificatesView

class netdef.Engines.webadmin.SecurityCertificatesView.SecurityCertificatesForm(formdata=None, obj=None, prefix='', data=None, meta=None, **kwargs)[source]

Bases: wtforms.form.Form

basicConstraints = <UnboundField(StringField, ('basicConstraints',), {'default': 'CA:TRUE', 'validators': [<wtforms.validators.Regexp object>]})>
cn = <UnboundField(StringField, ('Common name',), {'default': 'build-14281606-project-477358-netdef', 'validators': [<wtforms.validators.Regexp object>], 'render_kw': {'placeholder': 'Hostname, DNS, IP-address or leave it blank'}})>
current_password = <UnboundField(PasswordField, ('Current password',), {})>
days = <UnboundField(IntegerField, ('Days valid',), {'default': 7300})>
dns_1 = <UnboundField(StringField, ('DNS.1',), {'default': 'build-14281606-project-477358-netdef', 'validators': [<wtforms.validators.Regexp object>]})>
dns_2 = <UnboundField(StringField, ('DNS.2',), {'default': '', 'validators': [<wtforms.validators.Regexp object>]})>
dns_3 = <UnboundField(StringField, ('DNS.3',), {'default': '', 'validators': [<wtforms.validators.Regexp object>]})>
extendedKeyUsage = <UnboundField(StringField, ('extendedKeyUsage',), {'default': 'critical, serverAuth', 'validators': [<wtforms.validators.Regexp object>]})>
form_opts = <flask_admin.form.FormOpts object>
gen_opcua = <UnboundField(SelectField, ('OpcUa certificate',), {'default': '1', 'choices': [('0', 'No change'), ('1', 'Generate new')], 'description': 'Following files will be overwritten:<ul><li>ssl/certs/opcua_certificate.pem</li><li>ssl/private/opcua_certificate.pem.key</li><li>ssl/certs/opcua_certificate.der</li><li>ssl/private/opcua_certificate.der.key</li></ul>'})>
gen_webadmin = <UnboundField(SelectField, ('Webadmin certificate',), {'default': '1', 'choices': [('0', 'No change'), ('1', 'Generate new')], 'description': 'Following files will be overwritten:<ul><li>ssl/certs/webadmin_certificate.pem</li><li>ssl/private/webadmin_certificate.pem.key</li><li>ssl/certs/webadmin_certificate.der</li><li>ssl/private/webadmin_certificate.der.key</li></ul>'})>
ip_1 = <UnboundField(StringField, ('IP.1',), {'default': '127.0.0.1', 'validators': [<wtforms.validators.Optional object>, <wtforms.validators.IPAddress object>]})>
ip_2 = <UnboundField(StringField, ('IP.2',), {'default': '172.17.0.2', 'validators': [<wtforms.validators.Optional object>, <wtforms.validators.IPAddress object>]})>
ip_3 = <UnboundField(StringField, ('IP.3',), {'default': '', 'validators': [<wtforms.validators.Optional object>, <wtforms.validators.IPAddress object>]})>
ip_4 = <UnboundField(StringField, ('IP.4',), {'default': '', 'validators': [<wtforms.validators.Optional object>, <wtforms.validators.IPAddress object>]})>
ip_5 = <UnboundField(StringField, ('IP.5',), {'default': '', 'validators': [<wtforms.validators.Optional object>, <wtforms.validators.IPAddress object>]})>
keyUsage = <UnboundField(StringField, ('keyUsage',), {'default': 'critical, cRLSign, digitalSignature, keyCertSign', 'validators': [<wtforms.validators.Regexp object>]})>
subjectAltName = <UnboundField(HiddenField, ('subjectAltName:',), {})>
uri_1 = <UnboundField(StringField, ('URI.1',), {'default': <function get_uri>, 'validators': [<wtforms.validators.Regexp object>]})>
uri_2 = <UnboundField(StringField, ('URI.2',), {'default': '', 'validators': [<wtforms.validators.Regexp object>]})>
uri_3 = <UnboundField(StringField, ('URI.3',), {'default': '', 'validators': [<wtforms.validators.Regexp object>]})>
static validate_current_password(form, field)[source]
class netdef.Engines.webadmin.SecurityCertificatesView.SecurityCertificatesView(name=None, category=None, endpoint=None, url=None, static_folder=None, static_url_path=None, menu_class_name=None, menu_icon_type=None, menu_icon_value=None)[source]

Bases: netdef.Engines.webadmin.MyBaseView.MyBaseView

index()[source]
is_accessible()[source]

Override this method to add permission checks.

Flask-Admin does not make any assumptions about the authentication system used in your application, so it is up to you to implement it.

By default, it will allow access for everyone.

netdef.Engines.webadmin.SecurityCertificatesView.get_uri()[source]
netdef.Engines.webadmin.SecurityCertificatesView.setup(admin, view=None)[source]

Views

class netdef.Engines.webadmin.Views.Views(shared=None)[source]

Bases: object

A collection of all loaded webadmin views

add_shared_object(shared)[source]
load(base_packages)[source]
setup(admin)[source]
netdef.Engines.webadmin.Views.register(name)[source]

A decorator to register webadmin views. Example:

from netdef.Engines.webadmin import Views

@Views.register("NewView")
def setup(admin, view=None):
    if not view:
        view = NewView(name='NewView', endpoint='newview')
    admin.add_view(view)
    ...

netdef.Interfaces package

Abstract base

Default interface

class netdef.Interfaces.DefaultInterface.DefaultInterface(value)[source]

Bases: object

Abstract base class

Internal classes

Datamessage

class netdef.Interfaces.datamessage.DataDefinition(key, default, datatype, access, extension)[source]

Bases: netdef.Interfaces.datamessage.datamessage.AbstractBase

access
datatype
default
extension
classmethod from_uri(uri)[source]
static is_uri(uri)[source]
key
class netdef.Interfaces.datamessage.DataMessage(key, value, source_time, status_code, origin, extension)[source]

Bases: netdef.Interfaces.datamessage.datamessage.AbstractBase

extension
classmethod from_uri(uri)[source]
static is_uri(uri)[source]
key
origin
source_time
status_code
value

Tick

class netdef.Interfaces.internal.tick.Tick(controller)[source]

Bases: object

tick()[source]
timediff()[source]

Built-in Interfaces

BytestringInterface

class netdef.Interfaces.BytestringInterface.ByteStringInterface(value)[source]

Bases: netdef.Interfaces.DefaultInterface.DefaultInterface

CommTestInterface

class netdef.Interfaces.CommTestInterface.CommTestInterface(value)[source]

Bases: netdef.Interfaces.DefaultInterface.DefaultInterface

available
delay
class netdef.Interfaces.CommTestInterface.Value(value)[source]

Bases: object

available
delay

ConcurrentWebRequestInterface

class netdef.Interfaces.ConcurrentWebRequestInterface.ConcurrentWebRequestInterface(value)[source]

Bases: netdef.Interfaces.DefaultInterface.DefaultInterface

available
data
delay
class netdef.Interfaces.ConcurrentWebRequestInterface.Value(value)[source]

Bases: object

available
data
delay

FloatInterface

class netdef.Interfaces.FloatInterface.FloatInterface(value)[source]

Bases: netdef.Interfaces.DefaultInterface.DefaultInterface

InfluxDBLoggerInterface

class netdef.Interfaces.InfluxDBLoggerInterface.InfluxDBLoggerInterface(value)[source]

Bases: netdef.Interfaces.DefaultInterface.DefaultInterface

class netdef.Interfaces.InfluxDBLoggerInterface.Value(key, source, rule, controller, value, source_time, status_code)

Bases: tuple

controller

Alias for field number 3

key

Alias for field number 0

rule

Alias for field number 2

source

Alias for field number 1

source_time

Alias for field number 5

status_code

Alias for field number 6

value

Alias for field number 4

IntegerInterface

class netdef.Interfaces.IntegerInterface.IntegerInterface(value)[source]

Bases: netdef.Interfaces.DefaultInterface.DefaultInterface

Interface that facilitates bit manipulation in an integer

bit(offset)[source]

returns True or False

bits(*offsets)[source]

Returns True or False List

clearbit(offset)[source]

Changes bit in value to False. No return value.

clearbits(*offsets)[source]

Changes bits in value to False. No return value.

setbit(offset, bit=True)[source]

Changing bit in value to True. Can also change to False if bit = False Does not return any value.

setbits(*offsets, bit=True)[source]

Changing bits in value to True. Can also change to False if bit = False Does not return any value.

StringInterface

class netdef.Interfaces.StringInterface.StringInterface(value)[source]

Bases: netdef.Interfaces.DefaultInterface.DefaultInterface

UnitOfValueInterface

class netdef.Interfaces.UnitOfValueInterface.ByteUnitInterface(value)[source]

Bases: netdef.Interfaces.DefaultInterface.DefaultInterface

get_value_and_unit()[source]
class netdef.Interfaces.UnitOfValueInterface.NoUnitInterface(value)[source]

Bases: netdef.Interfaces.DefaultInterface.DefaultInterface

get_value_and_unit()[source]
class netdef.Interfaces.UnitOfValueInterface.PercentUnitInterface(value)[source]

Bases: netdef.Interfaces.DefaultInterface.DefaultInterface

get_value_and_unit()[source]
netdef.Interfaces.UnitOfValueInterface.bytes2human(n)[source]

netdef.Rules package

Rules

Rules

class netdef.Rules.Rules.Rules(shared=None)[source]

Bases: object

add_shared_object(shared)[source]
init()[source]
load(base_packages)[source]
netdef.Rules.Rules.register(name, classref=None)[source]

A decorator to register rules. Example:

from netdef.Rules import BaseRule, Rules

@Rules.register("NewRuleTemplate")
class NewRuleTemplate(BaseRule.BaseRule):
    def __init__(self, name, shared):
        ...

Can also be called as a normal function:

from netdef.Rules import BaseRule, Rules

def setup(shared):
    Rules.register("NewRuleTemplate", NewRuleTemplate)

class NewRuleTemplate(BaseRule.BaseRule):
    def __init__(self, name, shared):
        ...
Parameters:
  • name (str) – Name of the rule class
  • classref (object) – Should be None if used as a decorator and a class if called as a function
Returns:

A callable that returns a class if used as a decorator and a class if called as a normal function

utils

netdef.Rules.utils.get_module_from_string(mod_str, package, abs_root, location_name, mod_name)[source]
netdef.Rules.utils.import_file(abs_pyfile, location_name, mod_name)[source]
netdef.Rules.utils.load_entrypoint(entrypoint, package=None)[source]

Abstract base

This is an abstract baseclass

BaseRule

class netdef.Rules.BaseRule.BaseRule(name, shared)[source]

Bases: object

Abstract class for rules.

Parameters:
  • name (str) – Name to be used in logfiles
  • shared (netdef.Shared.Shared) – a reference to the shared object
add_class_to_controller(source_name, controller_name=None)[source]

Sends ADD_PARSER to controls. Controllers will use static functions defined in these classes to decode / encode values etc.

Parameters:
  • source_name (str) – source name as string
  • controller_name (str) – controller name as string
add_instance_to_controller(item_instance)[source]

Send ADD_SOURCE to controller of given source.

Parameters:item_instance (netdef.Sources.BaseSource) – source instance
add_interrupt(interrupt)[source]

Setup the interrupt signal

add_new_expression(expr_info)[source]

This function does too many things:

  1. Updates shared.expressions.instances (indirectly via self.maintain_searches)
  2. Associate the sources with expressions as arguments
  3. Finds sources and sends them to controllers with ADD_SOURCE message
add_new_parser(source_name, controller_name=None)[source]

It is not always easy for a controller to understand what kind data that a source regards as value. Some controllers do not even know which source to update with data.

Therefore the source classes has static functions that the controller can use to find out these things.

Use this function to add a source class to a controller as a parser.

Parameters:
  • source_name (str) – source as string
  • controller_name (str) – controller as string
convert_to_instance(item_name, source_name, controller_name, rule_name, defaultvalue)[source]

Uses the source name to find the actual source class. Make a instance off the given source class, returns the instance

Parameters:
  • item_name (str) – item as string
  • source_name (str) – source as string
  • controller_name (str) – controller as string
  • rule_name (str) – rule as string
  • defaultvalue – could be anything.
Returns:

instance of source

get_existing_instance(source_instance)[source]
get_expressions(instance)[source]

Returns all expression that is associated with the given instance

Returns:list or None
static get_module_from_string(mod_str, package=None, abs_root=None, location_name=None, mod_name=None)[source]
get_ticks()[source]
handle_run_expression(incoming, value, source_time, status_code)[source]
has_existing_instance(source_instance)[source]

Returns True if the source we are working on already exists. This is important, because we do not want more than one source instance for each value…

has_interrupt()[source]

Returns True if the interrupt signal is received

init_queue()[source]

Setup the message queue and timeout

loop_incoming()[source]

Get every message from the queue and dispatch the associated handler function

maintain_searches(source_instance, expression)[source]

Keeps shared.expressions.instances updated

process_ticks()[source]
rule_name_from_key(key, default_rule_name)[source]

Check if rule name is valid.

Parameters:
  • key (str) – the source key
  • default_rule_name (str) – rule name to use if not found by given key
Returns:

rule name

Return type:

str

Raises:

ValueError – if rule does not exists

run()[source]

Override this function in rule. Example:

def run(self):
    self.logger.info("Running")

    while not self.has_interrupt():
        self.loop_incoming() # dispatch handle_* functions

    self.logger.info("Stopped")
send_expressions_to_engine(item_instance, expressions, value, source_time, status_code)[source]

Send RUN_EXPRESSION to the engine

Parameters:
  • item_instance – the source instance that triggered the expressions
  • expressions (list) – list of expressions
send_ticks()[source]
setup()[source]

Implement the following:

  1. Open and read a configuration file
  2. Create SourceInfo for the sources found in config
  3. Create instance of expression found in config
  4. Create source instances based on data in SourceInfo
  5. Link source instances to expression.
  6. Send ADD_SOURCE and ADD_PARSER to controllers
setup_done()[source]

Update useful statistics

setup_ticks()[source]
sleep(seconds)[source]

” Sleep by waiting for the interrupt. Should be used instead of time.sleep. Override if sleep should be interrupted by even more signals

source_and_controller_from_key(key, controller=None)[source]

Check if controller name is valid. Returns a valid (key, controller) tuple

Parameters:
  • key (str) – the source key
  • controller (str) – controller name to use if not found by given key
Returns:

tuple of key and controller

Return type:

tuple

Raises:

ValueError – if controller does not exists

update_statistics(namespace, error_count, expression_count, source_count)[source]

Write useful info to Statistics-singleton

class netdef.Rules.BaseRule.ExpressionInfo(module, arguments, func='expression', setup='setup')[source]

Bases: object

This is a data class that describes an expression. The rule shall create an expression based on this description

arguments
func
module
setup
class netdef.Rules.BaseRule.SourceInfo(typename, key, controller=None, defaultvalue=None, setup='setup')[source]

Bases: object

This is a data class that describes a source. The rule shall create a source instance based on this description

controller
defaultvalue
get_setup_func(instance)[source]
key
setup
typename

Built-in rule modules

CSVRule

class netdef.Rules.CSVRule.CSVRule(name, shared)[source]

Bases: netdef.Rules.BaseRule.BaseRule

Tip

Development Status :: 5 - Production/Stable

handle_run_expression(incoming, value, source_time, status_code)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

setup()[source]

Parse config files

setup_csv_rule(name)[source]

Parse CSV file.

InfluxDBLoggerRule

class netdef.Rules.InfluxDBLoggerRule.InfluxDBLoggerRule(name, shared)[source]

Bases: netdef.Rules.BaseRule.BaseRule

handle_run_expression(incoming, value, source_time, status_code)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal. Calls setup_auto_logging() once at startup

setup()[source]
setup_auto_logging()[source]

Autogenerate logging expressions and sources for every source that is already created by other rules

INIRule

class netdef.Rules.INIRule.INIRule(name, shared)[source]

Bases: netdef.Rules.BaseRule.BaseRule

Caution

Development Status :: 4 - Beta

handle_run_expression(incoming, value, source_time, status_code)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

setup()[source]

Parse config files

setup_ini_rule(name, rel_inifile)[source]

parse given ini-file

YAMLRule

class netdef.Rules.YAMLRule.YAMLRule(name, shared)[source]

Bases: netdef.Rules.BaseRule.BaseRule

Danger

Development Status :: 3 - Alpha

handle_run_expression(incoming, value, source_time, status_code)[source]
run()[source]

Main loop. Will exit when receiving interrupt signal

setup()[source]

Parse config files

setup_yaml_rule(name, rel_yamlfile)[source]

parse given yaml-file

netdef.Shared package

Internal

class netdef.Shared.Internal.Statistics[source]

Bases: object

A singleton class to store statistics as key-value pair. Can be turned off for performance or security.

Can be imported from Rules, Controllers and Expressions.

Example:

import psutil
from netdef.Shared.Internal import Statistics
from netdef.Sources.SystemMonitorSource import bytes2human

if Statistics.on:
    uss = psutil.Process().memory_full_info().uss
    Statistics.set("process.memory.startup", bytes2human(uss))
static get(key)[source]
static get_dict()[source]
on = True
static set(key, value)[source]
statistics = {}

Shared

class netdef.Shared.Shared.Shared(identifier, install_path, proj_path, default_config_string)[source]

Bases: object

Shared memory for the application. This is the class of the shared instance that is passed to all controllers, rules, engines and expressions. You will use this class to read configs, get message queues etc.

Parameters:
  • identifier (str) – a unique identifier for this app.
  • install_path (str) – Full filepath to application package location
  • proj_path (str) – Full filepath to project location
  • default_config_string (str) – initial config text for SharedConfig.Config

SharedConfig

class netdef.Shared.SharedConfig.Config(identifier, install_path, proj_path, default_config_string, read_from_files=True)[source]

Bases: object

A wrapper class for the configparser module in standard python library.

Parameters:
  • identifier (str) – a unique identifier for this app.
  • install_path (str) – Full filepath to application package location
  • proj_path (str) – Full filepath to project location
  • default_config_string (str) – initial config text for configparser
add_section(section)[source]
config(section, key, defaultvalue=None, add_if_not_exists=True)[source]
get_dict(section)[source]
get_full_list()[source]
is_hidden_value(section, key)[source]
read_default(config_path)[source]
set_config(section, key, value)[source]
set_hidden_value(section, key)[source]
verify(proj_path, config_path)[source]

SharedExpressions

class netdef.Shared.SharedExpressions.ExpressionInstances[source]

Bases: object

add_expression(item)[source]
add_expression_in_source_ref(ref, expression)[source]
get_expressions_by_source_ref(ref)[source]
has_expression_in_source_ref(ref, expression)[source]
has_source_ref(ref)[source]
class netdef.Shared.SharedExpressions.SharedExpressions[source]

Bases: object

instances = <netdef.Shared.SharedExpressions.ExpressionInstances object>

SharedQueues

class netdef.Shared.SharedQueues.MessageType[source]

Bases: enum.Enum

An enumeration.

ADD_PARSER = 6

Instruct the controller to use the given source class as a parser

ADD_SOURCE = 2

Instruct the controller to update the given source’s value from external datasource

APP_STATE = 9

Inform the controller of application state

READ_ALL = 1

warning: Not implemented yet

READ_SOURCE = 3

warning: Not implemented yet

REMOVE_SOURCE = 7

warning: Not implemented yet

RUN_EXPRESSION = 5

Instruct the rule or engine to execute the given expression’s function

TICK = 8

Instruct the controller to send a reply

WRITE_SOURCE = 4

Instruct the controller to update external datasource from the given source’s value

class netdef.Shared.SharedQueues.AppStateType[source]

Bases: enum.Enum

An enumeration.

RUNNING = 2
SETUP = 1
class netdef.Shared.SharedQueues.SharedQueues(maxsize=0)[source]

Bases: object

Message queues for all controllers, rules and the engine

add_controller(name)[source]

Create a incoming queue for given controller’

add_rule(name)[source]

Create a incoming queue for given rule’

get_messages_to_controller(name)[source]

Returns the incoming queue for given controller

get_messages_to_engine()[source]

Returns the incoming queue for the engine

get_messages_to_rule(name)[source]

Returns the incoming queue for given rule

run_expressions_in_engine(source_instance, expressions, value, source_time, status_code)[source]

Send a RUN_EXPRESSION message to the engine.

Parameters:
  • source_instance – the source that triggered given expressions
  • expressions (list) – list of expressions
run_expressions_in_rule(source_instance)[source]

Send a RUN_EXPRESSION message to given rule.

Parameters:source_instance – the source
send_message_to_controller(messagetype, controllername, message_object)[source]

Send a message to given controller

Parameters:
  • messagetype (self.MessageType) –
  • controllername (str) –
  • message_object – usually a source instance. can also be a tuple.
send_message_to_engine(messagetype, message_object)[source]

Send a message to the engine

Parameters:
  • messagetype (self.MessageType) – probably MessageType.RUN_EXPRESSION
  • message_object – usually a source instance.
send_message_to_rule(messagetype, rule_name, message_object)[source]

Send a message to given rule

Parameters:
  • messagetype (self.MessageType) –
  • rule_name (str) –
  • message_object – usually a source instance.
send_running_state_to_controller(controllername)[source]

Send a APP_STATE message to given controller

Parameters:controllername – the controller
send_setup_state_to_controller(controllername)[source]

Send a APP_STATE message to given controller

Parameters:controllername – the controller
write_value_to_controller(source_instance, value, source_time)[source]

Send a WRITE_SOURCE message to given controller

Parameters:
  • source_instance – the source
  • value – new value. datatype have to match the given source
  • source_time (datetime.datetime) – timestamp in utc

SharedSources

class netdef.Shared.SharedSources.SharedSources[source]

Bases: object

classes contain a dict (classes.items) with uninitiated sources classes. (key is name from config, value is class) Used by rules when parsing config files and finding the right source.

instances contains a list of all sources (instances.items) instances created by the rules.

classes = <netdef.Shared.SharedSources.SourceClasses object>
instances = <netdef.Shared.SharedSources.SourceInstances object>
class netdef.Shared.SharedSources.SourceClasses[source]

Bases: object

add_item(source_name, classobj)[source]
get_item(name)[source]
init_items(items)[source]
class netdef.Shared.SharedSources.SourceInstances[source]

Bases: object

add_item(item)[source]
get_item_by_ref(ref)[source]
has_item_ref(ref)[source]

netdef.Sources package

Sources

class netdef.Sources.Sources.Sources(shared=None)[source]

Bases: object

add_shared_object(shared)[source]
init()[source]
load(base_packages)[source]
netdef.Sources.Sources.register(name, classref=None)[source]

A decorator to register sources. Example:

from netdef.Sources import BaseSource, Sources

@Sources.register("NewSourceTemplate")
class NewSourceTemplate(BaseSource.BaseSource):
    def __init__(self, name, shared):
        ...

Can also be called as a normal function:

from netdef.Sources import BaseSource, Sources

def setup(shared):
    Sources.register("NewSourceTemplate", NewSourceTemplate)

class NewSourceTemplate(BaseSource.BaseSource):
    def __init__(self, name, shared):
        ...
Parameters:
  • name (str) – Name of the source class
  • classref (object) – Should be None if used as a decorator and a class if called as a function
Returns:

A callable that returns a class if used as a decorator and a class if called as a normal function

Abstract base

BaseSource

class netdef.Sources.BaseSource.BaseSource(key=None, value=None, controller=None, source=None, rule=None)[source]

Bases: object

can_set_value_from_string()[source]

Returns True if the value can be converted from string to its given datatype. Only builtins.int, str and float have built-in support, but aditional types can be implemented by this function and set_value_from_string

static can_unpack_subitems(value)[source]

Function that confirms / decides on input data a known list. If so, then unpack_subitems can be used afterwards.

Example:

def parse_response(self, response):
    for parser in self.get_parsers():
        if parser.can_unpack_subitems(response):
            yield from parser.unpack_subitems(response)
static can_unpack_value(value)[source]

Function that confirms / determines if the input data is compatible with this class. If so, unpack_value should be used afterwards.

Example:

def parse_item(self, item):
    for parser in self.get_parsers():
        if parser.can_unpack_value(item):
            key, source_time, value = parser.unpack_value(item)
            self.send_datachange(key, source_time, value)
copy_get_value()[source]

Shallow copy of the value

copy_value()[source]

Shallow copy of the value

get

Get the value that is updated by the controller

get_reference()[source]

Used to identify similar sources. if two instances return the same reference this means that one instance is redundant and can be replaced

pack_add_source()[source]

Used if source must be added to external system. I.e. a subscription. Can be overridden and customized.

static pack_subitems(value)[source]

Creates output that can be used to query for a list of inputs

pack_value(value)[source]

Function that converts key and values into a format that the source uses. Can be overridden and adapted to the controller it is to be used in.

Example:

def handle_write_source(self, incoming, value, source_time):
    data = incoming.pack_value(value, source_time)
    topic, payload = incoming.make_message(incoming.key, data)
    self.publish_data_item(topic, payload)
register_set_callback(set_callback)[source]

Register the callback that sends WRITE_SOURCE message to the controller queue.

set

Get the value that is updated by expressions

set_value_from_string(value, stime=None, status_ok=True, origin='')[source]

Converts given value to correct datatype and sends a WRITE_SOURCE message to controller.

This function is called when a value change is triggered from Webadmin ‣ Sources ‣ Edit

Parameters:
  • value – value to be set
  • or datetime.datetime) stime ((None) – timestamp when the value was changed
  • status_ok (bool) – True if value is good
  • origin (str) – who set the value
static unpack_subitems(value)[source]

Function that parses response from source and yield items found in value. This can be overridden and adapted to the controller it is to be used in.

Example:

def parse_response(self, response):
    for parser in self.get_parsers():
        if parser.can_unpack_subitems(response):
            yield from parser.unpack_subitems(response)
static unpack_value(key, source_time, value)[source]

Function that parses response from source and returns following tuple: (key, source_time, value) Key can then be used to find the right instance and update values.

Can be overridden and adapted to the controller it is to be used in.

Returns:tuple(key, source_time, value)
Return type:tuple

Example:

def parse_item(self, item):
    for parser in self.get_parsers():
        if parser.can_unpack_value(item):
            key, source_time, value = parser.unpack_value(item)
            self.send_datachange(key, source_time, value)
value_as_string

Is primarily used by web interfaces to display value in table. Can be overridden to limit the display of large data. Example:

@property
def value_as_string(self):
    if self.value and isinstance(self.value, bytes):
        n = len(self.value)
        return "<{}...><data len:{}>".format(self.value[:10], n)
    else:
        return super().value_as_string
class netdef.Sources.BaseSource.StatusCode[source]

Bases: enum.Enum

Used to indicate the quality of a value in BaseSource.status_code

NONE: Value is not set yet. INITIAL: First value. you might have to update cashes with this value at application startup. GOOD: A normal value update. INVALID: A value update where the value is not to be trusted.

GOOD = 2
INITIAL = 1
INVALID = 3
NONE = 0

Built-in Interfaces

BytestringSource

class netdef.Sources.BytestringSource.BytestringSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

value_as_string

byte data as string

CommTestSource

class netdef.Sources.CommTestSource.CommTestSource(*args, **kwargs)[source]

Bases: netdef.Sources.FloatSource.FloatSource

unpack_host_and_port()[source]

ConcurrentWebRequestSource

class netdef.Sources.ConcurrentWebRequestSource.ConcurrentWebRequestSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

DEFAULT_CLIENT_SESSION_TIMEOUT = 2
add_client_session(session)[source]
build_url(url)[source]
get_basic_auth()[source]
get_client_session()[source]
get_client_session_timeout()[source]
get_commands_list()[source]
get_connect_request()[source]
get_poll_request()[source]
get_poll_request_interval()[source]
get_start_url()[source]
has_basic_auth()[source]
has_connect_request()[source]
has_poll_request()[source]
parse_url(url)[source]
class netdef.Sources.ConcurrentWebRequestSource.Request(method, url, params=None, data=None)[source]

Bases: object

data
method
params
url
class netdef.Sources.ConcurrentWebRequestSource.Result(result)[source]

Bases: object

result
netdef.Sources.ConcurrentWebRequestSource.setup(shared)[source]

CrontabSource

class netdef.Sources.CrontabSource.CrontabSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

DictSource

class netdef.Sources.DictSource.DictSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

FloatSource

class netdef.Sources.FloatSource.FloatSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

HoldingRegisterSource

class netdef.Sources.HoldingRegisterSource.HoldingRegisterSource(*args, **kwargs)[source]

Bases: netdef.Sources.IntegerSource.IntegerSource

static pack_unit_and_address(unit, address)[source]
unpack_unit_and_address()[source]

InfluxDBLoggerSource

class netdef.Sources.InfluxDBLoggerSource.InfluxDBLoggerSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

A dataholder class to be used with InfluxDBLoggerController

get_points(data, source_time, status_code)[source]

Returns a list suitable as argument for InfluxDBClient.write_points()

Parameters:
Returns:

a list of dicts

static make_points(interface, measurement, value, source_time, status_code)[source]

Make a list suitable as argument for InfluxDBClient.write_points()

Parameters:
  • interface (BaseSource, InfluxDBLoggerInterface) – an object with key, rule, source an controller attrs
  • measurement (str) – influxdb measurement name
  • value – measurement field.value
  • source_time (datetime.datetime) – measurement time
  • status_code (BaseSource.StatusCode) – measurement field.status_code
Returns:

a list of dicts

unpack_measurement()[source]

Returns self.key. Override to change measurement name.

IntegerSource

class netdef.Sources.IntegerSource.IntegerSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

InternalSource

class netdef.Sources.InternalSource.InternalSource(*args, **kwargs)[source]

Bases: netdef.Sources.DictSource.DictSource

static can_unpack_value(value)[source]

Function that confirms / determines if the input data is compatible with this class. If so, unpack_value should be used afterwards.

Example:

def parse_item(self, item):
    for parser in self.get_parsers():
        if parser.can_unpack_value(item):
            key, source_time, value = parser.unpack_value(item)
            self.send_datachange(key, source_time, value)
pack_value(value)[source]

Function that converts key and values into a format that the source uses. Can be overridden and adapted to the controller it is to be used in.

Example:

def handle_write_source(self, incoming, value, source_time):
    data = incoming.pack_value(value, source_time)
    topic, payload = incoming.make_message(incoming.key, data)
    self.publish_data_item(topic, payload)
static unpack_value(value)[source]

Function that parses response from source and returns following tuple: (key, source_time, value) Key can then be used to find the right instance and update values.

Can be overridden and adapted to the controller it is to be used in.

Returns:tuple(key, source_time, value)
Return type:tuple

Example:

def parse_item(self, item):
    for parser in self.get_parsers():
        if parser.can_unpack_value(item):
            key, source_time, value = parser.unpack_value(item)
            self.send_datachange(key, source_time, value)

MQTTDataMessageSource

class netdef.Sources.MQTTDataMessageSource.MQTTDataMessageSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

static can_unpack_value(value)[source]

Check if it is possible to extract a value from the payload

static make_message(topic, datamessage)[source]

Wraps given datamessage into a json-payload

Parameters:
  • topic (str) – mqtt topic
  • datamessage (DataMessage) – a datamessage object
Returns:

tuple of topic and json payload

Return type:

tuple

pack_value(value, stime, status_code, origin)[source]

pack the value and stime into a mqtt payload

static parse_message(topic, payload)[source]

Parse given json-payload into a datamessage object

Parameters:
  • topic (str) – mqtt topic
  • payload (str) – json payload
Returns:

a DataMessage object

Return type:

DataMessage

static unpack_value(value)[source]

Return a tuple with key, time and value from the mqtt payload :param DataMessage value: datamessage from mqtt payload :returns: tuple(key, source_time, value, origin) :rtype: tuple

SubprocessSource

class netdef.Sources.SubprocessSource.SubprocessSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

DEFAULT_INTERVAL = 10
static can_unpack_subitems(value)[source]

Returns False, cannot unpack subitems

get_command_and_args(args=None)[source]

Get command and argument to run

get_poll_interval()[source]
has_initial_poll()[source]
has_poll_interval()[source]
parse_stdout_response(value)[source]

Implement parsing function

static unpack_subitems(value)[source]

Yields None, cannot unpack subitems

netdef.Sources.SubprocessSource.setup(shared)[source]

SystemMonitorSource

class netdef.Sources.SystemMonitorSource.SystemMonitorByteSource(*args, **kwargs)[source]

Bases: netdef.Sources.SystemMonitorSource.SystemMonitorSource

static get_interface()[source]
class netdef.Sources.SystemMonitorSource.SystemMonitorPercentSource(*args, **kwargs)[source]

Bases: netdef.Sources.SystemMonitorSource.SystemMonitorSource

static get_interface()[source]
class netdef.Sources.SystemMonitorSource.SystemMonitorSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

static get_interface()[source]
get_value_and_unit()[source]
value_as_string

TextSource

class netdef.Sources.TextSource.TextSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

VariantSource

class netdef.Sources.VariantSource.VariantSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

XmlRpcMethodCallSource

class netdef.Sources.XmlRpcMethodCallSource.XmlRpcMethodCallSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

static can_unpack_subitems(value)[source]

Returns False, cannot unpack subitems

make_rpc_request(value)[source]
parse_rpc_response(value)[source]
poll_request()[source]
static unpack_subitems(value)[source]

Yields None, cannot unpack subitems

ZmqDataAccessSource

class netdef.Sources.ZmqDataAccessSource.ZmqDataAccessSource(*args, **kwargs)[source]

Bases: netdef.Sources.BaseSource.BaseSource

pack_address(addr)[source]
unpack_address()[source]

Indices and tables