Source code for netdef.Controllers.OPCUAServerController

import datetime
import logging

import time
import werkzeug.security

from netdef.Controllers import BaseController, Controllers
from netdef.Sources.BaseSource import StatusCode
from netdef.Shared.Internal import Statistics
from opcua import Server, ua
from opcua.common import utils
from opcua.common.callback import CallbackType
from opcua.crypto import security_policies
from opcua.server.internal_server import InternalServer, InternalSession
from opcua.server.user_manager import UserManager


[docs]class CustomAnonInternalSession(InternalSession): "Custom InternalSession will set timestamp when missing"
[docs] def write(self, params): for writevalue in params.NodesToWrite: if writevalue.AttributeId == ua.AttributeIds.Value: datavalue = writevalue.Value if not datavalue.SourceTimestamp: datavalue.SourceTimestamp = datetime.datetime.utcnow() if not datavalue.ServerTimestamp: datavalue.ServerTimestamp = datavalue.SourceTimestamp return super().write(params)
[docs]class CustomInternalSession(CustomAnonInternalSession): "This custom InternalSession will block anonymous access"
[docs] def activate_session(self, params): id_token = params.UserIdentityToken if isinstance(id_token, ua.AnonymousIdentityToken): raise utils.ServiceError(ua.StatusCodes.BadUserAccessDenied) elif isinstance(id_token, ua.X509IdentityToken): raise utils.ServiceError(ua.StatusCodes.BadIdentityTokenRejected) return super().activate_session(params)
[docs]class CustomServer(Server): "Custom Server that enables Basic128Rsa15 and Basic256" def _setup_server_nodes(self): super()._setup_server_nodes() if self._security_policy != [ua.SecurityPolicyType.NoSecurity]: if not (self.certificate and self.private_key): return if ( ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt in self._security_policy ): self._set_endpoints( security_policies.SecurityPolicyBasic128Rsa15, ua.MessageSecurityMode.SignAndEncrypt, ) self._policies.append( ua.SecurityPolicyFactory( security_policies.SecurityPolicyBasic128Rsa15, ua.MessageSecurityMode.SignAndEncrypt, self.certificate, self.private_key, ) ) if ua.SecurityPolicyType.Basic128Rsa15_Sign in self._security_policy: self._set_endpoints( security_policies.SecurityPolicyBasic128Rsa15, ua.MessageSecurityMode.Sign, ) self._policies.append( ua.SecurityPolicyFactory( security_policies.SecurityPolicyBasic128Rsa15, ua.MessageSecurityMode.Sign, self.certificate, self.private_key, ) ) if ua.SecurityPolicyType.Basic256_SignAndEncrypt in self._security_policy: self._set_endpoints( security_policies.SecurityPolicyBasic256, ua.MessageSecurityMode.SignAndEncrypt, ) self._policies.append( ua.SecurityPolicyFactory( security_policies.SecurityPolicyBasic256, ua.MessageSecurityMode.SignAndEncrypt, self.certificate, self.private_key, ) ) if ua.SecurityPolicyType.Basic256_Sign in self._security_policy: self._set_endpoints( security_policies.SecurityPolicyBasic256, ua.MessageSecurityMode.Sign, ) self._policies.append( ua.SecurityPolicyFactory( security_policies.SecurityPolicyBasic256, ua.MessageSecurityMode.Sign, self.certificate, self.private_key, ) )
[docs]@Controllers.register("OPCUAServerController") class OPCUAServerController(BaseController.BaseController): """ .. tip:: Development Status :: 5 - Production/Stable This Controller will start a freeopcua server instance and will add a nodeid for all sources received in `ADD_SOURCE` messages. When a client writes a new value this event will be forwarded to the associated source and a `RUN_EXPRESSION` message will be sent. When a `WRITE_SOURCE` message is received the value for the associated source will be updated in the server and all connected clients will receive a value update Sequence diagram: .. seqdiag:: seqdiag app{ activation = none; default_note_color = LemonChiffon; span_height = 12; edge_length = 200; Queue [color=LemonChiffon]; Controller [label=OPCUAServerController,color=LemonChiffon]; External [label=FreeOpcUA,color=LemonChiffon]; === Initialization === Queue -> Controller [label="APP_STATE, SETUP"] === Setup === Queue -> Controller [label="ADD_SOURCE, source [n]"] Queue -> Controller [label="APP_STATE, RUNNING"] === Running === Controller -> External [label="subscribe nodeid [n]"] === Begin loop === Controller <- External [ label="Value change, nodeid [n]", leftnote=" Update value of source [n]" ] Controller -> Queue [ label="RUN_EXPRESSION, source [n]", note=" Value change in source [n]" ] ... ... Queue -> Controller [ label=" WRITE_SOURCE, source [n], value, timestamp", note=" Update value of source [n]" ] Controller -> External [ label="update value, nodeid [n]", note=" Value change in nodeid [n]" ] === End Loop === } """ def __init__(self, name, shared): super().__init__(name, shared) self.logger = logging.getLogger(self.name) self.logger.info("init") self.shared.config.set_hidden_value(self.name, "user") self.shared.config.set_hidden_value(self.name, "password") self.shared.config.set_hidden_value(self.name, "password_hash") def config(key, val): return self.shared.config.config(self.name, key, val) endpoint = config("endpoint", "no_endpoint") certificate = config("certificate", "") private_key = config("private_key", "") name = config("name", "FreeOpcUa Python Server") uri = config("uri", "http://examples.freeopcua.github.io") root_object_name = config("root_object_name", "TEST") separator = config("separator", ".") namespace = config("namespace", 2) auto_build_folders = config("auto_build_folders", 0) self.oldnew = config("oldnew_comparision", 0) self.debug_statistics = config("debug_statistics", 0) admin_username = config("user", "admin") admin_password = config("password", "admin") admin_password_hash = config("password_hash", "").replace("$$", "$") security_ids = [] anonymous_on = config("anonymous_on", 0) username_on = config("username_on", 1) certificate_on = config("certificate_basic256sha256_on", 0) if anonymous_on: security_ids.append("Anonymous") if username_on: security_ids.append("Username") if certificate_on: security_ids.append("Basic256Sha256") security_policy = [] if config("nosecurity_on", 1): security_policy.append(ua.SecurityPolicyType.NoSecurity) if config("basic128rsa15_sign_on", 0): security_policy.append(ua.SecurityPolicyType.Basic128Rsa15_Sign) if config("basic128rsa15_signandencrypt_on", 0): security_policy.append(ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt) if config("basic256_sign_on", 0): security_policy.append(ua.SecurityPolicyType.Basic256_Sign) if config("basic256_signandencrypt_on", 0): security_policy.append(ua.SecurityPolicyType.Basic256_SignAndEncrypt) if config("basic256sha256_sign_on", 1): security_policy.append(ua.SecurityPolicyType.Basic256Sha256_Sign) if config("basic256sha256_signandencrypt_on", 1): security_policy.append(ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt) initial_values_is_quality_good = config("initial_values_is_quality_good", 0) if anonymous_on: server = CustomServer( iserver=InternalServer(session_cls=CustomAnonInternalSession) ) else: server = CustomServer( iserver=InternalServer(session_cls=CustomInternalSession) ) server.iserver._parent = server server.set_application_uri(uri) server.name = name server.set_endpoint(endpoint) server.allow_remote_admin(False) if certificate and private_key: server.load_certificate(str(certificate)) server.load_private_key(str(private_key)) if security_ids: server.set_security_IDs(security_ids) if security_policy: server.set_security_policy(security_policy) def custom_user_manager(isession, userName, password): if userName != admin_username: return False if admin_password_hash: if werkzeug.security.check_password_hash(admin_password_hash, password): return True else: # fallback to plaintext if password == admin_password: return True return False if username_on: server.user_manager.set_user_manager(custom_user_manager) idx = server.register_namespace(uri) objects = server.get_objects_node() root = objects.add_object(idx, root_object_name) self.server = server self.objects = objects self.root = root self.sep = separator self.ns = namespace self.auto_build_folders = auto_build_folders self.items = [] self.subscription = None if initial_values_is_quality_good: self.initial_status_code = ua.StatusCodes.Good else: self.initial_status_code = ua.StatusCodes.BadWaitingForInitialData self.initial_timestamp = datetime.datetime.utcnow()
[docs] def run(self): "Main loop. Will exit when receiving interrupt signal" self.logger.info("Running") self.server.start() self.server.subscribe_server_callback( CallbackType.ItemSubscriptionCreated, self.create_monitored_items ) self.server.subscribe_server_callback( CallbackType.ItemSubscriptionModified, self.modify_monitored_items ) subhandler = SubHandler(self) self.subscription = self.server.create_subscription(100, subhandler) prev = time.time() while not self.has_interrupt(): self.loop_incoming() # dispatch handle_* functions if self.debug_statistics and Statistics.on: try: if time.time() > prev: prev = time.time() + 10 _report = "subs len:{}\n".format(len(self.server.iserver.subscription_service.subscriptions)) for k, s in self.server.iserver.subscription_service.subscriptions.items(): pi = s.data.RevisedPublishingInterval lc = s.data.RevisedLifetimeCount mkac = s.data.RevisedMaxKeepAliveCount _report += " {}: (noack len:{} ,pi:{},lc:{},mkac:{})\n".format(k, len(s._not_acknowledged_results), pi, lc, mkac) Statistics.set(self.name + ".debug.subscriptions", _report) # self.logger.info(self.name + ".debug.subscriptions: %s", _report) _report = "clients len:{}\n".format(len(self.server.bserver.clients)) for c in self.server.bserver.clients: s = c.processor.session q = len(c.processor._publishdata_queue) _report += " {}: (state:{}, pq:{})\n".format(s.name, s.state, q) Statistics.set(self.name + ".debug.clients", _report) # self.logger.info(self.name + ".debug.clients %s", _report) except Exception as err: self.logger.exception(err) self.server.stop() self.logger.info("Stopped")
[docs] def get_default_value(self, incoming): "Returns the default value of the source value" defaultvalue = incoming.interface(incoming.value).value return defaultvalue
[docs] def handle_add_source(self, incoming): "Add a source to the server" nodeid = self.get_nodeid(incoming) self.logger.debug("'Add source' event for nodeid: %s", nodeid) if self.has_source(nodeid): self.logger.error("source already exists %s", nodeid) return if self.auto_build_folders: parent = self.build_folders(self.root, nodeid, self.sep) else: parent = self.root defaultvalue = self.get_default_value(incoming) varianttype = self.get_varianttype(incoming) varnode = self.add_variablenode(parent, nodeid, defaultvalue, varianttype) if self.is_writable(incoming): varnode.set_writable() self.add_source(nodeid, (incoming, varnode)) if self.subscription: self.subscription.subscribe_data_change(varnode)
[docs] def handle_write_source(self, incoming, value, source_time): "Receive a value change from an expression and update the server" #self.logger.debug( # "'Write source' event to %s. value: %s at %s", # incoming.key, # value, # source_time, #) nodeid = self.get_nodeid(incoming) incoming, varnode = self.get_source(nodeid) varianttype = self.get_varianttype(incoming) # Check if datatype is compatible with varianttype if isinstance(varianttype, ua.VariantType): if (varianttype == ua.VariantType.String) and ( isinstance(value, str) or value is None ): pass # string can be None or str elif not isinstance(value, type(ua.get_default_value(varianttype))): self.logger.error( "%s: Value %s is not compatible with datatype %r", nodeid, incoming.value_as_string, varianttype, ) varianttype = None datavalue = self.create_datavalue(value, varianttype, ua.StatusCodes.Good, source_time) varnode.set_value(datavalue)
[docs] def add_folder(self, parent, foldername): "Add a folder in server" if not parent: parent = self.root return parent.add_folder(self.ns, foldername)
[docs] def build_folders(self, parent, ref, sep): nodeid = ua.NodeId.from_string(ref) folders = nodeid.Identifier.split(sep) ns = nodeid.NamespaceIndex for folder in folders[:-1]: try: has_folder = parent.get_child( ua.QualifiedName.from_string("{}:{}".format(ns, folder)) ) parent = has_folder except Exception as error: parent = parent.add_folder(ns, folder) return parent
[docs] def add_variablenode(self, parent, ref, val, varianttype): "Create and add a variable in server and return the variable node" self.logger.debug("ADDING %s AS %s" % (ref, varianttype)) if not parent: parent = self.root nodeid = ua.NodeId.from_string(ref) if self.auto_build_folders: bname = "%d:%s" % ( nodeid.NamespaceIndex, nodeid.Identifier.split(self.sep)[-1], ) else: bname = "%d:%s" % (nodeid.NamespaceIndex, nodeid.Identifier) datavalue = self.create_datavalue(val, varianttype, self.initial_status_code, self.initial_timestamp) var_node = parent.add_variable( nodeid=ref, bname=bname, val=val, varianttype=varianttype ) var_node.set_data_value(datavalue) return var_node
[docs] def create_datavalue(self, val, datatype, statuscode, timestamp): "Create a value for the server that keep the correct datatype" variant = ua.Variant(value=val, varianttype=datatype) status = ua.StatusCode(statuscode) datavalue = ua.DataValue(variant=variant, status=status) datavalue.SourceTimestamp = timestamp datavalue.ServerTimestamp = timestamp return datavalue
[docs] def get_varianttype(self, incoming): "Returns the varianttype from the source" if hasattr(incoming, "get_varianttype"): return getattr(incoming, "get_varianttype")() else: return None
[docs] def get_nodeid(self, incoming): "Returns the nodeid from the source" if hasattr(incoming, "get_nodeid"): return getattr(incoming, "get_nodeid")() else: return incoming.key
[docs] def is_writable(self, incoming): "Returns True if source is writable for the opcua client" if hasattr(incoming, "is_writable"): return True if getattr(incoming, "is_writable")() else False else: return True
[docs] def send_datachange(self, nodeid, value, stime, status_ok, ua_status_code): "Triggers a RUN_EXPRESSION message for given source" if self.has_source(nodeid): item, varnode = self.get_source(nodeid) if not status_ok: if item.status_code == StatusCode.NONE: if ua_status_code == self.initial_status_code: # we are actually good status_ok = True if self.update_source_instance_value( item, value, stime, status_ok, self.oldnew ): self.send_outgoing(item)
[docs] def modify_monitored_items(self, event, dispatcher): self.logger.info("modify_monitored_items")
[docs] def create_monitored_items(self, event, dispatcher): "write a warning to logfile if the client add a nodeid that does not exists" for idx in range(len(event.response_params)): if not event.response_params[idx].StatusCode.is_good(): nodeId = event.request_params.ItemsToCreate[idx].ItemToMonitor.NodeId # print (idx, nodeId.NamespaceIndex, nodeId.Identifier, nodeId.NamespaceUri, nodeId.NodeIdType) ident = nodeId.to_string() self.logger.warning("create_monitored_items: missing %s", ident)
[docs]class SubHandler: """ The subscription handler for the server. Will send value changes i server to the controller. """ def __init__(self, controller): self.controller = controller self.logger = self.controller.logger
[docs] def datachange_notification(self, node, val, data): nodeid = node.nodeid.to_string() item = data.monitored_item.Value source_value = item.Value.Value source_time = item.SourceTimestamp source_status_ok = item.StatusCode.value == 0 #self.logger.debug( # "nodeid:%s, value:%s, time:%s, ok:%s, uacode:%s", # nodeid, # source_value, # source_time, # source_status_ok, # item.StatusCode.value, #) self.controller.send_datachange( nodeid, source_value, source_time, source_status_ok, item.StatusCode.value )
[docs] def event_notification(self, event): self.logger.info("Python: New event %s", event)