import queue
import time
from enum import Enum
from threading import Lock
[docs]class Mode(Enum):
"collector modes"
FIRST = 1
"Use arguments from the first call"
LAST = 2
"Use arguments from the last call"
LIST_ALL = 3
"Convert arguments to lists with every call"
FIRST_WITH_EVENT = 4
"Use arguments from the first call and an additional argument called event"
LAST_WITH_EVENT = 5
"Use arguments from the last call and an additional argument called event"
[docs]class Collector:
"""
Takes a function but does not call it right away. After the given wait
time has elapsed the function is called based on the given mode.
:param callable fn: a function or callable
:param float wait: seconds to wait
:param Mode mode: how to call the callable
"""
def __init__(self, fn, wait, mode):
self.mode = mode
self.fn = fn
self.wait = wait
self.buffer = queue.Queue()
self.lock = Lock()
if not self.mode in (
Mode.FIRST,
Mode.LAST,
Mode.LIST_ALL,
Mode.FIRST_WITH_EVENT,
Mode.LAST_WITH_EVENT,
):
raise NotImplementedError
[docs] def __call__(self, *args):
"""
Add arguments to a queue. Only the first call will acquire
:attr:`self.lock` and sleep until wait time has elapsed. After sleep
the arguments in queue is used to call the function :attr:`self.fn`
based on the chosen mode.
"""
_lock = self.lock.acquire(blocking=False)
self.buffer.put(args)
if _lock:
time.sleep(self.wait)
_args = []
while not self.buffer.empty():
_args.append(self.buffer.get_nowait())
self.lock.release()
if self.mode == Mode.FIRST:
self.fn(*_args[0])
elif self.mode == Mode.LAST:
self.fn(*_args[-1])
elif self.mode == Mode.LIST_ALL:
self.fn(*zip(*_args))
elif self.mode in (Mode.FIRST_WITH_EVENT, Mode.LAST_WITH_EVENT):
events = [src for arg in _args for src in arg if src.update or src.new]
if self.mode == Mode.FIRST_WITH_EVENT:
self.fn(*_args[0], events)
elif self.mode == Mode.LAST_WITH_EVENT:
self.fn(*_args[-1], events)
[docs]def collect(wait, mode):
"""
A decorator for expressions.
Usage::
from netdef.Engines.expression.Collector import collect, Mode
@collect(wait=0.1, mode=Mode.LIST_ALL)
def expression(c1, c2, c3):
pass
"""
def fn(func):
_fn = Collector(func, wait, mode)
return _fn
return fn