Source code for netdef.Sources.SubprocessSource

from netdef.Sources import BaseSource, Sources
from netdef.Interfaces.DefaultInterface import DefaultInterface

[docs]def setup(shared): SubprocessSource.DEFAULT_INTERVAL = shared.config.config( "SubprocessSource", "default_poll_interval", SubprocessSource.DEFAULT_INTERVAL, )
[docs]@Sources.register("SubprocessSource") class SubprocessSource(BaseSource.BaseSource): DEFAULT_INTERVAL = 10 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.interface = DefaultInterface
[docs] def get_command_and_args(self, args=None): "Get command and argument to run" if args: return self.key + " " + args else: return self.key
[docs] def parse_stdout_response(self, value): "Implement parsing function" return value
[docs] def has_initial_poll(self): return self.has_poll_interval()
[docs] def has_poll_interval(self): return True
[docs] def get_poll_interval(self): return self.DEFAULT_INTERVAL
[docs] @staticmethod def can_unpack_subitems(value): "Returns False, cannot unpack subitems" return False
[docs] @staticmethod def unpack_subitems(value): "Yields None, cannot unpack subitems" yield None