Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Replace send_pickled in CARLA drivers #233

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions pylot/drivers/carla_camera_driver_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def __init__(self, ground_vehicle_id_stream: erdos.ReadStream,
self._vehicle = None
# The camera sensor actor object we obtain from the simulator.
self._camera = None
self._pickle_lock = threading.Lock()
self._pickled_messages = {}
self._message_lock = threading.Lock()
self._messages = {}
# Lock to ensure that the callbacks do not execute simultaneously.
self._lock = threading.Lock()
# If false then the operator does not send data until it receives
Expand All @@ -81,14 +81,13 @@ def release_data(self, timestamp):
self._logger.debug("@{}: {} releasing sensor data".format(
timestamp, self.config.name))
watermark_msg = erdos.WatermarkMessage(timestamp)
self._camera_stream.send_pickled(timestamp,
self._pickled_messages[timestamp])
self._camera_stream.send(self._messages[timestamp])
# Note: The operator is set not to automatically propagate
# watermark messages received on input streams. Thus, we can
# issue watermarks only after the simulator callback is invoked.
self._camera_stream.send(watermark_msg)
with self._pickle_lock:
del self._pickled_messages[timestamp]
with self._message_lock:
del self._messages[timestamp]

def run(self):
# Read the vehicle id from the vehicle id stream
Expand Down Expand Up @@ -170,9 +169,6 @@ def process_images(self, simulator_image):
self._camera_stream.send(msg)
self._camera_stream.send(watermark_msg)
else:
# Pickle the data, and release it upon release msg receipt.
pickled_msg = pickle.dumps(
msg, protocol=pickle.HIGHEST_PROTOCOL)
with self._pickle_lock:
self._pickled_messages[msg.timestamp] = pickled_msg
with self._message_lock:
self._messages[msg.timestamp] = msg
self._notify_reading_stream.send(watermark_msg)
18 changes: 7 additions & 11 deletions pylot/drivers/carla_lidar_driver_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def __init__(self, ground_vehicle_id_stream: erdos.ReadStream,
self._vehicle = None
# Handle to the Lidar simulator actor.
self._lidar = None
self._pickle_lock = threading.Lock()
self._pickled_messages = {}
self._message_lock = threading.Lock()
self._messages = {}
self._lock = threading.Lock()
# If false then the operator does not send data until it receives
# release data watermark. Otherwise, it sends as soon as it
Expand All @@ -71,14 +71,13 @@ def release_data(self, timestamp):
self._release_data = True
else:
watermark_msg = erdos.WatermarkMessage(timestamp)
self._lidar_stream.send_pickled(timestamp,
self._pickled_messages[timestamp])
self._lidar_stream.send(self._messages[timestamp])
# Note: The operator is set not to automatically propagate
# watermark messages received on input streams. Thus, we can
# issue watermarks only after the simulator callback is invoked.
self._lidar_stream.send(watermark_msg)
with self._pickle_lock:
del self._pickled_messages[timestamp]
with self._message_lock:
del self._messages[timestamp]

def process_point_clouds(self, simulator_pc):
""" Invoked when a point cloud is received from the simulator.
Expand All @@ -105,11 +104,8 @@ def process_point_clouds(self, simulator_pc):
self._lidar_stream.send(msg)
self._lidar_stream.send(watermark_msg)
else:
# Pickle the data, and release it upon release msg receipt.
pickled_msg = pickle.dumps(
msg, protocol=pickle.HIGHEST_PROTOCOL)
with self._pickle_lock:
self._pickled_messages[msg.timestamp] = pickled_msg
with self._message_lock:
self._messages[msg.timestamp] = msg
self._notify_reading_stream.send(watermark_msg)

def run(self):
Expand Down