Source code for libgs.utils

# -*- coding: utf-8 -*-
"""
..
    Copyright © 2017-2018 The University of New South Wales

    Permission is hereby granted, free of charge, to any person obtaining a copy of
    this software and associated documentation files (the "Software"), to deal in
    the Software without restriction, including without limitation the rights to use,
    copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
    Software, and to permit persons to whom the Software is furnished to do so,
    subject to the following conditions:

    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
    WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
    CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

    Except as contained in this notice, the name or trademarks of a copyright holder

    shall not be used in advertising or otherwise to promote the sale, use or other
    dealings in this Software without prior written authorization of the copyright
    holder.

    UNSW is a trademark of The University of New South Wales.


libgs.utils
===========

:date: Thu Jul 27 20:40:36 2017
:author: Kjetil Wormnes


This module contains utilities and settings for the libgs package. 

Functions and classes from utils can be imported to provide useful and common functionality across the application.

.. note::

   Implementers should take particular note of :func:`wait_loop` which will allow you to wait
   for events, or perform an equivalent of time.sleep while aborting in case of termination events. The blocking
   time.sleep should be avoided as much as possible.

"""
from __future__ import print_function

import threading

import logging
import time
import os
import pandas as pd
import traceback
import collections
from datetime import datetime
import ephem
from libgs_ops.rpc import RPCClient

log = logging.getLogger('libgs-log')
log.addHandler(logging.NullHandler())


#: abort_all is an importanle event that is being used all through
#: libgs. When set all wait_loops will exit.
abort_all = threading.Event()


##########################
#
# Exceptions
#
##########################

