From 6dfdc238a8f5c2693a480d20117a64f7685123d8 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Fri, 12 Aug 2022 00:34:32 +0000 Subject: [PATCH 01/20] Implement __eq__ for Pose --- pylot/utils.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/pylot/utils.py b/pylot/utils.py index d3774e130..2b9734e93 100644 --- a/pylot/utils.py +++ b/pylot/utils.py @@ -63,6 +63,12 @@ def __str__(self): return 'Rotation(pitch={}, yaw={}, roll={})'.format( self.pitch, self.yaw, self.roll) + def __eq__(self, other: object) -> bool: + if isinstance(other, Rotation): + return (self.pitch == other.pitch and self.yaw == other.yaw + and self.roll == other.roll) + return NotImplemented + class Quaternion(object): """ Represents the Rotation of an obstacle or vehicle in quaternion @@ -369,6 +375,12 @@ def __repr__(self): def __str__(self): return 'Vector3D(x={}, y={}, z={})'.format(self.x, self.y, self.z) + def __eq__(self, other: object) -> bool: + if isinstance(other, Vector3D): + return (self.x == other.x and self.y == other.y + and self.z == other.z) + return NotImplemented + class Vector2D(object): """Represents a 2D vector and provides helper functions.""" @@ -826,6 +838,12 @@ def __str__(self): else: return "Transform({})".format(str(self.matrix)) + def __eq__(self, other: object) -> bool: + if isinstance(other, Transform): + return (self.location == other.location + and self.rotation == other.rotation) + return NotImplemented + class Pose(object): """Class used to wrap ego-vehicle information. @@ -868,6 +886,12 @@ def __str__(self): return "Pose(transform: {}, forward speed: {}, velocity vector: {})"\ .format(self.transform, self.forward_speed, self.velocity_vector) + def __eq__(self, other: object) -> bool: + if isinstance(other, Pose): + return (self.transform == other.transform + and self.forward_speed == other.forward_speed + and self.velocity_vector == other.velocity_vector) + class LaneMarkingColor(Enum): """Enum that defines the lane marking colors according to OpenDrive 1.4. From 39f8b75ec928acabdd1456c0e973f29a10e2ccfd Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Fri, 12 Aug 2022 00:55:17 +0000 Subject: [PATCH 02/20] Add CarlaPoseDriverOperator --- pylot/drivers/carla_pose_driver_operator.py | 137 ++++++++++++++++++++ pylot/operator_creator.py | 68 +++++++++- 2 files changed, 204 insertions(+), 1 deletion(-) create mode 100644 pylot/drivers/carla_pose_driver_operator.py diff --git a/pylot/drivers/carla_pose_driver_operator.py b/pylot/drivers/carla_pose_driver_operator.py new file mode 100644 index 000000000..dd64871d2 --- /dev/null +++ b/pylot/drivers/carla_pose_driver_operator.py @@ -0,0 +1,137 @@ +import threading +import erdos +from erdos import ReadStream, Timestamp, WriteStream + +from pylot.simulation.utils import get_vehicle_handle, get_world, \ + set_simulation_mode +import pylot.utils + + +class MatchesOperator(erdos.Operator): + def __init__(self, left_stream, right_stream): + self._left_msgs = [] + self._right_msgs = [] + self._logger = erdos.utils.setup_logging(self.config.name, + self.config.log_file_name) + + left_stream.add_callback(self.on_left_stream) + right_stream.add_callback(self.on_right_stream) + left_stream.add_watermark_callback( + lambda t: self._logger.debug(f"@{t}: got left watermark")) + right_stream.add_watermark_callback( + lambda t: self._logger.debug(f"@{t}: got right watermark")) + + erdos.add_watermark_callback([left_stream, right_stream], [], + self.on_watermark) + + @staticmethod + def connect(left_stream, right_stream): + return [] + + def on_left_stream(self, msg): + self._logger.debug("got left msg") + self._left_msgs.append(msg) + + def on_right_stream(self, msg): + self._logger.debug("got right msg") + self._right_msgs.append(msg) + + def on_watermark(self, t): + left_msgs, self._left_msgs = self._left_msgs, [] + right_msgs, self._right_msgs = self._right_msgs, [] + length_matches = (len(left_msgs) == len(right_msgs)) + left_tuples = [(m.timestamp, m.data) for m in left_msgs] + right_tuples = [(m.timestamp, m.data) for m in right_msgs] + matches = length_matches and all( + map( + lambda x: x[0].timestamp == x[1].timestamp and x[0].data == x[ + 1].data, zip(left_msgs, right_msgs))) + + if matches: + self._logger.debug(f"@{t}: left matches right") + else: + self._logger.warn( + f"@{t}: left does not match right\n\tleft: {left_tuples}\n\tright: {right_tuples}" + ) + + +class CarlaPoseDriverOperator(erdos.Operator): + """Sends pose information.""" + def __init__(self, vehicle_id_stream: ReadStream, pose_stream: WriteStream, + frequency: float, flags): + # Save the streams. + self._vehicle_id_stream = vehicle_id_stream + self._pose_stream = pose_stream + + # Save the flags and initialize logging. + self._flags = flags + self._logger = erdos.utils.setup_logging(self.config.name, + self.config.log_file_name) + + # Save the setup, the vehicle and the sensor. + self._frequency = frequency + self._vehicle = None + self._gnss = None # Use GNSS sensor to tick at a regular interval. + self._lock = threading.Lock() + + @staticmethod + def connect(vehicle_id_stream: ReadStream): + pose_stream = WriteStream() + return [pose_stream] + + def send_pose(self, timestamp: Timestamp): + """Sends pose information followed by a watermark.""" + vec_transform = pylot.utils.Transform.from_simulator_transform( + self._vehicle.get_transform()) + velocity_vector = pylot.utils.Vector3D.from_simulator_vector( + self._vehicle.get_velocity()) + forward_speed = velocity_vector.magnitude() + pose = pylot.utils.Pose(vec_transform, forward_speed, velocity_vector, + timestamp.coordinates[0]) + self._logger.debug(f"@{timestamp}: sending {pose}") + self._pose_stream.send(erdos.Message(timestamp, pose)) + self._pose_stream.send(erdos.WatermarkMessage(timestamp)) + + def process_gnss(self, gnss_msg): + """Callback attached to a GNSS sensor to ensure that pose is sent at a + fixed frequency.""" + game_time = int(gnss_msg.timestamp * 1000) + timestamp = erdos.Timestamp(coordinates=[game_time]) + with erdos.profile(self.config.name + '.process_gnss', + self, + event_data={'timestamp': str(timestamp)}): + with self._lock: + self.send_pose(timestamp) + + def run(self): + # Read the vehicle ID from the vehicle ID stream. + vehicle_id = self._vehicle_id_stream.read().data + self._logger.debug("{} received the vehicle id: {}".format( + self.config.name, vehicle_id)) + + # Connect to the world. + _, world = get_world(self._flags.simulator_host, + self._flags.simulator_port, + self._flags.simulator_timeout) + set_simulation_mode(world, self._flags) + + # Retrieve the vehicle and install the GNSS sensor. + self._vehicle = get_vehicle_handle(world, vehicle_id) + gnss_blueprint = world.get_blueprint_library().find( + 'sensor.other.gnss') + + if self._frequency == -1: + gnss_blueprint.set_attribute('sensor_tick', '0.0') + else: + gnss_blueprint.set_attribute('sensor_tick', + str(1.0 / self._frequency)) + transform = pylot.utils.Transform( + pylot.utils.Location(), + pylot.utils.Rotation()).as_simulator_transform() + self._logger.debug("Spawning a GNSS sensor") + self._gnss = world.spawn_actor(gnss_blueprint, + transform, + attach_to=self._vehicle) + + # Register the callback on the GNSS sensor. + self._gnss.listen(self.process_gnss) \ No newline at end of file diff --git a/pylot/operator_creator.py b/pylot/operator_creator.py index 2274901e6..e7ea1f2c1 100644 --- a/pylot/operator_creator.py +++ b/pylot/operator_creator.py @@ -13,16 +13,82 @@ def add_simulator_bridge(control_stream, sensor_ready_stream, pipeline_finish_notify_stream): from pylot.simulation.carla_operator import CarlaOperator + from pylot.drivers.carla_pose_driver_operator import CarlaPoseDriverOperator, MatchesOperator + op_config = erdos.OperatorConfig(name='simulator_bridge_operator', flow_watermarks=False, log_file_name=FLAGS.log_file_name, csv_log_file_name=FLAGS.csv_log_file_name, profile_file_name=FLAGS.profile_file_name) - return erdos.connect( + ( + pose_stream, + pose_stream_for_control, + ground_traffic_lights_stream, + ground_obstacles_stream, + ground_speed_limit_signs_stream, + ground_stop_signs_stream, + vehicle_id_stream, + open_drive_stream, + global_trajectory_stream, + ) = erdos.connect( CarlaOperator, op_config, [control_stream, sensor_ready_stream, pipeline_finish_notify_stream], FLAGS) + # Check that Pose matches + pose_driver_config = erdos.OperatorConfig( + name='pose_driver_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + pose_driver_stream, = erdos.connect(CarlaPoseDriverOperator, + pose_driver_config, + [vehicle_id_stream], + FLAGS.simulator_localization_frequency, + FLAGS) + + matches_pose_driver_config = erdos.OperatorConfig( + name='matches_pose_driver_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + erdos.connect(MatchesOperator, matches_pose_driver_config, + [pose_stream, pose_driver_stream]) + + # Check that Pose for control matches + pose_for_control_config = erdos.OperatorConfig( + name='pose_for_control_driver_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + pose_for_control_driver_stream, = erdos.connect( + CarlaPoseDriverOperator, pose_for_control_config, [vehicle_id_stream], + FLAGS.simulator_localization_frequency, FLAGS) + + matches_pose_for_control_driver_config = erdos.OperatorConfig( + name='matches_pose_for_control_driver_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + erdos.connect(MatchesOperator, matches_pose_for_control_driver_config, + [pose_stream_for_control, pose_for_control_driver_stream]) + + return ( + pose_stream, + pose_stream_for_control, + ground_traffic_lights_stream, + ground_obstacles_stream, + ground_speed_limit_signs_stream, + ground_stop_signs_stream, + vehicle_id_stream, + open_drive_stream, + global_trajectory_stream, + ) + def add_efficientdet_obstacle_detection(camera_stream, time_to_decision_stream, From 0a8d78df3c71a515bb11f115e8644083df014d38 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Fri, 12 Aug 2022 01:06:30 +0000 Subject: [PATCH 03/20] Move MatchesOperator to pylot.utils --- pylot/drivers/carla_pose_driver_operator.py | 48 ------------------- pylot/operator_creator.py | 3 +- pylot/utils.py | 51 +++++++++++++++++++++ 3 files changed, 53 insertions(+), 49 deletions(-) diff --git a/pylot/drivers/carla_pose_driver_operator.py b/pylot/drivers/carla_pose_driver_operator.py index dd64871d2..88d2b3497 100644 --- a/pylot/drivers/carla_pose_driver_operator.py +++ b/pylot/drivers/carla_pose_driver_operator.py @@ -7,54 +7,6 @@ import pylot.utils -class MatchesOperator(erdos.Operator): - def __init__(self, left_stream, right_stream): - self._left_msgs = [] - self._right_msgs = [] - self._logger = erdos.utils.setup_logging(self.config.name, - self.config.log_file_name) - - left_stream.add_callback(self.on_left_stream) - right_stream.add_callback(self.on_right_stream) - left_stream.add_watermark_callback( - lambda t: self._logger.debug(f"@{t}: got left watermark")) - right_stream.add_watermark_callback( - lambda t: self._logger.debug(f"@{t}: got right watermark")) - - erdos.add_watermark_callback([left_stream, right_stream], [], - self.on_watermark) - - @staticmethod - def connect(left_stream, right_stream): - return [] - - def on_left_stream(self, msg): - self._logger.debug("got left msg") - self._left_msgs.append(msg) - - def on_right_stream(self, msg): - self._logger.debug("got right msg") - self._right_msgs.append(msg) - - def on_watermark(self, t): - left_msgs, self._left_msgs = self._left_msgs, [] - right_msgs, self._right_msgs = self._right_msgs, [] - length_matches = (len(left_msgs) == len(right_msgs)) - left_tuples = [(m.timestamp, m.data) for m in left_msgs] - right_tuples = [(m.timestamp, m.data) for m in right_msgs] - matches = length_matches and all( - map( - lambda x: x[0].timestamp == x[1].timestamp and x[0].data == x[ - 1].data, zip(left_msgs, right_msgs))) - - if matches: - self._logger.debug(f"@{t}: left matches right") - else: - self._logger.warn( - f"@{t}: left does not match right\n\tleft: {left_tuples}\n\tright: {right_tuples}" - ) - - class CarlaPoseDriverOperator(erdos.Operator): """Sends pose information.""" def __init__(self, vehicle_id_stream: ReadStream, pose_stream: WriteStream, diff --git a/pylot/operator_creator.py b/pylot/operator_creator.py index e7ea1f2c1..d1b8d4928 100644 --- a/pylot/operator_creator.py +++ b/pylot/operator_creator.py @@ -13,7 +13,6 @@ def add_simulator_bridge(control_stream, sensor_ready_stream, pipeline_finish_notify_stream): from pylot.simulation.carla_operator import CarlaOperator - from pylot.drivers.carla_pose_driver_operator import CarlaPoseDriverOperator, MatchesOperator op_config = erdos.OperatorConfig(name='simulator_bridge_operator', flow_watermarks=False, @@ -36,6 +35,8 @@ def add_simulator_bridge(control_stream, sensor_ready_stream, FLAGS) # Check that Pose matches + from pylot.drivers.carla_pose_driver_operator import CarlaPoseDriverOperator + from pylot.utils import MatchesOperator pose_driver_config = erdos.OperatorConfig( name='pose_driver_operator', flow_watermarks=False, diff --git a/pylot/utils.py b/pylot/utils.py index 2b9734e93..d684325cb 100644 --- a/pylot/utils.py +++ b/pylot/utils.py @@ -1103,3 +1103,54 @@ def run_visualizer_control_loop(control_display_stream): def verify_keys_in_dict(required_keys, arg_dict): assert set(required_keys).issubset(set(arg_dict.keys())), \ "one or more of {} not found in {}".format(required_keys, arg_dict) + + +import erdos + + +class MatchesOperator(erdos.Operator): + def __init__(self, left_stream, right_stream): + self._left_msgs = [] + self._right_msgs = [] + self._logger = erdos.utils.setup_logging(self.config.name, + self.config.log_file_name) + + left_stream.add_callback(self.on_left_stream) + right_stream.add_callback(self.on_right_stream) + left_stream.add_watermark_callback( + lambda t: self._logger.debug(f"@{t}: got left watermark")) + right_stream.add_watermark_callback( + lambda t: self._logger.debug(f"@{t}: got right watermark")) + + erdos.add_watermark_callback([left_stream, right_stream], [], + self.on_watermark) + + @staticmethod + def connect(left_stream, right_stream): + return [] + + def on_left_stream(self, msg): + self._logger.debug("got left msg") + self._left_msgs.append(msg) + + def on_right_stream(self, msg): + self._logger.debug("got right msg") + self._right_msgs.append(msg) + + def on_watermark(self, t): + left_msgs, self._left_msgs = self._left_msgs, [] + right_msgs, self._right_msgs = self._right_msgs, [] + length_matches = (len(left_msgs) == len(right_msgs)) + left_tuples = [(m.timestamp, m.data) for m in left_msgs] + right_tuples = [(m.timestamp, m.data) for m in right_msgs] + matches = length_matches and all( + map( + lambda x: x[0].timestamp == x[1].timestamp and x[0].data == x[ + 1].data, zip(left_msgs, right_msgs))) + + if matches: + self._logger.debug(f"@{t}: left matches right") + else: + self._logger.warn( + f"@{t}: left does not match right\n\tleft: {left_tuples}\n\tright: {right_tuples}" + ) \ No newline at end of file From 2465f42fb3a8a43f3c608e06fa6a9498b48d4cc6 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Fri, 12 Aug 2022 01:17:30 +0000 Subject: [PATCH 04/20] Add newline to the end of the file --- pylot/drivers/carla_pose_driver_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pylot/drivers/carla_pose_driver_operator.py b/pylot/drivers/carla_pose_driver_operator.py index 88d2b3497..dcfa08e8c 100644 --- a/pylot/drivers/carla_pose_driver_operator.py +++ b/pylot/drivers/carla_pose_driver_operator.py @@ -86,4 +86,4 @@ def run(self): attach_to=self._vehicle) # Register the callback on the GNSS sensor. - self._gnss.listen(self.process_gnss) \ No newline at end of file + self._gnss.listen(self.process_gnss) From 7c94d9a211a25ff85f9f2ce282129afb7b5af9d0 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Fri, 12 Aug 2022 01:20:32 +0000 Subject: [PATCH 05/20] Remove log message --- pylot/drivers/carla_pose_driver_operator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pylot/drivers/carla_pose_driver_operator.py b/pylot/drivers/carla_pose_driver_operator.py index dcfa08e8c..159352779 100644 --- a/pylot/drivers/carla_pose_driver_operator.py +++ b/pylot/drivers/carla_pose_driver_operator.py @@ -40,7 +40,6 @@ def send_pose(self, timestamp: Timestamp): forward_speed = velocity_vector.magnitude() pose = pylot.utils.Pose(vec_transform, forward_speed, velocity_vector, timestamp.coordinates[0]) - self._logger.debug(f"@{timestamp}: sending {pose}") self._pose_stream.send(erdos.Message(timestamp, pose)) self._pose_stream.send(erdos.WatermarkMessage(timestamp)) From 667cc1f2e3e6f37b2a06924a4ed358bfef84ec70 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Sat, 13 Aug 2022 00:34:31 +0000 Subject: [PATCH 06/20] Add CarlaPoseDriverOperator documentation --- pylot/drivers/carla_pose_driver_operator.py | 23 +++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/pylot/drivers/carla_pose_driver_operator.py b/pylot/drivers/carla_pose_driver_operator.py index 159352779..ea07b8b81 100644 --- a/pylot/drivers/carla_pose_driver_operator.py +++ b/pylot/drivers/carla_pose_driver_operator.py @@ -1,4 +1,5 @@ import threading + import erdos from erdos import ReadStream, Timestamp, WriteStream @@ -8,7 +9,21 @@ class CarlaPoseDriverOperator(erdos.Operator): - """Sends pose information.""" + """Publishes the pose (location, orientation, and velocity) at the provided + frequency. + + This operator attaches to the vehicle using the vehicle ID provided by the + ``vehicle_id_stream``, registers callback functions to retrieve the + pose at the provided frequency, and publishes it to downstream operators. + + Args: + vehicle_id_stream: Stream on which the operator receives the ID of the + ego vehicle. The ID is used to get a simulator handle to the + vehicle. + pose_stream: Stream on which the operator sends the vehicle's pose. + frequency: Rate at which the pose is published, in Hertz. + flags: Object used to access absl flags. + """ def __init__(self, vehicle_id_stream: ReadStream, pose_stream: WriteStream, frequency: float, flags): # Save the streams. @@ -44,8 +59,8 @@ def send_pose(self, timestamp: Timestamp): self._pose_stream.send(erdos.WatermarkMessage(timestamp)) def process_gnss(self, gnss_msg): - """Callback attached to a GNSS sensor to ensure that pose is sent at a - fixed frequency.""" + """Callback attached to a GNSS sensor to ensure that pose is sent at + the provided frequency.""" game_time = int(gnss_msg.timestamp * 1000) timestamp = erdos.Timestamp(coordinates=[game_time]) with erdos.profile(self.config.name + '.process_gnss', @@ -57,7 +72,7 @@ def process_gnss(self, gnss_msg): def run(self): # Read the vehicle ID from the vehicle ID stream. vehicle_id = self._vehicle_id_stream.read().data - self._logger.debug("{} received the vehicle id: {}".format( + self._logger.debug("{} received vehicle id: {}".format( self.config.name, vehicle_id)) # Connect to the world. From 147b7dfc6db8973c6907acfcf4ea79eafda9b8d7 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Sat, 13 Aug 2022 00:35:34 +0000 Subject: [PATCH 07/20] Add and test CarlaTrafficLightsDriverOperator --- .../carla_traffic_lights_driver_operator.py | 105 ++++++++++++++++++ pylot/operator_creator.py | 22 ++++ 2 files changed, 127 insertions(+) create mode 100644 pylot/drivers/carla_traffic_lights_driver_operator.py diff --git a/pylot/drivers/carla_traffic_lights_driver_operator.py b/pylot/drivers/carla_traffic_lights_driver_operator.py new file mode 100644 index 000000000..2f77d6ed7 --- /dev/null +++ b/pylot/drivers/carla_traffic_lights_driver_operator.py @@ -0,0 +1,105 @@ +import threading + +import erdos +from erdos import ReadStream, Timestamp, WriteStream + +from pylot.simulation.utils import get_vehicle_handle, get_world, \ + set_simulation_mode +import pylot.utils +from pylot.perception.detection.traffic_light import TrafficLight +from pylot.perception.messages import TrafficLightsMessage + + +class CarlaTrafficLightsDriverOperator(erdos.Operator): + """Publishes the location and state of all traffic lights. + + This operator attaches to the vehicle using the vehicle ID provided by the + ``vehicle_id_stream``, registers callback functions to retrieve the + state of the traffic lights, and publishes them to downstream operators. + + Args: + vehicle_id_stream: Stream on which the operator receives the ID of the + ego vehicle. The ID is used to get a simulator handle to the + vehicle. + traffic_lights_stream: Stream on which the operator sends the status of + all traffic lights. + frequency: Rate at which the pose is published, in Hertz. + flags: Object used to access absl flags. + """ + def __init__(self, vehicle_id_stream: ReadStream, + traffic_lights_stream: WriteStream, frequency: float, flags): + # Save the streams. + self._vehicle_id_stream = vehicle_id_stream + self._traffic_lights_stream = traffic_lights_stream + + # Save the flags and initialize logging. + self._flags = flags + self._logger = erdos.utils.setup_logging(self.config.name, + self.config.log_file_name) + + # Save the setup, the vehicle and the sensor. + self._frequency = frequency + self._traffic_light_actors = [] + self._gnss = None # Use GNSS sensor to tick at a regular interval. + self._lock = threading.Lock() + + @staticmethod + def connect(vehicle_id_stream: ReadStream): + traffic_lights_stream = WriteStream() + return [traffic_lights_stream] + + def send_traffic_lights(self, timestamp: Timestamp): + """Sends pose information followed by a watermark.""" + traffic_lights = list( + map(TrafficLight.from_simulator_actor, self._traffic_light_actors)) + self._traffic_lights_stream.send( + TrafficLightsMessage(timestamp, traffic_lights)) + self._traffic_lights_stream.send(erdos.WatermarkMessage(timestamp)) + + def process_gnss(self, gnss_msg): + """Callback attached to a GNSS sensor to ensure that traffic lights are + sent at the provided frequency.""" + game_time = int(gnss_msg.timestamp * 1000) + timestamp = erdos.Timestamp(coordinates=[game_time]) + with erdos.profile(self.config.name + '.process_gnss', + self, + event_data={'timestamp': str(timestamp)}): + with self._lock: + self.send_traffic_lights(timestamp) + + def run(self): + # Read the vehicle ID from the vehicle ID stream. + vehicle_id = self._vehicle_id_stream.read().data + self._logger.debug("{} received vehicle id: {}".format( + self.config.name, vehicle_id)) + + # Connect to the world. + _, world = get_world(self._flags.simulator_host, + self._flags.simulator_port, + self._flags.simulator_timeout) + set_simulation_mode(world, self._flags) + + # Get traffic light actors + self._traffic_light_actors = world.get_actors().filter( + 'traffic.traffic_light*') + + # Retrieve the vehicle and install the GNSS sensor. + vehicle = get_vehicle_handle(world, vehicle_id) + gnss_blueprint = world.get_blueprint_library().find( + 'sensor.other.gnss') + + if self._frequency == -1: + gnss_blueprint.set_attribute('sensor_tick', '0.0') + else: + gnss_blueprint.set_attribute('sensor_tick', + str(1.0 / self._frequency)) + transform = pylot.utils.Transform( + pylot.utils.Location(), + pylot.utils.Rotation()).as_simulator_transform() + self._logger.debug("Spawning a GNSS sensor") + self._gnss = world.spawn_actor(gnss_blueprint, + transform, + attach_to=vehicle) + + # Register the callback on the GNSS sensor. + self._gnss.listen(self.process_gnss) diff --git a/pylot/operator_creator.py b/pylot/operator_creator.py index d1b8d4928..7ff521033 100644 --- a/pylot/operator_creator.py +++ b/pylot/operator_creator.py @@ -78,6 +78,28 @@ def add_simulator_bridge(control_stream, sensor_ready_stream, erdos.connect(MatchesOperator, matches_pose_for_control_driver_config, [pose_stream_for_control, pose_for_control_driver_stream]) + # Check that traffic lights match + from pylot.drivers.carla_traffic_lights_driver_operator import CarlaTrafficLightsDriverOperator + ground_traffic_lights_config = erdos.OperatorConfig( + name='carla_traffic_lights_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + ground_traffic_lights_driver_stream, = erdos.connect( + CarlaTrafficLightsDriverOperator, ground_traffic_lights_config, + [vehicle_id_stream], FLAGS.simulator_localization_frequency, FLAGS) + + matches_ground_traffic_lights_config = erdos.OperatorConfig( + name='matches_ground_traffic_lights_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + erdos.connect( + MatchesOperator, matches_ground_traffic_lights_config, + [ground_traffic_lights_driver_stream, ground_traffic_lights_stream]) + return ( pose_stream, pose_stream_for_control, From 1f9929c108ce45732fd88a9d38cbf1c172b3bb59 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Sat, 27 Aug 2022 02:20:56 +0000 Subject: [PATCH 08/20] Add CarlaBaseGNSSDriverOperator --- .../carla_base_gnss_driver_operator.py | 146 ++++++++++++++++++ pylot/drivers/carla_gnss_driver_operator.py | 112 +++----------- pylot/drivers/carla_pose_driver_operator.py | 89 ++--------- .../carla_traffic_lights_driver_operator.py | 92 +++-------- 4 files changed, 206 insertions(+), 233 deletions(-) create mode 100644 pylot/drivers/carla_base_gnss_driver_operator.py diff --git a/pylot/drivers/carla_base_gnss_driver_operator.py b/pylot/drivers/carla_base_gnss_driver_operator.py new file mode 100644 index 000000000..364589838 --- /dev/null +++ b/pylot/drivers/carla_base_gnss_driver_operator.py @@ -0,0 +1,146 @@ +"""This module implements an operator which executes a callback at a the +provided frequency. + +The operator attaches a GNSS sensor to the ego vehicle, receives GNSS +measurements from the simulator, and invokes the user-defined callback. +""" + +from abc import abstractmethod +import threading + +import carla + +import erdos + +from pylot.simulation.utils import get_vehicle_handle, get_world, \ + set_simulation_mode +from pylot.drivers.sensor_setup import GNSSSetup + + +class CarlaBaseGNSSDriverOperator(erdos.Operator): + """Invokes ``process_gnss`` at the provided frequency. + + This operator attaches to a vehicle at the required position with respect + to the vehicle, registers callback functions to execute a ``process_gnss`` + at the provided frequency. + + Attributes: + _vehicle_id_stream (ReadStream): Stream on which the operator receives + the id of the ego vehicle. The ID is used to get a simulator handle + to the vehicle. + _output_stream (WriteStream): Stream on which the operator sends + messages. + _flags: Object to be used to access absl flags. + _logger (logging.Logger): Used to log information. + _gnss_setup (GNSSSetup): Setup of the GNSS sensor. + _frequency (float): Rate at which the callback is invoked. + _vehicle (Optional[carla.Vehicle]): Handle to the CARLA vehicle. + Initialized once the vehicle ID is received. + _world (Optional[carla.World]): Handle to the CARLA world. Initialized + once the vehicle ID is received. + _gnss (Optional[carla.Actor]): Handle to the CARLA GNSS sensor. + Initialized once the vehicle ID is received. + _log (threading.Lock): used to ensure that only 1 GNSS reading is + processed at a time. + + + Args: + vehicle_id_stream: Stream on which the operator receives the id of the + ego vehicle. The ID is used to get a simulator handle to the + vehicle. + output_stream: Stream on which the operator sends messages. + gnss_setup: Setup of the GNSS sensor. + frequency: Rate at which the pose is published, in Hertz. Set to -1 to + invoke on every simulator tick. + flags: Object used to access absl flags. + """ + def __init__(self, vehicle_id_stream: erdos.ReadStream, + output_stream: erdos.WriteStream, gnss_setup: GNSSSetup, + frequency: float, flags): + # Save the streams. + self._vehicle_id_stream = vehicle_id_stream + self._output_stream = output_stream + + # Save the flags and initialize logging. + self._flags = flags + self._logger = erdos.utils.setup_logging(self.config.name, + self.config.log_file_name) + + # Save the setup, the vehicle, the world, and the sensor. + self._gnss_setup = gnss_setup + self._frequency = frequency + self._vehicle = None + self._world = None + self._gnss = None + self._lock = threading.Lock() + + @staticmethod + def connect(ground_vehicle_id_stream): + gnss_stream = erdos.WriteStream() + return [gnss_stream] + + @abstractmethod + def process_gnss(self, timestamp: erdos.Timestamp, + gnss_msg: carla.GnssMeasurement): + """Invoked when a GNSS measurement is received from the simulator. + + Note: + Only 1 invocation of this callback will run at a time. + """ + raise NotImplementedError + + def on_gnss(self, gnss_msg: carla.GnssMeasurement): + """Invoked when a GNSS measurement is received from the simulator. + """ + game_time = int(gnss_msg.timestamp * 1000) + timestamp = erdos.Timestamp(coordinates=[game_time]) + with erdos.profile(self.config.name + '.process_gnss', + self, + event_data={'timestamp': str(timestamp)}): + with self._lock: + self.process_gnss(timestamp, gnss_msg) + + def run(self): + # Read the vehicle ID from the vehicle ID stream. + vehicle_id = self._vehicle_id_stream.read().data + self._logger.debug("received the vehicle id: {}".format(vehicle_id)) + + # Connect to the world. + _, self._world = get_world(self._flags.simulator_host, + self._flags.simulator_port, + self._flags.simulator_timeout) + set_simulation_mode(self._world, self._flags) + + # Retrieve the vehicle and install the GNSS sensor. + self._vehicle = get_vehicle_handle(self._world, vehicle_id) + gnss_blueprint = self._world.get_blueprint_library().find( + 'sensor.other.gnss') + + # Set the noise and bias parameters. + gnss_blueprint.set_attribute('noise_alt_stddev', + str(self._flags.gnss_noise_stddev_alt)) + gnss_blueprint.set_attribute('noise_lat_stddev', + str(self._flags.gnss_noise_stddev_lat)) + gnss_blueprint.set_attribute('noise_lon_stddev', + str(self._flags.gnss_noise_stddev_lon)) + gnss_blueprint.set_attribute('noise_alt_bias', + str(self._flags.gnss_bias_alt)) + gnss_blueprint.set_attribute('noise_lat_bias', + str(self._flags.gnss_bias_lat)) + gnss_blueprint.set_attribute('noise_lon_bias', + str(self._flags.gnss_bias_lon)) + + if self._frequency == -1: + gnss_blueprint.set_attribute('sensor_tick', '0.0') + else: + gnss_blueprint.set_attribute('sensor_tick', + str(1.0 / self._frequency)) + transform = self._gnss_setup.get_transform().as_simulator_transform() + self._logger.debug("Spawning a GNSS sensor: {}".format( + self._gnss_setup)) + self._gnss = self._world.spawn_actor(gnss_blueprint, + transform, + attach_to=self._vehicle) + + # Register the callback on the GNSS sensor. + self._gnss.listen(self.on_gnss) diff --git a/pylot/drivers/carla_gnss_driver_operator.py b/pylot/drivers/carla_gnss_driver_operator.py index 7121d46b1..76a787083 100644 --- a/pylot/drivers/carla_gnss_driver_operator.py +++ b/pylot/drivers/carla_gnss_driver_operator.py @@ -4,18 +4,20 @@ The operator attaches a GNSS sensor to the ego vehicle, receives GNSS measurements from the simulator, and sends them on its output stream. """ - -import threading +import carla import erdos +from pylot.drivers.carla_base_gnss_driver_operator import ( + CarlaBaseGNSSDriverOperator) +from pylot.drivers.sensor_setup import GNSSSetup from pylot.localization.messages import GNSSMessage from pylot.simulation.utils import get_vehicle_handle, get_world, \ set_simulation_mode from pylot.utils import Transform -class CarlaGNSSDriverOperator(erdos.Operator): +class CarlaGNSSDriverOperator(CarlaBaseGNSSDriverOperator): """Publishes GNSSMessages (transform, altitude, latitude, longitude) from the GNSS sensor. @@ -24,99 +26,29 @@ class CarlaGNSSDriverOperator(erdos.Operator): measurements and publishes it to downstream operators. Args: - ground_vehicle_id_stream (:py:class:`erdos.ReadStream`): Stream on - which the operator receives the id of the ego vehicle. It uses this - id to get a simulator handle to the vehicle. - gnss_stream (:py:class:`erdos.WriteStream`): Stream on which the - operator sends GNSS info. - gnss_setup (:py:class:`pylot.drivers.sensor_setup.GNSSSetup`): - Setup of the GNSS sensor. - flags (absl.flags): Object to be used to access absl flags. + vehicle_id_stream: Stream on which the operator receives the id of the + ego vehicle. The ID is used to get a simulator handle to the + vehicle. + gnss_stream: Stream on which the operator sends GNSS info. + gnss_setup: Setup of the GNSS sensor. + flags: Object used to access absl flags. """ def __init__(self, ground_vehicle_id_stream: erdos.ReadStream, - gnss_stream: erdos.WriteStream, gnss_setup, flags): - # Save the streams. - self._vehicle_id_stream = ground_vehicle_id_stream - self._gnss_stream = gnss_stream - - # Save the flags and initialize logging. - self._flags = flags - self._logger = erdos.utils.setup_logging(self.config.name, - self.config.log_file_name) - - # Save the setup, the vehicle and the sensor. - self._gnss_setup = gnss_setup - self._vehicle = None - self._gnss = None - self._lock = threading.Lock() + gnss_stream: erdos.WriteStream, gnss_setup: GNSSSetup, flags): - @staticmethod - def connect(ground_vehicle_id_stream): - gnss_stream = erdos.WriteStream() - return [gnss_stream] + frequency = flags.simulator_gnss_frequency + super().__init__(ground_vehicle_id_stream, gnss_stream, gnss_setup, + frequency, flags) - def process_gnss(self, gnss_msg): + def process_gnss(self, timestamp: erdos.Timestamp, + gnss_msg: carla.GnssMeasurement): """Invoked when a GNSS measurement is received from the simulator. Sends GNSS measurements to downstream operators. """ - game_time = int(gnss_msg.timestamp * 1000) - timestamp = erdos.Timestamp(coordinates=[game_time]) watermark_msg = erdos.WatermarkMessage(timestamp) - with erdos.profile(self.config.name + '.process_gnss', - self, - event_data={'timestamp': str(timestamp)}): - with self._lock: - msg = GNSSMessage( - timestamp, - Transform.from_simulator_transform(gnss_msg.transform), - gnss_msg.altitude, gnss_msg.latitude, gnss_msg.longitude) - self._gnss_stream.send(msg) - self._gnss_stream.send(watermark_msg) - - def run(self): - # Read the vehicle ID from the vehicle ID stream. - vehicle_id = self._vehicle_id_stream.read().data - self._logger.debug( - "The GNSSDriverOperator received the vehicle id: {}".format( - vehicle_id)) - - # Connect to the world. - _, world = get_world(self._flags.simulator_host, - self._flags.simulator_port, - self._flags.simulator_timeout) - set_simulation_mode(world, self._flags) - - # Retrieve the vehicle and install the GNSS sensor. - self._vehicle = get_vehicle_handle(world, vehicle_id) - gnss_blueprint = world.get_blueprint_library().find( - 'sensor.other.gnss') - - # Set the noise and bias parameters. - gnss_blueprint.set_attribute('noise_alt_stddev', - str(self._flags.gnss_noise_stddev_alt)) - gnss_blueprint.set_attribute('noise_lat_stddev', - str(self._flags.gnss_noise_stddev_lat)) - gnss_blueprint.set_attribute('noise_lon_stddev', - str(self._flags.gnss_noise_stddev_lon)) - gnss_blueprint.set_attribute('noise_alt_bias', - str(self._flags.gnss_bias_alt)) - gnss_blueprint.set_attribute('noise_lat_bias', - str(self._flags.gnss_bias_lat)) - gnss_blueprint.set_attribute('noise_lon_bias', - str(self._flags.gnss_bias_lon)) - - if self._flags.simulator_gnss_frequency == -1: - gnss_blueprint.set_attribute('sensor_tick', '0.0') - else: - gnss_blueprint.set_attribute( - 'sensor_tick', str(1.0 / self._flags.simulator_gnss_frequency)) - transform = self._gnss_setup.get_transform().as_simulator_transform() - self._logger.debug("Spawning a GNSS sensor: {}".format( - self._gnss_setup)) - self._gnss = world.spawn_actor(gnss_blueprint, - transform, - attach_to=self._vehicle) - - # Register the callback on the GNSS sensor. - self._gnss.listen(self.process_gnss) + msg = GNSSMessage( + timestamp, Transform.from_simulator_transform(gnss_msg.transform), + gnss_msg.altitude, gnss_msg.latitude, gnss_msg.longitude) + self._output_stream.send(msg) + self._output_stream.send(watermark_msg) diff --git a/pylot/drivers/carla_pose_driver_operator.py b/pylot/drivers/carla_pose_driver_operator.py index ea07b8b81..7e103013e 100644 --- a/pylot/drivers/carla_pose_driver_operator.py +++ b/pylot/drivers/carla_pose_driver_operator.py @@ -1,14 +1,15 @@ -import threading +import carla import erdos -from erdos import ReadStream, Timestamp, WriteStream from pylot.simulation.utils import get_vehicle_handle, get_world, \ set_simulation_mode import pylot.utils +from pylot.drivers.carla_base_gnss_driver_operator import ( + CarlaBaseGNSSDriverOperator) -class CarlaPoseDriverOperator(erdos.Operator): +class CarlaPoseDriverOperator(CarlaBaseGNSSDriverOperator): """Publishes the pose (location, orientation, and velocity) at the provided frequency. @@ -24,29 +25,17 @@ class CarlaPoseDriverOperator(erdos.Operator): frequency: Rate at which the pose is published, in Hertz. flags: Object used to access absl flags. """ - def __init__(self, vehicle_id_stream: ReadStream, pose_stream: WriteStream, - frequency: float, flags): - # Save the streams. - self._vehicle_id_stream = vehicle_id_stream - self._pose_stream = pose_stream - - # Save the flags and initialize logging. - self._flags = flags - self._logger = erdos.utils.setup_logging(self.config.name, - self.config.log_file_name) - - # Save the setup, the vehicle and the sensor. - self._frequency = frequency - self._vehicle = None - self._gnss = None # Use GNSS sensor to tick at a regular interval. - self._lock = threading.Lock() - - @staticmethod - def connect(vehicle_id_stream: ReadStream): - pose_stream = WriteStream() - return [pose_stream] - - def send_pose(self, timestamp: Timestamp): + def __init__(self, vehicle_id_stream: erdos.ReadStream, + pose_stream: erdos.WriteStream, frequency: float, flags): + transform = pylot.utils.Transform(pylot.utils.Location(), + pylot.utils.Rotation()) + gnss_setup = pylot.drivers.sensor_setup.GNSSSetup( + self.config.name, transform) + super().__init__(vehicle_id_stream, pose_stream, gnss_setup, frequency, + flags) + + def process_gnss(self, timestamp: erdos.Timestamp, + gnss_msg: carla.GnssMeasurement): """Sends pose information followed by a watermark.""" vec_transform = pylot.utils.Transform.from_simulator_transform( self._vehicle.get_transform()) @@ -55,49 +44,5 @@ def send_pose(self, timestamp: Timestamp): forward_speed = velocity_vector.magnitude() pose = pylot.utils.Pose(vec_transform, forward_speed, velocity_vector, timestamp.coordinates[0]) - self._pose_stream.send(erdos.Message(timestamp, pose)) - self._pose_stream.send(erdos.WatermarkMessage(timestamp)) - - def process_gnss(self, gnss_msg): - """Callback attached to a GNSS sensor to ensure that pose is sent at - the provided frequency.""" - game_time = int(gnss_msg.timestamp * 1000) - timestamp = erdos.Timestamp(coordinates=[game_time]) - with erdos.profile(self.config.name + '.process_gnss', - self, - event_data={'timestamp': str(timestamp)}): - with self._lock: - self.send_pose(timestamp) - - def run(self): - # Read the vehicle ID from the vehicle ID stream. - vehicle_id = self._vehicle_id_stream.read().data - self._logger.debug("{} received vehicle id: {}".format( - self.config.name, vehicle_id)) - - # Connect to the world. - _, world = get_world(self._flags.simulator_host, - self._flags.simulator_port, - self._flags.simulator_timeout) - set_simulation_mode(world, self._flags) - - # Retrieve the vehicle and install the GNSS sensor. - self._vehicle = get_vehicle_handle(world, vehicle_id) - gnss_blueprint = world.get_blueprint_library().find( - 'sensor.other.gnss') - - if self._frequency == -1: - gnss_blueprint.set_attribute('sensor_tick', '0.0') - else: - gnss_blueprint.set_attribute('sensor_tick', - str(1.0 / self._frequency)) - transform = pylot.utils.Transform( - pylot.utils.Location(), - pylot.utils.Rotation()).as_simulator_transform() - self._logger.debug("Spawning a GNSS sensor") - self._gnss = world.spawn_actor(gnss_blueprint, - transform, - attach_to=self._vehicle) - - # Register the callback on the GNSS sensor. - self._gnss.listen(self.process_gnss) + self._output_stream.send(erdos.Message(timestamp, pose)) + self._output_stream.send(erdos.WatermarkMessage(timestamp)) diff --git a/pylot/drivers/carla_traffic_lights_driver_operator.py b/pylot/drivers/carla_traffic_lights_driver_operator.py index 2f77d6ed7..4ddfb12db 100644 --- a/pylot/drivers/carla_traffic_lights_driver_operator.py +++ b/pylot/drivers/carla_traffic_lights_driver_operator.py @@ -1,16 +1,19 @@ import threading +import carla + import erdos -from erdos import ReadStream, Timestamp, WriteStream from pylot.simulation.utils import get_vehicle_handle, get_world, \ set_simulation_mode import pylot.utils from pylot.perception.detection.traffic_light import TrafficLight from pylot.perception.messages import TrafficLightsMessage +from pylot.drivers.carla_base_gnss_driver_operator import ( + CarlaBaseGNSSDriverOperator) -class CarlaTrafficLightsDriverOperator(erdos.Operator): +class CarlaTrafficLightsDriverOperator(CarlaBaseGNSSDriverOperator): """Publishes the location and state of all traffic lights. This operator attaches to the vehicle using the vehicle ID provided by the @@ -26,80 +29,27 @@ class CarlaTrafficLightsDriverOperator(erdos.Operator): frequency: Rate at which the pose is published, in Hertz. flags: Object used to access absl flags. """ - def __init__(self, vehicle_id_stream: ReadStream, - traffic_lights_stream: WriteStream, frequency: float, flags): - # Save the streams. - self._vehicle_id_stream = vehicle_id_stream - self._traffic_lights_stream = traffic_lights_stream - - # Save the flags and initialize logging. - self._flags = flags - self._logger = erdos.utils.setup_logging(self.config.name, - self.config.log_file_name) - - # Save the setup, the vehicle and the sensor. - self._frequency = frequency - self._traffic_light_actors = [] - self._gnss = None # Use GNSS sensor to tick at a regular interval. - self._lock = threading.Lock() - - @staticmethod - def connect(vehicle_id_stream: ReadStream): - traffic_lights_stream = WriteStream() - return [traffic_lights_stream] - - def send_traffic_lights(self, timestamp: Timestamp): + def __init__(self, vehicle_id_stream: erdos.ReadStream, + traffic_lights_stream: erdos.WriteStream, frequency: float, + flags): + transform = pylot.utils.Transform(pylot.utils.Location(), + pylot.utils.Rotation()) + gnss_setup = pylot.drivers.sensor_setup.GNSSSetup( + self.config.name, transform) + super().__init__(vehicle_id_stream, traffic_lights_stream, gnss_setup, + frequency, flags) + + def process_gnss(self, timestamp: erdos.Timestamp, + gnss_msg: carla.GnssMeasurement): """Sends pose information followed by a watermark.""" traffic_lights = list( map(TrafficLight.from_simulator_actor, self._traffic_light_actors)) - self._traffic_lights_stream.send( + self._output_stream.send( TrafficLightsMessage(timestamp, traffic_lights)) - self._traffic_lights_stream.send(erdos.WatermarkMessage(timestamp)) - - def process_gnss(self, gnss_msg): - """Callback attached to a GNSS sensor to ensure that traffic lights are - sent at the provided frequency.""" - game_time = int(gnss_msg.timestamp * 1000) - timestamp = erdos.Timestamp(coordinates=[game_time]) - with erdos.profile(self.config.name + '.process_gnss', - self, - event_data={'timestamp': str(timestamp)}): - with self._lock: - self.send_traffic_lights(timestamp) + self._output_stream.send(erdos.WatermarkMessage(timestamp)) def run(self): - # Read the vehicle ID from the vehicle ID stream. - vehicle_id = self._vehicle_id_stream.read().data - self._logger.debug("{} received vehicle id: {}".format( - self.config.name, vehicle_id)) - - # Connect to the world. - _, world = get_world(self._flags.simulator_host, - self._flags.simulator_port, - self._flags.simulator_timeout) - set_simulation_mode(world, self._flags) - + super().run() # Get traffic light actors - self._traffic_light_actors = world.get_actors().filter( + self._traffic_light_actors = self._world.get_actors().filter( 'traffic.traffic_light*') - - # Retrieve the vehicle and install the GNSS sensor. - vehicle = get_vehicle_handle(world, vehicle_id) - gnss_blueprint = world.get_blueprint_library().find( - 'sensor.other.gnss') - - if self._frequency == -1: - gnss_blueprint.set_attribute('sensor_tick', '0.0') - else: - gnss_blueprint.set_attribute('sensor_tick', - str(1.0 / self._frequency)) - transform = pylot.utils.Transform( - pylot.utils.Location(), - pylot.utils.Rotation()).as_simulator_transform() - self._logger.debug("Spawning a GNSS sensor") - self._gnss = world.spawn_actor(gnss_blueprint, - transform, - attach_to=vehicle) - - # Register the callback on the GNSS sensor. - self._gnss.listen(self.process_gnss) From 0f08a4fe19dd202b60581bba0d9f41769285d275 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Sat, 27 Aug 2022 09:07:51 +0000 Subject: [PATCH 09/20] Add CarlaObstaclesDriverOperator --- .../carla_obstacles_driver_operator.py | 48 +++++++++++++++++++ pylot/operator_creator.py | 30 ++++++++++-- pylot/utils.py | 21 ++++---- 3 files changed, 85 insertions(+), 14 deletions(-) create mode 100644 pylot/drivers/carla_obstacles_driver_operator.py diff --git a/pylot/drivers/carla_obstacles_driver_operator.py b/pylot/drivers/carla_obstacles_driver_operator.py new file mode 100644 index 000000000..fd105d549 --- /dev/null +++ b/pylot/drivers/carla_obstacles_driver_operator.py @@ -0,0 +1,48 @@ +import carla + +import erdos +from pylot.perception.detection.obstacle import Obstacle +from pylot.perception.messages import ObstaclesMessage + +from pylot.simulation.utils import get_vehicle_handle, get_world, \ + set_simulation_mode +import pylot.utils +from pylot.drivers.carla_base_gnss_driver_operator import ( + CarlaBaseGNSSDriverOperator) + + +class CarlaObstaclesDriverOperator(CarlaBaseGNSSDriverOperator): + """Publishes the bounding boxes of all vehicles and people retrieved from + the simulator at the provided frequency. + + Args: + vehicle_id_stream: Stream on which the operator receives the ID of the + ego vehicle. The ID is used to get a simulator handle to the + vehicle. + obstacles_stream: Stream on which the operator sends the obstacles. + frequency: Rate at which the pose is published, in Hertz. + flags: Object used to access absl flags. + """ + def __init__(self, vehicle_id_stream: erdos.ReadStream, + obstacles_stream: erdos.WriteStream, frequency: float, flags): + transform = pylot.utils.Transform(pylot.utils.Location(), + pylot.utils.Rotation()) + gnss_setup = pylot.drivers.sensor_setup.GNSSSetup( + self.config.name, transform) + super().__init__(vehicle_id_stream, obstacles_stream, gnss_setup, + frequency, flags) + + def process_gnss(self, timestamp: erdos.Timestamp, + gnss_msg: carla.GnssMeasurement): + """""" + actor_list = self._world.get_actors() + + vec_actors = actor_list.filter('vehicle.*') + vehicles = list(map(Obstacle.from_simulator_actor, vec_actors)) + + person_actors = actor_list.filter('walker.pedestrian.*') + people = list(map(Obstacle.from_simulator_actor, person_actors)) + + self._output_stream.send(ObstaclesMessage(timestamp, + vehicles + people)) + self._output_stream.send(erdos.WatermarkMessage(timestamp)) diff --git a/pylot/operator_creator.py b/pylot/operator_creator.py index 7ff521033..5a2d6b1d4 100644 --- a/pylot/operator_creator.py +++ b/pylot/operator_creator.py @@ -86,19 +86,39 @@ def add_simulator_bridge(control_stream, sensor_ready_stream, log_file_name=FLAGS.log_file_name, csv_log_file_name=FLAGS.csv_log_file_name, profile_file_name=FLAGS.profile_file_name) - ground_traffic_lights_driver_stream, = erdos.connect( + traffic_lights_driver_stream, = erdos.connect( CarlaTrafficLightsDriverOperator, ground_traffic_lights_config, [vehicle_id_stream], FLAGS.simulator_localization_frequency, FLAGS) - matches_ground_traffic_lights_config = erdos.OperatorConfig( + matches_ground_obstacles_config = erdos.OperatorConfig( name='matches_ground_traffic_lights_operator', flow_watermarks=False, log_file_name=FLAGS.log_file_name, csv_log_file_name=FLAGS.csv_log_file_name, profile_file_name=FLAGS.profile_file_name) - erdos.connect( - MatchesOperator, matches_ground_traffic_lights_config, - [ground_traffic_lights_driver_stream, ground_traffic_lights_stream]) + erdos.connect(MatchesOperator, matches_ground_obstacles_config, + [traffic_lights_driver_stream, ground_traffic_lights_stream]) + + # Check that obstacles match + from pylot.drivers.carla_obstacles_driver_operator import CarlaObstaclesDriverOperator + ground_obstacles_config = erdos.OperatorConfig( + name='carla_obstacles_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + obstacles_driver_stream, = erdos.connect( + CarlaObstaclesDriverOperator, ground_obstacles_config, + [vehicle_id_stream], FLAGS.simulator_localization_frequency, FLAGS) + + matches_ground_obstacles_config = erdos.OperatorConfig( + name='matches_obstacles_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + erdos.connect(MatchesOperator, matches_ground_obstacles_config, + [ground_obstacles_stream, obstacles_driver_stream]) return ( pose_stream, diff --git a/pylot/utils.py b/pylot/utils.py index d684325cb..b60ed19f9 100644 --- a/pylot/utils.py +++ b/pylot/utils.py @@ -1108,8 +1108,17 @@ def verify_keys_in_dict(required_keys, arg_dict): import erdos +def matches_data(left_msgs, right_msgs): + length_matches = (len(left_msgs) == len(right_msgs)) + return length_matches and all( + map( + lambda x: x[0].timestamp == x[1].timestamp and x[0].data == x[1]. + data, zip(left_msgs, right_msgs))) + + class MatchesOperator(erdos.Operator): - def __init__(self, left_stream, right_stream): + def __init__(self, left_stream, right_stream, matches_fn=matches_data): + self._matches_fn = matches_fn self._left_msgs = [] self._right_msgs = [] self._logger = erdos.utils.setup_logging(self.config.name, @@ -1140,17 +1149,11 @@ def on_right_stream(self, msg): def on_watermark(self, t): left_msgs, self._left_msgs = self._left_msgs, [] right_msgs, self._right_msgs = self._right_msgs, [] - length_matches = (len(left_msgs) == len(right_msgs)) - left_tuples = [(m.timestamp, m.data) for m in left_msgs] - right_tuples = [(m.timestamp, m.data) for m in right_msgs] - matches = length_matches and all( - map( - lambda x: x[0].timestamp == x[1].timestamp and x[0].data == x[ - 1].data, zip(left_msgs, right_msgs))) + matches = self._matches_fn(left_msgs, right_msgs) if matches: self._logger.debug(f"@{t}: left matches right") else: self._logger.warn( - f"@{t}: left does not match right\n\tleft: {left_tuples}\n\tright: {right_tuples}" + f"@{t}: left does not match right\n\tleft: {left_msgs}\n\tright: {right_msgs}" ) \ No newline at end of file From b20153d48d420af44b4b749e5fd3f6114245a5f0 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Sat, 27 Aug 2022 17:17:00 +0000 Subject: [PATCH 10/20] Remove unused imports --- pylot/drivers/carla_gnss_driver_operator.py | 2 -- pylot/drivers/carla_obstacles_driver_operator.py | 4 +--- pylot/drivers/carla_pose_driver_operator.py | 2 -- pylot/drivers/carla_traffic_lights_driver_operator.py | 3 +-- 4 files changed, 2 insertions(+), 9 deletions(-) diff --git a/pylot/drivers/carla_gnss_driver_operator.py b/pylot/drivers/carla_gnss_driver_operator.py index 76a787083..aa2bcd132 100644 --- a/pylot/drivers/carla_gnss_driver_operator.py +++ b/pylot/drivers/carla_gnss_driver_operator.py @@ -12,8 +12,6 @@ CarlaBaseGNSSDriverOperator) from pylot.drivers.sensor_setup import GNSSSetup from pylot.localization.messages import GNSSMessage -from pylot.simulation.utils import get_vehicle_handle, get_world, \ - set_simulation_mode from pylot.utils import Transform diff --git a/pylot/drivers/carla_obstacles_driver_operator.py b/pylot/drivers/carla_obstacles_driver_operator.py index fd105d549..9819bcb47 100644 --- a/pylot/drivers/carla_obstacles_driver_operator.py +++ b/pylot/drivers/carla_obstacles_driver_operator.py @@ -1,11 +1,9 @@ import carla import erdos + from pylot.perception.detection.obstacle import Obstacle from pylot.perception.messages import ObstaclesMessage - -from pylot.simulation.utils import get_vehicle_handle, get_world, \ - set_simulation_mode import pylot.utils from pylot.drivers.carla_base_gnss_driver_operator import ( CarlaBaseGNSSDriverOperator) diff --git a/pylot/drivers/carla_pose_driver_operator.py b/pylot/drivers/carla_pose_driver_operator.py index 7e103013e..b74db3503 100644 --- a/pylot/drivers/carla_pose_driver_operator.py +++ b/pylot/drivers/carla_pose_driver_operator.py @@ -2,8 +2,6 @@ import erdos -from pylot.simulation.utils import get_vehicle_handle, get_world, \ - set_simulation_mode import pylot.utils from pylot.drivers.carla_base_gnss_driver_operator import ( CarlaBaseGNSSDriverOperator) diff --git a/pylot/drivers/carla_traffic_lights_driver_operator.py b/pylot/drivers/carla_traffic_lights_driver_operator.py index 4ddfb12db..c3f2644bf 100644 --- a/pylot/drivers/carla_traffic_lights_driver_operator.py +++ b/pylot/drivers/carla_traffic_lights_driver_operator.py @@ -4,8 +4,6 @@ import erdos -from pylot.simulation.utils import get_vehicle_handle, get_world, \ - set_simulation_mode import pylot.utils from pylot.perception.detection.traffic_light import TrafficLight from pylot.perception.messages import TrafficLightsMessage @@ -38,6 +36,7 @@ def __init__(self, vehicle_id_stream: erdos.ReadStream, self.config.name, transform) super().__init__(vehicle_id_stream, traffic_lights_stream, gnss_setup, frequency, flags) + self._traffic_light_actors = None def process_gnss(self, timestamp: erdos.Timestamp, gnss_msg: carla.GnssMeasurement): From 7acf6d48233ec915dc10be8f1745e6788d399775 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Sat, 27 Aug 2022 17:27:08 +0000 Subject: [PATCH 11/20] Add CarlaSpeedLimitSignsDriverOperator --- ...carla_speed_limit_signs_driver_operator.py | 52 +++++++++++++++++++ pylot/operator_creator.py | 24 +++++++++ 2 files changed, 76 insertions(+) create mode 100644 pylot/drivers/carla_speed_limit_signs_driver_operator.py diff --git a/pylot/drivers/carla_speed_limit_signs_driver_operator.py b/pylot/drivers/carla_speed_limit_signs_driver_operator.py new file mode 100644 index 000000000..7eab37d1c --- /dev/null +++ b/pylot/drivers/carla_speed_limit_signs_driver_operator.py @@ -0,0 +1,52 @@ +import carla + +import erdos + +from pylot.perception.detection.speed_limit_sign import SpeedLimitSign +from pylot.perception.messages import ObstaclesMessage, SpeedSignsMessage +import pylot.utils +from pylot.drivers.carla_base_gnss_driver_operator import ( + CarlaBaseGNSSDriverOperator) + + +class CarlaSpeedLimitSignsDriverOperator(CarlaBaseGNSSDriverOperator): + """Publishes the locations and values of all speed limit signs retrieved + from the simulator at the provided frequency. + + Args: + vehicle_id_stream: Stream on which the operator receives the ID of the + ego vehicle. The ID is used to get a simulator handle to the + vehicle. + speed_limit_signs_stream: Stream on which the operator sends the speed + limit signs. + frequency: Rate at which the pose is published, in Hertz. + flags: Object used to access absl flags. + """ + def __init__(self, vehicle_id_stream: erdos.ReadStream, + speed_limit_signs_stream: erdos.WriteStream, frequency: float, + flags): + transform = pylot.utils.Transform(pylot.utils.Location(), + pylot.utils.Rotation()) + gnss_setup = pylot.drivers.sensor_setup.GNSSSetup( + self.config.name, transform) + super().__init__(vehicle_id_stream, speed_limit_signs_stream, + gnss_setup, frequency, flags) + self._speed_limit_actors = None + + def process_gnss(self, timestamp: erdos.Timestamp, + gnss_msg: carla.GnssMeasurement): + """""" + actor_list = self._world.get_actors() + + speed_limit_actors = actor_list.filter('traffic.speed_limit*') + speed_limits = list( + map(SpeedLimitSign.from_simulator_actor, speed_limit_actors)) + + self._output_stream.send(SpeedSignsMessage(timestamp, speed_limits)) + self._output_stream.send(erdos.WatermarkMessage(timestamp)) + + def run(self): + super().run() + # Get speed limit actors + self._speed_limit_actors = self._world.get_actors().filter( + 'traffic.speed_limit*') diff --git a/pylot/operator_creator.py b/pylot/operator_creator.py index 5a2d6b1d4..6648c352c 100644 --- a/pylot/operator_creator.py +++ b/pylot/operator_creator.py @@ -1,6 +1,7 @@ from absl import flags import erdos +from pylot.drivers.carla_speed_limit_signs_driver_operator import CarlaSpeedLimitSignsDriverOperator import pylot.utils @@ -120,6 +121,29 @@ def add_simulator_bridge(control_stream, sensor_ready_stream, erdos.connect(MatchesOperator, matches_ground_obstacles_config, [ground_obstacles_stream, obstacles_driver_stream]) + # Check that speed signs match + from pylot.drivers.carla_speed_limit_signs_driver_operator import CarlaSpeedLimitSignsDriverOperator + speed_signs_config = erdos.OperatorConfig( + name='carla_speed_limit_signs_driver_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + speed_limit_signs_driver_stream, = erdos.connect( + CarlaSpeedLimitSignsDriverOperator, speed_signs_config, + [vehicle_id_stream], FLAGS.simulator_localization_frequency, FLAGS) + + # Check that speed limit signs match + matches_speed_limit_signs_config = erdos.OperatorConfig( + name='matches_speed_limit_signs_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + erdos.connect( + MatchesOperator, matches_speed_limit_signs_config, + [ground_speed_limit_signs_stream, speed_limit_signs_driver_stream]) + return ( pose_stream, pose_stream_for_control, From 07c034271ae898d220c1aa48b11a062ae2faa03e Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Sat, 27 Aug 2022 18:36:42 +0000 Subject: [PATCH 12/20] Small fixes --- pylot/drivers/carla_speed_limit_signs_driver_operator.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pylot/drivers/carla_speed_limit_signs_driver_operator.py b/pylot/drivers/carla_speed_limit_signs_driver_operator.py index 7eab37d1c..c18a7f5f1 100644 --- a/pylot/drivers/carla_speed_limit_signs_driver_operator.py +++ b/pylot/drivers/carla_speed_limit_signs_driver_operator.py @@ -3,7 +3,7 @@ import erdos from pylot.perception.detection.speed_limit_sign import SpeedLimitSign -from pylot.perception.messages import ObstaclesMessage, SpeedSignsMessage +from pylot.perception.messages import SpeedSignsMessage import pylot.utils from pylot.drivers.carla_base_gnss_driver_operator import ( CarlaBaseGNSSDriverOperator) @@ -36,12 +36,8 @@ def __init__(self, vehicle_id_stream: erdos.ReadStream, def process_gnss(self, timestamp: erdos.Timestamp, gnss_msg: carla.GnssMeasurement): """""" - actor_list = self._world.get_actors() - - speed_limit_actors = actor_list.filter('traffic.speed_limit*') speed_limits = list( - map(SpeedLimitSign.from_simulator_actor, speed_limit_actors)) - + map(SpeedLimitSign.from_simulator_actor, self._speed_limit_actors)) self._output_stream.send(SpeedSignsMessage(timestamp, speed_limits)) self._output_stream.send(erdos.WatermarkMessage(timestamp)) From 7a0658bc547c808ec088926eab569642d8ec14bb Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Sat, 27 Aug 2022 18:52:46 +0000 Subject: [PATCH 13/20] Add CarlaStopSignsDriverOperator --- .../carla_stop_signs_driver_operator.py | 48 +++++++++++++++++++ pylot/operator_creator.py | 22 +++++++++ 2 files changed, 70 insertions(+) create mode 100644 pylot/drivers/carla_stop_signs_driver_operator.py diff --git a/pylot/drivers/carla_stop_signs_driver_operator.py b/pylot/drivers/carla_stop_signs_driver_operator.py new file mode 100644 index 000000000..386b378e7 --- /dev/null +++ b/pylot/drivers/carla_stop_signs_driver_operator.py @@ -0,0 +1,48 @@ +import carla + +import erdos + +from pylot.perception.detection.stop_sign import StopSign +from pylot.perception.messages import StopSignsMessage +import pylot.utils +from pylot.drivers.carla_base_gnss_driver_operator import ( + CarlaBaseGNSSDriverOperator) + + +class CarlaStopSignsDriverOperator(CarlaBaseGNSSDriverOperator): + """Publishes the locations and values of all stop signs retrieved from the + simulator at the provided frequency. + + Args: + vehicle_id_stream: Stream on which the operator receives the ID of the + ego vehicle. The ID is used to get a simulator handle to the + vehicle. + stop_signs_stream: Stream on which the operator sends the speed + limit signs. + frequency: Rate at which the pose is published, in Hertz. + flags: Object used to access absl flags. + """ + def __init__(self, vehicle_id_stream: erdos.ReadStream, + stop_signs_stream: erdos.WriteStream, frequency: float, + flags): + transform = pylot.utils.Transform(pylot.utils.Location(), + pylot.utils.Rotation()) + gnss_setup = pylot.drivers.sensor_setup.GNSSSetup( + self.config.name, transform) + super().__init__(vehicle_id_stream, stop_signs_stream, gnss_setup, + frequency, flags) + self._stop_sign_actors = None + + def process_gnss(self, timestamp: erdos.Timestamp, + gnss_msg: carla.GnssMeasurement): + """""" + stop_signs = list( + map(StopSign.from_simulator_actor, self._stop_sign_actors)) + self._output_stream.send(StopSignsMessage(timestamp, stop_signs)) + self._output_stream.send(erdos.WatermarkMessage(timestamp)) + + def run(self): + super().run() + # Get speed limit actors + self._stop_sign_actors = self._world.get_actors().filter( + 'traffic.stop') diff --git a/pylot/operator_creator.py b/pylot/operator_creator.py index 6648c352c..a203168f9 100644 --- a/pylot/operator_creator.py +++ b/pylot/operator_creator.py @@ -144,6 +144,28 @@ def add_simulator_bridge(control_stream, sensor_ready_stream, MatchesOperator, matches_speed_limit_signs_config, [ground_speed_limit_signs_stream, speed_limit_signs_driver_stream]) + # Check that stop signs match + from pylot.drivers.carla_stop_signs_driver_operator import CarlaStopSignsDriverOperator + stop_signs_config = erdos.OperatorConfig( + name='carla_stop_signs_driver_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + stop_signs_driver_stream, = erdos.connect( + CarlaStopSignsDriverOperator, stop_signs_config, [vehicle_id_stream], + FLAGS.simulator_localization_frequency, FLAGS) + + # Check that stop signs match + matches_stop_signs_config = erdos.OperatorConfig( + name='matches_stop_signs_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + erdos.connect(MatchesOperator, matches_stop_signs_config, + [ground_stop_signs_stream, stop_signs_driver_stream]) + return ( pose_stream, pose_stream_for_control, From 09cdb87f90b28526b255bb336027ff04f24e12de Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Sat, 27 Aug 2022 18:56:26 +0000 Subject: [PATCH 14/20] Update docs --- pylot/drivers/carla_obstacles_driver_operator.py | 2 +- pylot/drivers/carla_speed_limit_signs_driver_operator.py | 2 +- pylot/drivers/carla_stop_signs_driver_operator.py | 2 +- pylot/drivers/carla_traffic_lights_driver_operator.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pylot/drivers/carla_obstacles_driver_operator.py b/pylot/drivers/carla_obstacles_driver_operator.py index 9819bcb47..ad4101d1a 100644 --- a/pylot/drivers/carla_obstacles_driver_operator.py +++ b/pylot/drivers/carla_obstacles_driver_operator.py @@ -32,7 +32,7 @@ def __init__(self, vehicle_id_stream: erdos.ReadStream, def process_gnss(self, timestamp: erdos.Timestamp, gnss_msg: carla.GnssMeasurement): - """""" + """Sends obstacles (vehicles and people) followed by a watermark.""" actor_list = self._world.get_actors() vec_actors = actor_list.filter('vehicle.*') diff --git a/pylot/drivers/carla_speed_limit_signs_driver_operator.py b/pylot/drivers/carla_speed_limit_signs_driver_operator.py index c18a7f5f1..f864a2bd0 100644 --- a/pylot/drivers/carla_speed_limit_signs_driver_operator.py +++ b/pylot/drivers/carla_speed_limit_signs_driver_operator.py @@ -35,7 +35,7 @@ def __init__(self, vehicle_id_stream: erdos.ReadStream, def process_gnss(self, timestamp: erdos.Timestamp, gnss_msg: carla.GnssMeasurement): - """""" + """Sends speed limit signs followed by a watermark.""" speed_limits = list( map(SpeedLimitSign.from_simulator_actor, self._speed_limit_actors)) self._output_stream.send(SpeedSignsMessage(timestamp, speed_limits)) diff --git a/pylot/drivers/carla_stop_signs_driver_operator.py b/pylot/drivers/carla_stop_signs_driver_operator.py index 386b378e7..658081f83 100644 --- a/pylot/drivers/carla_stop_signs_driver_operator.py +++ b/pylot/drivers/carla_stop_signs_driver_operator.py @@ -35,7 +35,7 @@ def __init__(self, vehicle_id_stream: erdos.ReadStream, def process_gnss(self, timestamp: erdos.Timestamp, gnss_msg: carla.GnssMeasurement): - """""" + """Send stop signs followed by a watermark.""" stop_signs = list( map(StopSign.from_simulator_actor, self._stop_sign_actors)) self._output_stream.send(StopSignsMessage(timestamp, stop_signs)) diff --git a/pylot/drivers/carla_traffic_lights_driver_operator.py b/pylot/drivers/carla_traffic_lights_driver_operator.py index c3f2644bf..8c64b1f28 100644 --- a/pylot/drivers/carla_traffic_lights_driver_operator.py +++ b/pylot/drivers/carla_traffic_lights_driver_operator.py @@ -40,7 +40,7 @@ def __init__(self, vehicle_id_stream: erdos.ReadStream, def process_gnss(self, timestamp: erdos.Timestamp, gnss_msg: carla.GnssMeasurement): - """Sends pose information followed by a watermark.""" + """Sends traffic light information followed by a watermark.""" traffic_lights = list( map(TrafficLight.from_simulator_actor, self._traffic_light_actors)) self._output_stream.send( From 78dda3ebed8befd7b9651bd864e834818c9de530 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Mon, 29 Aug 2022 23:19:50 +0000 Subject: [PATCH 15/20] Add CarlaOpenDriveDriverOperator and improve logging --- pylot/drivers/carla_gnss_driver_operator.py | 4 +-- .../carla_obstacles_driver_operator.py | 2 ++ .../carla_open_drive_driver_operator.py | 32 +++++++++++++++++++ pylot/drivers/carla_pose_driver_operator.py | 1 + ...carla_speed_limit_signs_driver_operator.py | 1 + .../carla_stop_signs_driver_operator.py | 1 + .../carla_traffic_lights_driver_operator.py | 2 ++ 7 files changed, 41 insertions(+), 2 deletions(-) create mode 100644 pylot/drivers/carla_open_drive_driver_operator.py diff --git a/pylot/drivers/carla_gnss_driver_operator.py b/pylot/drivers/carla_gnss_driver_operator.py index aa2bcd132..adbdccda3 100644 --- a/pylot/drivers/carla_gnss_driver_operator.py +++ b/pylot/drivers/carla_gnss_driver_operator.py @@ -44,9 +44,9 @@ def process_gnss(self, timestamp: erdos.Timestamp, Sends GNSS measurements to downstream operators. """ - watermark_msg = erdos.WatermarkMessage(timestamp) + self._logger.debug('@{}: sending GNSS reading'.format(timestamp)) msg = GNSSMessage( timestamp, Transform.from_simulator_transform(gnss_msg.transform), gnss_msg.altitude, gnss_msg.latitude, gnss_msg.longitude) self._output_stream.send(msg) - self._output_stream.send(watermark_msg) + self._output_stream.send(erdos.WatermarkMessage(timestamp)) diff --git a/pylot/drivers/carla_obstacles_driver_operator.py b/pylot/drivers/carla_obstacles_driver_operator.py index ad4101d1a..c3ed7c8a0 100644 --- a/pylot/drivers/carla_obstacles_driver_operator.py +++ b/pylot/drivers/carla_obstacles_driver_operator.py @@ -33,6 +33,8 @@ def __init__(self, vehicle_id_stream: erdos.ReadStream, def process_gnss(self, timestamp: erdos.Timestamp, gnss_msg: carla.GnssMeasurement): """Sends obstacles (vehicles and people) followed by a watermark.""" + self._logger.debug('@{}: sending obstacles'.format(timestamp)) + actor_list = self._world.get_actors() vec_actors = actor_list.filter('vehicle.*') diff --git a/pylot/drivers/carla_open_drive_driver_operator.py b/pylot/drivers/carla_open_drive_driver_operator.py new file mode 100644 index 000000000..0c3c32438 --- /dev/null +++ b/pylot/drivers/carla_open_drive_driver_operator.py @@ -0,0 +1,32 @@ +import erdos + +from pylot.simulation.utils import get_world, set_simulation_mode + + +class CarlaOpenDriveDriverOperator(erdos.Operator): + """Sends a string containing the ground-truth map of the world in OpenDRIVE + format, followed by a top watermark.""" + def __init__(self, open_drive_stream: erdos.WriteStream, flags): + self._open_drive_stream = open_drive_stream + + self._flags = flags + self._logger = erdos.utils.setup_logging(self.config.name, + self.config.log_file_name) + + @staticmethod + def connect(): + open_drive_stream = erdos.WriteStream() + return [open_drive_stream] + + def run(self): + _, world = get_world(self._flags.simulator_host, + self._flags.simulator_port, + self._flags.simulator_timeout) + set_simulation_mode(world, self._flags) + + self._logger.debug('Sending the map in OpenDRIVE format') + self._open_drive_stream.send( + erdos.Message(erdos.Timestamp(coordinates=[0]), + world.get_map().to_opendrive())) + self._open_drive_stream.send( + erdos.WatermarkMessage(erdos.Timestamp(is_top=True))) diff --git a/pylot/drivers/carla_pose_driver_operator.py b/pylot/drivers/carla_pose_driver_operator.py index b74db3503..17980afee 100644 --- a/pylot/drivers/carla_pose_driver_operator.py +++ b/pylot/drivers/carla_pose_driver_operator.py @@ -35,6 +35,7 @@ def __init__(self, vehicle_id_stream: erdos.ReadStream, def process_gnss(self, timestamp: erdos.Timestamp, gnss_msg: carla.GnssMeasurement): """Sends pose information followed by a watermark.""" + self._logger.debug('@{}: sending pose'.format(timestamp)) vec_transform = pylot.utils.Transform.from_simulator_transform( self._vehicle.get_transform()) velocity_vector = pylot.utils.Vector3D.from_simulator_vector( diff --git a/pylot/drivers/carla_speed_limit_signs_driver_operator.py b/pylot/drivers/carla_speed_limit_signs_driver_operator.py index f864a2bd0..ef93e0cb7 100644 --- a/pylot/drivers/carla_speed_limit_signs_driver_operator.py +++ b/pylot/drivers/carla_speed_limit_signs_driver_operator.py @@ -36,6 +36,7 @@ def __init__(self, vehicle_id_stream: erdos.ReadStream, def process_gnss(self, timestamp: erdos.Timestamp, gnss_msg: carla.GnssMeasurement): """Sends speed limit signs followed by a watermark.""" + self._logger.debug('@{}: sending speed limit signs'.format(timestamp)) speed_limits = list( map(SpeedLimitSign.from_simulator_actor, self._speed_limit_actors)) self._output_stream.send(SpeedSignsMessage(timestamp, speed_limits)) diff --git a/pylot/drivers/carla_stop_signs_driver_operator.py b/pylot/drivers/carla_stop_signs_driver_operator.py index 658081f83..0996a58d2 100644 --- a/pylot/drivers/carla_stop_signs_driver_operator.py +++ b/pylot/drivers/carla_stop_signs_driver_operator.py @@ -36,6 +36,7 @@ def __init__(self, vehicle_id_stream: erdos.ReadStream, def process_gnss(self, timestamp: erdos.Timestamp, gnss_msg: carla.GnssMeasurement): """Send stop signs followed by a watermark.""" + self._logger.debug('@{}:sending stop signs'.format(timestamp)) stop_signs = list( map(StopSign.from_simulator_actor, self._stop_sign_actors)) self._output_stream.send(StopSignsMessage(timestamp, stop_signs)) diff --git a/pylot/drivers/carla_traffic_lights_driver_operator.py b/pylot/drivers/carla_traffic_lights_driver_operator.py index 8c64b1f28..969ce832d 100644 --- a/pylot/drivers/carla_traffic_lights_driver_operator.py +++ b/pylot/drivers/carla_traffic_lights_driver_operator.py @@ -41,6 +41,8 @@ def __init__(self, vehicle_id_stream: erdos.ReadStream, def process_gnss(self, timestamp: erdos.Timestamp, gnss_msg: carla.GnssMeasurement): """Sends traffic light information followed by a watermark.""" + self._logger.debug( + '@{}: sending traffic light information'.format(timestamp)) traffic_lights = list( map(TrafficLight.from_simulator_actor, self._traffic_light_actors)) self._output_stream.send( From b29214d9c69e9e63a0de74599f8d650aa81dac6a Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Mon, 29 Aug 2022 23:25:56 +0000 Subject: [PATCH 16/20] Update CARLA operator --- pylot.py | 6 +- pylot/operator_creator.py | 131 +++++++- pylot/simulation/carla_operator.py | 288 +++-------------- pylot/simulation/old_carla_operator.py | 430 +++++++++++++++++++++++++ 4 files changed, 599 insertions(+), 256 deletions(-) create mode 100644 pylot/simulation/old_carla_operator.py diff --git a/pylot.py b/pylot.py index 1d7665e74..64aceff02 100644 --- a/pylot.py +++ b/pylot.py @@ -44,13 +44,15 @@ def driver(): ground_stop_signs_stream, vehicle_id_stream, open_drive_stream, - global_trajectory_stream, - ) = pylot.operator_creator.add_simulator_bridge( + ) = pylot.operator_creator.add_simulator_bridge( # .add_simulator_bridge( control_loop_stream, release_sensor_stream, pipeline_finish_notify_stream, ) + global_trajectory_stream = erdos.IngestStream() + streams_to_send_top_on.append(global_trajectory_stream) + # Add sensors. center_camera_setup = RGBCameraSetup('center_camera', FLAGS.camera_image_width, diff --git a/pylot/operator_creator.py b/pylot/operator_creator.py index a203168f9..701fb97ae 100644 --- a/pylot/operator_creator.py +++ b/pylot/operator_creator.py @@ -15,6 +15,113 @@ def add_simulator_bridge(control_stream, sensor_ready_stream, pipeline_finish_notify_stream): from pylot.simulation.carla_operator import CarlaOperator + op_config = erdos.OperatorConfig(name='simulator_bridge_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + + vehicle_id_stream, = erdos.connect( + CarlaOperator, op_config, + [control_stream, pipeline_finish_notify_stream], FLAGS) + + # Check that Pose matches + from pylot.drivers.carla_pose_driver_operator import CarlaPoseDriverOperator + pose_driver_config = erdos.OperatorConfig( + name='pose_driver_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + pose_stream, = erdos.connect(CarlaPoseDriverOperator, pose_driver_config, + [vehicle_id_stream], + FLAGS.simulator_localization_frequency, FLAGS) + + # Check that Pose for control matches + pose_for_control_config = erdos.OperatorConfig( + name='pose_for_control_driver_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + pose_stream_for_control, = erdos.connect( + CarlaPoseDriverOperator, pose_for_control_config, [vehicle_id_stream], + FLAGS.simulator_localization_frequency, FLAGS) + + # Check that traffic lights match + from pylot.drivers.carla_traffic_lights_driver_operator import CarlaTrafficLightsDriverOperator + ground_traffic_lights_config = erdos.OperatorConfig( + name='carla_traffic_lights_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + ground_traffic_lights_stream, = erdos.connect( + CarlaTrafficLightsDriverOperator, ground_traffic_lights_config, + [vehicle_id_stream], FLAGS.simulator_localization_frequency, FLAGS) + + # Check that obstacles match + from pylot.drivers.carla_obstacles_driver_operator import CarlaObstaclesDriverOperator + ground_obstacles_config = erdos.OperatorConfig( + name='carla_obstacles_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + ground_obstacles_stream, = erdos.connect( + CarlaObstaclesDriverOperator, ground_obstacles_config, + [vehicle_id_stream], FLAGS.simulator_localization_frequency, FLAGS) + + # Check that speed signs match + from pylot.drivers.carla_speed_limit_signs_driver_operator import CarlaSpeedLimitSignsDriverOperator + speed_signs_config = erdos.OperatorConfig( + name='carla_speed_limit_signs_driver_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + ground_speed_limit_signs_stream, = erdos.connect( + CarlaSpeedLimitSignsDriverOperator, speed_signs_config, + [vehicle_id_stream], FLAGS.simulator_localization_frequency, FLAGS) + + # Check that stop signs match + from pylot.drivers.carla_stop_signs_driver_operator import CarlaStopSignsDriverOperator + stop_signs_config = erdos.OperatorConfig( + name='carla_stop_signs_driver_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + ground_stop_signs_stream, = erdos.connect( + CarlaStopSignsDriverOperator, stop_signs_config, [vehicle_id_stream], + FLAGS.simulator_localization_frequency, FLAGS) + + from pylot.drivers.carla_open_drive_driver_operator import CarlaOpenDriveDriverOperator + open_drive_driver_config = erdos.OperatorConfig( + name='carla_open_drive_driver_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + open_drive_stream, = erdos.connect(CarlaOpenDriveDriverOperator, + open_drive_driver_config, [], FLAGS) + + return ( + pose_stream, + pose_stream_for_control, + ground_traffic_lights_stream, + ground_obstacles_stream, + ground_speed_limit_signs_stream, + ground_stop_signs_stream, + vehicle_id_stream, + open_drive_stream, + ) + + +def add_simulator_bridge_old(control_stream, sensor_ready_stream, + pipeline_finish_notify_stream): + from pylot.simulation.old_carla_operator import OldCarlaOperator + op_config = erdos.OperatorConfig(name='simulator_bridge_operator', flow_watermarks=False, log_file_name=FLAGS.log_file_name, @@ -31,7 +138,7 @@ def add_simulator_bridge(control_stream, sensor_ready_stream, open_drive_stream, global_trajectory_stream, ) = erdos.connect( - CarlaOperator, op_config, + OldCarlaOperator, op_config, [control_stream, sensor_ready_stream, pipeline_finish_notify_stream], FLAGS) @@ -166,6 +273,27 @@ def add_simulator_bridge(control_stream, sensor_ready_stream, erdos.connect(MatchesOperator, matches_stop_signs_config, [ground_stop_signs_stream, stop_signs_driver_stream]) + # Check that open drive matches + from pylot.drivers.carla_open_drive_driver_operator import CarlaOpenDriveDriverOperator + open_drive_driver_config = erdos.OperatorConfig( + name='carla_open_drive_driver_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + open_drive_driver_stream, = erdos.connect(CarlaOpenDriveDriverOperator, + open_drive_driver_config, [], + FLAGS) + + matches_open_drive_config = erdos.OperatorConfig( + name='matches_open_drive_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + erdos.connect(MatchesOperator, matches_open_drive_config, + [open_drive_stream, open_drive_driver_stream]) + return ( pose_stream, pose_stream_for_control, @@ -175,7 +303,6 @@ def add_simulator_bridge(control_stream, sensor_ready_stream, ground_stop_signs_stream, vehicle_id_stream, open_drive_stream, - global_trajectory_stream, ) diff --git a/pylot/simulation/carla_operator.py b/pylot/simulation/carla_operator.py index da59b7ea5..c4a94c6bb 100644 --- a/pylot/simulation/carla_operator.py +++ b/pylot/simulation/carla_operator.py @@ -1,9 +1,6 @@ -import enum -import heapq import random import threading import time -from functools import total_ordering from carla import Location, VehicleControl, command @@ -13,8 +10,6 @@ import pylot.simulation.utils import pylot.utils from pylot.control.messages import ControlMessage -from pylot.perception.messages import ObstaclesMessage, SpeedSignsMessage, \ - StopSignsMessage, TrafficLightsMessage class CarlaOperator(erdos.Operator): @@ -25,6 +20,10 @@ class CarlaOperator(erdos.Operator): vehicle that the rest of the pipeline drives. Args: + control_stream: Stream on which the operator receives control messages + to apply to the ego vehicle. + pipeline_finish_notify_stream: In pseudo-async mode, notifies the + operator that the pipeline has finished executing. flags: A handle to the global flags instance to retrieve the configuration. @@ -33,35 +32,18 @@ class CarlaOperator(erdos.Operator): _world: A handle to the world running inside the simulation. _vehicles: A list of identifiers of the vehicles inside the simulation. """ - def __init__( - self, control_stream: ReadStream, - release_sensor_stream: ReadStream, - pipeline_finish_notify_stream: ReadStream, - pose_stream: WriteStream, pose_stream_for_control: WriteStream, - ground_traffic_lights_stream: WriteStream, - ground_obstacles_stream: WriteStream, - ground_speed_limit_signs_stream: WriteStream, - ground_stop_signs_stream: WriteStream, - vehicle_id_stream: WriteStream, open_drive_stream: WriteStream, - global_trajectory_stream: WriteStream, flags): + def __init__(self, control_stream: ReadStream, + pipeline_finish_notify_stream: ReadStream, + vehicle_id_stream: WriteStream, flags): if flags.random_seed: random.seed(flags.random_seed) # Register callback on control stream. control_stream.add_callback(self.on_control_msg) - erdos.add_watermark_callback([release_sensor_stream], [], - self.on_sensor_ready) if flags.simulator_mode == "pseudo-asynchronous": erdos.add_watermark_callback([pipeline_finish_notify_stream], [], self.on_pipeline_finish) - self.pose_stream = pose_stream - self.pose_stream_for_control = pose_stream_for_control - self.ground_traffic_lights_stream = ground_traffic_lights_stream - self.ground_obstacles_stream = ground_obstacles_stream - self.ground_speed_limit_signs_stream = ground_speed_limit_signs_stream - self.ground_stop_signs_stream = ground_stop_signs_stream + self.vehicle_id_stream = vehicle_id_stream - self.open_drive_stream = open_drive_stream - self.global_trajectory_stream = global_trajectory_stream self._flags = flags self._logger = erdos.utils.setup_logging(self.config.name, @@ -126,39 +108,11 @@ def __init__( # concurrently. self._lock = threading.Lock() - # Dictionary that stores the processing times when sensors are ready - # to realease data. This info is used to calculate the real processing - # time of our pipeline without including simulator-induced sensor - # delays. - self._next_localization_sensor_reading = None - self._next_control_sensor_reading = None - self._simulator_in_sync = False - self._tick_events = [] - self._control_msgs = {} - @staticmethod - def connect(control_stream: ReadStream, release_sensor_stream: ReadStream, + def connect(control_stream: ReadStream, pipeline_finish_notify_stream: ReadStream): - pose_stream = erdos.WriteStream() - pose_stream_for_control = erdos.WriteStream() - ground_traffic_lights_stream = erdos.WriteStream() - ground_obstacles_stream = erdos.WriteStream() - ground_speed_limit_signs_stream = erdos.WriteStream() - ground_stop_signs_stream = erdos.WriteStream() vehicle_id_stream = erdos.WriteStream() - open_drive_stream = erdos.WriteStream() - global_trajectory_stream = erdos.WriteStream() - return [ - pose_stream, - pose_stream_for_control, - ground_traffic_lights_stream, - ground_obstacles_stream, - ground_speed_limit_signs_stream, - ground_stop_signs_stream, - vehicle_id_stream, - open_drive_stream, - global_trajectory_stream, - ] + return [vehicle_id_stream] @erdos.profile_method() def on_control_msg(self, msg: ControlMessage): @@ -167,135 +121,40 @@ def on_control_msg(self, msg: ControlMessage): Args: msg: A control.messages.ControlMessage message. """ - self._logger.debug('@{}: received control message'.format( - msg.timestamp)) + self._logger.debug('@{}: received control message {}'.format( + msg.timestamp, msg)) + if self._flags.control not in ['simulator_auto_pilot', 'manual']: + self._apply_control_msg(msg) + # If auto pilot or manual mode is enabled then we do not apply the + # control, but we still want to tick in this method to ensure that + # all operators finished work before the world ticks. + if not self._flags.simulator_mode == 'pseudo-asynchronous': + self._tick_simulator() if self._flags.simulator_mode == 'pseudo-asynchronous': - heapq.heappush( - self._tick_events, - (msg.timestamp.coordinates[0], TickEvent.CONTROL_CMD)) - self._control_msgs[msg.timestamp.coordinates[0]] = msg - # Tick until the next sensor read game time to ensure that the - # data-flow has a new round of sensor inputs. Apply control - # commands if they must be applied before the next sensor read. - self._consume_next_event() - else: - # If auto pilot or manual mode is enabled then we do not apply the - # control, but we still want to tick in this method to ensure that - # all operators finished work before the world ticks. - if self._flags.control not in ['simulator_auto_pilot', 'manual']: - self._apply_control_msg(msg) # Tick the world after the operator received a control command. # This usually indicates that all the operators have completed # processing the previous timestamp (with the exception of logging # operators that are not part of the main loop). - self._tick_simulator() - - def _consume_next_event(self): - while True: - (sim_time, event_type) = heapq.heappop(self._tick_events) - if event_type == TickEvent.SENSOR_READ: - self._tick_simulator_until(sim_time) - break - elif event_type == TickEvent.CONTROL_CMD: - self._tick_simulator_until(sim_time) - control_msg = self._control_msgs[sim_time] - self._apply_control_msg(control_msg) + self._apply_control_msg(msg) def on_pipeline_finish(self, timestamp: Timestamp): + """Only invoked in pseudo-asynchronous mode.""" self._logger.debug("@{}: Received pipeline finish.".format(timestamp)) + # Tick simulator forward until the next expected control message. game_time = timestamp.coordinates[0] - if (self._flags.simulator_control_frequency == -1 - or self._next_control_sensor_reading is None - or game_time == self._next_control_sensor_reading): - # There was supposed to be a control message for this timestamp - # too. Send the Pose message and continue after the control message - # is received. - self._update_next_control_pseudo_asynchronous_ticks(game_time) - self.__send_hero_vehicle_data(self.pose_stream_for_control, - timestamp) - self.__update_spectactor_pose() - else: - # No pose message was supposed to be sent for this timestamp, we - # need to consume the next event to move the dataflow forward. - self._consume_next_event() - - def on_sensor_ready(self, timestamp: Timestamp): - # The first sensor reading needs to be discarded because it might - # not be correctly spaced out. - if not self._simulator_in_sync: - self._simulator_in_sync = True - - def send_actor_data(self, msg): - """ Callback function that gets called when the world is ticked. - This function sends a WatermarkMessage to the downstream operators as - a signal that they need to release data to the rest of the pipeline. - - Args: - msg: Data recieved from the simulation at a tick. - """ - # Ensure that the callback executes serially. - with self._lock: - game_time = int(msg.elapsed_seconds * 1000) - self._logger.info( - 'The world is at the timestamp {}'.format(game_time)) - timestamp = erdos.Timestamp(coordinates=[game_time]) - with erdos.profile(self.config.name + '.send_actor_data', - self, - event_data={'timestamp': str(timestamp)}): - if (self._flags.simulator_localization_frequency == -1 - or self._next_localization_sensor_reading is None or - game_time == self._next_localization_sensor_reading): - if self._flags.simulator_mode == 'pseudo-asynchronous': - self._update_next_localization_pseudo_async_ticks( - game_time) - self.__send_hero_vehicle_data(self.pose_stream, timestamp) - self.__send_ground_actors_data(timestamp) - self.__update_spectactor_pose() - - if self._flags.simulator_mode == "pseudo-asynchronous" and ( - self._flags.simulator_control_frequency == -1 - or self._next_control_sensor_reading is None - or game_time == self._next_control_sensor_reading): - self._update_next_control_pseudo_asynchronous_ticks( - game_time) - self.__send_hero_vehicle_data(self.pose_stream_for_control, - timestamp) - self.__update_spectactor_pose() - - def _update_next_localization_pseudo_async_ticks(self, game_time: int): - if self._flags.simulator_localization_frequency > -1: - self._next_localization_sensor_reading = ( - game_time + - int(1000 / self._flags.simulator_localization_frequency)) - if not self._simulator_in_sync: - # If this is the first sensor reading, then tick - # one more time because the second sensor reading - # is sometimes delayed by 1 tick. - self._next_localization_sensor_reading += int( - 1000 / self._flags.simulator_fps) - else: - self._next_localization_sensor_reading = ( - game_time + int(1000 / self._flags.simulator_fps)) - heapq.heappush( - self._tick_events, - (self._next_localization_sensor_reading, TickEvent.SENSOR_READ)) - - def _update_next_control_pseudo_asynchronous_ticks(self, game_time: int): if self._flags.simulator_control_frequency > -1: - self._next_control_sensor_reading = ( - game_time + - int(1000 / self._flags.simulator_control_frequency)) + control_frequency = self._flags.simulator_control_frequency else: - self._next_control_sensor_reading = ( - game_time + int(1000 / self._flags.simulator_fps)) - if (self._next_control_sensor_reading != - self._next_localization_sensor_reading): - heapq.heappush( - self._tick_events, - (self._next_control_sensor_reading, TickEvent.SENSOR_READ)) + control_frequency = self._flags.simulator_fps + next_control_msg_time = game_time + round(1000 / control_frequency) + self._tick_simulator_until(next_control_msg_time) def run(self): - self.__send_world_data() + self.vehicle_id_stream.send( + erdos.Message(erdos.Timestamp(coordinates=[0]), + self._ego_vehicle.id)) + self.vehicle_id_stream.send( + erdos.WatermarkMessage(erdos.Timestamp(is_top=True))) # Tick here once to ensure that the driver operators can get a handle # to the ego vehicle. # XXX(ionel): Hack to fix a race condition. Driver operators @@ -308,21 +167,12 @@ def run(self): time.sleep(4) # The older CARLA versions require an additional tick to sync # sensors. - self._world.on_tick(self.send_actor_data) self._tick_simulator() def _initialize_world(self): - """ Setups the world town, and activates the desired weather.""" - if self._simulator_version == '0.9.5': - # TODO (Sukrit) :: ERDOS provides no way to retrieve handles to the - # class objects to do garbage collection. Hence, objects from - # previous runs of the simulation may persist. We need to clean - # them up right now. In future, move this logic to a seperate - # destroy function. - pylot.simulation.utils.reset_world(self._world) - else: - self._world = self._client.load_world('Town{:02d}'.format( - self._flags.simulator_town)) + """Sets up the world town, and activates the desired weather.""" + self._world = self._client.load_world('Town{:02d}'.format( + self._flags.simulator_town)) self._logger.info('Setting the weather to {}'.format( self._flags.simulator_weather)) pylot.simulation.utils.set_weather(self._world, @@ -336,6 +186,7 @@ def _tick_simulator(self): self._world.tick() def _tick_simulator_until(self, goal_time: int): + self._logger.debug("ticking simulator until {}".format(goal_time)) while True: snapshot = self._world.get_snapshot() sim_time = int(snapshot.timestamp.elapsed_seconds * 1000) @@ -353,63 +204,7 @@ def _apply_control_msg(self, msg: ControlMessage): reverse=msg.reverse) self._client.apply_batch_sync( [command.ApplyVehicleControl(self._ego_vehicle.id, vec_control)]) - - def __send_hero_vehicle_data(self, stream: WriteStream, - timestamp: Timestamp): - vec_transform = pylot.utils.Transform.from_simulator_transform( - self._ego_vehicle.get_transform()) - velocity_vector = pylot.utils.Vector3D.from_simulator_vector( - self._ego_vehicle.get_velocity()) - forward_speed = velocity_vector.magnitude() - pose = pylot.utils.Pose(vec_transform, forward_speed, velocity_vector, - timestamp.coordinates[0]) - stream.send(erdos.Message(timestamp, pose)) - stream.send(erdos.WatermarkMessage(timestamp)) - - def __send_ground_actors_data(self, timestamp: Timestamp): - # Get all the actors in the simulation. - actor_list = self._world.get_actors() - - (vehicles, people, traffic_lights, speed_limits, traffic_stops - ) = pylot.simulation.utils.extract_data_in_pylot_format(actor_list) - - # Send ground people and vehicles. - self.ground_obstacles_stream.send( - ObstaclesMessage(timestamp, vehicles + people)) - self.ground_obstacles_stream.send(erdos.WatermarkMessage(timestamp)) - # Send ground traffic lights. - self.ground_traffic_lights_stream.send( - TrafficLightsMessage(timestamp, traffic_lights)) - self.ground_traffic_lights_stream.send( - erdos.WatermarkMessage(timestamp)) - # Send ground speed signs. - self.ground_speed_limit_signs_stream.send( - SpeedSignsMessage(timestamp, speed_limits)) - self.ground_speed_limit_signs_stream.send( - erdos.WatermarkMessage(timestamp)) - # Send stop signs. - self.ground_stop_signs_stream.send( - StopSignsMessage(timestamp, traffic_stops)) - self.ground_stop_signs_stream.send(erdos.WatermarkMessage(timestamp)) - - def __send_world_data(self): - """ Sends ego vehicle id, open drive and trajectory messages.""" - # Send the id of the ego vehicle. This id is used by the driver - # operators to get a handle to the ego vehicle, which they use to - # attach sensors. - self.vehicle_id_stream.send( - erdos.Message(erdos.Timestamp(coordinates=[0]), - self._ego_vehicle.id)) - self.vehicle_id_stream.send( - erdos.WatermarkMessage(erdos.Timestamp(is_top=True))) - - # Send open drive string. - self.open_drive_stream.send( - erdos.Message(erdos.Timestamp(coordinates=[0]), - self._world.get_map().to_opendrive())) - top_watermark = erdos.WatermarkMessage(erdos.Timestamp(is_top=True)) - self.open_drive_stream.send(top_watermark) - self.global_trajectory_stream.send(top_watermark) + self.__update_spectactor_pose() def __update_spectactor_pose(self): # Set the world simulation view with respect to the vehicle. @@ -417,14 +212,3 @@ def __update_spectactor_pose(self): v_pose.location -= 10 * Location(v_pose.get_forward_vector()) v_pose.location.z = 5 self._spectator.set_transform(v_pose) - - -@total_ordering -class TickEvent(enum.Enum): - CONTROL_CMD = 1 - SENSOR_READ = 2 - - def __lt__(self, other): - if self.__class__ is other.__class__: - return self.value < other.value - return NotImplemented diff --git a/pylot/simulation/old_carla_operator.py b/pylot/simulation/old_carla_operator.py new file mode 100644 index 000000000..623ad12ba --- /dev/null +++ b/pylot/simulation/old_carla_operator.py @@ -0,0 +1,430 @@ +import enum +import heapq +import random +import threading +import time +from functools import total_ordering + +from carla import Location, VehicleControl, command + +import erdos +from erdos import ReadStream, Timestamp, WriteStream + +import pylot.simulation.utils +import pylot.utils +from pylot.control.messages import ControlMessage +from pylot.perception.messages import ObstaclesMessage, SpeedSignsMessage, \ + StopSignsMessage, TrafficLightsMessage + + +class OldCarlaOperator(erdos.Operator): + """Initializes and controls a CARLA simulation. + + This operator connects to the simulation, sets the required weather in the + simulation world, initializes the required number of actors, and the + vehicle that the rest of the pipeline drives. + + Args: + flags: A handle to the global flags instance to retrieve the + configuration. + + Attributes: + _client: A connection to the simulator. + _world: A handle to the world running inside the simulation. + _vehicles: A list of identifiers of the vehicles inside the simulation. + """ + def __init__( + self, control_stream: ReadStream, + release_sensor_stream: ReadStream, + pipeline_finish_notify_stream: ReadStream, + pose_stream: WriteStream, pose_stream_for_control: WriteStream, + ground_traffic_lights_stream: WriteStream, + ground_obstacles_stream: WriteStream, + ground_speed_limit_signs_stream: WriteStream, + ground_stop_signs_stream: WriteStream, + vehicle_id_stream: WriteStream, open_drive_stream: WriteStream, + global_trajectory_stream: WriteStream, flags): + if flags.random_seed: + random.seed(flags.random_seed) + # Register callback on control stream. + control_stream.add_callback(self.on_control_msg) + erdos.add_watermark_callback([release_sensor_stream], [], + self.on_sensor_ready) + if flags.simulator_mode == "pseudo-asynchronous": + erdos.add_watermark_callback([pipeline_finish_notify_stream], [], + self.on_pipeline_finish) + self.pose_stream = pose_stream + self.pose_stream_for_control = pose_stream_for_control + self.ground_traffic_lights_stream = ground_traffic_lights_stream + self.ground_obstacles_stream = ground_obstacles_stream + self.ground_speed_limit_signs_stream = ground_speed_limit_signs_stream + self.ground_stop_signs_stream = ground_stop_signs_stream + self.vehicle_id_stream = vehicle_id_stream + self.open_drive_stream = open_drive_stream + self.global_trajectory_stream = global_trajectory_stream + + self._flags = flags + self._logger = erdos.utils.setup_logging(self.config.name, + self.config.log_file_name) + self._csv_logger = erdos.utils.setup_csv_logging( + self.config.name + '-csv', self.config.csv_log_file_name) + # Connect to simulator and retrieve the world running. + self._client, self._world = pylot.simulation.utils.get_world( + self._flags.simulator_host, self._flags.simulator_port, + self._flags.simulator_timeout) + self._simulator_version = self._client.get_client_version() + + if not self._flags.scenario_runner and \ + self._flags.control != "manual": + # Load the appropriate town. + self._initialize_world() + + # Save the spectator handle so that we don't have to repeteadly get the + # handle (which is slow). + self._spectator = self._world.get_spectator() + + if pylot.simulation.utils.check_simulator_version( + self._simulator_version, required_minor=9, required_patch=8): + # Any simulator version after 0.9.7. + # Create a traffic manager to that auto pilot works. + self._traffic_manager = self._client.get_trafficmanager( + self._flags.carla_traffic_manager_port) + self._traffic_manager.set_synchronous_mode( + self._flags.simulator_mode == 'synchronous') + + if self._flags.scenario_runner: + # Tick until 4.0 seconds time so that all synchronous scenario runs + # start at exactly the same game time. + pylot.simulation.utils.set_synchronous_mode(self._world, 1000) + self._tick_simulator_until(4000) + + pylot.simulation.utils.set_simulation_mode(self._world, self._flags) + + if self._flags.scenario_runner or self._flags.control == "manual": + # Wait until the ego vehicle is spawned by the scenario runner. + self._logger.info("Waiting for the scenario to be ready ...") + self._ego_vehicle = pylot.simulation.utils.wait_for_ego_vehicle( + self._world) + self._logger.info("Found ego vehicle") + else: + # Spawn ego vehicle, people and vehicles. + (self._ego_vehicle, self._vehicle_ids, + self._people) = pylot.simulation.utils.spawn_actors( + self._client, self._world, + self._flags.carla_traffic_manager_port, + self._simulator_version, + self._flags.simulator_spawn_point_index, + self._flags.control == 'simulator_auto_pilot', + self._flags.simulator_num_people, + self._flags.simulator_num_vehicles, self._logger) + + pylot.simulation.utils.set_vehicle_physics( + self._ego_vehicle, self._flags.simulator_vehicle_moi, + self._flags.simulator_vehicle_mass) + + # Lock used to ensure that simulator callbacks are not executed + # concurrently. + self._lock = threading.Lock() + + # Dictionary that stores the processing times when sensors are ready + # to realease data. This info is used to calculate the real processing + # time of our pipeline without including simulator-induced sensor + # delays. + self._next_localization_sensor_reading = None + self._next_control_sensor_reading = None + self._simulator_in_sync = False + self._tick_events = [] + self._control_msgs = {} + + @staticmethod + def connect(control_stream: ReadStream, release_sensor_stream: ReadStream, + pipeline_finish_notify_stream: ReadStream): + pose_stream = erdos.WriteStream() + pose_stream_for_control = erdos.WriteStream() + ground_traffic_lights_stream = erdos.WriteStream() + ground_obstacles_stream = erdos.WriteStream() + ground_speed_limit_signs_stream = erdos.WriteStream() + ground_stop_signs_stream = erdos.WriteStream() + vehicle_id_stream = erdos.WriteStream() + open_drive_stream = erdos.WriteStream() + global_trajectory_stream = erdos.WriteStream() + return [ + pose_stream, + pose_stream_for_control, + ground_traffic_lights_stream, + ground_obstacles_stream, + ground_speed_limit_signs_stream, + ground_stop_signs_stream, + vehicle_id_stream, + open_drive_stream, + global_trajectory_stream, + ] + + @erdos.profile_method() + def on_control_msg(self, msg: ControlMessage): + """ Invoked when a ControlMessage is received. + + Args: + msg: A control.messages.ControlMessage message. + """ + self._logger.debug('@{}: received control message'.format( + msg.timestamp)) + if self._flags.simulator_mode == 'pseudo-asynchronous': + heapq.heappush( + self._tick_events, + (msg.timestamp.coordinates[0], TickEvent.CONTROL_CMD)) + self._control_msgs[msg.timestamp.coordinates[0]] = msg + # Tick until the next sensor read game time to ensure that the + # data-flow has a new round of sensor inputs. Apply control + # commands if they must be applied before the next sensor read. + self._consume_next_event() + else: + # If auto pilot or manual mode is enabled then we do not apply the + # control, but we still want to tick in this method to ensure that + # all operators finished work before the world ticks. + if self._flags.control not in ['simulator_auto_pilot', 'manual']: + self._apply_control_msg(msg) + # Tick the world after the operator received a control command. + # This usually indicates that all the operators have completed + # processing the previous timestamp (with the exception of logging + # operators that are not part of the main loop). + self._tick_simulator() + + def _consume_next_event(self): + while True: + (sim_time, event_type) = heapq.heappop(self._tick_events) + if event_type == TickEvent.SENSOR_READ: + self._tick_simulator_until(sim_time) + break + elif event_type == TickEvent.CONTROL_CMD: + self._tick_simulator_until(sim_time) + control_msg = self._control_msgs[sim_time] + self._apply_control_msg(control_msg) + + def on_pipeline_finish(self, timestamp: Timestamp): + self._logger.debug("@{}: Received pipeline finish.".format(timestamp)) + game_time = timestamp.coordinates[0] + if (self._flags.simulator_control_frequency == -1 + or self._next_control_sensor_reading is None + or game_time == self._next_control_sensor_reading): + # There was supposed to be a control message for this timestamp + # too. Send the Pose message and continue after the control message + # is received. + self._update_next_control_pseudo_asynchronous_ticks(game_time) + self.__send_hero_vehicle_data(self.pose_stream_for_control, + timestamp) + self.__update_spectactor_pose() + else: + # No pose message was supposed to be sent for this timestamp, we + # need to consume the next event to move the dataflow forward. + self._consume_next_event() + + def on_sensor_ready(self, timestamp: Timestamp): + # The first sensor reading needs to be discarded because it might + # not be correctly spaced out. + if not self._simulator_in_sync: + self._simulator_in_sync = True + + def send_actor_data(self, msg): + """ Callback function that gets called when the world is ticked. + This function sends a WatermarkMessage to the downstream operators as + a signal that they need to release data to the rest of the pipeline. + + Args: + msg: Data recieved from the simulation at a tick. + """ + # Ensure that the callback executes serially. + with self._lock: + game_time = int(msg.elapsed_seconds * 1000) + self._logger.info( + 'The world is at the timestamp {}'.format(game_time)) + timestamp = erdos.Timestamp(coordinates=[game_time]) + with erdos.profile(self.config.name + '.send_actor_data', + self, + event_data={'timestamp': str(timestamp)}): + if (self._flags.simulator_localization_frequency == -1 + or self._next_localization_sensor_reading is None or + game_time == self._next_localization_sensor_reading): + if self._flags.simulator_mode == 'pseudo-asynchronous': + self._update_next_localization_pseudo_async_ticks( + game_time) + self.__send_hero_vehicle_data(self.pose_stream, timestamp) + self.__send_ground_actors_data(timestamp) + self.__update_spectactor_pose() + + if self._flags.simulator_mode == "pseudo-asynchronous" and ( + self._flags.simulator_control_frequency == -1 + or self._next_control_sensor_reading is None + or game_time == self._next_control_sensor_reading): + self._update_next_control_pseudo_asynchronous_ticks( + game_time) + self.__send_hero_vehicle_data(self.pose_stream_for_control, + timestamp) + self.__update_spectactor_pose() + + def _update_next_localization_pseudo_async_ticks(self, game_time: int): + if self._flags.simulator_localization_frequency > -1: + self._next_localization_sensor_reading = ( + game_time + + int(1000 / self._flags.simulator_localization_frequency)) + if not self._simulator_in_sync: + # If this is the first sensor reading, then tick + # one more time because the second sensor reading + # is sometimes delayed by 1 tick. + self._next_localization_sensor_reading += int( + 1000 / self._flags.simulator_fps) + else: + self._next_localization_sensor_reading = ( + game_time + int(1000 / self._flags.simulator_fps)) + heapq.heappush( + self._tick_events, + (self._next_localization_sensor_reading, TickEvent.SENSOR_READ)) + + def _update_next_control_pseudo_asynchronous_ticks(self, game_time: int): + if self._flags.simulator_control_frequency > -1: + self._next_control_sensor_reading = ( + game_time + + int(1000 / self._flags.simulator_control_frequency)) + else: + self._next_control_sensor_reading = ( + game_time + int(1000 / self._flags.simulator_fps)) + if (self._next_control_sensor_reading != + self._next_localization_sensor_reading): + heapq.heappush( + self._tick_events, + (self._next_control_sensor_reading, TickEvent.SENSOR_READ)) + + def run(self): + self.__send_world_data() + # Tick here once to ensure that the driver operators can get a handle + # to the ego vehicle. + # XXX(ionel): Hack to fix a race condition. Driver operators + # register a simulator listen callback only after they've received + # the vehicle id value. We miss frames if we tick before + # they register a listener. Thus, we sleep here a bit to + # give them sufficient time to register a callback. + time.sleep(4) + self._tick_simulator() + time.sleep(4) + # The older CARLA versions require an additional tick to sync + # sensors. + self._world.on_tick(self.send_actor_data) + self._tick_simulator() + + def _initialize_world(self): + """ Setups the world town, and activates the desired weather.""" + if self._simulator_version == '0.9.5': + # TODO (Sukrit) :: ERDOS provides no way to retrieve handles to the + # class objects to do garbage collection. Hence, objects from + # previous runs of the simulation may persist. We need to clean + # them up right now. In future, move this logic to a seperate + # destroy function. + pylot.simulation.utils.reset_world(self._world) + else: + self._world = self._client.load_world('Town{:02d}'.format( + self._flags.simulator_town)) + self._logger.info('Setting the weather to {}'.format( + self._flags.simulator_weather)) + pylot.simulation.utils.set_weather(self._world, + self._flags.simulator_weather) + + def _tick_simulator(self): + if (self._flags.simulator_mode == 'asynchronous-fixed-time-step' + or self._flags.simulator_mode == 'asynchronous'): + # No need to tick when running in these modes. + return + self._world.tick() + + def _tick_simulator_until(self, goal_time: int): + while True: + snapshot = self._world.get_snapshot() + sim_time = int(snapshot.timestamp.elapsed_seconds * 1000) + if sim_time < goal_time: + self._world.tick() + else: + return + + def _apply_control_msg(self, msg: ControlMessage): + # Transform the message to a simulator control cmd. + vec_control = VehicleControl(throttle=msg.throttle, + steer=msg.steer, + brake=msg.brake, + hand_brake=msg.hand_brake, + reverse=msg.reverse) + self._client.apply_batch_sync( + [command.ApplyVehicleControl(self._ego_vehicle.id, vec_control)]) + + def __send_hero_vehicle_data(self, stream: WriteStream, + timestamp: Timestamp): + vec_transform = pylot.utils.Transform.from_simulator_transform( + self._ego_vehicle.get_transform()) + velocity_vector = pylot.utils.Vector3D.from_simulator_vector( + self._ego_vehicle.get_velocity()) + forward_speed = velocity_vector.magnitude() + pose = pylot.utils.Pose(vec_transform, forward_speed, velocity_vector, + timestamp.coordinates[0]) + stream.send(erdos.Message(timestamp, pose)) + stream.send(erdos.WatermarkMessage(timestamp)) + + def __send_ground_actors_data(self, timestamp: Timestamp): + # Get all the actors in the simulation. + actor_list = self._world.get_actors() + + (vehicles, people, traffic_lights, speed_limits, traffic_stops + ) = pylot.simulation.utils.extract_data_in_pylot_format(actor_list) + + # Send ground people and vehicles. + self.ground_obstacles_stream.send( + ObstaclesMessage(timestamp, vehicles + people)) + self.ground_obstacles_stream.send(erdos.WatermarkMessage(timestamp)) + # Send ground traffic lights. + self.ground_traffic_lights_stream.send( + TrafficLightsMessage(timestamp, traffic_lights)) + self.ground_traffic_lights_stream.send( + erdos.WatermarkMessage(timestamp)) + # Send ground speed signs. + self.ground_speed_limit_signs_stream.send( + SpeedSignsMessage(timestamp, speed_limits)) + self.ground_speed_limit_signs_stream.send( + erdos.WatermarkMessage(timestamp)) + # Send stop signs. + self.ground_stop_signs_stream.send( + StopSignsMessage(timestamp, traffic_stops)) + self.ground_stop_signs_stream.send(erdos.WatermarkMessage(timestamp)) + + def __send_world_data(self): + """ Sends ego vehicle id, open drive and trajectory messages.""" + # Send the id of the ego vehicle. This id is used by the driver + # operators to get a handle to the ego vehicle, which they use to + # attach sensors. + self.vehicle_id_stream.send( + erdos.Message(erdos.Timestamp(coordinates=[0]), + self._ego_vehicle.id)) + self.vehicle_id_stream.send( + erdos.WatermarkMessage(erdos.Timestamp(is_top=True))) + + # Send open drive string. + self.open_drive_stream.send( + erdos.Message(erdos.Timestamp(coordinates=[0]), + self._world.get_map().to_opendrive())) + top_watermark = erdos.WatermarkMessage(erdos.Timestamp(is_top=True)) + self.open_drive_stream.send(top_watermark) + self.global_trajectory_stream.send(top_watermark) + + def __update_spectactor_pose(self): + # Set the world simulation view with respect to the vehicle. + v_pose = self._ego_vehicle.get_transform() + v_pose.location -= 10 * Location(v_pose.get_forward_vector()) + v_pose.location.z = 5 + self._spectator.set_transform(v_pose) + + +@total_ordering +class TickEvent(enum.Enum): + CONTROL_CMD = 1 + SENSOR_READ = 2 + + def __lt__(self, other): + if self.__class__ is other.__class__: + return self.value < other.value + return NotImplemented From bab666483f238aeed65c6d55d1568049575ba79c Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Tue, 30 Aug 2022 06:40:52 +0000 Subject: [PATCH 17/20] Add simulator component, helper functions to operator creator --- data_gatherer.py | 7 +- pylot.py | 20 ++- pylot/component_creator.py | 55 ++++++++ pylot/operator_creator.py | 251 +++++++++++++------------------------ scripts/compute_decay.py | 6 +- 5 files changed, 168 insertions(+), 171 deletions(-) diff --git a/data_gatherer.py b/data_gatherer.py index cc8adaccd..1811a2eb4 100644 --- a/data_gatherer.py +++ b/data_gatherer.py @@ -5,6 +5,7 @@ import erdos import pylot.flags +import pylot.component_creator import pylot.operator_creator import pylot.simulation.utils import pylot.utils @@ -61,10 +62,8 @@ def main(argv): ground_stop_signs_stream, vehicle_id_stream, open_drive_stream, - global_trajectory_stream, - ) = pylot.operator_creator.add_simulator_bridge( - control_loop_stream, release_sensor_stream, - pipeline_finish_notify_stream) + ) = pylot.component_creator.add_simulator(control_loop_stream, + pipeline_finish_notify_stream) # Add sensors. rgb_camera_setup = RGBCameraSetup('center_camera', diff --git a/pylot.py b/pylot.py index 64aceff02..9109d9634 100644 --- a/pylot.py +++ b/pylot.py @@ -34,7 +34,7 @@ def driver(): pipeline_finish_notify_stream = erdos.IngestStream() notify_streams = [] - # Create operator that bridges between pipeline and the simulator. + # Create operators that bridge between pipeline and the simulator. ( pose_stream, pose_stream_for_control, @@ -44,12 +44,26 @@ def driver(): ground_stop_signs_stream, vehicle_id_stream, open_drive_stream, - ) = pylot.operator_creator.add_simulator_bridge( # .add_simulator_bridge( + ) = pylot.component_creator.add_simulator( control_loop_stream, - release_sensor_stream, pipeline_finish_notify_stream, ) + # ( + # pose_stream, + # pose_stream_for_control, + # ground_traffic_lights_stream, + # ground_obstacles_stream, + # ground_speed_limit_signs_stream, + # ground_stop_signs_stream, + # vehicle_id_stream, + # open_drive_stream, + # ) = pylot.operator_creator.add_simulator_bridge_old( + # control_loop_stream, + # release_sensor_stream, + # pipeline_finish_notify_stream, + # ) + global_trajectory_stream = erdos.IngestStream() streams_to_send_top_on.append(global_trajectory_stream) diff --git a/pylot/component_creator.py b/pylot/component_creator.py index 61942f717..2cb958ce9 100644 --- a/pylot/component_creator.py +++ b/pylot/component_creator.py @@ -11,6 +11,61 @@ logger = logging.getLogger(__name__) +def add_simulator(control_stream, pipeline_finish_notify_stream): + """Adds operators to interface with and retrieve ground truth information + from the simulator. + + Args: + control_stream: Control commands to actuate the ego vehicle. + pipeline_finish_notify_stream: Sends watermarks when the pipeline + completes execution. Used in pseudo-async mode. + + Returns: + pose_stream: Sends the ego vehicle's pose at the frequency provided by + ``--simulator_localization_frequency``. + pose_stream_for_control: Sends the ego vehicle's pose at the frequency + provided by ``--simulator_control_frequency``. + ground_traffic_lights_stream: Sends the state of all traffic lights. + ground_obstacles_stream: Sends the locations and bounding boxes of all + vehicles and pedestrians. + ground_speed_limit_signs_stream: Sends the locations and values of all + speed limit signs. + ground_stop_signs_stream: Sends the locations and values of all stop + signs. + vehicle_id_stream: Sends the ID of the ego vehicle, followed by a top + watermark. + open_drive_stream: Sends the map in OpenDRIVE format, followed by a top + watermark. + """ + vehicle_id_stream = pylot.operator_creator.add_simulator_bridge( + control_stream, pipeline_finish_notify_stream) + pose_stream = pylot.operator_creator.add_pose( + vehicle_id_stream, FLAGS.simulator_localization_frequency) + pose_stream_for_control = pylot.operator_creator.add_pose( + vehicle_id_stream, FLAGS.simulator_control_frequency, + 'pose_for_control') + ground_traffic_lights_stream = pylot.operator_creator.add_simulator_traffic_lights( + vehicle_id_stream) + ground_obstacles_stream = pylot.operator_creator.add_simulator_obstacles( + vehicle_id_stream) + ground_speed_limit_signs_stream = pylot.operator_creator.add_simulator_speed_limit_signs( + vehicle_id_stream) + ground_stop_signs_stream = pylot.operator_creator.add_simulator_stop_signs( + vehicle_id_stream) + open_drive_stream = pylot.operator_creator.add_simulator_open_drive() + + return ( + pose_stream, + pose_stream_for_control, + ground_traffic_lights_stream, + ground_obstacles_stream, + ground_speed_limit_signs_stream, + ground_stop_signs_stream, + vehicle_id_stream, + open_drive_stream, + ) + + def add_obstacle_detection(center_camera_stream, center_camera_setup=None, pose_stream=None, diff --git a/pylot/operator_creator.py b/pylot/operator_creator.py index 701fb97ae..0cf15419b 100644 --- a/pylot/operator_creator.py +++ b/pylot/operator_creator.py @@ -11,8 +11,7 @@ FLAGS = flags.FLAGS -def add_simulator_bridge(control_stream, sensor_ready_stream, - pipeline_finish_notify_stream): +def add_simulator_bridge(control_stream, pipeline_finish_notify_stream): from pylot.simulation.carla_operator import CarlaOperator op_config = erdos.OperatorConfig(name='simulator_bridge_operator', @@ -20,102 +19,11 @@ def add_simulator_bridge(control_stream, sensor_ready_stream, log_file_name=FLAGS.log_file_name, csv_log_file_name=FLAGS.csv_log_file_name, profile_file_name=FLAGS.profile_file_name) - vehicle_id_stream, = erdos.connect( CarlaOperator, op_config, [control_stream, pipeline_finish_notify_stream], FLAGS) - # Check that Pose matches - from pylot.drivers.carla_pose_driver_operator import CarlaPoseDriverOperator - pose_driver_config = erdos.OperatorConfig( - name='pose_driver_operator', - flow_watermarks=False, - log_file_name=FLAGS.log_file_name, - csv_log_file_name=FLAGS.csv_log_file_name, - profile_file_name=FLAGS.profile_file_name) - pose_stream, = erdos.connect(CarlaPoseDriverOperator, pose_driver_config, - [vehicle_id_stream], - FLAGS.simulator_localization_frequency, FLAGS) - - # Check that Pose for control matches - pose_for_control_config = erdos.OperatorConfig( - name='pose_for_control_driver_operator', - flow_watermarks=False, - log_file_name=FLAGS.log_file_name, - csv_log_file_name=FLAGS.csv_log_file_name, - profile_file_name=FLAGS.profile_file_name) - pose_stream_for_control, = erdos.connect( - CarlaPoseDriverOperator, pose_for_control_config, [vehicle_id_stream], - FLAGS.simulator_localization_frequency, FLAGS) - - # Check that traffic lights match - from pylot.drivers.carla_traffic_lights_driver_operator import CarlaTrafficLightsDriverOperator - ground_traffic_lights_config = erdos.OperatorConfig( - name='carla_traffic_lights_operator', - flow_watermarks=False, - log_file_name=FLAGS.log_file_name, - csv_log_file_name=FLAGS.csv_log_file_name, - profile_file_name=FLAGS.profile_file_name) - ground_traffic_lights_stream, = erdos.connect( - CarlaTrafficLightsDriverOperator, ground_traffic_lights_config, - [vehicle_id_stream], FLAGS.simulator_localization_frequency, FLAGS) - - # Check that obstacles match - from pylot.drivers.carla_obstacles_driver_operator import CarlaObstaclesDriverOperator - ground_obstacles_config = erdos.OperatorConfig( - name='carla_obstacles_operator', - flow_watermarks=False, - log_file_name=FLAGS.log_file_name, - csv_log_file_name=FLAGS.csv_log_file_name, - profile_file_name=FLAGS.profile_file_name) - ground_obstacles_stream, = erdos.connect( - CarlaObstaclesDriverOperator, ground_obstacles_config, - [vehicle_id_stream], FLAGS.simulator_localization_frequency, FLAGS) - - # Check that speed signs match - from pylot.drivers.carla_speed_limit_signs_driver_operator import CarlaSpeedLimitSignsDriverOperator - speed_signs_config = erdos.OperatorConfig( - name='carla_speed_limit_signs_driver_operator', - flow_watermarks=False, - log_file_name=FLAGS.log_file_name, - csv_log_file_name=FLAGS.csv_log_file_name, - profile_file_name=FLAGS.profile_file_name) - ground_speed_limit_signs_stream, = erdos.connect( - CarlaSpeedLimitSignsDriverOperator, speed_signs_config, - [vehicle_id_stream], FLAGS.simulator_localization_frequency, FLAGS) - - # Check that stop signs match - from pylot.drivers.carla_stop_signs_driver_operator import CarlaStopSignsDriverOperator - stop_signs_config = erdos.OperatorConfig( - name='carla_stop_signs_driver_operator', - flow_watermarks=False, - log_file_name=FLAGS.log_file_name, - csv_log_file_name=FLAGS.csv_log_file_name, - profile_file_name=FLAGS.profile_file_name) - ground_stop_signs_stream, = erdos.connect( - CarlaStopSignsDriverOperator, stop_signs_config, [vehicle_id_stream], - FLAGS.simulator_localization_frequency, FLAGS) - - from pylot.drivers.carla_open_drive_driver_operator import CarlaOpenDriveDriverOperator - open_drive_driver_config = erdos.OperatorConfig( - name='carla_open_drive_driver_operator', - flow_watermarks=False, - log_file_name=FLAGS.log_file_name, - csv_log_file_name=FLAGS.csv_log_file_name, - profile_file_name=FLAGS.profile_file_name) - open_drive_stream, = erdos.connect(CarlaOpenDriveDriverOperator, - open_drive_driver_config, [], FLAGS) - - return ( - pose_stream, - pose_stream_for_control, - ground_traffic_lights_stream, - ground_obstacles_stream, - ground_speed_limit_signs_stream, - ground_stop_signs_stream, - vehicle_id_stream, - open_drive_stream, - ) + return vehicle_id_stream def add_simulator_bridge_old(control_stream, sensor_ready_stream, @@ -143,7 +51,6 @@ def add_simulator_bridge_old(control_stream, sensor_ready_stream, FLAGS) # Check that Pose matches - from pylot.drivers.carla_pose_driver_operator import CarlaPoseDriverOperator from pylot.utils import MatchesOperator pose_driver_config = erdos.OperatorConfig( name='pose_driver_operator', @@ -151,11 +58,8 @@ def add_simulator_bridge_old(control_stream, sensor_ready_stream, log_file_name=FLAGS.log_file_name, csv_log_file_name=FLAGS.csv_log_file_name, profile_file_name=FLAGS.profile_file_name) - pose_driver_stream, = erdos.connect(CarlaPoseDriverOperator, - pose_driver_config, - [vehicle_id_stream], - FLAGS.simulator_localization_frequency, - FLAGS) + pose_driver_stream = add_pose(vehicle_id_stream, + FLAGS.simulator_localization_frequency) matches_pose_driver_config = erdos.OperatorConfig( name='matches_pose_driver_operator', @@ -167,15 +71,9 @@ def add_simulator_bridge_old(control_stream, sensor_ready_stream, [pose_stream, pose_driver_stream]) # Check that Pose for control matches - pose_for_control_config = erdos.OperatorConfig( - name='pose_for_control_driver_operator', - flow_watermarks=False, - log_file_name=FLAGS.log_file_name, - csv_log_file_name=FLAGS.csv_log_file_name, - profile_file_name=FLAGS.profile_file_name) - pose_for_control_driver_stream, = erdos.connect( - CarlaPoseDriverOperator, pose_for_control_config, [vehicle_id_stream], - FLAGS.simulator_localization_frequency, FLAGS) + pose_for_control_driver_stream = add_pose( + vehicle_id_stream, flags.simulator_control_frequency, + 'pose_for_control') matches_pose_for_control_driver_config = erdos.OperatorConfig( name='matches_pose_for_control_driver_operator', @@ -187,16 +85,8 @@ def add_simulator_bridge_old(control_stream, sensor_ready_stream, [pose_stream_for_control, pose_for_control_driver_stream]) # Check that traffic lights match - from pylot.drivers.carla_traffic_lights_driver_operator import CarlaTrafficLightsDriverOperator - ground_traffic_lights_config = erdos.OperatorConfig( - name='carla_traffic_lights_operator', - flow_watermarks=False, - log_file_name=FLAGS.log_file_name, - csv_log_file_name=FLAGS.csv_log_file_name, - profile_file_name=FLAGS.profile_file_name) - traffic_lights_driver_stream, = erdos.connect( - CarlaTrafficLightsDriverOperator, ground_traffic_lights_config, - [vehicle_id_stream], FLAGS.simulator_localization_frequency, FLAGS) + traffic_lights_driver_stream = add_simulator_traffic_lights( + vehicle_id_stream) matches_ground_obstacles_config = erdos.OperatorConfig( name='matches_ground_traffic_lights_operator', @@ -208,16 +98,7 @@ def add_simulator_bridge_old(control_stream, sensor_ready_stream, [traffic_lights_driver_stream, ground_traffic_lights_stream]) # Check that obstacles match - from pylot.drivers.carla_obstacles_driver_operator import CarlaObstaclesDriverOperator - ground_obstacles_config = erdos.OperatorConfig( - name='carla_obstacles_operator', - flow_watermarks=False, - log_file_name=FLAGS.log_file_name, - csv_log_file_name=FLAGS.csv_log_file_name, - profile_file_name=FLAGS.profile_file_name) - obstacles_driver_stream, = erdos.connect( - CarlaObstaclesDriverOperator, ground_obstacles_config, - [vehicle_id_stream], FLAGS.simulator_localization_frequency, FLAGS) + obstacles_driver_stream = add_simulator_obstacles(vehicle_id_stream) matches_ground_obstacles_config = erdos.OperatorConfig( name='matches_obstacles_operator', @@ -229,18 +110,9 @@ def add_simulator_bridge_old(control_stream, sensor_ready_stream, [ground_obstacles_stream, obstacles_driver_stream]) # Check that speed signs match - from pylot.drivers.carla_speed_limit_signs_driver_operator import CarlaSpeedLimitSignsDriverOperator - speed_signs_config = erdos.OperatorConfig( - name='carla_speed_limit_signs_driver_operator', - flow_watermarks=False, - log_file_name=FLAGS.log_file_name, - csv_log_file_name=FLAGS.csv_log_file_name, - profile_file_name=FLAGS.profile_file_name) - speed_limit_signs_driver_stream, = erdos.connect( - CarlaSpeedLimitSignsDriverOperator, speed_signs_config, - [vehicle_id_stream], FLAGS.simulator_localization_frequency, FLAGS) + speed_limit_signs_driver_stream = add_simulator_speed_limit_signs( + vehicle_id_stream) - # Check that speed limit signs match matches_speed_limit_signs_config = erdos.OperatorConfig( name='matches_speed_limit_signs_operator', flow_watermarks=False, @@ -252,18 +124,8 @@ def add_simulator_bridge_old(control_stream, sensor_ready_stream, [ground_speed_limit_signs_stream, speed_limit_signs_driver_stream]) # Check that stop signs match - from pylot.drivers.carla_stop_signs_driver_operator import CarlaStopSignsDriverOperator - stop_signs_config = erdos.OperatorConfig( - name='carla_stop_signs_driver_operator', - flow_watermarks=False, - log_file_name=FLAGS.log_file_name, - csv_log_file_name=FLAGS.csv_log_file_name, - profile_file_name=FLAGS.profile_file_name) - stop_signs_driver_stream, = erdos.connect( - CarlaStopSignsDriverOperator, stop_signs_config, [vehicle_id_stream], - FLAGS.simulator_localization_frequency, FLAGS) + stop_signs_driver_stream = add_simulator_stop_signs(vehicle_id_stream) - # Check that stop signs match matches_stop_signs_config = erdos.OperatorConfig( name='matches_stop_signs_operator', flow_watermarks=False, @@ -274,16 +136,7 @@ def add_simulator_bridge_old(control_stream, sensor_ready_stream, [ground_stop_signs_stream, stop_signs_driver_stream]) # Check that open drive matches - from pylot.drivers.carla_open_drive_driver_operator import CarlaOpenDriveDriverOperator - open_drive_driver_config = erdos.OperatorConfig( - name='carla_open_drive_driver_operator', - flow_watermarks=False, - log_file_name=FLAGS.log_file_name, - csv_log_file_name=FLAGS.csv_log_file_name, - profile_file_name=FLAGS.profile_file_name) - open_drive_driver_stream, = erdos.connect(CarlaOpenDriveDriverOperator, - open_drive_driver_config, [], - FLAGS) + open_drive_driver_stream = add_simulator_open_drive() matches_open_drive_config = erdos.OperatorConfig( name='matches_open_drive_operator', @@ -880,6 +733,82 @@ def add_localization(imu_stream, return pose_stream +def add_pose(vehicle_id_stream, frequency: float, name: str = 'pose'): + from pylot.drivers.carla_pose_driver_operator import CarlaPoseDriverOperator + pose_driver_config = erdos.OperatorConfig( + name=name + '_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + return erdos.connect(CarlaPoseDriverOperator, pose_driver_config, + [vehicle_id_stream], frequency, FLAGS)[0] + + +def add_simulator_traffic_lights(vehicle_id_stream): + from pylot.drivers.carla_traffic_lights_driver_operator import CarlaTrafficLightsDriverOperator + ground_traffic_lights_config = erdos.OperatorConfig( + name='simulator_traffic_lights_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + return erdos.connect(CarlaTrafficLightsDriverOperator, + ground_traffic_lights_config, [vehicle_id_stream], + FLAGS.simulator_localization_frequency, FLAGS)[0] + + +def add_simulator_obstacles(vehicle_id_stream): + from pylot.drivers.carla_obstacles_driver_operator import CarlaObstaclesDriverOperator + ground_obstacles_config = erdos.OperatorConfig( + name='simulator_obstacles_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + return erdos.connect(CarlaObstaclesDriverOperator, ground_obstacles_config, + [vehicle_id_stream], + FLAGS.simulator_localization_frequency, FLAGS)[0] + + +def add_simulator_speed_limit_signs(vehicle_id_stream): + from pylot.drivers.carla_speed_limit_signs_driver_operator import CarlaSpeedLimitSignsDriverOperator + speed_limit_signs_config = erdos.OperatorConfig( + name='simulator_speed_signs_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + return erdos.connect(CarlaSpeedLimitSignsDriverOperator, + speed_limit_signs_config, [vehicle_id_stream], + FLAGS.simulator_localization_frequency, FLAGS)[0] + + +def add_simulator_stop_signs(vehicle_id_stream): + from pylot.drivers.carla_stop_signs_driver_operator import CarlaStopSignsDriverOperator + stop_signs_config = erdos.OperatorConfig( + name='simulator_stop_signs_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + return erdos.connect(CarlaStopSignsDriverOperator, stop_signs_config, + [vehicle_id_stream], + FLAGS.simulator_localization_frequency, FLAGS)[0] + + +def add_simulator_open_drive(): + from pylot.drivers.carla_open_drive_driver_operator import CarlaOpenDriveDriverOperator + open_drive_driver_config = erdos.OperatorConfig( + name='simulator_open_drive_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + return erdos.connect(CarlaOpenDriveDriverOperator, + open_drive_driver_config, [], FLAGS)[0] + + def add_fusion(pose_stream, obstacles_stream, depth_stream, ground_obstacles_stream): from pylot.perception.fusion.fusion_operator import FusionOperator diff --git a/scripts/compute_decay.py b/scripts/compute_decay.py index 88a39e4cb..42d4b3f83 100644 --- a/scripts/compute_decay.py +++ b/scripts/compute_decay.py @@ -4,6 +4,7 @@ import erdos import pylot.flags +import pylot.component_creator import pylot.operator_creator import pylot.utils from pylot.control.messages import ControlMessage @@ -60,9 +61,8 @@ def main(argv): ground_stop_signs_stream, vehicle_id_stream, open_drive_stream, - global_trajectory_stream, - ) = pylot.operator_creator.add_simulator_bridge(control_loop_stream, - release_sensor_stream) + ) = pylot.component_creator.add_simulator(control_loop_stream, + release_sensor_stream) # Add camera sensors. rgb_camera_setup = RGBCameraSetup('center_camera', From 49a4afe75d15874ed274b479f12cc52a97cf8670 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Tue, 30 Aug 2022 06:43:32 +0000 Subject: [PATCH 18/20] Formatting --- pylot/utils.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pylot/utils.py b/pylot/utils.py index b60ed19f9..d7486ebc4 100644 --- a/pylot/utils.py +++ b/pylot/utils.py @@ -1127,9 +1127,9 @@ def __init__(self, left_stream, right_stream, matches_fn=matches_data): left_stream.add_callback(self.on_left_stream) right_stream.add_callback(self.on_right_stream) left_stream.add_watermark_callback( - lambda t: self._logger.debug(f"@{t}: got left watermark")) + lambda t: self._logger.debug("@{}: got left watermark".format(t))) right_stream.add_watermark_callback( - lambda t: self._logger.debug(f"@{t}: got right watermark")) + lambda t: self._logger.debug("@{}: got right watermark".format(t))) erdos.add_watermark_callback([left_stream, right_stream], [], self.on_watermark) @@ -1152,8 +1152,8 @@ def on_watermark(self, t): matches = self._matches_fn(left_msgs, right_msgs) if matches: - self._logger.debug(f"@{t}: left matches right") + self._logger.debug("@{}: left matches right".format(t)) else: self._logger.warn( - f"@{t}: left does not match right\n\tleft: {left_msgs}\n\tright: {right_msgs}" - ) \ No newline at end of file + "@{}: left does not match right\n\tleft: {}\n\tright: {}". + format(t, left_msgs, right_msgs)) From a21d27753a7bf36d6987763868533f3c860314fa Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Tue, 30 Aug 2022 06:52:00 +0000 Subject: [PATCH 19/20] Formatting --- pylot/component_creator.py | 13 +++++++------ pylot/operator_creator.py | 25 ++++++++++++------------- pylot/utils.py | 5 ++--- 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/pylot/component_creator.py b/pylot/component_creator.py index 2cb958ce9..daa9541d0 100644 --- a/pylot/component_creator.py +++ b/pylot/component_creator.py @@ -14,12 +14,12 @@ def add_simulator(control_stream, pipeline_finish_notify_stream): """Adds operators to interface with and retrieve ground truth information from the simulator. - + Args: control_stream: Control commands to actuate the ego vehicle. pipeline_finish_notify_stream: Sends watermarks when the pipeline completes execution. Used in pseudo-async mode. - + Returns: pose_stream: Sends the ego vehicle's pose at the frequency provided by ``--simulator_localization_frequency``. @@ -44,12 +44,13 @@ def add_simulator(control_stream, pipeline_finish_notify_stream): pose_stream_for_control = pylot.operator_creator.add_pose( vehicle_id_stream, FLAGS.simulator_control_frequency, 'pose_for_control') - ground_traffic_lights_stream = pylot.operator_creator.add_simulator_traffic_lights( - vehicle_id_stream) + ground_traffic_lights_stream = ( + pylot.operator_creator.add_simulator_traffic_lights(vehicle_id_stream)) ground_obstacles_stream = pylot.operator_creator.add_simulator_obstacles( vehicle_id_stream) - ground_speed_limit_signs_stream = pylot.operator_creator.add_simulator_speed_limit_signs( - vehicle_id_stream) + ground_speed_limit_signs_stream = ( + pylot.operator_creator.add_simulator_speed_limit_signs( + vehicle_id_stream)) ground_stop_signs_stream = pylot.operator_creator.add_simulator_stop_signs( vehicle_id_stream) open_drive_stream = pylot.operator_creator.add_simulator_open_drive() diff --git a/pylot/operator_creator.py b/pylot/operator_creator.py index 0cf15419b..7a6349e12 100644 --- a/pylot/operator_creator.py +++ b/pylot/operator_creator.py @@ -1,7 +1,6 @@ from absl import flags import erdos -from pylot.drivers.carla_speed_limit_signs_driver_operator import CarlaSpeedLimitSignsDriverOperator import pylot.utils @@ -52,12 +51,6 @@ def add_simulator_bridge_old(control_stream, sensor_ready_stream, # Check that Pose matches from pylot.utils import MatchesOperator - pose_driver_config = erdos.OperatorConfig( - name='pose_driver_operator', - flow_watermarks=False, - log_file_name=FLAGS.log_file_name, - csv_log_file_name=FLAGS.csv_log_file_name, - profile_file_name=FLAGS.profile_file_name) pose_driver_stream = add_pose(vehicle_id_stream, FLAGS.simulator_localization_frequency) @@ -734,7 +727,8 @@ def add_localization(imu_stream, def add_pose(vehicle_id_stream, frequency: float, name: str = 'pose'): - from pylot.drivers.carla_pose_driver_operator import CarlaPoseDriverOperator + from pylot.drivers.carla_pose_driver_operator import ( + CarlaPoseDriverOperator) pose_driver_config = erdos.OperatorConfig( name=name + '_operator', flow_watermarks=False, @@ -746,7 +740,8 @@ def add_pose(vehicle_id_stream, frequency: float, name: str = 'pose'): def add_simulator_traffic_lights(vehicle_id_stream): - from pylot.drivers.carla_traffic_lights_driver_operator import CarlaTrafficLightsDriverOperator + from pylot.drivers.carla_traffic_lights_driver_operator import ( + CarlaTrafficLightsDriverOperator) ground_traffic_lights_config = erdos.OperatorConfig( name='simulator_traffic_lights_operator', flow_watermarks=False, @@ -759,7 +754,8 @@ def add_simulator_traffic_lights(vehicle_id_stream): def add_simulator_obstacles(vehicle_id_stream): - from pylot.drivers.carla_obstacles_driver_operator import CarlaObstaclesDriverOperator + from pylot.drivers.carla_obstacles_driver_operator import ( + CarlaObstaclesDriverOperator) ground_obstacles_config = erdos.OperatorConfig( name='simulator_obstacles_operator', flow_watermarks=False, @@ -772,7 +768,8 @@ def add_simulator_obstacles(vehicle_id_stream): def add_simulator_speed_limit_signs(vehicle_id_stream): - from pylot.drivers.carla_speed_limit_signs_driver_operator import CarlaSpeedLimitSignsDriverOperator + from pylot.drivers.carla_speed_limit_signs_driver_operator import ( + CarlaSpeedLimitSignsDriverOperator) speed_limit_signs_config = erdos.OperatorConfig( name='simulator_speed_signs_operator', flow_watermarks=False, @@ -785,7 +782,8 @@ def add_simulator_speed_limit_signs(vehicle_id_stream): def add_simulator_stop_signs(vehicle_id_stream): - from pylot.drivers.carla_stop_signs_driver_operator import CarlaStopSignsDriverOperator + from pylot.drivers.carla_stop_signs_driver_operator import ( + CarlaStopSignsDriverOperator) stop_signs_config = erdos.OperatorConfig( name='simulator_stop_signs_operator', flow_watermarks=False, @@ -798,7 +796,8 @@ def add_simulator_stop_signs(vehicle_id_stream): def add_simulator_open_drive(): - from pylot.drivers.carla_open_drive_driver_operator import CarlaOpenDriveDriverOperator + from pylot.drivers.carla_open_drive_driver_operator import ( + CarlaOpenDriveDriverOperator) open_drive_driver_config = erdos.OperatorConfig( name='simulator_open_drive_operator', flow_watermarks=False, diff --git a/pylot/utils.py b/pylot/utils.py index d7486ebc4..0d271cbd5 100644 --- a/pylot/utils.py +++ b/pylot/utils.py @@ -4,6 +4,8 @@ import numpy as np +import erdos + class Rotation(object): """Used to represent the rotation of an actor or obstacle. @@ -1105,9 +1107,6 @@ def verify_keys_in_dict(required_keys, arg_dict): "one or more of {} not found in {}".format(required_keys, arg_dict) -import erdos - - def matches_data(left_msgs, right_msgs): length_matches = (len(left_msgs) == len(right_msgs)) return length_matches and all( From aeb71ad69c82e7638a4531c41926a232edbbd481 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Tue, 30 Aug 2022 06:57:15 +0000 Subject: [PATCH 20/20] Formatting --- pylot/drivers/carla_base_gnss_driver_operator.py | 9 ++++----- pylot/drivers/carla_obstacles_driver_operator.py | 6 +++--- pylot/drivers/carla_pose_driver_operator.py | 4 ++-- .../drivers/carla_speed_limit_signs_driver_operator.py | 6 +++--- pylot/drivers/carla_stop_signs_driver_operator.py | 6 +++--- pylot/drivers/carla_traffic_lights_driver_operator.py | 10 ++++------ 6 files changed, 19 insertions(+), 22 deletions(-) diff --git a/pylot/drivers/carla_base_gnss_driver_operator.py b/pylot/drivers/carla_base_gnss_driver_operator.py index 364589838..a4a8f8449 100644 --- a/pylot/drivers/carla_base_gnss_driver_operator.py +++ b/pylot/drivers/carla_base_gnss_driver_operator.py @@ -4,17 +4,16 @@ The operator attaches a GNSS sensor to the ego vehicle, receives GNSS measurements from the simulator, and invokes the user-defined callback. """ - -from abc import abstractmethod import threading +from abc import abstractmethod import carla import erdos +from pylot.drivers.sensor_setup import GNSSSetup from pylot.simulation.utils import get_vehicle_handle, get_world, \ set_simulation_mode -from pylot.drivers.sensor_setup import GNSSSetup class CarlaBaseGNSSDriverOperator(erdos.Operator): @@ -42,7 +41,7 @@ class CarlaBaseGNSSDriverOperator(erdos.Operator): Initialized once the vehicle ID is received. _log (threading.Lock): used to ensure that only 1 GNSS reading is processed at a time. - + Args: vehicle_id_stream: Stream on which the operator receives the id of the @@ -83,7 +82,7 @@ def connect(ground_vehicle_id_stream): def process_gnss(self, timestamp: erdos.Timestamp, gnss_msg: carla.GnssMeasurement): """Invoked when a GNSS measurement is received from the simulator. - + Note: Only 1 invocation of this callback will run at a time. """ diff --git a/pylot/drivers/carla_obstacles_driver_operator.py b/pylot/drivers/carla_obstacles_driver_operator.py index c3ed7c8a0..cc267435c 100644 --- a/pylot/drivers/carla_obstacles_driver_operator.py +++ b/pylot/drivers/carla_obstacles_driver_operator.py @@ -2,17 +2,17 @@ import erdos -from pylot.perception.detection.obstacle import Obstacle -from pylot.perception.messages import ObstaclesMessage import pylot.utils from pylot.drivers.carla_base_gnss_driver_operator import ( CarlaBaseGNSSDriverOperator) +from pylot.perception.detection.obstacle import Obstacle +from pylot.perception.messages import ObstaclesMessage class CarlaObstaclesDriverOperator(CarlaBaseGNSSDriverOperator): """Publishes the bounding boxes of all vehicles and people retrieved from the simulator at the provided frequency. - + Args: vehicle_id_stream: Stream on which the operator receives the ID of the ego vehicle. The ID is used to get a simulator handle to the diff --git a/pylot/drivers/carla_pose_driver_operator.py b/pylot/drivers/carla_pose_driver_operator.py index 17980afee..51c06c45d 100644 --- a/pylot/drivers/carla_pose_driver_operator.py +++ b/pylot/drivers/carla_pose_driver_operator.py @@ -10,11 +10,11 @@ class CarlaPoseDriverOperator(CarlaBaseGNSSDriverOperator): """Publishes the pose (location, orientation, and velocity) at the provided frequency. - + This operator attaches to the vehicle using the vehicle ID provided by the ``vehicle_id_stream``, registers callback functions to retrieve the pose at the provided frequency, and publishes it to downstream operators. - + Args: vehicle_id_stream: Stream on which the operator receives the ID of the ego vehicle. The ID is used to get a simulator handle to the diff --git a/pylot/drivers/carla_speed_limit_signs_driver_operator.py b/pylot/drivers/carla_speed_limit_signs_driver_operator.py index ef93e0cb7..bf92accd3 100644 --- a/pylot/drivers/carla_speed_limit_signs_driver_operator.py +++ b/pylot/drivers/carla_speed_limit_signs_driver_operator.py @@ -2,17 +2,17 @@ import erdos -from pylot.perception.detection.speed_limit_sign import SpeedLimitSign -from pylot.perception.messages import SpeedSignsMessage import pylot.utils from pylot.drivers.carla_base_gnss_driver_operator import ( CarlaBaseGNSSDriverOperator) +from pylot.perception.detection.speed_limit_sign import SpeedLimitSign +from pylot.perception.messages import SpeedSignsMessage class CarlaSpeedLimitSignsDriverOperator(CarlaBaseGNSSDriverOperator): """Publishes the locations and values of all speed limit signs retrieved from the simulator at the provided frequency. - + Args: vehicle_id_stream: Stream on which the operator receives the ID of the ego vehicle. The ID is used to get a simulator handle to the diff --git a/pylot/drivers/carla_stop_signs_driver_operator.py b/pylot/drivers/carla_stop_signs_driver_operator.py index 0996a58d2..c1a95e5b8 100644 --- a/pylot/drivers/carla_stop_signs_driver_operator.py +++ b/pylot/drivers/carla_stop_signs_driver_operator.py @@ -2,17 +2,17 @@ import erdos -from pylot.perception.detection.stop_sign import StopSign -from pylot.perception.messages import StopSignsMessage import pylot.utils from pylot.drivers.carla_base_gnss_driver_operator import ( CarlaBaseGNSSDriverOperator) +from pylot.perception.detection.stop_sign import StopSign +from pylot.perception.messages import StopSignsMessage class CarlaStopSignsDriverOperator(CarlaBaseGNSSDriverOperator): """Publishes the locations and values of all stop signs retrieved from the simulator at the provided frequency. - + Args: vehicle_id_stream: Stream on which the operator receives the ID of the ego vehicle. The ID is used to get a simulator handle to the diff --git a/pylot/drivers/carla_traffic_lights_driver_operator.py b/pylot/drivers/carla_traffic_lights_driver_operator.py index 969ce832d..5f05e95a9 100644 --- a/pylot/drivers/carla_traffic_lights_driver_operator.py +++ b/pylot/drivers/carla_traffic_lights_driver_operator.py @@ -1,23 +1,21 @@ -import threading - import carla import erdos import pylot.utils -from pylot.perception.detection.traffic_light import TrafficLight -from pylot.perception.messages import TrafficLightsMessage from pylot.drivers.carla_base_gnss_driver_operator import ( CarlaBaseGNSSDriverOperator) +from pylot.perception.detection.traffic_light import TrafficLight +from pylot.perception.messages import TrafficLightsMessage class CarlaTrafficLightsDriverOperator(CarlaBaseGNSSDriverOperator): """Publishes the location and state of all traffic lights. - + This operator attaches to the vehicle using the vehicle ID provided by the ``vehicle_id_stream``, registers callback functions to retrieve the state of the traffic lights, and publishes them to downstream operators. - + Args: vehicle_id_stream: Stream on which the operator receives the ID of the ego vehicle. The ID is used to get a simulator handle to the