Skip to content

Commit

Permalink
add laikad to process replay (#24889)
Browse files Browse the repository at this point in the history
* merge

* Fix closing process executor after fetching orbits

* cleanup

* Add ref commit and revert test_processes hack

* Fix

* Fix ref

* Fix test

* Temp

* Temp

* Trying

* Trying

* Cleanup and change test

* add ref commit

* remove print

* fix test getting stuck

* cleanup fetch_orbits

Co-authored-by: Gijs Koning <gijs-koning@live.nl>
  • Loading branch information
pd0wm and gijskoning committed Jun 28, 2022
1 parent 4cf63f4 commit 3823f55
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 22 deletions.
50 changes: 33 additions & 17 deletions selfdrive/locationd/laikad.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
import json
import os
import time
from collections import defaultdict
from concurrent.futures import Future, ProcessPoolExecutor
Expand Down Expand Up @@ -32,8 +33,10 @@ def __init__(self, valid_const=("GPS", "GLONASS"), auto_update=False, valid_ephe
save_ephemeris=False, last_known_position=None):
self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types, clear_old_ephemeris=True)
self.gnss_kf = GNSSKalman(GENERATED_DIR)
self.orbit_fetch_executor = ProcessPoolExecutor()

self.orbit_fetch_executor: Optional[ProcessPoolExecutor] = None
self.orbit_fetch_future: Optional[Future] = None

self.last_fetch_orbits_t = None
self.last_cached_t = None
self.save_ephemeris = save_ephemeris
Expand All @@ -44,9 +47,13 @@ def __init__(self, valid_const=("GPS", "GLONASS"), auto_update=False, valid_ephe
self.last_pos_fix_t = None

def load_cache(self):
if not self.save_ephemeris:
return

cache = Params().get(EPHEMERIS_CACHE)
if not cache:
return

try:
cache = json.loads(cache, object_hook=deserialize_hook)
self.astro_dog.add_orbits(cache['orbits'])
Expand All @@ -71,7 +78,7 @@ def get_est_pos(self, t, processed_measurements):
self.last_pos_residual = pos_fix_residual
self.last_pos_fix_t = t
return self.last_pos_fix

def process_ublox_msg(self, ublox_msg, ublox_mono_time: int, block=False):
if ublox_msg.which == 'measurementReport':
t = ublox_mono_time * 1e-9
Expand Down Expand Up @@ -152,17 +159,22 @@ def init_gnss_localizer(self, est_pos):
def fetch_orbits(self, t: GPSTime, block):
if t not in self.astro_dog.orbit_fetched_times and (self.last_fetch_orbits_t is None or t - self.last_fetch_orbits_t > SECS_IN_HR):
astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types
if self.orbit_fetch_future is None:

ret = None

if block:
ret = get_orbit_data(t, *astro_dog_vars)
elif self.orbit_fetch_future is None:
self.orbit_fetch_executor = ProcessPoolExecutor(max_workers=1)
self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars)
if block:
self.orbit_fetch_future.result()
if self.orbit_fetch_future.done():
ret = self.orbit_fetch_future.result()
elif self.orbit_fetch_future.done():
self.last_fetch_orbits_t = t
if ret:
self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret
self.cache_ephemeris(t=t)
self.orbit_fetch_future = None
ret = self.orbit_fetch_future.result()
self.orbit_fetch_executor = self.orbit_fetch_future = None

if ret is not None:
self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret
self.cache_ephemeris(t=t)


def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types):
Expand All @@ -174,7 +186,7 @@ def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types):
astro_dog.get_orbit_data(t, only_predictions=True)
data = (astro_dog.orbits, astro_dog.orbit_fetched_times)
except RuntimeError as e:
cloudlog.info(f"No orbit data found. {e}")
cloudlog.warning(f"No orbit data found. {e}")
cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.1f}s")
return data

Expand Down Expand Up @@ -255,17 +267,21 @@ class EphemerisSourceType(IntEnum):
glonassIacUltraRapid = 2


def main():
sm = messaging.SubMaster(['ubloxGnss'])
pm = messaging.PubMaster(['gnssMeasurements'])
def main(sm=None, pm=None):
if sm is None:
sm = messaging.SubMaster(['ubloxGnss'])
if pm is None:
pm = messaging.PubMaster(['gnssMeasurements'])

replay = "REPLAY" in os.environ
# todo get last_known_position
laikad = Laikad(save_ephemeris=True)
laikad = Laikad(save_ephemeris=not replay)
while True:
sm.update()

if sm.updated['ubloxGnss']:
ublox_msg = sm['ubloxGnss']
msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss'])
msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss'], block=replay)
if msg is not None:
pm.send('gnssMeasurements', msg)

Expand Down
27 changes: 24 additions & 3 deletions selfdrive/locationd/test/test_laikad.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from unittest.mock import Mock, patch

