Source code for netdef.Controllers.ConcurrentWebRequestController

import asyncio
import datetime
import time
from itertools import repeat

import aiohttp
from netdef.Controllers import BaseAsyncController, Controllers
from netdef.Sources.BaseSource import StatusCode
from netdef.Sources.ConcurrentWebRequestSource import (
    ConcurrentWebRequestSource,
    Request,
    Result,
)

# this controller is in development, do not use it yet.


[docs]@Controllers.register("ConcurrentWebRequestController") class ConcurrentWebRequestController(BaseAsyncController.BaseAsyncController): """ .. danger:: Development Status :: 3 - Alpha Basically just a web scraper. Can scrape multiple web pages simultaneously. IO is handeled by this controller. The poll interval and program flow is implemented in `ConcurrentWebRequestSource` """ def __init__(self, name, shared): super().__init__(name, shared) self.logger.info("init") self.init_task_limit()
[docs] def init_task_limit(self): "Read configuration" self.max_iterations = self.shared.config.config( self.name, "max_iterations", 100 ) # this lock should limit the number of simultaneous tasks self.max_concurrent_tasks = self.shared.config.config( self.name, "max_concurrent_tasks", 1000 ) self.access_task = asyncio.Semaphore(self.max_concurrent_tasks, loop=self.loop)
[docs] def handle_add_source(self, incoming): "Add source to controller" self.logger.debug("'Add source' event for %s %s", incoming.key, incoming.source) if isinstance(incoming, ConcurrentWebRequestSource): self.add_source(incoming.key, incoming) else: self.logger.error("Source class not supported: %s", incoming.source)
[docs] def handle_write_source(self, incoming, value, source_time): "execute a command if given value is the name of a command" if value in incoming.get_commands_list(): asyncio.run_coroutine_threadsafe( self.proccess_task(incoming, value), loop=self.loop ) else: self.logger.error("command not available: %s", value)
[docs] async def loop_outgoing_until_interrupt(self): """ Main async loop. """ await self.enter_running_state.wait() if not self.has_interrupt(): # connect_request at startup sources = [ source for source in self.get_sources().values() if source.has_connect_request() ] tasks = tuple( (self.proccess_task(item, "get_connect_request") for item in sources) ) if tasks: await asyncio.gather(*tasks, loop=self.loop) interval_plan = NextInterval(time.time()) for source in self.get_sources().values(): if source.has_poll_request(): pri = source.get_poll_request_interval() if pri > 0: interval_plan.add(pri) while not self.has_interrupt(): if not interval_plan.has_interval(): return timeout, current_interval = interval_plan.next(time.time()) try: await asyncio.wait_for( self.interrupt_loop.wait(), timeout, loop=self.loop ) except asyncio.TimeoutError: pass if self.has_interrupt(): return sources = [ source for source in self.get_sources().values() if source.has_poll_request() ] sources = [ source for source in sources if source.get_poll_request_interval() == current_interval ] tasks = tuple( (self.proccess_task(item, "get_poll_request") for item in sources) ) if tasks: await asyncio.gather(*tasks, loop=self.loop)
[docs] def run(self): "Main sync loop" self.logger.info("Running") # self.incoming polling synchronously in own thread self.loop.run_in_executor(None, self.loop_incoming_until_interrupt) # async polling of sockets self.loop.run_until_complete(self.loop_outgoing_until_interrupt()) self.logger.info("Stopped")
[docs] def get_client_session(self, item): """ Returns a aiohttp session. Add new session to source if not found. Session will be initialized with basic auth and a default timeout """ session = item.get_client_session() if not session: if item.has_basic_auth(): user, passw = item.get_basic_auth() auth = aiohttp.BasicAuth(user, passw) else: auth = None timeout = aiohttp.ClientTimeout(total=item.get_client_session_timeout()) session = aiohttp.ClientSession( cookie_jar=aiohttp.CookieJar(unsafe=True), auth=auth, loop=self.loop, timeout=timeout, ) item.add_client_session(session) return session
[docs] async def proccess_web_request_item(self, item, method, session): "handle IO by interfacing with the sources data generator" available = False data = {} try: html_data = None for find_data in repeat(getattr(item, method)(), self.max_iterations): result = find_data.send(html_data) assert not result is None if isinstance(result, Request): response = await session.request( method=result.method, url=result.url, params=result.params, data=result.data, ) html_data = await response.text() await response.release() available = True assert isinstance(html_data, str) elif isinstance(result, Result): data = result.result else: raise ValueError( "Excpected Request or Result. Got %s" % type(result) ) except StopIteration: pass # except aiohttp.client_exceptions.ClientResponseError as error: # self.logger.error("error for %s: %s", item.key, error) except ( ValueError, ConnectionRefusedError, OSError, asyncio.TimeoutError, ) as error: self.logger.debug("error %s", error) except Exception as error: self.logger.error("%s: %s", item.get_reference(), error) return available, data
[docs] async def proccess_task(self, item, method): "Retrives data from web site and packs it into the source" if not isinstance(item, ConcurrentWebRequestSource): self.logger.error("source of type %s not implemented", type(item)) return elif not hasattr(item, method): self.logger.error("method %s missing in %s", method, type(item)) return session = self.get_client_session(item) time_begin = time.time() await self.access_task.acquire() available, data = await self.proccess_web_request_item(item, method, session) time_end = time.time() self.access_task.release() new_val = round(time_end - time_begin, 3), available, data stime = datetime.datetime.utcnow() status_ok = True cmp_oldnew = False if self.update_source_instance_value( item, new_val, stime, status_ok, cmp_oldnew ): self.send_outgoing(item)
[docs]class NextInterval: "Call next() to retrieve seconds to next interval, and which interval it is" __slots__ = ["spans", "start"] def __init__(self, timestamp): self.spans = [] self.start = timestamp
[docs] def has_interval(self): return True if self.spans else False
[docs] def add(self, interval): new_span = [self.start + interval, interval] if not new_span in self.spans: self.spans.append(new_span)
[docs] def next(self, now): okay_then = min(self.spans) when, what = okay_then # When? What?? okay then if when < now: # When? now? when = now # When? NOW!?? okay_then[0] = when + what # Okay then, when? what? return when - now, what # When? Now what?