Source code for libgs.monitoring

# -*- coding: utf-8 -*-
"""
..
    Copyright © 2017-2018 The University of New South Wales

    Permission is hereby granted, free of charge, to any person obtaining a copy of
    this software and associated documentation files (the "Software"), to deal in
    the Software without restriction, including without limitation the rights to use,
    copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
    Software, and to permit persons to whom the Software is furnished to do so,
    subject to the following conditions:

    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
    WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
    CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

    Except as contained in this notice, the name or trademarks of a copyright holder

    shall not be used in advertising or otherwise to promote the sale, use or other
    dealings in this Software without prior written authorization of the copyright
    holder.

    UNSW is a trademark of The University of New South Wales.


libgs.monitoring
=================

:date:   Mon Sep 18 09:22:40 2017
:author: Kjetil Wormnes

Monitoring is a stand-alone module that allows monitoring of telemetry points while
attempting to have a minimal impact on the execution of the rest of the code.

It implements a simple pythonic syntax for the creation of monitoring points. To use;

1. Create a monitor

>>> from libgs.monitoring import Monitor
>>> mon = Monitor()

2. Define a function that returns the monitored value, 
and decorate it with mon.monitor(). 

For example, to monitor the current time:

>>> from datetime import datetime
>>> @mon.monitor()
>>> def current_time():
>>>    return datetime.utcnow()

3. Start the monitor

>>> mon.start()

And that's it. The current time is now monitored at the default update interval 
under a monitor point called "current_time". 
You can view the state of the monitor by printing it::

    >>> print(mon)
    Monitor ()  -- running
    Name                          Value               Alert     Last polled   
    ----------------------------- ------------------- --------- ---------------
    .current_time                 2018-06-21 00:39:07           0.7


More complicated monitoring functions can easily be created by specifying the point
value in the decorator. See :meth:`Monitor.monitor` for syntax. You can also change
the update interval etc...

The constructor also takes some arguments to customise its default behaviour. See :class:`Monitor`.

You are able to add callbacks whenever a monitored value is updated. This is useful
if you are displaying the data on a dashboard or updating a database. See :meth:`Monitor.add_callback`.

Finally, you can set alert levels by returning an Alert object rather than a value from
your monitoring function. This module defines :class:`GreenAlert`, :class:`OrangeAlert`, 
:class:`RedAlert`, and :class:`CriticalAlert`. You are welcome to customise or make others 
by deriving a new subclass from :class:`Alert`.

The following example monitoring function gets the cpu usage every 2 seconds, and
marks the alert as Red if it is above 50%:

>>> @mon.monitor(dt = 2)
>>> def cpu_usage():
>>>     usage = psutil.cpu_percent(interval=1)
>>>     if usage > 50:
>>>         return RedAlert(usage)
>>>     else:
>>>         return GreenAlert(usage)
 
"""

import time
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
import multiprocessing
import logging
import threading


########
#
# Logging
#
########
log = logging.getLogger('libgs-log')
log.addHandler(logging.NullHandler())