from common.params import Params
from laika.constants import SECS_IN_DAY
from laika.ephemeris import EphemerisType, GPSEphemeris
from laika.gps_time import GPSTime
from laika.helpers import ConstellationId, TimeRangeHolder
Expand Down Expand Up @@ -62,6 +63,26 @@ def setUpClass(cls):
def setUp(self):
Params().delete(EPHEMERIS_CACHE)

def test_fetch_orbits_non_blocking(self):
gpstime = GPSTime.from_datetime(datetime(2021, month=3, day=1))
laikad = Laikad()
laikad.fetch_orbits(gpstime, block=False)
laikad.orbit_fetch_future.result(5)
# Get results and save orbits to laikad:
laikad.fetch_orbits(gpstime, block=False)

ephem = laikad.astro_dog.orbits['G01'][0]
self.assertIsNotNone(ephem)

laikad.fetch_orbits(gpstime+2*SECS_IN_DAY, block=False)
laikad.orbit_fetch_future.result(5)
# Get results and save orbits to laikad:
laikad.fetch_orbits(gpstime + 2 * SECS_IN_DAY, block=False)

ephem2 = laikad.astro_dog.orbits['G01'][0]
self.assertIsNotNone(ephem)
self.assertNotEqual(ephem, ephem2)

def test_ephemeris_source_in_msg(self):
data_mock = defaultdict(str)
data_mock['sv_id'] = 1
Expand Down Expand Up @@ -155,7 +176,7 @@ def wait_for_cache():
while Params().get(EPHEMERIS_CACHE) is None:
time.sleep(0.1)
max_time -= 0.1
if max_time == 0:
if max_time < 0:
self.fail("Cache has not been written after 2 seconds")

# Test cache with no ephemeris
Expand All @@ -170,7 +191,7 @@ def wait_for_cache():
wait_for_cache()

# Check both nav and orbits separate
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.NAV)
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.NAV, save_ephemeris=True)
# Verify orbits and nav are loaded from cache
self.dict_has_values(laikad.astro_dog.orbits)
self.dict_has_values(laikad.astro_dog.nav)
Expand All @@ -185,7 +206,7 @@ def wait_for_cache():
mock_method.assert_not_called()

# Verify cache is working for only orbits by running a segment
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT)
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT, save_ephemeris=True)
msg = verify_messages(self.logs, laikad, return_one_success=True)
self.assertIsNotNone(msg)
# Verify orbit data is not downloaded
Expand Down
3 changes: 2 additions & 1 deletion selfdrive/test/process_replay/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Process replay is a regression test designed to identify any changes in the outp
If the test fails, make sure that you didn't unintentionally change anything. If there are intentional changes, the reference logs will be updated.

Use `test_processes.py` to run the test locally.
Use `FILEREADER_CACHE='1' test_processes.py` to cache log files.
Use `FILEREADER_CACHE='1' test_processes.py` to cache log files.

Currently the following processes are tested:

Expand All @@ -15,6 +15,7 @@ Currently the following processes are tested:
* calibrationd
* dmonitoringd
* locationd
* laikad
* paramsd
* ubloxd

Expand Down
18 changes: 18 additions & 0 deletions selfdrive/test/process_replay/process_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ def ublox_rcv_callback(msg):
return []


def laika_rcv_callback(msg, CP, cfg, fsm):
if msg.ubloxGnss.which() == "measurementReport":
return ["gnssMeasurements"], True
else:
return [], False


CONFIGS = [
ProcessConfig(
proc_name="controlsd",
Expand Down Expand Up @@ -338,6 +345,17 @@ def ublox_rcv_callback(msg):
tolerance=None,
fake_pubsubmaster=False,
),
ProcessConfig(
proc_name="laikad",
pub_sub={
"ubloxGnss": ["gnssMeasurements"],
},
ignore=["logMonoTime"],
init_callback=get_car_params,
should_recv_callback=laika_rcv_callback,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True,
),
]


Expand Down
2 changes: 1 addition & 1 deletion selfdrive/test/process_replay/ref_commit
Original file line number Diff line number Diff line change
@@ -1 +1 @@
a16ca1082cd493f6cea5252eaaba9f8c6574334a
2ee969b34585f8055bb3eabab2dcc4061cc4bef9
3 changes: 3 additions & 0 deletions selfdrive/test/profiling/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def profile(proc, func, car='toyota'):
msgs = list(LogReader(rlog_url)) * int(os.getenv("LOOP", "1"))

os.environ['FINGERPRINT'] = fingerprint
os.environ['REPLAY'] = "1"

def run(sm, pm, can_sock):
try:
Expand Down Expand Up @@ -81,12 +82,14 @@ def run(sm, pm, can_sock):
from selfdrive.controls.radard import radard_thread
from selfdrive.locationd.paramsd import main as paramsd_thread
from selfdrive.controls.plannerd import main as plannerd_thread
from selfdrive.locationd.laikad import main as laikad_thread

procs = {
'radard': radard_thread,
'controlsd': controlsd_thread,
'paramsd': paramsd_thread,
'plannerd': plannerd_thread,
'laikad': laikad_thread,
}

proc = sys.argv[1]
Expand Down

0 comments on commit 3823f55

Please sign in to comment.