-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTLE_sats.py
More file actions
290 lines (232 loc) · 9.21 KB
/
TLE_sats.py
File metadata and controls
290 lines (232 loc) · 9.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
'''Uses python-sgp4 to calculate which satellites will cross my zenith today.
Then, make a list of those satellites.
Print a list periodically of the satellites overhead.
requires this repo to be installed:
https://github.com/brandon-rhodes/python-sgp4
https://github.com/satellogic/orbit-predictor
'''
import glob
import os
import shutil
from time import clock
from datetime import datetime, timedelta
import geocoder
import numpy as np
from orbit_predictor import (angles, coordinate_systems, exceptions, keplerian,
locations, predictors, sources, utils)
from sgp4.earth_gravity import wgs84
from sgp4.io import twoline2rv
try:
from urllib2 import urlopen
except:
from urllib.request import urlopen
from send_cube import connect_cube, send_cube
OVERHEAD_LIMIT = 10. # Degrees
TLE_URLS = ('http://celestrak.com/NORAD/elements/weather.txt',
'http://celestrak.com/NORAD/elements/resource.txt',
'https://www.celestrak.com/NORAD/elements/cubesat.txt',
'http://celestrak.com/NORAD/elements/stations.txt',
'https://www.celestrak.com/NORAD/elements/sarsat.txt',
'https://www.celestrak.com/NORAD/elements/noaa.txt',
'https://www.celestrak.com/NORAD/elements/amateur.txt',
'https://www.celestrak.com/NORAD/elements/engineering.txt')
def fetch(urls):
"""Fetch TLE from internet and save it to `destination`."""
print("Fetching TLE data...")
# TODO: play nicely when recollecting data
try:
shutil.rmtree("TLE_data")
except:
pass
os.mkdir('TLE_data')
for url in urls:
destination = url.split('/')[-1]
destination = os.path.join("TLE_data", destination)
with open(destination, "w", encoding="utf-8") as dest:
response = urlopen(url)
dest.write(response.read().decode("utf-8"))
def update_passing_sats(dt=1):
# Where am I?
g = geocoder.ip('me')
lat, lon = g.latlng
me = locations.Location('me', lat, lon, 0)
# Get the current time.
now = datetime.utcnow()
# Get tomorrow, as the limit on when to check
tomorrow = now + timedelta(days=dt)
print('I want to find satellites that pass overhead between:\n', now, '\n', tomorrow)
# Get a database of satellites.
fetch(TLE_URLS)
created_files = glob.glob("TLE_data/*")
print("TLE files:\n{}".format(created_files))
# Now read that data into my TLE database
database = sources.MemoryTLESource()
# Also track all my satellite IDs
sat_IDs = []
print("Parsing TLE data...")
for fname in created_files:
with open(fname, 'r') as f:
while True:
name = f.readline().strip()
if name == '':
break
sat_IDs.append(name)
tle_1 = f.readline().strip()
tle_2 = f.readline().strip()
tle = (tle_1, tle_2)
database.add_tle(name, tle, now)
print("Done!")
print("Checking all satellites to see which transit overhead in the next 24 hours...")
alt_lim = abs(90 - OVERHEAD_LIMIT) # Degrees
passes = []
will_pass = []
for ID in sat_IDs:
try:
predictor = predictors.TLEPredictor(ID, database)
pred = predictor.get_next_pass(
location=me,
when_utc=now,
aos_at_dg=alt_lim,
limit_date=tomorrow
)
print("The object {} will pass over {} deg at:\n--> {}\n".format(ID, alt_lim, pred))
will_pass.append(ID)
passes.append(pred)
except AssertionError as e:
pass
# print(e)
except exceptions.NotReachable as e:
pass
# print(e)
except exceptions.PropagationError as e:
pass
# print(e)
# print(will_pass)
print("\n\nFound {} satellites that will pass through the top {} degrees above lat, lon: {}, {} within the next {} day(s)\n\n\n".format(
len(will_pass), 90-alt_lim, lat, lon, dt
))
with open('passing_sats.txt', 'w') as f:
to_write = '\n'.join(will_pass)
f.write(to_write)
for ID, pred in zip(will_pass, passes):
print("{}\n\n".format(pred))
return database
def sat_locations(database,
mylat, mylon,
time=None,
lat_bins=8, lon_bins=8,
alt_edges=[9e99, 30000, 20000, 10000, 5000, 2000, 1000, 0],
quiet=True):
'''For the satellite IDs in passing_sats.txt, check their current location. Then,
fill in a grid of how many satellites are in each cell of a grid.
Inputs:
-------
database, MemoryTLESource
The orbit_predictor TLE database, populated with satellites
time, datetime.datetime, optional
The time to calculate the grid for. If None, use datetime.datetime.utcnow(). Requires UTC
lat_bins, int, optional
The number of latitude bins. Used to construct a numpy linspace for the bin edges.
lon_bins, int, optional
The same, for longitude
alt_edges, iterable of floats, optional
A list of the altitude bins.
Output:
-------
grid, 3D numpy array
A grid populated with the number of satellites per cell, at the current time.
'''
if time is None:
time = datetime.utcnow()
# What are my bin edges?
altrange = abs(OVERHEAD_LIMIT)
lat_edges = np.linspace(-altrange, +altrange, lat_bins)
lon_edges = np.linspace(-altrange, +altrange, lon_bins)
# Init an empty grid.
grid = np.zeros((len(alt_edges), lat_bins, lon_bins), dtype=int)
# I might be interested in which satellite is currently overhead
overhead_now = []
with open('passing_sats.txt', 'r') as f:
for line in f:
if not quiet:
print("\n\n-------------------------------------------------------------------------------------\n\n")
ID = line.strip()
# create this satellite's predictor
predictor = predictors.TLEPredictor(ID, database)
# The positions are returned in Earth-centric, Earth fixed coords. I need to convert those.
ecef_location = predictor.get_only_position(time)
### Convert ECEF to LLA ###
lat, lon, alt = coordinate_systems.ecef_to_llh(ecef_location)
if not quiet:
print("satellite ID: {}".format(ID))
print("ECEF coords (x, y, z): {}, {}, {}".format(*ecef_location))
print("LLS coords (lat, lon, alt): {}, {}, {}".format(lat, lon, alt))
# transform lat, lon, to relative to me
dlat = lat - mylat
dlon = lon - mylon
if not quiet:
print("Relative to me, the satellite is at {}, {}, {}".format(dlat, dlon, alt))
if dlat < np.min(lat_edges) or dlat > np.max(lat_edges):
if not quiet:
print("Satellite is not in the visible range")
continue
if dlon < np.min(lon_edges) or dlon > np.max(lon_edges):
if not quiet:
print("Satellite is not in the visible range")
continue
if alt < np.min(alt_edges) or alt > np.max(alt_edges):
if not quiet:
print("Satellite is not in the visible range")
continue
# What indexes is this satellite in?
index_alt = np.digitize(alt, alt_edges)
index_lat = np.digitize(dlat, lat_edges)
index_lon = np.digitize(dlon, lon_edges)
# If they fall outside the range, I don't care about the object
if not quiet:
print("This satellite is in index (lat, lon, alt): ({}, {}, {})".format(index_lat, index_lon, index_alt))
# Save this satellite
grid[index_alt, index_lat, index_lon] += 1
overhead_now.append((ID, alt))
return grid, overhead_now
if __name__ == "__main__":
# Not needed in the above functions
from pprint import pprint
import time
try:
ser = connect_cube()
except:
print("No arduino detected!!")
input("> ")
ser = None
# Create the database
database = update_passing_sats()
# Where am I?
me = geocoder.ip('me')
mylat, mylon = me.latlng
while True:
# Time to evaluate
now = datetime.utcnow()
# pop the grid
t0 = clock()
grid, satlist = sat_locations(
database, mylat, mylon,
time=now, quiet=True
)
exec_time = clock() - t0
os.system("clear")
print("--------------------------------------")
print("Checked for overhead satellites in {:.3f}s".format(exec_time))
print(now)
if ser is not None:
send_cube(ser, grid)
# if np.sum(grid):
# for i, subgrid in enumerate(grid):
# if np.sum(subgrid):
# print("Layer {}".format(i))
# pprint(subgrid)
# pprint(np.sum(grid))
print("Satellites overhead now:")
for sat in satlist:
print("{}: {:.3f} km".format(*sat))
time.sleep(1 - exec_time)