# -*- coding: utf-8 -*-
"""
..
Copyright © 2017-2018 The University of New South Wales
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Except as contained in this notice, the name or trademarks of a copyright holder
shall not be used in advertising or otherwise to promote the sale, use or other
dealings in this Software without prior written authorization of the copyright
holder.
UNSW is a trademark of The University of New South Wales.
libgs_ops.propagator
=====================
:date: Jan 2 10:10:48 2018
:author: Kjetil Wormnes
libgs_ops.propagator has been designed to allow you to design and plan satellite passes, and to calculate the data required by
:mod:`.scheduling` when creating the Schedules to upload to the groundstation.
Usage
---------
You can use TLEs from a database with the TLEDb interface:
>>> p = Propagator(api=TLEDb('leo_tles.txt'), gs_lat = GS_LAT, gs_lon = GS_LON, gs_elev = GS_ELEV) #<-- replace path with a txt file of recent TLEs
>>> p.get_tle(25544)
('0 ISS (ZARYA)',
'1 25544U 98067A 17330.99569027 .00003456 00000-0 59208-4 0 9997',
'2 25544 51.6427 306.6944 0004138 155.0450 324.2726 15.54239375 87077')
.. note::
This assumes the file leo_tles.txt exists, and is in the correct format. The file can contain many tles.
You can download tles from Space-Track.org. They need to be in the 3le format.
A TLE that is not in the database will fail:
>>> p.get_tle(40000)
KeyError: 40000
Or you can connect directly to spacetrack if you have login credentials. Note hat uname, pwd are your Space-Track.org
username and password. GS_LAT, GS_LON, GS_ELEV are the coordinates of your ground station.
>>> p = Propagator(api=SpaceTrackAPI(uname, pwd), gs_lat = GS_LAT, gs_lon = GS_LON, gs_elev = GS_ELEV)
Now we can get any TLE:
>>> [p.get_tle(25544), p.get_tle(40000)]
[('0 ISS (ZARYA)',
'1 25544U 98067A 18191.92601852 .00011951 00000-0 18828-3 0 9990',
'2 25544 51.6420 260.3859 0003584 301.4077 43.7815 15.54010757122205'),
('0 FENGYUN 2C DEB',
'1 40000U 04042D 17363.55183407 -.00000332 00000-0 00000+0 0 9995',
'2 40000 9.4060 40.1038 0077532 344.5713 15.1779 1.00303233 32417')]
Or you can even specify a TLE directly (only really useful for testing):
>>> tle = \"""0 ISS (ZARYA)
>>> 1 25544U 98067A 17284.88510854 .00004604 00000-0 76690-4 0 9997
>>> 2 25544 51.6422 176.5756 0004586 12.7905 64.0182 15.54151166 79907\"""
>>> p = Propagator(tles=tle, gs_lat = GS_LAT, gs_lon = GS_LON, gs_elev = GS_ELEV)
The propagator provides many utility functions to allow you to easily visualise and plan passes. For this example, we will
work with the International Space Station (ISS) which has NORAD ID 25544. To print details about it, including
where it is at the moment:
>>> p.print_info(25544)
TLE (updated 12/04/2018 09:56:44):
0 ISS (ZARYA)
1 25544U 98067A 17284.88510854 .00004604 00000-0 76690-4 0 9997
2 25544 51.6422 176.5756 0004586 12.7905 64.0182 15.54151166 79907
\
Orbit:
epoch (utc) : 2017/10/11 21:14:33
eccentricity: 0.000459
inclindation: 51.642200
RAAN: 176.575607
AP: 12.790500
revol per day: 15.541512
mean anom at epoch: 64.018204
orbit no at epoch: 7990
\
Observer:
lat: -35.291447
lon: 149.165655
elev: 614
date (utc): 2018/04/11 23:56:55
\
Satellite:
ground track pos: (lat = 33.998, lon = -179.991)
pointing: (az = 26.178, el = -35.329)
range: 7997806.500000
range rate: 5306.109375
altitude: 400772.000000
orbit number: 10820.520789
We can compute the details of the next pass using :meth:`Propagator.compute_pass`. It returns two tables, pdat and psum.
pdat includes deails about az, el and range_rate during the pass and is what the :class:`.Scheduler` will require (more about
that later), and psum shows an overview:
>>> pdat, psum = p.compute_pass(25544)
>>> psum
======= ======== ====================== =============== =========== =========== =============== =============== ============== ============ =========== ========
. norad_id tstamp_str orbit_no az el range range_rate altitude lat long eclipsed
======= ======== ====================== =============== =========== =========== =============== =============== ============== ============ =========== ========
rise 25544 2018/4/12 12:56:27 10828.934071 341.021182 0.017601 2.362804e+06 -6453.000000 404883.78125 -15.640039 142.408351 True
peak 25544 2018/4/12 13:01:33 10828.989114 50.446186 21.951227 9.426514e+05 57.450962 408999.65625 -30.219701 155.773119 True
set 25544 2018/4/12 13:06:40 10829.044337 118.978849 0.075523 2.383968e+06 6468.492188 413027.25000 -42.669076 173.889048 True
======= ======== ====================== =============== =========== =========== =============== =============== ============== ============ =========== ========
.. note::
pdat (and psum) returns more information (columns) than are required by :class:`Schedule`, which only needs az, el and range_rate.
Most of the other functionality in this module are subsets of :meth:`Propagator.get_all`:
>>> p.get_all(25544)
norad_id 25544
tstamp_str 2018/04/11 23:59:00
orbit_no 10820.5
az 28.9954
el -39.2531
range 8.64443e+06
range_rate 5033.92
altitude 402203
lat 39.1371
long -172.682
eclipsed False
Name: 2018/04/11 23:59:00, dtype: object
You can provide a when=keyword to specify when you want the details computed for, or even an array of timestamps:
>>> p.get_all(25544, when=['2018/04/11 23:59:00', '2018/04/11 23:60:00', '2018/04/11 23:61:00'])
======================= ========= ==================== ========= ======== ========= ============ =========== ========= ======== ========= =========
. norad_id tstamp_str orbit_no az el range range_rate altitude lat long eclipsed
======================= ========= ==================== ========= ======== ========= ============ =========== ========= ======== ========= =========
2018/04/11 23:59:00 25544 2018/04/11 23:59:00 10820.5 28.9954 -39.2531 8.64443e+06 5033.92 402203 39.1371 -172.682 False
2018/04/11 23:60:00 25544 2018/04/11 23:60:00 10820.6 30.3388 -41.12 8.94222e+06 4891.28 402885 41.401 -168.776 False
2018/04/11 23:61:00 25544 2018/04/11 23:61:00 10820.6 31.6915 -42.9758 9.23124e+06 4741.58 403546 43.5045 -164.581 False
======================= ========= ==================== ========= ======== ========= ============ =========== ========= ======== ========= =========
A more complicated example to show how you can do anything you can do in python.
Requires mplleaflet to be installed (pip install mplleaflet):
>>> import matplotlib.pyplot as plt
>>> import mplleaflet
>>> latlon = [p.get_ground_coord(25544, when=tstamp) for tstamp in pdat.tstamp_str]
>>> lat, lon = zip(*latlon)
>>> plt.plot(lon, lat, linewidth=3, color='r')
>>> plt.plot(p.gs_lon, p.gs_lat, 'ro', markersize=10)
>>> mplleaflet.display(tiles='esri_aerial')
It can be useful to get an overview of upcoming satellite passes. Just use :meth:`Propagator.get_passes` to very quickly calculate upcoming
passes. It also optionally accepts the ``when`` keyword in the same way as :meth:`Propagator.get_all` (if omitted when = Now) as well as ``N`` to specify the number
of upcoming passes to calculate (if omitted N=1), and ``horizon`` to filter by passes that are higher than a certain elevation. For example:
>>> p.get_passes([25544, 42778, 42017], N=1, horizon=10)
=== ======= ============= ======================= ============== ======================= ============= ======================= =============
. nid orbit_no rise_t rise_az max_elev_t max_elev set_t set_az
=== ======= ============= ======================= ============== ======================= ============= ======================= =============
0 42778 4451.856863 2018/04/12 11:00:07 150.894247 2018/04/12 11:05:31 18.542780 2018/04/12 11:10:50 19.517481
1 42017 6406.847362 2018/04/12 12:10:13 169.722360 2018/04/12 12:16:10 83.098219 2018/04/12 12:22:01 345.571864
2 25544 10832.568897 2018/04/12 14:03:15 308.388451 2018/04/12 14:08:46 69.318347 2018/04/12 14:14:22 133.721252
=== ======= ============= ======================= ============== ======================= ============= ======================= =============
.. note::
The horizon parameter just filters the rows to show anything with max_elev > horizon. rise_t and set_t still correspond to horizon=0.
This is because get_passes uses a quick computation algorithm (See :meth:`ephem.Observer.next_pass`). To get the actual
rise_t and set_t for that horizon, run :meth:`Propagator.compute_pass` with ``when`` set to the corresponding rise_t.
A common use of get_passes is to get the list of upcoming passes you want to create a schedule for. You then need to call compute_passes
for each of the rows in the get_passes table by giving the ``when`` parameter as the corresponding rise_t. The below example uses the
:func:`mpl_plot_pass` convenience function to plot a representation for each of the upcoming passes:
>>> for k,satpass in p.get_passes([25544, 42778, 42017], N=1, horizon=10).iterrows():
>>> pdat, psum = p.compute_pass(satpass.nid, when=satpass.rise_t)
>>> mpl_plot_pass(pdat)
Module reference
----------------
"""
from __future__ import print_function
import requests
import ephem
from datetime import datetime
import time
from math import pi
import pandas as pd
from pandas import DataFrame
import numpy as np
from lxml import html
import re
from functools import wraps
def _print( *arg, **kwarg):
"""
Function to print.
This wrapper funciton is provided to allow for more easy redirection
of output to other destinations (including Bokeh GUI, or files) if
necessary in the futrue.
It should be used for information targeted at an interactive user, not
for logging messages
"""
print(*arg, **kwarg)
[docs]class Error(Exception):
""" A generic exception """
pass
try:
import matplotlib.pyplot as plt
_HAS_MATPLOTLIB = True
except:
_HAS_MATPLOTLIB = False
from collections import Iterable
import logging
log = logging.getLogger('libgs_ops-log')
log.addHandler(logging.NullHandler())
# Decorator to more easily tag matplotlib functions
def _mpl(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not _HAS_MATPLOTLIB:
raise Error("matplotib.pyplot not available, before using this funciton you need to install it : pip install matplotlib")
else:
func(*args, **kwargs)
return wrapper
def _tstamp_array(start, end, dt=None):
"""
Helper function to create an array of ephem.Date
"""
t0 = ephem.Date(start)
t1 = ephem.Date(end)
if dt is None:
t_array = np.linspace(t0,t1, 100)
else:
dt_days = dt/86400.0
t_array = np.arange(t0,t1,dt_days)
t_array2 = [ephem.Date(t) for t in t_array]
return t_array2
[docs]@_mpl
def mpl_plot_pass(angs, vishor=10):
"""
A helper function to plot a pass in polar coordinates.
This function requires matplotlib to be installed.
Args:
angs: The antenna angles (pandas dataframe with az+el columns)
vishor: The horizon (deg)
Returns:
None
"""
angs = angs[angs.el>vishor]
if angs.empty:
raise Error("Pass never comes above visiblity horizon")
plot_fig = plt.gcf()
ax = plot_fig.add_subplot(111, projection='polar')
ax.set_ylim([0,90])
az = angs.az/180.0*pi
el = 90 - angs.el
ax.plot(az,el, 'b', linewidth=2)
# plot visibility horizon
min_el = vishor
h_az = np.linspace(0,2*pi, 100)
h_el = [90-min_el]*len(h_az)
ax.plot(h_az, h_el, 'r')
# print timestamps for start/end pass
maxe = angs[angs.el == angs.el.max()]
if type(maxe) == DataFrame:
maxe = maxe.iloc[0]
kaz = np.array([angs.iloc[0].az, maxe.az, angs.iloc[-1].az])
kel = np.array([angs.iloc[0].el, maxe.el, angs.iloc[-1].el])
kt = np.array([angs.iloc[0].tstamp_str, maxe.tstamp_str, angs.iloc[-1].tstamp_str])
ax.plot(kaz/180.0*pi, 90-kel, 'b.', markersize=10)
for i in range(0, len(kaz)):
ax.text(kaz[i]/180.0*pi, 90-kel[i], kt[i], color='b', weight='bold')
ax.set_yticklabels([])
ax.set_theta_zero_location("N")
ax.set_theta_direction(-1)
[docs]class SpaceTrackAPI(object):
"""
Class to interface with `space-track.org <https://www.space-track.org>`_.
This class connects to the space-track.org public API, and therefore requires a username and password.
To obtain such credentials you can register for free on the space-track website.
This class allows the execution of arbitrary API queries, but provides a simplified method to download TLEs in
the format used by :class:`.Propagator`.
"""
ST_URL='https://www.space-track.org'
def __init__(self, uname, pwd):
"""
:param uname: space-track.org user name
:param pwd: space-track.org password
"""
r=requests.post(self.ST_URL+"/ajaxauth/login", json={'identity':uname, 'password':pwd})
#
# Raise error if login request fails
#
r.raise_for_status()
self.cookies = r.cookies
[docs] def query_raw(self, uri):
"""
Perform a raw query to spacetrack API. Can return anything the API
can.
See `spacetrack <https://www.space-track.org/documentation#/api>`_
for the URI syntax.
Args:
uri (string): URI, starting with '/'. It will be appended to the
spacetrack URL.
Example:
>>> query_raw('/basicspacedata/query/class/boxscore/format/csv')
Will return Space Objects Box Score, part 1 of the Space Situation Report (SSR), in CSV format
"""
r = requests.get(self.ST_URL+uri, cookies=self.cookies)
r.raise_for_status()
return r.content
[docs] def query_tle(self, params):
"""
Perform a (fairly) raw query to the spacetrack API for the most recent
three-line-elements.
Args:
params (dict): A dictionary of parameter/argument pairs that
specify the exact query to make.
.. Note::
This function will always return three-line-elements. I.e.
the param pairs are added to the API URI "/basicspacedata/query/class/tle_latest/format/3le/ORDINAL/1"
**Parameter syntax**
See `spacetrack <https://www.space-track.org/documentation#/api>`_
for the most up-to-date descripions of the possible parameter/value pairs.
Each entry in the param dict follows key:value as per the model
definition. For the latest model definition see `here <https://www.space-track.org/basicspacedata/modeldef/class/tle_latest/format/html>`_.
===================== =========================
Field Type
===================== =========================
ORDINAL tinyint(3) unsigned
COMMENT varchar(32)
ORIGINATOR varchar(5)
NORAD_CAT_ID mediumint(8) unsigned
OBJECT_NAME varchar(60)
OBJECT_TYPE varchar(11)
CLASSIFICATION_TYPE char(1)
INTLDES varchar(8)
EPOCH datetime
EPOCH_MICROSECONDS mediumint(8) unsigned
MEAN_MOTION double
ECCENTRICITY double
INCLINATION double
RA_OF_ASC_NODE double
ARG_OF_PERICENTER double
MEAN_ANOMALY double
EPHEMERIS_TYPE tinyint(3) unsigned
ELEMENT_SET_NO smallint(5) unsigned
REV_AT_EPOCH float
BSTAR double
MEAN_MOTION_DOT double
MEAN_MOTION_DDOT double
FILE int(10) unsigned
TLE_LINE0 varchar(62)
TLE_LINE1 char(71)
TLE_LINE2 char(71)
OBJECT_ID varchar(11)
OBJECT_NUMBER mediumint(8) unsigned
SEMIMAJOR_AXIS double(20,3)
PERIOD double(20,3)
APOGEE double(20,3)
PERIGEE double(20,3)
===================== =========================
The following operators can be used with the values
========= =======================================================
Operator Description
========= =======================================================
> Greater Than (alternate is %3E)
< Less Than (alternate is %3C)
<> Not Equal (alternate is %3C%3E)
, Comma Delimited 'OR' (ex. 1,2,3)
-- Inclusive Range
(ex. ``1--100`` returns 1 and 100 and everything in between).
Date ranges are expressed as
``YYYY-MM-DD%20HH:MM:SS--YYYY-MM-DD%20HH:MM:SS`` or
``YYYY-MM-DD--YYYY-MM-DD``
null-val Value for 'NULL', can only be used with Not Equal (<>) or by itself.
~~ "Like" or Wildcard search. You may put the ``~~`` before or
after the text; wildcard is evaluated regardless of
location of ``~~`` in the URL. For example, ``~~OB`` will return
'OBJECT 1', 'GLOBALSTAR', 'PROBA 1', etc.
^ Wildcard after value with a minimum of two characters.
(alternate is %5E) The wildcard is evaluated after the
text regardless of location of ^ in the URL. For example,
``^OB`` will return 'OBJECT 1', 'OBJECT 2', etc. but not 'GLOBALSTAR'
now Variable that contains the current system date and time.
Add or subtract days (or fractions thereof) after 'now'
to modify the date/time, e.g. ``now-7``, ``now+14``, ``now-6.5``,
``now+2.3``. Use <,>,and -- to get a range of dates;
e.g. ``>now-7``, ``now-14--now``
========= =======================================================
You can also specify to order results by a specific field by using
they
Examples:
>>> query_tle({'PERIOD': '<128'})
Satellites currently in Low Earth Orbit (LEO = <2000KM altitude).
>>> query_tle({'PERIOD':'1430--1450', 'orderby':'NORAD_CAT_ID'})
Satellites in geosynchronous orbit 1430 <= period <= 1450 minutes.
Sort results by Norad ID.
"""
URI = "/basicspacedata/query/class/tle_latest/format/3le/ordinal/1"
for k,v in params.items():
URI += "/" + k +"/"+v
log.debug("Downloading TLE list using URI: %s"%(URI))
r = requests.get(self.ST_URL+URI, cookies=self.cookies)
r.raise_for_status()
return r.content
[docs] def get_tle_sets(self, name):
"""
A shortcut to some predefined sets of satellites.
Avaliable sets are:
============== ============================================================
'amateur_all' space-track.org curated list of amateur radio satellites
'leo_all' All satellites in LEO (period < 128 )
'geo_all' All satellites in GEO (period between 1430 and 1450)
============== ============================================================
Args:
name (str): The set to get (see above)
Returns:
Concatenated string of TLEs
"""
if name == "amateur_all":
l = self.query_tle({
'favorites': 'Amateur',
'EPOCH': '>now-30'})
elif name == "leo_all":
l = self.query_tle({
'PERIOD': '<128'})
elif name == 'geo_all':
l = self.query_tle({
'PERIOD': '1430--1450'})
else:
raise Error('Set named %s is not defined'%(name))
return l
[docs] def get_tles(self, nids):
"""
Query space-track for a series of norad IDs, and return
the NIDs and TLEs in a dict.
.. note::
In some cases space-track can not find a TLE. **No error**
will be raised by this function and it is up to the caller
to check whether the returned NIDs match the passed ones.
Args:
nids (list(int)): List of norad ids to query for
Returns:
Python dictionary of NIDS/TLES
"""
idstr=",".join(str(x) for x in nids)
tlestr = self.query_tle({'NORAD_CAT_ID':idstr})#.decode()
tlestr = tlestr.strip().replace(b'\r', b'').split(b'\n')
L0 = tlestr[0::3]
L1 = tlestr[1::3]
L2 = tlestr[2::3]
tle_list = zip(L0,L1,L2)
nids = [int(x.split(b' ')[1]) for x in L2]
tles = dict(zip(nids, tle_list))
return tles
[docs]class TLEDb(object):
"""
Database of TLEs.
The Database can be loaded from a file, or input directly.
This class can serve as a drop-in replacement for :class:`.SpaceTrackAPI`
when creating a :class:`.Propagator` as it implements the ``get_tles`` method.
"""
def __init__(self, fname=None, tles=None):
"""
.. note::
Only one of the arguments (fname or tles) must be specified, and not both.
Args:
fname (str): The filename to load TLEs from
tles (dict or str): Manual specificaiton of TLES
"""
if (fname is not None) and (tles is not None):
raise Error("only enter fname OR tles")
if tles is not None:
if type(tles) is str:
#parse string
self.tles = self._parse_tlestr(tles)
elif type(tles) is dict:
# load directly
self.tles = tles
elif fname is not None:
# load from file
fp = open(fname, 'r')
tlestr = fp.read()
self.tles = self._parse_tlestr(tlestr)
fp.close()
else:
raise Error("Either a file name or a tle string must be entered")
[docs] def save_tles(self, fname, fformat='txt'):
"""
Save the database to file.
Args:
fname (str): Filename to save to
fformat (str): Currently has to be 'txt'
"""
if fformat == 'txt':
fp = open(fname, 'w')
tlestr = '\r\n'.join(sum(self.tles.values(), ()))
fp.write(tlestr)
fp.close()
elif fformat == 'pickle':
pass
else:
raise Exception('Invalid file format {}'.format(fformat))
def _parse_tlestr(self, tlestr):
tlestr = tlestr.strip().replace('\r', '').split('\n')
L0 = [l.strip() for l in tlestr[0::3]]
L1 = [l.strip() for l in tlestr[1::3]]
L2 = [l.strip() for l in tlestr[2::3]]
# do a tiny bit of error checking and extract nids
if all([x[0] == '0' for x in L0]) is False:
raise Error('Malformed TLE string')
if all([x[0] == '1' for x in L1]) is False:
raise Error('Malformed TLE string')
if all([x[0] == '2' for x in L2]) is False:
raise Error('Malformed TLE string')
tle_list = zip(L0,L1,L2)
nids = [int(x[2:7]) for x in L2]
tles = dict(zip(nids, tle_list))
return tles
[docs] def get_tles(self, nids):
"""
Query database for a series of norad IDs, and return
the NIDs and TLEs in a dict.
.. note::
If the NID is not in the database, **no error**
will be raised by this function and it is up to the caller
to check whether the returned NIDs match the passed ones.
Args:
nids (list(int)): List of Norad IDs
Returns:
Python dictionary of NIDS/TLES
"""
tles = dict()
for n in nids:
if n in self.tles.keys():
tles[n] = self.tles[n]
return tles
[docs]class Propagator(object):
"""
Class to compute pointing angles for a satellite based on its Norad ID
You can connect to a local Db using the TLEDb class, to spacetrack
using the SpaceTrackAPI class, or simply just specify the TLEs
directly as a string. The latter will use the manually input tles and never query space-track.
For a reference on TLEs see `Wikipedia <https://en.wikipedia.org/wiki/Two-line_element_set>`_
.. note::
This class relies heavily on `py-ephem <http://rhodesmill.org/pyephem/>`_
"""
#: Where to find space-track
ST_URL="https://www.space-track.org/"
def __init__(
self,
api = None,
tles = None,
gs_lat = None,
gs_lon = None,
gs_elev = 0,
tle_timeout=1,
nids=[]
):
"""
Args:
api : API for TLE updates
tles (str) : TLEstring
gs_lat (float, optional) : latitude of ground station
gs_lon (float, optional) : longitude of ground station
gs_elev (float, optional) : elevation of ground station
tle_timeout (float, optional) : Time in days before re-requesting new TLEs
nids (list(int), optional) : List of NORAD IDs to track.
If a satellite is requested that is not in this list it will be
added and a full refresh of the TLEs from space-track initiated.
"""
if api is None and tles is None:
raise Error('You have to specify either uname/pwd or tls')
if gs_lat is None or gs_lon is None:
raise Error("gs lon and lat must be specified")
#: Ground station latitude
self.gs_lat = gs_lat
#: Ground station longitude
self.gs_lon = gs_lon
#: Ground station elevation
self.gs_elev = gs_elev
if tles is not None:
self._update_tles(tles)
# big number to avoid tles timing out in manual mode
self.tle_timeout = 9999999
else:
#: List of NIDS to track
self.nids_to_track = nids
#: Current TLEs
self.tles = None
#: Last requested TLE
self.tle_timestamp = None
#: Timeout (in days) before requesting new TLE
self.tle_timeout = tle_timeout
#:
self.api = api
self._update_tles()
def _is_visible(self, nid, horizon=0):
"""
Check if a satellite is above the horizon
"""
az, el = self.get_angles(nid)
return (el >= horizon)
def _update_tles(self, ext_tles=None):#, nids=None):
"""
Query space-track.org for the trackable tles and save to memory
Args:
ext_tles: This argument allows to update the TLEs manually
for example for testing. ext_tles should be a string
exactly identical in format to what space-track would return
"""
#if nids is None:
# nids = self.nids_to_track
if ext_tles is None:
self.tles = self.api.get_tles(self.nids_to_track)
self.nids_to_track = list(self.tles.keys())
else:
raw_tles = ext_tles.replace('\r', '').split('\n')
raw_tles = [rtle.strip() for rtle in raw_tles]
# do a tiny bit of error checking and extract nids
if all([x[0] == '0' for x in raw_tles[0::3]]) is False:
raise Error('Malformed TLE string')
if all([x[0] == '1' for x in raw_tles[1::3]]) is False:
raise Error('Malformed TLE string')
if all([x[0] == '2' for x in raw_tles[2::3]]) is False:
raise Error('Malformed TLE string')
nids = [int(x.split(' ')[1]) for x in raw_tles[2::3]]
self.nids_to_track = nids
#
# Create a tle dict
#
self.tles = {} #dict to map nid->TLE
#self.name2nid = {} #dict to map name->nid
cnt = 0
for nid in nids:
self.tles[nid] = raw_tles[cnt:cnt+3]
#self.name2nid[self.raw_tles[cnt][2:]] = nid
cnt += 3
#
# Save time for timeout calculation
#
self.tle_timestamp = time.time()
def _tles_have_expired(self):
if self.tle_timestamp is None:
return True
dt = time.time() - self.tle_timestamp
# Calculate time in days and compare with timeout
if dt/86400.0 > self.tle_timeout:
return True
else:
return False
[docs] def get_visible_satellites(self, tz='AEST', future_only=True, allow_overlap = True):
"""
Get a list of current and upcoming radio amateur satellite passes.
This function gets its data from heavens above. Heavens above does
not provide an official API and this function therefore scrapes
the data from the website. It will break without notice if anything changes on
the heavens above website. That is considered acceptable since
this function is for information only and does not form any part
of the core ground station software.
Args:
tz (str, optional): The timezone designator, or None for UTC.
"""
resp = requests.get('http://www.heavens-above.com/AmateurSats.aspx?lat=%f&lng=%f&loc=%s&alt=%.0f%s'%\
(self.gs_lat, self.gs_lon, 'adfags', self.gs_elev,
('' if tz is None else '&tz=%s'%(tz))))
h = html.document_fromstring(resp.content)
r = h.xpath('//table[@class="standardTable"]')[0]
htmlstr = html.tostring(r)
sats = pd.read_html(htmlstr)[0]
sats.columns=['Satellite', 'Date', 'Start', 'HP_time', 'HP_Elev', 'HP_Az', 'End', 'Freq']
sats.HP_Elev = (sats.HP_Elev.str.extract('(\d+)', expand=False)).astype(float)
sats.HP_Az = (sats.HP_Az.str.extract('(\d+)', expand=False)).astype(float)
tz = h.xpath('//span[@id="ctl00_lblTZ"]')[0].text_content()
sats['NID'] = -1
for index, row in sats.iterrows():
idx2 = htmlstr.find(row.Satellite)
idx1 = htmlstr[:idx2].rfind('<a href')
linkstr = htmlstr[idx1:idx2]
satid = int(re.findall('satid=(\d+)',linkstr)[0])
sats.loc[index, 'NID'] = satid
sats['tz'] = tz
if future_only:
# minimum time to wait before first pass starts (in s)
BUFFER= 60
sats = sats[sats.Start.apply(pd.to_datetime) > pd.Timestamp.now() + pd.Timedelta(seconds=BUFFER)]
if not allow_overlap:
# get rid of overlapping
no_overl = [True] + [pd.to_datetime(sats.iloc[k].Start) > pd.to_datetime(sats.iloc[k-1].End) for k in range(1, len(sats))]
sats = sats[no_overl]
return sats
[docs] def get_tle(self, nid):
"""
Return TLEs from a NORAD ID by
quering Space-Track if data is older than the specified timeout
from memory otherwise
Args:
nid (int): Norad ID
Returns:
Three line element (TLE)
If the timeout delay has not been hit it will return the
most recently fetched TLE, otherwise it will retreive the
most recent TLE from spacetrack.
Example:
>>> A.get_tle(25544)
['0 ISS (ZARYA)', '1 25544U 98067A 17164.50922405 +.00002016 +00000-0 +37842-4 0 9993', '2 25544 051.6431 056.6611 0004416 268.2130 149.1141 15.54021074061180']
"""
if self.tles is None or self._tles_have_expired():
self._update_tles()
tles = self.tles.keys()
tles.sort()
self.nids_to_track.sort()
if tles != self.nids_to_track:
raise Error('Unexpected Error')
if nid not in tles:
if not hasattr(self, 'api'):
raise Error('Class has not been connected with an API. Cant request NID')
self.nids_to_track += [nid]
self._update_tles()
return self.tles[nid]
[docs] def get_range(self, nid, gs_lat=None, gs_lon=None, gs_elev=None, when=None):
return self.get_all(nid, gs_lat, gs_lon, gs_elev, when)['range']
[docs] def get_range_rate(self, nid, gs_lat=None, gs_lon=None, gs_elev=None, when=None):
return self.get_all(nid, gs_lat, gs_lon, gs_elev, when)['range_rate']
[docs] def get_doppler(self, nid, freq, gs_lat=None, gs_lon=None, gs_elev=None, when=None):
dv = self.get_range_rate(nid, gs_lat, gs_lon, gs_elev, when)
return (dv/299792458.0)*freq
[docs] def get_eclipsed(self, nid, gs_lat=None, gs_lon=None, gs_elev=None, when=None):
return self.get_all(nid, gs_lat, gs_lon, gs_elev, when)['eclipsed']
[docs] def get_angles(self, nid, gs_lat=None, gs_lon=None, gs_elev=None, when=None):
x = self.get_all(nid, gs_lat, gs_lon, gs_elev, when)[['az', 'el']]
if isinstance(x, pd.Series):
return x.az, x.el
else:
return x
[docs] def get_all(self, nid, gs_lat=None, gs_lon=None, gs_elev=None, when=None):
"""
Just a wrapper around pyephem _compute function, which allows the extraction
of a bunch of different parameters through other functions that use
this one (such as get_angles, get_range, etc...)
"""
#
# Allow user to pass gs coordinates, or use stored ones otherwise
#
if gs_lat is not None:
lat = gs_lat
else:
lat = self.gs_lat
if gs_lon is not None:
lon = gs_lon
else:
lon = self.gs_lon
if gs_elev is not None:
elev = gs_elev
else:
elev = self.gs_elev
if (lat is None) or (lon is None) or (elev is None):
raise Error("GS Lat, Lon or Elev have not been specified")
obs = ephem.Observer()
obs.lat = str(lat)
obs.lon = str(lon)
obs.elevation = elev
v = ephem.readtle(*self.get_tle(nid))
if when is None:
when = [datetime.utcnow().strftime('%Y/%m/%d %H:%M:%S')]
elif isinstance(when, basestring):
when = [when]
elif not isinstance(when, Iterable):
when = [when]
data = DataFrame(index=when, columns= ['norad_id', 'tstamp_str', 'orbit_no', 'az', 'el', 'range', 'range_rate', 'altitude', 'lat', 'long', 'eclipsed'])
for w in when:
obs.date = w
v.compute(obs)
data.az[w] = v.az/pi*180.0
data.el[w] = v.alt/pi*180.0
data.range[w] = v.range
data.range_rate[w] = v.range_velocity
data.eclipsed[w] = v.eclipsed
data.altitude[w] = v.elevation
data.lat[w] = v.sublat/pi*180.0
data.long[w] = v.sublong/pi*180.0
data.tstamp_str[w] = str(w)
data.norad_id[w] = nid
data.orbit_no[w] = self._calculate_orbit_no(nid, w)
if pd.isnull(data).any().any():
raise Error('Unexpected error: dataframe has not filled as expected')
data.index.name='tstamp'
# This is not a nice way to store attributes since it does not persist
# if we do something to the dataframe, so in this funciton it is also
# saved as a column (also not very nice). Anyway, the below is relied
# on for several functions so dont remove.
data.nid = nid
# For convenience we also store the TLE this way. It is not relied on
# but if it is not set then the TLE will not end up in the libgs log
# (unless manually added as metadata to CommsPass under the TLE key)
data.TLE = '\n'.join(self.get_tle(nid))
if len(data) == 1:
return(data.iloc[0])
else:
return data
[docs] def get_angles_old(self, nid, gs_lat=None, gs_lon=None, gs_elev=None, when=None):
"""
If the user prefers to calculate the angles at a specific time he
should set when=YYYY/MM/DD HH:mm:ss. when can also be an array
"""
#
# Allow user to pass gs coordinates, or use stored ones otherwise
#
if gs_lat is not None:
lat = gs_lat
else:
lat = self.gs_lat
if gs_lon is not None:
lon = gs_lon
else:
lon = self.gs_lon
if gs_elev is not None:
elev = gs_elev
else:
elev = self.gs_elev
if (lat is None) or (lon is None) or (elev is None):
raise Error("GS Lat, Lon or Elev have not been specified")
obs = ephem.Observer()
obs.lat = str(lat)
obs.lon = str(lon)
obs.elevation = elev
v = ephem.readtle(*self.get_tle(nid))
if when is None:
obs.date = datetime.utcnow().strftime('%Y/%m/%d %H:%M:%S')
elif type(when) is not list:
obs.date = when
if type(when) is list:
pos = DataFrame(index=when, columns= ['tstamp_str', 'az', 'el'])
for w in when:
obs.date = w
v.compute(obs)
pos.az[w] = v.az/pi*180.0
pos.el[w] = v.alt/pi*180.0
pos.tstamp_str[w] = str(w)
if pd.isnull(pos).any().any():
raise Error('Unexpected error: dataframe has not filled as expected')
pos.nid = nid
return pos
else:
v.compute(obs)
#
# Return Azimuth, Elevation (in deg)
#
return v.az/pi*180.0, v.alt/pi*180.0
[docs] def get_ground_coord(self, nid, when=None):
"""
Return the ground track lat/long coord without needing the ground station
coordinates
"""
obs = ephem.Observer()
if when is None:
obs.date = datetime.utcnow().strftime('%Y/%m/%d %H:%M:%S')
else:
obs.date = when
v = ephem.readtle(*self.get_tle(nid))
v.compute(obs)
return v.sublat/pi*180.0, v.sublong/pi*180.0
# TODO: Add eclipsed and get_ground_coord and get_orbit to extend a base
# function taht doesnt depend on gs lat/lon (in same way as get_all above)
[docs] def get_orbit(self, nid, when=None):
"""
Return the orbital elements
"""
obs = ephem.Observer()
if when is None:
obs.date = datetime.utcnow().strftime('%Y/%m/%d %H:%M:%S')
else:
obs.date = when
v = ephem.readtle(*self.get_tle(nid))
v.compute(obs)
data = {\
'epoch' : v._epoch,
'e' : v._e,
'inc' : v._inc/pi*180.0,
'raan' : v._raan/pi*180.0,
'ap' : v._ap/pi*180.0,
'n' : v._n,
'M' : v._M/pi*180.0,
'orbit' : v._orbit}
return data
def _get_observer(self, gs_lat=None, gs_lon = None, gs_elev = None, when=None):
#
# Allow user to pass gs coordinates, or use stored ones otherwise
#
if gs_lat is not None:
lat = gs_lat
else:
lat = self.gs_lat
if gs_lon is not None:
lon = gs_lon
else:
lon = self.gs_lon
if gs_elev is not None:
elev = gs_elev
else:
elev = self.gs_elev
if (lat is None) or (lon is None) or (elev is None):
raise Error("GS Lat, Lon or Elev have not been specified")
obs = ephem.Observer()
obs.lat = lat/180.0*pi
obs.lon = lon/180.0*pi
obs.elevation = elev
if when is None:
obs.date = datetime.utcnow().strftime('%Y/%m/%d %H:%M:%S')
elif isinstance(when, unicode):
obs.date = when.encode()
else:
obs.date = when
return obs
[docs] def get_passes(self, nid=None, N=1, when=None, horizon=0):
"""
Compute the next passes
Args:
nid (int, optional): compute the next pass for a given nid.
If nid is omitted, compute the next pass for all tracked
satellites.
hor (float, optional): horizon angle
"""
passes = []
if nid is None:
iterable = self.nids_to_track
elif type(nid) is list:
iterable = nid
elif type(nid) is int:
iterable = [nid]
else:
raise Error('nid is malformed')
# Check if satellite is currently visible without having to call
# get_angles. Just do two next_pass calculations: one for now, one
# for 10 minutes ago. Check that they match. If not use the one from
#
# if yes: either delay time enough to get next pass later
# or advance time to include this pass... tbd
for nid in iterable:
#print(nid)
i = 0
nextwhen = when
while i < N:
obs = self._get_observer(when=nextwhen)
soon_date = ephem.Date(obs.date + (1.0/86400))
tle = ephem.readtle(*self.get_tle(nid))
try:
p = obs.next_pass(tle)
except Exception as e:
log.warning("Trying to get pass for NID %d, but cant: %s"%(nid, e))
i += 1
continue
if not any(p[2:]):
log.warning("Trying to get pass for NID %d, but cant. It may no longer be in orbit"%(nid))
i+=1
continue
if not (p[0]< p[4]):
# satellite is probably currently visible
# compute pos in 1 sec and set as rising
az,el = self.get_angles(nid, when=soon_date)
if soon_date <= p[2]:
p = (soon_date, el/180.0*pi, p[2], p[3], p[4], p[5])
# if maximum has also been passed, set it to be the same
elif soon_date > p[2]:
p = (soon_date, el/180.0*pi, soon_date, az/180.0*pi, p[4], p[5])
else:
raise Error('Unexpected')
nextwhen = ephem.Date(p[4] + 60.0/86400)
# convert to a str format that
# a) parses with pd.to_datetime
# b) parses with ephem.Date
# c) sorts correctly
dstr = lambda x : x.datetime().strftime('%Y/%m/%d %H:%M:%S')
if p[3]/pi*180.0 < horizon:
continue
else:
passes += [(nid, self._calculate_orbit_no(nid, str(p[0])),
dstr(p[0]), p[1]/pi*180.0,
dstr(p[2]), p[3]/pi*180.0,
dstr(p[4]), p[5]/pi*180.0)]
i += 1
passes = DataFrame(passes, columns = ['nid', 'orbit_no', 'rise_t', 'rise_az', 'max_elev_t', 'max_elev', 'set_t', 'set_az'])
if len(passes) > 1:
passes = passes.sort_values(by='rise_t').reset_index(drop=True)
passes.index.name="pass_no"
if len(passes) == 1:
passes.index.name = "pass_no"
passes = passes.iloc[0]
elif len(passes) < 1:
raise Error('An unexpected error occurred when computing passes')
return passes
[docs] def compute_pass(self, nid, key_els = [], when=None, dt=1.0):
"""
Perform a detailed calculation of the pass and
return the trajectory in a dataframe
Also compute the keypoints for which the trajectory passes through
a set of elevations (specified)
Args:
key_els (list(float), optional): list of elevations to compute
keypoints for. elevations less than 1 deg apart are ignored.
"""
# get upcoming pass
p = self.get_passes(nid, when=when)
ta = _tstamp_array(p.rise_t, p.set_t, dt=dt)
if len(ta) < 3:
raise Error('pass is too short to compute')
angs = self.get_all(nid, when=ta)
p_max = angs[angs.el == angs.el.max()].iloc[0]
p_rise = angs[angs.el > 0].iloc[0]
p_set = angs[angs.el > 0].iloc[-1]
if len(key_els) > 0:
key_els.sort()
key_els = np.unique([int(k) for k in key_els])
keypoints = [None] * len(key_els) * 2
i = 0
for el in key_els:
if (angs.el > el).any() == False:
raise Error('Pass never reaches requested keypoint elevation')
keypoints[i] = angs[angs.el > el].iloc[0]
keypoints[-1-i] = angs[angs.el > el].iloc[-1]
i += 1
keypoints = [p_rise] + keypoints[:len(key_els)] + [p_max] + keypoints[len(key_els):] + [p_set]
else:
keypoints= [p_rise, p_max, p_set]
kpoints = DataFrame(keypoints)
kpoints.index = ['rise'] + ['rise%02d'%(d) for d in key_els] + ['peak'] + ['set%02d'%(d) for d in key_els[::-1]] + ['set']
return angs, kpoints
def _calculate_orbit_no(self, NID, when):
orbit = self.get_orbit(NID, when=when)
ep = orbit['epoch'] # time at TLE epoch
n = orbit['n'] # mean motion (rev/day)
orb_e = orbit['orbit'] # orbit no at epoch
M = orbit['M'] # mean anomaly at epoch ("angle" of epoch)
ap = orbit['ap'] # argument of perigee (angle of perigee)
t = ephem.Date(when)
ep_an = ep - (1.0/n) * (M + ap)/360.0 #time of last ascending node
return orb_e + n*(t - ep_an)
[docs] def print_info(self, nid, when=None):
"""
Print all info about a specific space item
Args:
nid: Norad ID of object
"""
if when is None:
tstamp = datetime.utcnow().strftime('%Y/%m/%d %H:%M:%S')
else:
tstamp = when
tle = self.get_tle(nid)
#lat,lon = self.get_ground_coord(nid, when=tstamp)
#az,el = self.get_angles(nid, when=tstamp)
sdata = self.get_all(nid, when=tstamp)
_print("TLE (updated %s):"%(datetime.fromtimestamp(self.tle_timestamp).strftime('%d/%m/%Y %H:%M:%S')))
_print("\n ".join(tle))
odata = self.get_orbit(nid)
_print("\nOrbit:")
_print(" epoch (utc) : %s"%(odata['epoch']))
_print(" eccentricity: %f"%(odata['e']))
_print(" inclindation: %f"%(odata['inc']))
_print(" RAAN: %f"%(odata['raan']))
_print(" AP: %f"%(odata['ap']))
_print(" revol per day: %f"%(odata['n']))
_print(" mean anom at epoch: %f"%(odata['M']))
_print(" orbit no at epoch: %d"%(odata['orbit']))
_print("\nObserver:")
_print(" lat: %f\n lon: %f\n elev: %.0f\n date (utc): %s"%(
self.gs_lat, self.gs_lon, self.gs_elev, tstamp))
_print("\nSatellite:")
_print(" ground track pos: (lat = %.3f, lon = %.3f)"%(sdata['lat'],sdata['long']))
_print(" pointing: (az = %.3f, el = %.3f)"%(sdata['az'],sdata['el']))
_print(" range: %f"%(sdata['range']))
_print(" range rate: %f"%(sdata['range_rate']))
_print(" altitude: %f"%(sdata['altitude']))
_print(" orbit number: %f"%(sdata['orbit_no']))
if __name__ == '__main__':
pass