import concurrent.futures
import logging
import time
import opcua
from ..Sources.BaseSource import StatusCode
from . import BaseController, Controllers
log = logging.getLogger(__name__)
log.debug("Loading module")
[docs]@Controllers.register("OPCUAClientController")
class OPCUAClientController(BaseController.BaseController):
"""
.. caution:: Development Status :: 4 - Beta
"""
def __init__(self, name, shared):
super().__init__(name, shared)
self.logger = logging.getLogger(name)
self.logger.info("init")
self.shared.config.set_hidden_value(self.name, "user")
self.shared.config.set_hidden_value(self.name, "password")
self.oldnew = self.config("oldnew_comparision", 0)
self.disable = self.config("disable", 0)
endpoint = self.config("endpoint", "")
certificate = self.config("certificate", "")
private_key = self.config("private_key", "")
connection_timeout = self.config("connection_timeout", 4)
username = self.config("user", "")
password = self.config("password", "")
name = self.config("name", "Pure Python Client")
description = self.config("description", name)
application_uri = self.config("application_uri", "urn:freeopcua:client")
product_uri = self.config("product_uri", "urn:freeopcua.github.io:client")
secure_channel_timeout = self.config(
"secure_channel_timeout", 3600000
) # 1 hour
session_timeout = self.config("session_timeout", 3600000) # 1 hour
self.security_string = "" # format: Policy,Mode,certificate,private_key
if self.config("basic128rsa15_sign_on", 0):
self.security_string = "Basic128Rsa15,Sign,{},{}".format(
certificate, private_key
)
elif self.config("basic128rsa15_signandencrypt_on", 0):
self.security_string = "Basic128Rsa15,SignAndEncrypt,{},{}".format(
certificate, private_key
)
elif self.config("basic256_sign_on", 0):
self.security_string = "Basic256,Sign,{},{}".format(
certificate, private_key
)
elif self.config("basic256_signandencrypt_on", 0):
self.security_string = "Basic256,SignAndEncrypt,{},{}".format(
certificate, private_key
)
elif self.config("basic256sha256_sign_on", 0):
self.security_string = "Basic256Sha256,Sign,{},{}".format(
certificate, private_key
)
elif self.config("basic256sha256_signandencrypt_on", 0):
self.security_string = "Basic256Sha256,SignAndEncrypt,{},{}".format(
certificate, private_key
)
elif self.config("nosecurity_on", 1):
self.security_string = ""
self.client = opcua.Client(endpoint, connection_timeout)
self.client.description = description
self.client.name = name
self.client.application_uri = application_uri
self.client.product_uri = product_uri
self.client.secure_channel_timeout = secure_channel_timeout
self.client.session_timeout = session_timeout
if username:
self.client.set_user(username)
self.client.set_password(password)
# use this to use x509 cert identification instead of username/password or anonymous
certificate_on = self.config("certificate_basic256sha256_on", 0)
if certificate_on:
self.client.load_client_certificate(certificate)
self.client.load_private_key(private_key)
self.subscription = None
[docs] def config(self, key, default):
return self.shared.config.config(self.name, key, default)
[docs] def run(self):
"Main loop. Will exit when receiving interrupt signal"
self.sleep(1)
reconnect = False
reconnect_timeout = 0
keepalive_timeout = self.config("keepalive_timeout", 600)
last_keepalive = time.time()
while not self.has_interrupt():
if (
self.disable
): # to disable: empty queue by calling self.fetch_one_incoming
self.fetch_one_incoming()
continue
self.sleep(reconnect_timeout)
reconnect_timeout = self.config("reconnect_timeout", 20)
try:
if reconnect:
self.safe_disconnect()
# TODO: sette alle verdier til StatusCode.NONE
if self.security_string:
self.client.set_security_string(self.security_string)
self.client.connect()
intervall = self.config("subscription_interval", 100)
handler = SubHandler(self)
self.subscription = self.client.create_subscription(intervall, handler)
for source_key in self.get_sources():
node_instance = self.client.get_node(source_key)
try:
self.subscription.subscribe_data_change(node_instance)
except opcua.ua.uaerrors.BadNodeIdUnknown as error:
self.logger.exception(error)
reconnect = True
last_keepalive = time.time()
self.logger.info("Running")
while not self.has_interrupt():
self.loop_incoming() # dispatch handle_* functions
if time.time() > (last_keepalive + keepalive_timeout):
# self.logger.debug("Sending keepalive")
self.client.send_hello()
last_keepalive = time.time()
except (ConnectionRefusedError, ConnectionError) as error:
self.logger.debug(error, exc_info=True)
self.logger.error(
"Connection error. Reconnect in %s sec.", reconnect_timeout
)
except (concurrent.futures.TimeoutError, OSError) as error:
self.logger.debug(error, exc_info=True)
self.logger.error(
"Timeout error. Reconnect in %s sec.", reconnect_timeout
)
except opcua.ua.uaerrors.UaStatusCodeError as error:
self.logger.debug(error, exc_info=True)
self.logger.error(
"UaStatusCodeError: %s. Reconnect in %s sec.",
error,
reconnect_timeout,
)
self.safe_disconnect()
self.logger.info("Stopped")
[docs] def safe_disconnect(self):
try:
if self.subscription:
self.subscription.delete()
except Exception as error:
self.logger.warning("Cannot delete subscription: %s", error)
for item in self.get_sources().values():
item.status_code = StatusCode.NONE
try:
self.client.disconnect()
except Exception as error:
self.logger.warning("Cannot disconnect client: %s", error)
[docs] def handle_add_source(self, incoming):
try:
# key should be of format: "ns=2;s=Channel1.Device1.Tag1"
node_instance = self.client.get_node(incoming.key)
self.subscription.subscribe_data_change(node_instance)
self.add_source(incoming.key, incoming)
except opcua.ua.uaerrors.BadNodeIdUnknown as error:
self.logger.error("%s: %s", incoming.key, error)
except opcua.ua.uaerrors.UaStringParsingError as error:
self.logger.exception(error)
# TODO: kanske lagre nodeid-instansene?
[docs] def handle_write_source(self, incoming, value, source_time):
if self.has_source(incoming.key):
node_instance = self.client.get_node(incoming.key)
# print(id(node_instance), source_time)
# TODO: kanske gjøre internt oppslag på nodeid-instansene, i stedet for å hente ny hver gang?
# TODO: hente datatype med v.get_data_type_as_variant_type()
node_instance.set_value(value)
else:
self.logger.error("Write error. Source %s not found", incoming.key)
[docs] def loop_outgoing(self):
for item in self.get_sources().values():
self.poll_outgoing_item(item)
[docs] def send_datachange(self, nodeid, value, stime, status_ok):
if self.has_source(nodeid):
item = self.get_source(nodeid)
if self.update_source_instance_value(
item, value, stime, status_ok, self.oldnew
):
self.send_outgoing(item)
[docs]class SubHandler(object):
"""
Client to subscription. It will receive events from server
"""
def __init__(self, parent):
self.parent = parent
[docs] def datachange_notification(self, node, value, data):
nodeid = node.nodeid.to_string()
item = data.monitored_item.Value
source_value = item.Value.Value
source_time = item.SourceTimestamp
source_status_ok = item.StatusCode.value == 0
# self.logger.debug("nodeid:%s, value:%s, time:%s, ok:%s", )
self.parent.send_datachange(nodeid, source_value, source_time, source_status_ok)
[docs] def event_notification(self, event):
self.parent.logger.info("opcua subscription: New event: %s", event)
[docs] def status_change_notification(self, status):
self.parent.logger.info("opcua subscription: New status: %s", status)