Source code for netdef.Controllers.ModbusServerController

import datetime
import logging
import socket
import time

from pymodbus.datastore import (
    ModbusSequentialDataBlock,
    ModbusServerContext,
    ModbusSlaveContext,
)
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.server.sync import ModbusSocketFramer, ModbusTcpServer

from ..Shared.Internal import Statistics
from ..Sources.BaseSource import StatusCode
from ..Sources.HoldingRegisterSource import HoldingRegisterSource
from . import BaseController, Controllers


[docs]@Controllers.register("ModbusServerController") class ModbusServerController(BaseController.BaseController): """ .. tip:: Development Status :: 5 - Production/Stable Sequence diagram: .. seqdiag:: seqdiag app{ activation = none; default_note_color = LemonChiffon; span_height = 12; edge_length = 200; Queue [color=LemonChiffon]; Controller [label=ModbusServerController,color=LemonChiffon]; External [label="Pymodbus datablock",color=LemonChiffon]; === Initialization === Queue -> Controller [label="APP_STATE, SETUP"] === Setup === Queue -> Controller [label="ADD_SOURCE, source [n]"] Queue -> Controller [label="APP_STATE, RUNNING"] === Running === === Begin loop === Controller <- External [ label="Value change, register [n]", leftnote=" Update value of source [n]" ] Controller -> Queue [ label="RUN_EXPRESSION, source [n]", note=" Value change in source [n]" ] ... ... Queue -> Controller [ label=" WRITE_SOURCE, source [n], value, timestamp", note=" Update value of source [n]" ] Controller -> External [ label="update value, register [n]", note=" Value change in nodeid [n]" ] === End Loop === } """ def __init__(self, name, shared): super().__init__(name, shared) self.logger = logging.getLogger(name) self.logger.info("init")
[docs] def run(self): "Main loop. Will exit when receiving interrupt signal" config = self.shared.config.config self.send_events_internal = config(self.name, "send_events_on_internal", 0) self.send_events_external = config(self.name, "send_events_on_external", 1) self.oldnew = config(self.name, "oldnew_comparision", 1) self.daemon_threads = config(self.name, "daemon_threads", 0) self.context = self.get_modbus_server_context() framer = self.get_framer() self.readfunction = 0x03 # read holding registers self.writefunction = 0x10 identity = ModbusDeviceIdentification() identity.VendorName = config(self.name, "VendorName", "Pymodbus") identity.ProductCode = config(self.name, "ProductCode", "PM") identity.VendorUrl = config( self.name, "VendorUrl", "http://github.com/bashwork/pymodbus/" ) identity.ProductName = config(self.name, "ProductName", "Pymodbus Server") identity.ModelName = config(self.name, "ModelName", "Pymodbus Server") identity.MajorMinorRevision = config(self.name, "MajorMinorRevision", "1.0") host = config(self.name, "host", "0.0.0.0") port = config(self.name, "port", 5020) # når vi starter modbus sin serve_forever så blokkeres denne tråden # og vi får ikke kjørt loop_incoming, loop_outgoing # vi får heller ikke signalisert shutdown. # ved å overstyre noen funksjoner i serveren kan vi løse dette # dette er gjort i MyController self.server = self.init_server(self.context, framer, identity, host, port) if self.daemon_threads: self.server.daemon_threads = True self.logger.info("listen %s, port %s", host, port) self.logger.info("Running") self.server.serve_forever() self.logger.info("Closing connections") self.server.server_close() self.logger.info("Stopped")
[docs] def init_server(self, context, framer, identity, host, port): for i in range(20): if i > 0: time.sleep(10) try: return MyController( context, framer, identity, (host, port), allow_reuse_address=True, controller=self, ) break except OSError as error: self.logger.error("%s. Retry in 10 sec.", repr(error)) else: return MyController( context, framer, identity, (host, port), allow_reuse_address=True, controller=self, )
[docs] def get_modbus_server_context(self): """ Iter the devicelist section in config-file and builds a ModbusServerContext object :return: an ModbusServerContext instance """ config = self.shared.config.config device_dict = {} conf_device_list = config(self.name, "devicelist", self.name + "_devices") # self.shared.config.add_section(conf_device_list) devices = self.shared.config.get_dict(conf_device_list) for deviceconfig, deviceenabled in devices.items(): if int(deviceenabled): # 'di' - Discrete Inputs initializer # 'co' - Coils initializer # 'hr' - Holding Register initializer # 'ir' - Input Registers iniatializer device_id = config(deviceconfig, "device_id", 0) device_name = config(deviceconfig, "device_name", "").strip("\"'") di_start = config(deviceconfig, "di_start", 0) di_length = config(deviceconfig, "di_length", 100) di_init_value = config(deviceconfig, "di_init_value", 0) co_start = config(deviceconfig, "co_start", 0) co_length = config(deviceconfig, "co_length", 100) co_init_value = config(deviceconfig, "co_init_value", 0) hr_start = config(deviceconfig, "hr_start", 0) hr_length = config(deviceconfig, "hr_length", 100) hr_init_value = config(deviceconfig, "hr_init_value", 0) ir_start = config(deviceconfig, "ir_start", 0) ir_length = config(deviceconfig, "ir_length", 100) ir_init_value = config(deviceconfig, "ir_init_value", 0) store = MyContext( di=ModbusSequentialDataBlock(di_start, [di_init_value] * di_length), co=ModbusSequentialDataBlock(co_start, [co_init_value] * co_length), hr=ModbusSequentialDataBlock(hr_start, [hr_init_value] * hr_length), ir=ModbusSequentialDataBlock(ir_start, [ir_init_value] * ir_length), controller=self, device_id=device_id, device_name=device_name, ) device_dict[device_id] = store return ModbusServerContext(slaves=device_dict, single=False)
[docs] def get_framer(self): """ Returns the framer to be used. Override this function to return a custom framer """ return ModbusSocketFramer
[docs] def handle_add_source(self, incoming): self.logger.debug("'Add source' event for %s", incoming.key) incoming.get = 0 incoming.status_code = StatusCode.NONE self.add_source(incoming.key, incoming)
[docs] def handle_write_source(self, incoming, value, source_time): if isinstance(incoming, HoldingRegisterSource): unit, address = incoming.unpack_unit_and_address() self.context[unit].setValues(self.writefunction, address, [value], True) self.logger.debug( "'Write source' event to %s. value: %s at %s", incoming.key, value, source_time, )
[docs] def handle_datachange(self, unit, address, value, is_internal): name = HoldingRegisterSource.pack_unit_and_address(unit, address) if self.has_source(name): item = self.get_source(name) stime = datetime.datetime.utcnow() status_ok = True if self.update_source_instance_value( item, value, stime, status_ok, self.oldnew ): if is_internal and self.send_events_internal: self.send_outgoing(item) elif not is_internal and self.send_events_external: self.send_outgoing(item)
[docs]class MyController(ModbusTcpServer): daemon_threads = False def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.controller = kwargs["controller"]
[docs] def service_actions(self): if Statistics.on: Statistics.set(self.controller.name + ".clients.count", len(self.threads)) if self.controller.has_interrupt(): self._BaseServer__shutdown_request = True else: self.controller.loop_incoming() # dispatch handle_* functions
[docs]class MyContext(ModbusSlaveContext): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.controller = kwargs["controller"] self.device_id = kwargs["device_id"] self.device_name = kwargs["device_name"]
[docs] def setValues(self, fx, address, values, is_internal=False): super().setValues(fx, address, values) for i, value in enumerate(values): self.controller.handle_datachange( self.device_id, address + i, value, is_internal )