import datetime
import logging
import socket
import xmlrpc.client
from ..Sources.BaseSource import StatusCode
# import my supported sources
from ..Sources.XmlRpcMethodCallSource import XmlRpcMethodCallSource
from . import BaseController, Controllers
[docs]@Controllers.register("XmlRpcController")
class XmlRpcController(BaseController.BaseController):
"""
.. tip:: Development Status :: 5 - Production/Stable
Sequence diagram:
.. seqdiag::
seqdiag app{
activation = none;
default_note_color = LemonChiffon;
span_height = 12;
edge_length = 200;
Queue [color=LemonChiffon];
Controller [label=XmlRpcController,color=LemonChiffon];
External [label=xmlrpc.client,color=LemonChiffon];
=== Initialization ===
Queue -> Controller [label="APP_STATE, SETUP"]
=== Setup ===
Queue -> Controller [label="ADD_PARSER, class [n]"]
Queue -> Controller [label="ADD_SOURCE, source [n]"]
Queue -> Controller [label="ADD_SOURCE, source [i]"]
Queue -> Controller [label="APP_STATE, RUNNING"]
=== Running ===
=== Begin polling loop ===
Controller -> External [
label="Rpc call, request [n]",
leftnote="For source [n]"
]
Controller <- External [
label="Rpc call, response [n]",
leftnote="
Update value of
source [n]"
]
Controller -> Controller [
label="parse subitems",
leftnote="
Unpack subitems
into source [i]"
]
Controller -> Queue [
label="RUN_EXPRESSION, source [i]",
note="
Value change
in source [i]"
]
Controller -> Queue [
label="RUN_EXPRESSION, source [n]",
note="
Value change
in source [n]"
]
... ...
Queue -> Controller [
label="
WRITE_SOURCE,
source [n], value, timestamp",
note="
Update value of
source [n]"
]
Controller -> External [label="Rpc call, request [n]"]
Controller <- External [label="Rpc call, response [n]"]
=== End Loop ===
}
"""
def __init__(self, name, shared):
super().__init__(name, shared)
self.logger = logging.getLogger(self.name)
self.logger.info("init")
self.send_events = self.shared.config.config(self.name, "send_events", 1)
self.endpoint_url = self.shared.config.config(self.name, "endpoint_url", "")
self.poll_interval = self.shared.config.config(self.name, "poll_interval", 5)
self.timeout = self.shared.config.config(self.name, "timeout", 5)
self.disable = self.shared.config.config(self.name, "disable", 0)
self.endpoint = None
if self.endpoint_url:
socket.setdefaulttimeout(self.timeout)
self.endpoint = xmlrpc.client.ServerProxy(self.endpoint_url)
[docs] def run(self):
"Main loop. Will exit when receiving interrupt signal"
self.logger.info("Running")
while not self.has_interrupt():
if self.disable: # disble: tøm køen og loop
self.fetch_one_incoming()
continue
self.loop_incoming(
until_empty=False, until_timeout=self.poll_interval
) # dispatch handle_* functions
if not self.has_interrupt():
self.loop_outgoing() # dispatch poll_* functions
self.logger.info("Stopped")
[docs] def handle_readall(self, incoming):
raise NotImplementedError
[docs] def handle_add_source(self, incoming):
self.logger.debug("'Add source' event for %s", incoming.key)
self.add_source(incoming.key, incoming)
if not self.send_events:
incoming.status_code = StatusCode.GOOD
incoming.get = {}
[docs] def handle_read_source(self, incoming):
raise NotImplementedError
[docs] def handle_write_source(self, incoming, value, source_time):
self.logger.info("'Write source' event to %s. value: %s", incoming.key, value)
if self.endpoint:
if isinstance(incoming, XmlRpcMethodCallSource):
incoming.get = self.rpc_call(incoming, value)
incoming.source_time = source_time
if self.send_events:
if incoming.status_code == StatusCode.NONE:
incoming.status_code = StatusCode.INITIAL
else:
incoming.status_code = StatusCode.GOOD
self.send_outgoing(incoming)
else:
incoming.status_code = StatusCode.GOOD
else:
self.logger.error(
"'Write source' class %s not supported", type(incoming)
)
[docs] def poll_outgoing_item(self, item):
if self.endpoint:
if isinstance(item, XmlRpcMethodCallSource):
request = item.poll_request()
response = self.rpc_call(item, request)
for sub_item in self.parse_response(response):
self.parse_item(sub_item)
item.get = response
item.source_time = datetime.datetime.utcnow()
item.status_code = StatusCode.GOOD
self.send_outgoing(item)
[docs] def rpc_call(self, item, value):
try:
method, arguments = item.make_rpc_request(value)
result = getattr(self.endpoint, method)(*arguments)
return item.parse_rpc_response(result)
except Exception as error:
self.logger.error("%s: %s", item.get_reference(), error)
return None
[docs] def parse_response(self, response):
for parser in self.get_parsers():
if parser.can_unpack_subitems(response):
yield from parser.unpack_subitems(response)
[docs] def parse_item(self, item):
for parser in self.get_parsers():
if parser.can_unpack_value(item):
key, source_time, value = parser.unpack_value(item)
self.send_datachange(key, source_time, value)
[docs] def send_datachange(self, key, source_time, value):
if self.has_source(key):
if not source_time:
source_time = datetime.datetime.utcnow()
source_instance = self.get_source(key)
source_instance.get = value
source_instance.source_time = source_time
source_instance.status_code = StatusCode.GOOD
self.send_outgoing(source_instance)