Source code for netdef.Controllers.ModbusClientController

import datetime
import logging

from pymodbus.client.sync import ModbusTcpClient as ModbusClient
from pymodbus.exceptions import ConnectionException, ModbusIOException

from ..Sources.BaseSource import StatusCode
from . import BaseController, Controllers

log = logging.getLogger(__name__)
log.debug("Loading module")


[docs]@Controllers.register("ModbusClientController") class ModbusClientController(BaseController.BaseController): """ .. tip:: Development Status :: 5 - Production/Stable Read and write holding registers of a modbus device. :param str name: The name is used i logfile and default.ini :param Shared shared: reference to the global shared instance Settings: .. code-block:: ini [ModbusClientController] # connection host = 127.0.0.1 port = 5020 # RUN_EXPRESSION is only sent if value has changed oldnew_comparision = 1 # cooldown on connection error og write error reconnect_timeout = 20 # Buffer or clear write requests recieved during cooldown clear_writes_on_disconnect = 1 # Polling interval poll_interval = 0.5 Sequence diagram: .. seqdiag:: seqdiag app{ activation = none; default_note_color = LemonChiffon; span_height = 12; edge_length = 200; Queue [color=LemonChiffon]; Controller [label=ModbusClientController,color=LemonChiffon]; External [label="Modbus registers",color=LemonChiffon]; === Initialization === Queue -> Controller [label="APP_STATE, SETUP"] === Setup === Queue -> Controller [label="ADD_SOURCE, source [n]"] Queue -> Controller [label="APP_STATE, RUNNING"] === Running === === Begin loop === Controller <- External [ label="Value change, register [n]", leftnote=" Update value of source [n]" ] Controller -> Queue [ label="RUN_EXPRESSION, source [n]", note=" Value change in source [n]" ] ... ... Queue -> Controller [ label=" WRITE_SOURCE, source [n], value, timestamp", note=" Update value of source [n]" ] Controller -> External [ label="update value, register [n]", note=" Value change in nodeid [n]" ] === End Loop === } """ def __init__(self, name, shared): super().__init__(name, shared) self.logger = logging.getLogger(name) self.logger.info("init") self.oldnew = self.shared.config.config(self.name, "oldnew_comparision", 1) self.clear_writes_on_disconnect = self.shared.config.config( self.name, "clear_writes_on_disconnect", 1 ) self.poll_interval = self.shared.config.config(self.name, "poll_interval", 0.5) host = self.shared.config.config(self.name, "host", "127.0.0.1") port = self.shared.config.config(self.name, "port", 5020) self.client = ModbusClient(host, port=port)
[docs] def run(self): "Main loop. Will exit when receiving interrupt signal" reconnect = False reconnect_timeout = 0 self.loop_until_app_state_running() while not self.has_interrupt(): self.sleep(reconnect_timeout) reconnect_timeout = self.shared.config.config( self.name, "reconnect_timeout", 20 ) try: if reconnect: self.safe_disconnect() if self.clear_writes_on_disconnect: self.clear_incoming() reconnect = True self.logger.info("Running") while not self.has_interrupt(): self.loop_incoming( until_empty=False, until_timeout=self.poll_interval ) # dispatch handle_* functions self.loop_outgoing() # dispatch poll_* functions funksjonene except ( ConnectionRefusedError, ConnectionError, ConnectionException, ) as error: self.logger.debug("Exception: %s", error) self.logger.error( "Connection error. Reconnect in %s sec.", reconnect_timeout ) self.safe_disconnect() for item in self.get_sources().values(): if self.update_source_instance_status( item, status_ok=False, oldnew_check=self.oldnew ): self.send_outgoing(item) self.safe_disconnect() self.logger.info("Stopped")
[docs] def safe_disconnect(self): """ Close the tcp socket if it is connected """ try: self.client.close() except Exception as error: self.logger.warning("Cannot disconnect client: %s", error)
[docs] def handle_add_source(self, incoming): """ Add given source instance to internal source list :param HoldingRegisterSource incoming: source instance """ self.add_source(incoming.key, incoming)
[docs] def handle_write_source(self, incoming, value, source_time): """ Write given value to the connected modbus device. :param HoldingRegisterSource incoming: source instance :param value: frozen value of instance :param datetime.datetime source_time: value timestamp """ if hasattr(incoming, "unpack_unit_and_address"): slave_unit, register = incoming.unpack_unit_and_address() try: write_result = self.client.write_register( register, value, unit=slave_unit ) if isinstance(write_result, ModbusIOException): raise ModbusIOException status_ok = write_result.function_code < 0x80 if not status_ok: self.logger.error( "Write error on modbus unit:%s register:%s value:%s", slave_unit, register, value, ) except ModbusIOException as write_error: self.logger.exception(write_error) self.logger.error( "Write error on modbus unit:%s register:%s value:%s time:%s", slave_unit, register, value, source_time, )
[docs] def poll_outgoing_item(self, item): """ Poll given source for its value in the modbus device :param HoldingRegisterSource item: source instance """ if hasattr(item, "unpack_unit_and_address"): slave_unit, register = item.unpack_unit_and_address() try: read_result = self.client.read_holding_registers( register, 1, unit=slave_unit ) if isinstance(read_result, ModbusIOException): raise ModbusIOException status_ok = read_result.function_code < 0x80 value = read_result.registers[0] stime = datetime.datetime.utcnow() if self.update_source_instance_value( item, value, stime, status_ok, self.oldnew ): self.send_outgoing(item) except ModbusIOException as error: self.logger.exception(error)