Putting it all together¶
In this example we want to pass following commands to the subprocess module:
- echo hello
- ls -lah .
- ./simple_script.sh
- echo Don’t break the
We could hard code these commands in the controller but it is more flexible to create a source for each command. And we also want to read these commands from a config file so it will be easy to reuse, change or extend the commands.
To achieve this we just implement a method in the source that returns the command. the command can be extracted from the sources key:
first_app/Sources/CmdSource.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | from netdef.Sources import BaseSource, Sources
from netdef.Interfaces.DefaultInterface import DefaultInterface
@Sources.register("CmdSource")
class CmdSource(BaseSource.BaseSource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.interface = DefaultInterface
def get_command_and_args(self, args=None):
if args:
return self.key + " " + args
else:
return self.key
|
The controller can retrieve the command to run by calling
get_command_and_args
first_app/Controllers/CmdController.py
:
1 2 3 4 5 6 7 8 9 | import logging
import datetime
import subprocess
import shlex
from netdef.Controllers import BaseController, Controllers
from netdef.Sources.BaseSource import StatusCode
from ..Sources.CmdSource import CmdSource
|
We will use subprocess and shlex from standard library to execute commands. To keep it simple we can create a wrapper function that run a command and return the result from stdout. In case of error the function return the error as text instead. Also, charset decoding errors is replaced with “?”.
10 11 12 13 14 15 16 17 18 19 | def stdout_from_terminal(command_as_str, err_msg=None):
command_args = shlex.split(command_as_str)
try:
res = subprocess.run(command_args, stdout=subprocess.PIPE).stdout
return str(res, errors="replace")
except Exception as error:
if err_msg is None:
return str(error)
else:
return err_msg
|
We create an option value_as_args
to use the value from the source to be
added as an argument to the command. the option is read from config file.
20 21 22 23 24 25 26 | @Controllers.register("CmdController")
class CmdController(BaseController.BaseController):
def __init__(self, name, shared):
super().__init__(name, shared)
self.logger = logging.getLogger(self.name)
self.logger.info("init")
self.value_as_args = self.shared.config.config(self.name, "value_as_args", 1)
|
The run method will be very simple in this tutorial. Normally this is where we create a polling loop or setup subscriptions and await events. In this example we only wait for WRITE_SOURCE messages. So we only have to iterate the message queue:
27 28 29 30 31 32 | def run(self):
"Main loop. Will exit when receiving interrupt signal"
self.logger.info("Running")
while not self.has_interrupt():
self.loop_incoming() # dispatch handle_* functions
self.logger.info("Stopped")
|
The rule will always send the source instance at startup as a
ADD_SOURCE message. we have to receive the message and keep it
in our controller. We can use
netdef.Controllers.BaseController.BaseController.add_source
33 34 35 | def handle_add_source(self, incoming):
self.logger.debug("'Add source' event for %s", incoming.key)
self.add_source(incoming.key, incoming)
|
When an expression changes the value on one of our sources we will receive a WRITE_SOURCE message. We have to verify that the received source is in our source list and that we know how to handle it.
To check if it is one of ours we use
netdef.Controllers.BaseController.BaseController.has_source
To check if we know how to handle it we check if it is an instance of
the source we created CmdSource
.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | def handle_write_source(self, incoming, value, source_time):
self.logger.debug("'Write source' event to %s. value: %s at: %s", incoming.key, value, source_time)
if not self.has_source(incoming.key):
self.logger.error(
"%s not found",
incoming.key
)
return
if not isinstance(incoming, CmdSource):
self.logger.error(
"Got write event for %s, but only CmdSource is supported",
type(incoming)
)
return
|
We have verified that the source is an instance of CmdSource
.
Knowing this we can safely call CmdSource.get_command_and_args
to get the command.
51 52 53 54 55 56 57 58 59 | if self.value_as_args:
cmd_as_str = incoming.get_command_and_args(value)
else:
cmd_as_str = incoming.get_command_and_args()
new_val = stdout_from_terminal(cmd_as_str)
stime = datetime.datetime.utcnow()
status_ok = True # Why not
cmp_oldew = False # compare old and new value?
|
At last we create and send a RUN_EXPRESSION message using
netdef.Controllers.BaseController.BaseController.update_source_instance_value
and netdef.Controllers.BaseController.BaseController.send_outgoing
60 61 | if self.update_source_instance_value(incoming, new_val, stime, status_ok, cmp_oldew):
self.send_outgoing(incoming)
|
We now have to create the configfile and expression that is parsed by rule. The command list can be a simple text file:
config/command_rule.txt
:
1 2 3 4 | echo hello
ls -lah .
./simple_script.sh
echo Don\'t break the
|
The expression is a python file. The rule expect to find a function called expression()
config/command_rule.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | import logging
logger = logging.getLogger(__name__ + ":expression")
def expression(intern, cmd):
# triggers at startup
if intern.new:
if "hello" in cmd.key:
arg = "world"
elif "Don\\'t break the" in cmd.key:
arg = "circle"
else:
arg = ""
logger.info("{}: Send command arg: {}".format(cmd.key, arg))
cmd.set = arg
if cmd.new or cmd.update:
logger.info("{}: Result: {}".format(cmd.key, cmd.value))
|
Now we are ready to create the rule
first_app/Rules/FirstAppRule.py
:
1 2 3 4 5 6 7 | import logging
import pathlib
from netdef.Rules.utils import import_file
from netdef.Rules import BaseRule, Rules
SourceInfo = BaseRule.SourceInfo
ExpressionInfo = BaseRule.ExpressionInfo
|
We will look for the config file and expression file relative to the project folder.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | @Rules.register("FirstAppRule")
class FirstAppRule(BaseRule.BaseRule):
def __init__(self, name, shared):
super().__init__(name, shared)
self.logger = logging.getLogger(name)
self.logger.info("init")
self.proj_path = shared.config.config("proj", "path")
def read_list(self, rel_file):
full_file = pathlib.Path(self.proj_path).joinpath(rel_file)
lines = open(str(full_file), "r").readlines()
return [l.strip() for l in lines]
def import_py_file(self, rel_file):
full_file = pathlib.Path(self.proj_path).joinpath(rel_file)
nice_name = full_file.name
return import_file(str(full_file), self.name, nice_name)
|
TODO
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | def setup(self):
self.logger.info("Running setup")
self.setup_commands()
self.logger.info("Done parsing")
def setup_commands(self):
command_expression_module = self.import_py_file("config/command_rule.py")
command_list = self.read_list("config/command_rule.txt")
source_count = 0
for command in command_list:
source_count += self.add_new_expression(
ExpressionInfo(
command_expression_module,
[
SourceInfo("InternalSource", "generic"),
SourceInfo("CmdSource", command)
]
)
)
self.update_statistics(self.name + ".commands", 0, 1, source_count)
|
TODO
46 47 48 49 50 | def run(self):
self.logger.info("Running")
while not self.has_interrupt():
self.loop_incoming() # dispatch handle_* functions
self.logger.info("Stopped")
|
TODO
51 52 53 54 55 | def handle_run_expression(self, incoming, value, source_time, status_code):
expressions = self.get_expressions(incoming)
self.logger.debug("Received %s. Found expressions %s",incoming.key, len(expressions))
if expressions:
self.send_expressions_to_engine(incoming, expressions, value, source_time, status_code)
|
TODO
config/default.ini
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | [rules]
FirstAppRule = 1
[FirstAppRule]
[sources]
CmdSource = 1
InternalSource = 1
[CmdSource]
controller = CmdController
[InternalSource]
controller = InternalController
[controllers]
CmdController = 1
InternalController = 1
[InternalController]
send_init_event = 1
[CmdController]
value_as_args = 1
|
TODO
tests/test_command_rule.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | from netdef.testutils import MockExpression
from netdef.Sources.InternalSource import InternalSource
from first_app.Sources.CmdSource import CmdSource
def test_hello():
mock = MockExpression(
module="config/command_rule.py",
intern=InternalSource("generic"),
cmd=CmdSource("echo hello")
)
mock.intern.update_value(None, stat_init=True)
mock.cmd.assert_called_once_with("world")
mock.intern.assert_not_called()
def test_circle():
mock = MockExpression(
module="config/command_rule.py",
intern=InternalSource("generic"),
cmd=CmdSource("echo Don\\'t break the")
)
mock.intern.update_value(None, stat_init=True)
mock.cmd.assert_called_once_with("circle")
mock.intern.assert_not_called()
def test_ls():
mock = MockExpression(
module="config/command_rule.py",
intern=InternalSource("generic"),
cmd=CmdSource("ls -lah .")
)
mock.intern.update_value(None, stat_init=True)
mock.cmd.assert_called_once_with("")
mock.intern.assert_not_called()
|
TODO