[docs]class Error(Exception): """Generic Error for libgs""" def __init__(self, msg, original_Error=None): if original_Error is not None: super(Error, self).__init__(msg + (": %s" % original_Error)) self.original_Error = original_Error else: super(Error, self).__init__(msg)
[docs]class AbortAllException(Error): """ Exception representing the abort_all event """ pass
########################## # # Default values used throughout # ##########################
[docs]class Defaults: """ Defines default values used by the different classes. .. note:: Many (probably most) of these values are configurable in other ways (through class interfaces, constructors, etc...) This class just holds all the fallback values (defaults). """ # # The interval at which to save monitoring values to database # when tracking and when not tracking # #: The time inteval (in seconds) at which to save monitor values to database while tracking MONITOR_SAVE_DT_TRACK = 5 # 5 seconds #: The time inteval (in seconds) at which to save monitor values to database while not tracking MONITOR_SAVE_DT_NOTRACK = 900 # 15 minutes ############################################## # # Hardware interfaces # ############################################## # # XMLRPC interface to scheduler # #: The host to bind the XMLRPC scheduler interface to XMLRPC_SCH_SERVER_ADDR = "localhost" #: The port to bind the XMLRPC scheduler interface to XMLRPC_SCH_SERVER_PORT = 8000 # # XMLRPC interface to ground station # #: The host to bind the XMLRPC ground station interface to XMLRPC_GS_SERVER_ADDR = "localhost" #: The port to bind the XMLRPC ground station interface to XMLRPC_GS_SERVER_PORT = 8001 #: #: Default client timeout for xmlrpc calls #: XMLRPC_CLI_TIMEOUT = 2 #: #: Dashboard port #: DASH_PORT = 5001 # # REST API # #: The host to bind the REST API to API_ADDR = "localhost" #: The port to bind the REST API to API_PORT = 8080 ############################################## # # Configuration parameters # ############################################## # # Default rotator configurations # (Set in rotator base class, but usually overwritten in derived classes) # #: Default azimuth angle to stow antennae to (normally overridden in rotator class) ROTATOR_STOWED_AZ = 0 #: Default elevation angle to stow antennae to (normally overridden in rotator class) ROTATOR_STOWED_EL = 90 #: Default antenna beamwidth (normally overridden in rotator class). Sets the granularity at which the rotator #: moves the antenna ROTATOR_ANTENNA_BEAMWIDTH = 10 #<-- sets the granularity at which the rotator moves the antenna #: Default maximum azimuth angle for antenna rotator (normally overridden in rotator class) ROTATOR_MAX_AZ = 360 #: Default minimum azimuth angle for antenna rotator (normally overridden in rotator class) ROTATOR_MIN_AZ = 0 #: Default maximum elevation for antenna rotator (normally overridden in rotator class) ROTATOR_MAX_EL = 90 #: Default minimum elevation for antenna rotator (normally overridden in rotator class) ROTATOR_MIN_EL = 0 #: Default timeout for slewing antennae (normally overridden in rotator class) ROTATOR_SLEW_TIMEOUT = 100 # # Log config # LOG_PATH = './logs' #: Path to store log files in LOG_FILE = 'libgs.log' #: Log file name LOG_BACKUPCOUNT = 100 #: Number of log files to keep LOG_MAXFILESIZE = 1000000 #: Max log filesize in bytes LOG_FORMAT = '%(asctime)s - %(name)14s - %(levelname)5s - %(module)20s:%(lineno)4s - "%(message)s"' #: log format string LOG_STRING_LEN = 23+3+14+3+5+3 + 24+3+1 #: length of log format string #: #: Length of log to keep in UI #: UI_LOG_LEN = 200 #: #: Database connection (See SQLAlchemy for syntax) #: DB = 'sqlite:///libgs.db' # # Rotctld ppolling # TODO: This might also be an old one that should be removed # GS_ROTATOR_POLLING_INTERVAL = 0.25 #: Deprecated. Will be removed in future updates #: #: Communications timeout after requesting data, and waiting for reply #: TX_REPLY_TIMEOUT=1000 #<--- large number to let spacelink handle the timeouts. #: #: Color log entries #: USE_LOG_COLOUR = True #: #: Log level for extra verbose logging (normally disabled but can be enabled) #: # VERY_VERB_LOG = 5 #: #: Update delay for frequency plot (should be around .2 - 2) #: VIS_FREQUENCY_PLOT_UPDATE_DELAY = .5 # # Spectrum recording # RECORD_SPECTRA = True #: If false, spectra will not be recorded and stored in database RECORD_SPECTRA_MAX_LEN = 900 #: If spectrum recording enabled, maximum number of spectra to record in a single pass RECORD_SPECTRA_DT = 1.0 #: If spectrum recording enabled, time delay between subsequent recorded spectra (in seconds) #: #: Max allowed dt (in seconds) in data used for gs tracking #: MAX_TRACK_DT = 1.0 #: #: Path to store datafiles in (that are not stored in db) #: DB_BIN_PATH = '.' #: For backwards compatability only. Will be removed in future updates. DATA_PATH = DB_BIN_PATH #<-- todo: refactor and delete # # Visualisation waterfalls # WFALL_COLORMAP = 'Viridis256' #: Bokeh colormap, see bokeh.pydata.org/en/0.12.6/docs/reference/palettes.html WFALL_JPG_COLORMAP = 'viridis' #: Matplotlib colormap, see matplotlib.org/examples/color/colormaps_reference.html WFALL_JPG_WIDTH = 250 #: Width of each recorded waterfall plot (in pixels) if dpi=100 WFALL_JPG_HEIGHT = 600 #: Height of each recorded waterfall plot(in pixels) if dpi = 100 WFALL_JPG_WIDTH_EXTRA = 200 #: Extra width of each recorded waterfall plot in pixels if dpi = 100 WFALL_JPG_DPI = 100 #: DPI for recorded waterfall plots WFALL_JPG_OUT_SCALE = 1 #: Scaling of DPI on output of recorded waterfall plots.. Affects font size # # REST API # RESTAPI_DEBUG_MODE = False #: Put restapi in debug mode (more verbose error messages) RESTAPI_TABLE_LIMIT = 400 #: Max rows to return by restapi if nothing has been explicitly given
######################## # # Functions and classes # ########################
[docs]def raise_if_aborted(): """ Raises an exception if the abort_all global event has been set. """ if abort_all.is_set(): raise AbortAllException("abort_all event has been set")
[docs]class RegularCallback(object): """ Invokes a callback at regular intervals. Example: >>> def callback(): >>> print("Called callback") >>> rb = RegularCallback(callback, 5) >>> rb.start() """ def __init__(self, func, delay, min_interval=0.05): """ Args: func: Callback funciton to call delay: Time between two subsequent calls min_interval: Minimum time between the end of one call and the beginning of th next """ self.func = func self.delay = delay self._pthr = threading.Thread(target=self._regular_callback) self._pthr.daemon = True self.min_interval=min_interval self._suppressing_errors = False self._last_e = (None, None) def _regular_callback(self): """ A function to regularly call a callback based on the specified delay """ # This property can be changed to stop the callback self._stop_thread = False stop_thread_fn = lambda: self._stop_thread log.debug("Regular callback of '{}' started".format(self.func.__name__)) while True: t0 = time.time() try: self.func() except Exception, e: new_e = (e.__class__.__name__, str(e)) if new_e == self._last_e: if not self._suppressing_errors: log.error("Repeat Exception '{}:{}' in regular callback, suppressing output until something new happens".format(*new_e)) self._suppressing_errors = True else: if self._suppressing_errors: log.info("Repeat Exception '{}:{}' has stopped occurring. Regular callback resumed normal operation".format(*self._last_e)) self._suppressing_errors = False self._last_e = new_e err = traceback.format_exc() log.error("Exception in regular callback function. Stack trace:\n%s"%(err)) #Error("Exception in regular callback function: %s"%(e)) # Sleep until next call unless an event happens in which case we break from the loop try: if wait_loop([stop_thread_fn], timeout=max(self.delay - (time.time() - t0), self.min_interval)) is not None: break except AbortAllException as e: log.debug("Abort All event caught. Shutting down regular callback") break log.debug("Regular callback of '{}' ended".format(self.func.__name__)) #time.sleep(max(self.delay - (time.time() - t0), self.min_interval))
[docs] def start(self): """ Start the regular callback """ self._pthr.start()
[docs] def stop(self): """ Stop the regular callback """ self._stop_thread = True self._pthr.join()
[docs]def conv_time(d, to='iso', float_t='julian', ignore_ambig_types = False): """ Convert between different time/date types used in libgs There are a number of different date formats used around libgs ephem.Date, datetime, pandas.Timestamp to mention some. Also modified Julian time and various string formats. Time formats that can be converted: ========== ========================== iso string in `ISO 8601 <https://en.wikipedia.org/wiki/ISO_8601>`_ time format. E.g. "2019-01-14T21:49:12.321Z" julian float representing juilan time (days since 12:00:00 1 Jan 2713 BC in the Julian calendar) unix integer representing UNIX time (seconds since 00:00:00 1 Jan 1970) datetime python :class:`datetime.datetime` class pandas :class:`pandas.Timestamp` class ephem :class:`ephem.Date` class ========== ========================== Args: to (str): One of the abovementioned formats float_t (str): If converting from a numeric, specify one if it is 'julian' or 'unix', can be omitted if converting from other formats ignore_ambig_types: Only allow conversions to non-ambigous types (i.e. none of the numeric or string formats) """ if ignore_ambig_types and not (isinstance(d, ephem.Date) or isinstance(d, pd.Timestamp) or isinstance(d, datetime)): return d # first make a pandas Timestamp object if isinstance(d, ephem.Date): d2 = pd.to_datetime(d.datetime()) elif isinstance(d, pd.Timestamp): d2 = d elif isinstance(d, float): if float_t == 'julian': d2 = pd.to_datetime(d, unit='D', origin='julian') elif float_t == 'unix': d2 = pd.to_datetime(d, unit="s", origin="unix" ) else: raise Error("Unknown float time type {}".format(float_t)) elif isinstance(d, basestring): # Try to work out if we have year first (iso like or day first) if len(d.split('-')[0].strip()) == 4: d2 = pd.to_datetime(d) else: d2 = pd.to_datetime(d, dayfirst=True) elif isinstance(d, datetime): d2 = pd.to_datetime(d) else: raise Error("Dont know how to convert obejct of type {} to time".format(type(d))) d2 = d2.replace(tzinfo=None) if to == 'iso': #w milliseconds d3 = d2.strftime('%Y-%m-%dT%H:%M:%S,%f')[:-3] d3 += 'Z' elif to == 'julian': d3 =d2.replace(tzinfo=None).to_julian_date() elif to == 'unix': d3= time.mktime(d2.timetuple()) + d2.microsecond/1e6 elif to == 'datetime': d3 = d2.to_pydatetime() elif to == 'pandas': d3 = d2 elif to == 'ephem': d3 = ephem.Date(d2.to_pydatetime()) else: raise Error("Dont know how to convert to {}".format(to)) return d3
[docs]def wait_loop(events_or_callables = [], timeout=None, dt=0.1): """ Generic waiting loop. It will wait for any event (of type threading.Event) to be set, or for any callable to return True Example: >>> ev = wait_loop([fn, event], timeout=10) >>> if ev is None: >>> print("Timeout") >>> elif fn in ev: >>> print("fn callable returned true) >>> elif event in ev: >>> print("threading event became true) Args: events_or_callables (list(callable or threading.Event)): A list of events or callables to monitor. If any return true the wait loop will be broken timeout (int): Maximum number of seconds to wait until breaking regardless dt (float): The internal sleep interval between checking events (default = 0.1 sec) Returns: A list of the events that have returned true. In case of timeout returns None. An exception will be raised if the global abort_event has been set. """ t0 = time.time() if not isinstance(events_or_callables, collections.Iterable): events_or_callables = [events_or_callables] fns = [ec.is_set if isinstance(ec, threading._Event) else ec for ec in events_or_callables] for ec in events_or_callables: if not (isinstance(ec, threading._Event) or callable(ec)): raise Error("events_or_callable must be a list of events or callables, got {}".format(type(ec))) while True: if abort_all.is_set(): raise AbortAllException("global wait_loop abort_all has been set") if timeout is not None and time.time() - t0 > timeout: return None e_set = [e for (e,f) in zip(events_or_callables, fns) if f()] if len(e_set) > 0: return e_set rdt = max(0, timeout - (time.time() - t0)) if timeout is not None else 0 time.sleep(min(rdt, dt))
[docs]def safe_sleep(t): """ Sleep for a specified number of seconds. safe_sleep uses a subset of the :func:`wait_loop` capabilities to implement an anologe to the time.sleep() function. The advantage is that it will raise an exception if the global abort event gets set, and as such is safe to use in thread loops that need to exit on adfa-gs exit. """ wait_loop(timeout=t)
[docs]class UTCLogFormatter(logging.Formatter): """ A logging formatter that displays records in UTC (zulu) time """ converter = time.gmtime use_colour = Defaults.USE_LOG_COLOUR
[docs] def formatTime(self, record, datefmt=None): ct = self.converter(record.created) t = time.strftime("%Y-%m-%dT%H:%M:%S", ct) s = "%s.%03dZ" % (t, record.msecs) return s
[docs] def format(self, record): s = super(UTCLogFormatter, self).format(record) if record.levelno >= logging.ERROR and self.use_colour: s = "\033[1;31m%s\033[1;0m"%(s) return s
[docs]class UTCLogFormatterHTML(UTCLogFormatter): """ A logging formatter that displays records in UTC (zulu) time and uses HTML tags for formatting. """
[docs] def format(self, record): frec = super(UTCLogFormatter, self).format(record) if record.levelno >= logging.ERROR and self.use_colour: s = '<pre style="color:red;' else: s = '<pre style="' s += '">{}</pre>\n'.format(frec) return s
[docs]def setup_logger(logger, cons_loglvl = logging.INFO, file_logpath=None, file_loglvl=logging.INFO, fmt=Defaults.LOG_FORMAT): """ Function to set up logging for libgs. Should only ever be called once. It sill set up logging to console and to file if requested. It will also make sure all logging is timestamped in UTC time following ISO 8601 Args: logger: The log to setup cons_loglvl: log level for console logging, file_logpath: Path to log to if logging to file (None for no file logging), file_loglvl: log level for file log fmt: log formatting string """ if hasattr(logger, '_has_been_setup'): raise Error("setup_logger can only be called once per logger") # # TODO: NOTE: When moving to a python module, this stuff should probably # be in the __init__ ... # # # Add custom log level # DEBUG_LEVELV_NUM = 9 logging.addLevelName(DEBUG_LEVELV_NUM, "DBG_V") def debugv(self, message, *args, **kws): # Yes, logger takes its '*args' as 'args'. self._log(DEBUG_LEVELV_NUM, message, args, **kws) logging.Logger.debugv = debugv # create logger logger.setLevel(logging.DEBUG) formatter = UTCLogFormatter(fmt) # create console handler and set level to debug ch = logging.StreamHandler() ch.setLevel(cons_loglvl) # add formatter to ch ch.setFormatter(formatter) # add ch to logger logger.addHandler(ch) # # Set up file logging as specificed # if file_logpath is not None: logpath = os.path.realpath(file_logpath) if not os.path.exists(logpath): os.mkdir(logpath) cf = logging.handlers.TimedRotatingFileHandler(logpath +'/' + Defaults.LOG_FILE,when='midnight',interval=1,backupCount=0,utc=True) cf.setFormatter(formatter) cf.setLevel(file_loglvl) logger.addHandler(cf) logger.info("File-logging enabled to %s"%(logpath)) logger._has_been_setup = True
[docs]def schedule_regular_callback(func, delay): """ A wrapper for :class:`RegularCallback` """ cb = RegularCallback(func, delay) cb.start() return cb
def _print( *arg, **kwarg): """ Function to print. This wrapper funciton is provided to allow for more easy redirection of output to other destinations (including Bokeh GUI, or files) if necessary. It should be used for information targeted at an interactive user, not for logging messages """ print(*arg, **kwarg)
[docs]def bytes2prettyhex(data, whitespacelen= Defaults.LOG_STRING_LEN): """ Helper function to convert a bytearray to a pretty printed hex string. Example: >>> print(bytes2prettyhex(range(59), whitespacelen=10)) 000000 00-01-02-03-04-05-06-07-08-09-0A-0B-0C-0D-0E-0F 000010 10-11-12-13-14-15-16-17-18-19-1A-1B-1C-1D-1E-1F 000020 20-21-22-23-24-25-26-27-28-29-2A-2B-2C-2D-2E-2F 000030 30-31-32-33-34-35-36-37-38-39-3A """ st = '' if data is not None: data_array = bytearray(data) datastr=["%02X"%(x) for x in data_array] N = range(0, len(datastr), 16) for n in N: st += '\n' + (' '*whitespacelen) + "{:06x} ".format(n) + "-".join(datastr[0+n:16+n]) return st
[docs]def bytes2hex(data): """ Helper function to convert a bytearray to a hex string AA-BB-CC-... Example: >>> bytes2hex([0xaa, 0xbb, 0x00, 0x01, 0x10]) 'AA-BB-00-01-10' """ data =bytearray(data) return('-'.join(["%02X"%(x) for x in data]))
[docs]def hex2bytes(hexstr): """ Helper function to convert a hex string (AA-BB-CC-...) to a python bytearray Example: >>> hex2bytes("AA-BB-00-01-10") bytearray(b'\\xaa\\xbb\\x00\\x01\\x10') """ data = hexstr.split('-') return bytearray(''.join([chr(int(x, 16)) for x in data]))
[docs]class XMLRPCTimeoutServerProxy(RPCClient): """ Deprecated in favor of :class:`libgs_ops.rpc.RPCClient`. The implmentation is now just that this is an alias for RPCClient .. todo:: Refactor the code to no longer use this so we can delete """ pass
if __name__ == '__main__': setup_logger(log, cons_loglvl=logging.DEBUG)