from netdef.Sources import BaseSource, Sources
from netdef.Interfaces.DefaultInterface import DefaultInterface
[docs]def setup(shared):
SubprocessSource.DEFAULT_INTERVAL = shared.config.config(
"SubprocessSource",
"default_poll_interval",
SubprocessSource.DEFAULT_INTERVAL,
)
[docs]@Sources.register("SubprocessSource")
class SubprocessSource(BaseSource.BaseSource):
DEFAULT_INTERVAL = 10
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.interface = DefaultInterface
[docs] def get_command_and_args(self, args=None):
"Get command and argument to run"
if args:
return self.key + " " + args
else:
return self.key
[docs] def parse_stdout_response(self, value):
"Implement parsing function"
return value
[docs] def has_initial_poll(self):
return self.has_poll_interval()
[docs] def has_poll_interval(self):
return True
[docs] def get_poll_interval(self):
return self.DEFAULT_INTERVAL
[docs] @staticmethod
def can_unpack_subitems(value):
"Returns False, cannot unpack subitems"
return False
[docs] @staticmethod
def unpack_subitems(value):
"Yields None, cannot unpack subitems"
yield None