############################################
#
# Alert class and
# specific derived alert types.
# A user can specify others if she wishes.
#
############################################
[docs]class Alert(object): """ Base class for Alerts. Any derived class must set the alertstr and alertcode properties. """ def __init__(self, val): self.val = val def __str__(self): if not hasattr(self, 'alertstr'): return "'{}' Unknown Alert".format(self.val) else: return "'{}' ( {} alert<{}> )".format(self.val.__str__(), self.alertstr if self.alertstr else "NO", self.alertcode) def __repr__(self): return self.__str__()
[docs]class CriticalAlert(Alert): alertstr="CRITICAL" alertcode=100
[docs]class RedAlert(Alert): alertstr="RED" alertcode = 30
[docs]class OrangeAlert(Alert): alertstr="ORANGE" alertcode = 20
[docs]class GreenAlert(Alert): alertstr="GREEN" alertcode = 10
[docs]class NoAlert(Alert): alertstr = None alertcode = 0
[docs]class MonitorItem(tuple): """ A monitored item. """ _fields = ('name', 'value', 'alert', 'parent', 'age') def __new__(cls, item): tuplelist = [] for f in cls._fields: try: tuplelist += [item[f]] except: tuplelist += [None] return super (MonitorItem, cls).__new__(cls, tuplelist) def __str__(self): return '(' + ', '.join( "'{}': {}".format(k, "'{}'".format(v) if isinstance(v, basestring) else str(v)) for k,v in self.items() ) + ')' def __repr__(self): return self.__str__()
[docs] def keys(self): return self._fields
[docs] def items(self): for k, f in enumerate(self._fields): yield self._fields[k], self[f]
def __getitem__(self, item): if item in self._fields: idx = self._fields.index(item) else: idx = item return super(MonitorItem, self).__getitem__(idx)
############################################ # # The monitor magic # ############################################
[docs]class Monitor(object): """ Class to provide functionality for monitoring. The monitor class works by spawning a threadpool in which it will call a number of user-defined monitoring functions. Every tick, it will check the list of monitoring functions to find the ones that are due and then invoke them in the threadpool. It then *immediately proceeds to the next tick*. In other words, monitoring functions are free to do whatever they want, including to block for a while since while it is running the Monitor class will just proceed with executing other monitoring functions (up to the limit of the workers in the threadpool). Monitoring functions are added to the class instance preferably using the :meth:`monitor` decorator. It is possible to add callbacks to monitor points that are invoked every time the value is updated. It is also possible to add a single tick callback when creating the Monitor object. Take care when doing this as it is invoked at the end of every tick, and the next tick will not execute before it completes. Therefore care should be taken to ensure tick callbacks return very fast. This restriction does not apply to monitor callbacks as they are executed in the Threadpool with the call to the monitor function and will therefore not hold up the Monitor overall. """ def __init__(self, workers=10, tick=0.5, tick_cb = lambda x: None, #TODO: Perhaps get rid of tick callbacks. default_dt = 10): """ Args: workers: Maximum number of simultaneouse threads to spawn in the ThreadPool tick: The delay between successive runs of the monitor loop. Should be fairly small (< 1 sec). tick_cb: A callback that can be invoked at the end of every tick. default_dt: Default dt to apply to monitor generators (if dt not specified explicitly) """ # Defaults self.default_dt = default_dt # Callback self.tick_cb = tick_cb self._callbacks = [] #<-- added with add_callback # Set up the executor self._executor = ThreadPoolExecutor(workers) #<-- max threads we permit at a time # This is the tick self._tick = tick # A dictionary to hold the parent headings self._parents = {} # The exec map keeps track of all the monitored values and their execution status and times self._exec_map = pd.DataFrame( columns=['exec_t', 'next_exec_t', 'name', 'parent', 'dt', 'gen', 'loglvl_values','loglvl_alerts','logthr_alerts','dependents', 'value', 'alert']) # A dictionary to map alert values to human readable strings self._alertstr_map = {} # The monitoring loop runs in a separate thread self._pthr = None self._abort = threading.Event() def _get_alertstr(self, key): if key not in self._alertstr_map.keys(): return "" else: return self._alertstr_map[key] def __del__(self): self.stop() self._executor.shutdown(wait=False) @property def alertcode(self): """ The current worst alert level. """ return max(self._exec_map.alert)
[docs] def register_parent(self, name, parent=None): """ A parent monitor is not really a monitor. It is merely the product of its children and exists for visualisation/grouping purposes only. Its alert status will always be the worst of its children. Args: name (str) : The name of the parent to register parent (str(optional)) : The parent of the parent you are registering """ # If parent hadnt been explicitly defined already, define it at the top level if parent is not None and parent not in self._parents.keys(): self.register_parent(parent) self._parents[name] = dict(parent=parent)
[docs] def add_callback(self, callable): """ Add a callable to be invoked every time a value is polled. Multiple callbacks can be added. .. note:: The callback will be executed in the same sub-thread that does the polling The prototype of the callable should be:: some_function(point_name, tstamp, exc, ret) where: * point_name is the name of the monitor point that has been polled * tstamp is the unix timestamp at which the monitor function returned a value (obtained wiht time.time()), * exc is None if no exception happened, otherwise it is set to the exception that occurred. * ret are the return values. Will always be an :class:`Alert` object. Args: callable: The callable """ self._callbacks.append(callable)
[docs] def to_gen_in_executor(self, fn, point_names, *args, **kwargs): """ This decorator will take any function and turn it into a generator appropriately formatted for adding using :meth:`.register_monitor`. The call to fn is being delegated to an executor, and while it is not done, calls to next() will return None, otherwise it will return a tuple containging: * The timestamp the data the return value of fn * Any Exceptions that occurred * The return value This is the format that is required in order to add it to the Monitor. """ assert isinstance(point_names, tuple) numvals = len(point_names) # # Further wrap the call to catch any exceptions and timestamp # the output # def exc_wrapped(*args, **kwargs): try: res = fn(*args, **kwargs) if not (numvals == 1 or (numvals == len(res) and isinstance(res, tuple))): lres = len(res) if isinstance(res, tuple) else 1 raise Exception("Unexpected number of returned values from monitor function. expected {} value(s), got {} value(s)".format(numvals, lres)) exc = None except Exception as e: exc = e res = None if res is not None and numvals > 1: res = tuple(r if isinstance(r, Alert) else NoAlert(r) for r in res) else: if not isinstance(res, Alert): res = NoAlert(res) ret = time.time(), exc, res # Invoke callbacks for cb in self._callbacks: try: if not isinstance(res, tuple): res2 = (res,)*len(point_names) else: res2 = res for k, p in enumerate(point_names): cb(p, time.time(), exc, res2[k]) except Exception as e: log.error("Error invoking callback function {}. {}: {}".format(cb.__name__, e.__class__.__name__, e)) return ret def generator(*args, **kwargs): while True: future = self._executor.submit(exc_wrapped, *args, **kwargs) # Ensure a call returns immediately, and returns None if not ready while not future.done(): yield None yield future.result() return generator()
############# # # Convenience decorators # #############
[docs] def callback(self): """ Decorator that can be applied to a function to automatically add it to the monitor """ def decorator(fn): self.add_callback(fn) return fn return decorator
[docs] def monitor(self, point = None, *args, **kwargs): """ Creates a decorator that can be applied to any function to add it to be monitored. It will do two things: 1. Convert the function to a generator in which the function call is run in an executor, so that any call to the generators next() function will return immeditately and not hold up execution. If the function has not finished the generator will return None. If it has completed, it will return the value as well as a timestamp and any potential exception in the format required for the monitoring loop. 2. Add the function to the Monitor class polling schedule. The decorator can create multiple monitor points from a single callable (that returns a tuple) by specifying point as a tuple. Please see :meth:`.register_monitor` for the full list of the remaining arguments that can be applied. ``*args`` and ``**kwargs`` can be anything accepted by :meth:`.register_monitor` except name, gen or dependents. Example: register 3 monitoring points from a function that retuns random values at an interval of once every 10 seconds, grouped under the heading Test: >>> @mon.monitor(point=("Point 1", "Point 2", "Point 3"), dt=10, parent="Test") >>> def monitoring_function(): >>> return random.random(), random.random(), random.random() Args: point: The name of the monitor point. If omitted it will use the function name. Can also be a tuple if function monitors several variables. ``*args``, ``**kwargs``: Also accepts all the arguments of :meth:`.register_monitor` Returns: Decorator """ def decorator(fn): if point is None: point_names = (fn.__name__,) elif not isinstance(point, tuple): point_names = (point,) else: point_names = point def decorated(*args2, **kwargs2): f = self.to_gen_in_executor(fn=fn, point_names=point_names,*args2, **kwargs2) return f self.register_monitor(name=point_names[0], gen=decorated(), dependents = point_names[1:], *args, **kwargs) for p in point_names[1:]: self.register_monitor(name=p, gen=None, *args, **kwargs) return decorated return decorator
[docs] def register_monitor(self, name, gen, dt = -1, parent=None, loglvl_values = None, loglvl_alerts = logging.DEBUG, logthr_alerts = RedAlert.alertcode, dependents = tuple(), alert_exc = NoAlert): """ This is a low level function to register a monitor generator with the monitor class. The format of the generator is quite specific, and it should therefore ideally have been created from a callable using the :meth:`.to_gen_in_executor` method. Using this method will ensure the generator returns values in the right format for the monitor class, and that it never blocks. Args: name: The name of the monitor point gen: The monitor generator (* see description above) dt: The time interval in which to poll the monitored (dt = None -> one shot) parent: Assign a parent to the monitored (For grouping purposes only) If the parent does not exist, it will be created. loglvl_values: If not None, any change in value will be logged with the logging level specified. loglvl_alerts: If not None, a change in alert level may be logged with the logging level specified logthr_alerts: The threshold for the alertcode above by which to log. Default = :attr:`RedAlert.alertcode` dependents: Other monitors to update from same generator alert_exc: Alert to set in case an exception is raised in monitor function. Default = :class:`NoAlert` """ if dt is None: dt = 1e11 #<-- large number to prevent it from running again # One-shot can be enabled by setting dt == None. Negative dt = unset, and defaults will be applied # Continous running (ie every tick) is enabled by setting dt = 0 elif dt < 0: dt = self.default_dt log.debug("Adding monitor point '{}'".format(name)) # If parent hadnt been explicitly defined already, define it at the top level if parent is not None and parent not in self._parents.keys(): self.register_parent(parent) if name in self._exec_map['name']: raise Error("{} already exists. Names must be unique".format(name)) self._exec_map = self._exec_map.append(dict( exec_t=None, next_exec_t = time.time(), name=name, dt=dt, gen=gen, parent=parent, dependents=dependents, loglvl_values = loglvl_values, loglvl_alerts = loglvl_alerts, logthr_alerts = logthr_alerts, alert_exc=alert_exc), ignore_index=True) # Make sure value is of dtype=object so it can # accept values of different types if self._exec_map.value.dtype != 'object': self._exec_map.value = self._exec_map.value.astype('object') if self._exec_map.alert.dtype != 'object': self._exec_map.alert = self._exec_map.alert.astype('object') self._exec_map.sort_values('exec_t', inplace=True)
[docs] def start(self, subprocess = False): """ Start polling loop. Args: subprocess (bool (optional)): Start the polling loop in a subprocess rather than a thread. """ if self._pthr is not None: return if subprocess: self._pthr = multiprocessing.Process(target=self._polling_loop) self._pthr.daemon = True else: self._pthr = threading.Thread(target=self._polling_loop) self._pthr.daemon = True self._pthr.start()
[docs] def stop(self): """ Stop the polling loop. """ if self._pthr is None: return self._abort.set() self._pthr.join() self._pthr = None
def _polling_loop(self): while not self._abort.is_set(): self._poll_due() # TODO: Generalise this to be able to call add_cb and add any number of callbacks at different rates try: self.tick_cb(self) except Exception as e: log.error("Error invoking tick callbak. {}: {}".format(e.__class__.__name__, e)) # TICK time.sleep(self._tick) def _poll_due(self): """ Poll whatever is due """ # # Get the variables that are due for polling and loop through them # to poll. # to_exec = self._exec_map[(~pd.isnull(self._exec_map.gen)) & (time.time() > self._exec_map.next_exec_t)].sort_values('next_exec_t') # Try to get values for everything that is due for k, item in to_exec.iterrows(): val = item.gen.next() # No value means the poller hasnt returned yet so just leave it to try again at the next tick if val is None: continue tstamp, exc, ret = val dependents = self._exec_map.at[k, 'dependents'] if exc is not None: if ret.val is not None: raise Exception("Unexpected. It should not be possible to get an exception and return value at the same time. ret='{}', exc='{}'".format(ret, exc)) alertclass = self._exec_map.at[k, 'alert_exc'] ret = (alertclass("ERROR {}: {}".format(exc.__class__.__name__, exc)),) * (len(dependents) + 1) else: # Ensure that the result is always a tuple so that iteration works properly if len(dependents) == 0: ret= (ret,) # populate dependents for k1, dep in enumerate((item['name'],) + dependents): # Look up the index of the dependent dep_k = self._exec_map[self._exec_map['name'] == dep].index assert len(dep_k) == 1 dep_k = dep_k[0] self._log_value(k, ret[k1].val) #<-- TODO: Could be implmented using the generalised callback functionality we have self._log_alert(k, ret[k1].alertcode) #<-- TODO: Could be implmented using the generalised callback functionality we have # Update the dependent self._exec_map.at[dep_k, 'value'] = ret[k1].val self._exec_map.at[dep_k, 'alert'] = ret[k1].alertcode self._exec_map.at[dep_k, 'exec_t'] = tstamp self._exec_map.at[dep_k, 'next_exec_t'] = None if self._exec_map.at[k, 'dt'] is None else tstamp + self._exec_map.at[k, 'dt'] if not ret[k1].alertcode in self._alertstr_map.keys(): self._alertstr_map[ret[k1].alertcode] = ret[k1].alertstr if ret[k1].alertstr is not None else '' def _log_value(self, k, val): # # Log a change in value if requested and as requested # item = self._exec_map.loc[k] if (item['value'] != val) and (item['loglvl_values'] is not None): log.log(int(item['loglvl_values']), 'Monitor: value "{}" --> {}'.format(item['name'], val)) def _log_alert(self, k, alertcode): # # Log a change in alert if requested and as requested # item = self._exec_map.loc[k] if ((item['alert'] != alertcode) and (item['loglvl_alerts'] is not None) and (item['logthr_alerts'] <= alertcode or item['alert'] >= item['logthr_alerts'] )): log.log(int(item['loglvl_alerts']), 'Monitor: alert "{}" --> {}'.format(item['name'], alertcode)) def __getitem__(self, item): """ Note: Returns a copy of the entry so changing variables will not change it in the monitor Args: item: Returns: """ if item in self._parents.keys(): it = MonitorItem({'name':item, 'parent': self._parents[item]['parent']}) else: it = self._exec_map[self._exec_map['name'] == item] if len(it) > 1: raise Exception("Monitor name is not unique. This should not be possible") if len(it) < 1: raise Exception("Monitor name does not exist") itd = it.iloc[0].to_dict() itd['age'] = time.time() - itd['exec_t'] if itd['exec_t'] else None it = MonitorItem(itd) return it #<-- TODO: Decide if we should return all fields or a subset def __iter__(self): for k,r in self._exec_map.iterrows(): yield r['name'], r['value'], r['alert'], time.time() - r['exec_t'] def __str__(self): s = "" s += " Monitor " if self.alertcode > 0: s += "({}) ".format(self._get_alertstr(self.alertcode)) s += "--- running\n" if hasattr(self, '_pthr') else '--- not running\n' s += " {:<30s}{:<40s}{:<10s}{:<15s}\n".format("Name", "Value", "Alert", "Last polled") s += " "+ "-" * 29 + " " + "-" * 39 + " " + "-" * 9 + " " + "-" * 15 + "\n" for item in self.itertree(): parent = item['parent'] level = 0 while parent is not None: parent = self[parent]['parent'] level += 1 t0 = time.time() s += " {:30.29s}{:40.39s}{:10.9s}{:s}\n".format( '.' * (level + 1) + item['name'], '' if item['value'] is None else str(item['value']), '' if item['alert'] is None else self._get_alertstr(item['alert']), '{:.2f}'.format(item['age']) if item['age'] else '') return s def __repr__(self): return self.__str__()
[docs] def itertree(self, parent=None): """ Iterator that will iterate through the full tree of monitor points, or alternatively just a specific branch by specifying the parent of that branch. >>> for item in mon.itertree(): >>> print(item) """ if parent is None: top = self._exec_map['name'][pd.isnull(self._exec_map.parent)].tolist() top += [k for k, v in self._parents.items() if v['parent'] is None] else: top = self._exec_map['name'][self._exec_map.parent == parent].tolist() top += [k for k, v in self._parents.items() if v['parent'] == parent] if len(top) == 0: raise StopIteration top.sort() for k in top: yield(self[k]) for it in self.itertree(parent=self[k]['name']): yield it
[docs] def keys(self): """ Just include this since dicts etc all use keys to describe the access keys. Although in this class names is more descriptive. This method is the same as .names() Returns: list of monitor names """ return list(self._exec_map['name'])
[docs] def names(self): """ Returns: list of monitor names """ return self.keys()
if __name__ == '__main__': from utils import setup_logger import random setup_logger(log, cons_loglvl=logging.DEBUG) from database import MonitorDb db = MonitorDb(db='sqlite:///mondbtest.db') mon = Monitor(workers=2,default_dt=2) # # @mon.monitor(parent="test", dt=5, dtdb=5) # # @alert_a # def blah1(): # time.sleep(2) # return 'a' # # @mon.monitor("b", dt=2, dtdb=5) # def blah2(): # #time.sleep(2) # return 'b' # # # @mon.monitor("c", dt=10) # def blah3(): # time.sleep(2) # return RedAlert('c') # # # @mon.monitor(dt=30) # # @alert_a # def fail(): # time.sleep(1) # 1/0 # return("done") # # # @mon.monitor(("test1","test2", "test3", "test4", "test5"), parent='test_many_ret') # def test(): # time.sleep(2) # return 1,CriticalAlert(2),GreenAlert(3),RedAlert(4),5 # from datetime import datetime import psutil #from libgs.monitoring import Monitor #mon = Monitor(default_dt=5) count = dict(a=1,b=1,c=1,d=1,e=1,f=1,current_time=1,cpu_usage=1) @mon.monitor(point=("a","b","c","d","e","f")) def randmon(): #time.sleep(1) for k,v in count.items(): count[k] += 1 return random.random(),random.random(),random.random(),random.random(),random.random(),random.random() @mon.monitor(parent='grandchild') def current_time(x=2): count['current_time'] +=1 return datetime.utcnow() @mon.monitor(parent='parent') def cpu_usage(): count['cpu_usage'] +=1 usage = psutil.cpu_percent(interval=1) if usage > 50: return RedAlert(usage) else: return GreenAlert(usage) # last_t = {} # dt = 10 # @mon.callback() # def save( name, tstamp, exc, res): # if name in last_t.keys() and time.time() < last_t[name] + dt: # # print('NOT saving {}'.format(name)) # return # # last_t[name] = time.time() # # print('saving {}'.format(name)) # #db.put(0, args[0], args[3].val) #mon.register_parent('parent') # mon.register_parent('child', parent='parent') # mon.register_parent('grandchild', parent='child') mon.start()