Source code for netdef.Controllers.SystemMonitorController

import datetime
import logging
import threading
import time

import psutil

from netdef.Controllers import BaseController, Controllers
from netdef.Shared.Internal import Statistics
from netdef.Sources.BaseSource import StatusCode

# import my supported sources
from ..Sources.SystemMonitorSource import (
    SystemMonitorByteSource,
    SystemMonitorPercentSource,
    SystemMonitorSource,
    bytes2human,
)


[docs]def get_vm(): """ Helperfunction. :returns: psutil.virtual_memory """ return psutil.virtual_memory()
[docs]def get_proc(): """ Helperfunction. :returns: psutil.Process """ return psutil.Process()
[docs]def get_clean_mount_point_name(node): """ Replace / or \\ with . Example: .. code-block:: python for disk in psutil.disk_partitions(): print (get_clean_mount_point_name(disk.mountpoint)) :param str node: name of mountpoint :returns: new node name """ if "/" in node: return "root" + node.replace("/", ".").rstrip(".") elif "\\" in node: return node.replace(":\\", "").rstrip(".") else: return node
[docs]def statistics_update(item): "Write internal statistics to the Statistics singleton if activated" if Statistics.on: Statistics.set( item.key, "{} ({})".format( item.get_value_and_unit(), item.source_time.strftime("%Y.%m.%d %H:%M:%S"), ), )
[docs]class DataItem: __slots__ = ("key", "source_type", "interval", "func", "args", "next") def __init__(self, source_type, key, interval, func, args=None): self.source_type = source_type "Reference to a SystemMonitorSource class" self.key = key "Unique identifier" self.interval = interval "Poll interval" self.func = func "Callback to retrieve value" self.args = args "Arguments for :attr:`self.func` callback" self.next = 0 "Next scheduled call to :attr:`self.func`"
[docs] def get_value(self): """ Returns value of :attr:`self.func` callback """ if self.args: return self.func(*self.args) else: return self.func()
[docs] def ready(self): """ Returns True if interval for this item has elapsed. """ now = time.time() if now >= self.next: self.next = now + self.interval return True return False
[docs]def get_data_items_dict(mempoll, cpupoll, poll, checkdisk, diskpoll): """ Create a dict with items to monitor. :param int mempoll: poll interval for memory callbacks :param int cpupoll: poll interval for cpu callbacks :param int poll: general poll interval :param bool checkdisk: Set True to poll disk drives :param int diskpoll: poll interval for disk drives :returns: dict of :class:`DataItem` """ NO = SystemMonitorSource BY = SystemMonitorByteSource PE = SystemMonitorPercentSource items = [ DataItem( PE, "sysmon.cpu.percent", cpupoll, lambda: psutil.cpu_percent(interval=1) ), DataItem(PE, "sysmon.memory.percent", mempoll, lambda: get_vm().percent), DataItem(BY, "sysmon.memory.total", mempoll, lambda: get_vm().total), DataItem(BY, "sysmon.memory.available", mempoll, lambda: get_vm().available), DataItem(BY, "sysmon.memory.free", mempoll, lambda: get_vm().free), DataItem(BY, "sysmon.memory.used", mempoll, lambda: get_vm().used), DataItem(NO, "sysmon.threads.total", poll, lambda: threading.active_count()), DataItem(NO, "process.pid", poll, lambda: get_proc().pid), DataItem( PE, "process.cpu.percent", cpupoll, lambda: get_proc().cpu_percent(interval=1), ), DataItem( PE, "process.memory.percent", mempoll, lambda: get_proc().memory_percent() ), DataItem( BY, "process.memory.current", mempoll, lambda: get_proc().memory_full_info().uss, ), DataItem( NO, "process.open.files.count", poll, lambda: len(get_proc().open_files()) ), ] if checkdisk: def get_name(mp, tail): return "sysmon.disk.%s.%s" % (get_clean_mount_point_name(mp), tail) for disk in psutil.disk_partitions(): mp = disk.mountpoint get_total = lambda mp: psutil.disk_usage(mp).total get_used = lambda mp: psutil.disk_usage(mp).used get_free = lambda mp: psutil.disk_usage(mp).free get_percent = lambda mp: psutil.disk_usage(mp).percent items.extend( [ DataItem(BY, get_name(mp, "total"), diskpoll, get_total, [mp]), DataItem(BY, get_name(mp, "used"), diskpoll, get_used, [mp]), DataItem(BY, get_name(mp, "free"), diskpoll, get_free, [mp]), DataItem(PE, get_name(mp, "percent"), diskpoll, get_percent, [mp]), ] ) return {data.key: data for data in items}
[docs]@Controllers.register("SystemMonitorController") class SystemMonitorController(BaseController.BaseController): """ .. tip:: Development Status :: 5 - Production/Stable """ def __init__(self, name, shared): super().__init__(name, shared) self.logger = logging.getLogger(self.name) self.logger.info("init") config = self.shared.config.config self.oldnew = config(self.name, "oldnew_comparision", 0) self.memory_poll_interval = config(self.name, "memory_poll_interval", 600) self.cpu_poll_interval = config(self.name, "cpu_poll_interval", 10) self.poll_interval = config(self.name, "general_poll_interval", 10) self.disk_monitor_on = config(self.name, "disk_monitor_on", 0) self.disk_poll_interval = config(self.name, "disk_poll_interval", 60) self.data_items = get_data_items_dict( self.memory_poll_interval, self.cpu_poll_interval, self.poll_interval, self.disk_monitor_on, self.disk_poll_interval, ) self.internal_sources = { key: data.source_type(key=key) for key, data in self.data_items.items() }
[docs] def run(self): "Main loop. Will exit when receiving interrupt signal" self.logger.info("Running") if Statistics.on: try: uss = psutil.Process().memory_full_info().uss Statistics.set("process.memory.startup", bytes2human(uss)) except psutil.AccessDenied as error: self.logger.error("memory uss: %r", error) while not self.has_interrupt(): self.loop_incoming() # dispatch handle_* functions self.poll_data() self.logger.info("Stopped")
[docs] def handle_add_source(self, incoming): self.logger.debug("'Add source' event for %s", incoming.key) self.add_source(incoming.key, incoming)
[docs] def handle_write_source(self, incoming, value, source_time): self.logger.debug( "'Write source' event to %s. value: %s at: %s", incoming.key, value, source_time, )
[docs] def poll_data(self): """ Iter the dict of :class:`DataItem` and get values. """ stime = datetime.datetime.utcnow() status_ok = True for dataitem in self.data_items.values(): if dataitem.ready(): try: value = dataitem.get_value() internal_item = self.internal_sources[dataitem.key] self.update_source_instance_value( internal_item, value, stime, status_ok, self.oldnew ) statistics_update(internal_item) if self.has_source(dataitem.key): self.send_datachange(dataitem.key, value, stime, True) except psutil.AccessDenied as error: self.logger.error("%s: error: %s", dataitem.key, error)
[docs] def send_datachange(self, source_key, value, stime, status_ok): item = self.get_source(source_key) if self.update_source_instance_value( item, value, stime, status_ok, self.oldnew ): self.send_outgoing(item)