Source code for netdef.Controllers.RESTJsonController

import datetime
import http.client
import json
import logging
import time
import urllib.error
import urllib.parse
import urllib.request

from netdef.Controllers import BaseController, Controllers
from netdef.Sources.BaseSource import StatusCode

# import base64


log = logging.getLogger(__name__)
log.debug("Loading module")


[docs]@Controllers.register("RESTJsonController") class RESTJsonController(BaseController.BaseController): """ .. tip:: Development Status :: 5 - Production/Stable """ def __init__(self, name, shared): super().__init__(name, shared) self.logger = logging.getLogger(name) self.logger.info("init") self.encoding = self.shared.config.config(self.name, "encoding", "utf-8") self.poll_url = self.shared.config.config(self.name, "poll_url", "") self.post_url = self.shared.config.config(self.name, "post_url", "") self.get_url = self.shared.config.config(self.name, "get_url", "") self.poll_interval = self.shared.config.config(self.name, "poll_interval", 1.0) self.connect_url = self.shared.config.config(self.name, "connect_url", "") self.retry = self.shared.config.config(self.name, "retry", 3) self.reconnect_timeout = self.shared.config.config( self.name, "reconnect_timeout", 20 ) self.urlerrors = 0 self.urlopen = urllib.request.urlopen # self.auth_header = None self.disable = self.shared.config.config(self.name, "disable", 0) authorization = self.shared.config.config(self.name, "authorization", "") if authorization == "basic": username = self.shared.config.config(self.name, "username", "") password = self.shared.config.config(self.name, "password", "") auth_handler = urllib.request.HTTPBasicAuthHandler() auth_handler.add_password( realm=None, uri=[self.poll_url, self.connect_url, self.post_url], user=username, passwd=password, ) self.urlopen = urllib.request.build_opener(auth_handler).open # credentials = ('%s:%s' % (username, password)) # encoded_credentials = base64.b64encode(credentials.encode('ascii')) # self.auth_header = ('Authorization', 'Basic %s' % encoded_credentials.decode("ascii"))
[docs] def run(self): "Main loop. Will exit when receiving interrupt signal" self.logger.info("Running") self.connect() while not self.has_interrupt(): if ( self.disable ): # to disable: empty queue by calling self.fetch_one_incoming self.fetch_one_incoming() else: self.loop_incoming() # dispatch handle_* functions self.loop_outgoing() # dispatch poll_* functions time.sleep(0.1) self.logger.info("Stopped")
[docs] def handle_readall(self, incoming): raise NotImplementedError
[docs] def handle_add_source(self, incoming): add_source_message = incoming.pack_add_source() if add_source_message: self._write(add_source_message) self.add_source(incoming.key, incoming)
[docs] def handle_read_source(self, incoming): raise NotImplementedError
[docs] def handle_write_source(self, incoming, value): data = incoming.pack_value(value) if data: self._write(data)
[docs] def connect(self): data = self._connect()
# TODO: behandle motatt data med StatusCode.INITIAL
[docs] def loop_outgoing(self): time.sleep(self.poll_interval) data = self._poll() if isinstance(data, dict): for tupleitem in data.values(): self.parse_item(tupleitem) elif isinstance(data, list): for item in data: self.parse_item(item) elif data: self.parse_item(data)
[docs] def parse_item(self, item): # self.logger.debug(item) for parser in self.get_parsers(): if parser.can_unpack_value(item): key, source_time, value = parser.unpack_value(item) self.send_datachange(key, source_time, value)
[docs] def send_datachange(self, key, source_time, value): if self.has_source(key): if not source_time: source_time = datetime.datetime.utcnow() source_instance = self.get_source(key) source_instance.get = value source_instance.source_time = source_time source_instance.status_code = StatusCode.GOOD self.send_outgoing(source_instance)
[docs] def urlerrorhandling(self): self.urlerrors += 1 if self.urlerrors >= self.retry: self.urlerrors = 0 self.logger.error( "Timeout error. Reconnect in %s sec.", self.reconnect_timeout ) time.sleep(self.reconnect_timeout)
def _write(self, dict_data): # data = urllib.parse.urlencode(dict_data) data = json.dumps(dict_data) data = data.encode("ascii") headers = {"Content-Type": "application/json"} url = self.post_url request = urllib.request.Request(url, data=data, headers=headers) try: with self.urlopen(request) as f: self.logger.debug(f.read().decode("utf-8")) except (http.client.RemoteDisconnected, urllib.error.URLError) as rem_err: self.logger.error("%s", rem_err) def _read(self, key): data = None try: with self.urlopen(self.get_url.format(key)) as f: data = f.read().decode("utf-8") data = json.loads(data) except (http.client.RemoteDisconnected, urllib.error.URLError) as rem_err: self.logger.error("%s: %s", key, rem_err) self.urlerrorhandling() finally: return data def _poll(self): data = None try: with self.urlopen(self.poll_url) as f: data = f.read().decode("utf-8") data = json.loads(data) except (http.client.RemoteDisconnected, urllib.error.URLError) as rem_err: self.logger.error("%s", rem_err) self.urlerrorhandling() finally: return data def _connect(self): data = None try: with self.urlopen(self.connect_url) as f: data = f.read().decode("utf-8") data = json.loads(data) except (http.client.RemoteDisconnected, urllib.error.URLError) as rem_err: self.logger.error("%s", rem_err) self.urlerrorhandling() finally: return data