diff --git a/data_gatherer.py b/data_gatherer.py index cc8adaccd..1811a2eb4 100644 --- a/data_gatherer.py +++ b/data_gatherer.py @@ -5,6 +5,7 @@ import erdos import pylot.flags +import pylot.component_creator import pylot.operator_creator import pylot.simulation.utils import pylot.utils @@ -61,10 +62,8 @@ def main(argv): ground_stop_signs_stream, vehicle_id_stream, open_drive_stream, - global_trajectory_stream, - ) = pylot.operator_creator.add_simulator_bridge( - control_loop_stream, release_sensor_stream, - pipeline_finish_notify_stream) + ) = pylot.component_creator.add_simulator(control_loop_stream, + pipeline_finish_notify_stream) # Add sensors. rgb_camera_setup = RGBCameraSetup('center_camera', diff --git a/pylot.py b/pylot.py index 1d7665e74..9109d9634 100644 --- a/pylot.py +++ b/pylot.py @@ -34,7 +34,7 @@ def driver(): pipeline_finish_notify_stream = erdos.IngestStream() notify_streams = [] - # Create operator that bridges between pipeline and the simulator. + # Create operators that bridge between pipeline and the simulator. ( pose_stream, pose_stream_for_control, @@ -44,13 +44,29 @@ def driver(): ground_stop_signs_stream, vehicle_id_stream, open_drive_stream, - global_trajectory_stream, - ) = pylot.operator_creator.add_simulator_bridge( + ) = pylot.component_creator.add_simulator( control_loop_stream, - release_sensor_stream, pipeline_finish_notify_stream, ) + # ( + # pose_stream, + # pose_stream_for_control, + # ground_traffic_lights_stream, + # ground_obstacles_stream, + # ground_speed_limit_signs_stream, + # ground_stop_signs_stream, + # vehicle_id_stream, + # open_drive_stream, + # ) = pylot.operator_creator.add_simulator_bridge_old( + # control_loop_stream, + # release_sensor_stream, + # pipeline_finish_notify_stream, + # ) + + global_trajectory_stream = erdos.IngestStream() + streams_to_send_top_on.append(global_trajectory_stream) + # Add sensors. center_camera_setup = RGBCameraSetup('center_camera', FLAGS.camera_image_width, diff --git a/pylot/component_creator.py b/pylot/component_creator.py index 61942f717..daa9541d0 100644 --- a/pylot/component_creator.py +++ b/pylot/component_creator.py @@ -11,6 +11,62 @@ logger = logging.getLogger(__name__) +def add_simulator(control_stream, pipeline_finish_notify_stream): + """Adds operators to interface with and retrieve ground truth information + from the simulator. + + Args: + control_stream: Control commands to actuate the ego vehicle. + pipeline_finish_notify_stream: Sends watermarks when the pipeline + completes execution. Used in pseudo-async mode. + + Returns: + pose_stream: Sends the ego vehicle's pose at the frequency provided by + ``--simulator_localization_frequency``. + pose_stream_for_control: Sends the ego vehicle's pose at the frequency + provided by ``--simulator_control_frequency``. + ground_traffic_lights_stream: Sends the state of all traffic lights. + ground_obstacles_stream: Sends the locations and bounding boxes of all + vehicles and pedestrians. + ground_speed_limit_signs_stream: Sends the locations and values of all + speed limit signs. + ground_stop_signs_stream: Sends the locations and values of all stop + signs. + vehicle_id_stream: Sends the ID of the ego vehicle, followed by a top + watermark. + open_drive_stream: Sends the map in OpenDRIVE format, followed by a top + watermark. + """ + vehicle_id_stream = pylot.operator_creator.add_simulator_bridge( + control_stream, pipeline_finish_notify_stream) + pose_stream = pylot.operator_creator.add_pose( + vehicle_id_stream, FLAGS.simulator_localization_frequency) + pose_stream_for_control = pylot.operator_creator.add_pose( + vehicle_id_stream, FLAGS.simulator_control_frequency, + 'pose_for_control') + ground_traffic_lights_stream = ( + pylot.operator_creator.add_simulator_traffic_lights(vehicle_id_stream)) + ground_obstacles_stream = pylot.operator_creator.add_simulator_obstacles( + vehicle_id_stream) + ground_speed_limit_signs_stream = ( + pylot.operator_creator.add_simulator_speed_limit_signs( + vehicle_id_stream)) + ground_stop_signs_stream = pylot.operator_creator.add_simulator_stop_signs( + vehicle_id_stream) + open_drive_stream = pylot.operator_creator.add_simulator_open_drive() + + return ( + pose_stream, + pose_stream_for_control, + ground_traffic_lights_stream, + ground_obstacles_stream, + ground_speed_limit_signs_stream, + ground_stop_signs_stream, + vehicle_id_stream, + open_drive_stream, + ) + + def add_obstacle_detection(center_camera_stream, center_camera_setup=None, pose_stream=None, diff --git a/pylot/drivers/carla_base_gnss_driver_operator.py b/pylot/drivers/carla_base_gnss_driver_operator.py new file mode 100644 index 000000000..a4a8f8449 --- /dev/null +++ b/pylot/drivers/carla_base_gnss_driver_operator.py @@ -0,0 +1,145 @@ +"""This module implements an operator which executes a callback at a the +provided frequency. + +The operator attaches a GNSS sensor to the ego vehicle, receives GNSS +measurements from the simulator, and invokes the user-defined callback. +""" +import threading +from abc import abstractmethod + +import carla + +import erdos + +from pylot.drivers.sensor_setup import GNSSSetup +from pylot.simulation.utils import get_vehicle_handle, get_world, \ + set_simulation_mode + + +class CarlaBaseGNSSDriverOperator(erdos.Operator): + """Invokes ``process_gnss`` at the provided frequency. + + This operator attaches to a vehicle at the required position with respect + to the vehicle, registers callback functions to execute a ``process_gnss`` + at the provided frequency. + + Attributes: + _vehicle_id_stream (ReadStream): Stream on which the operator receives + the id of the ego vehicle. The ID is used to get a simulator handle + to the vehicle. + _output_stream (WriteStream): Stream on which the operator sends + messages. + _flags: Object to be used to access absl flags. + _logger (logging.Logger): Used to log information. + _gnss_setup (GNSSSetup): Setup of the GNSS sensor. + _frequency (float): Rate at which the callback is invoked. + _vehicle (Optional[carla.Vehicle]): Handle to the CARLA vehicle. + Initialized once the vehicle ID is received. + _world (Optional[carla.World]): Handle to the CARLA world. Initialized + once the vehicle ID is received. + _gnss (Optional[carla.Actor]): Handle to the CARLA GNSS sensor. + Initialized once the vehicle ID is received. + _log (threading.Lock): used to ensure that only 1 GNSS reading is + processed at a time. + + + Args: + vehicle_id_stream: Stream on which the operator receives the id of the + ego vehicle. The ID is used to get a simulator handle to the + vehicle. + output_stream: Stream on which the operator sends messages. + gnss_setup: Setup of the GNSS sensor. + frequency: Rate at which the pose is published, in Hertz. Set to -1 to + invoke on every simulator tick. + flags: Object used to access absl flags. + """ + def __init__(self, vehicle_id_stream: erdos.ReadStream, + output_stream: erdos.WriteStream, gnss_setup: GNSSSetup, + frequency: float, flags): + # Save the streams. + self._vehicle_id_stream = vehicle_id_stream + self._output_stream = output_stream + + # Save the flags and initialize logging. + self._flags = flags + self._logger = erdos.utils.setup_logging(self.config.name, + self.config.log_file_name) + + # Save the setup, the vehicle, the world, and the sensor. + self._gnss_setup = gnss_setup + self._frequency = frequency + self._vehicle = None + self._world = None + self._gnss = None + self._lock = threading.Lock() + + @staticmethod + def connect(ground_vehicle_id_stream): + gnss_stream = erdos.WriteStream() + return [gnss_stream] + + @abstractmethod + def process_gnss(self, timestamp: erdos.Timestamp, + gnss_msg: carla.GnssMeasurement): + """Invoked when a GNSS measurement is received from the simulator. + + Note: + Only 1 invocation of this callback will run at a time. + """ + raise NotImplementedError + + def on_gnss(self, gnss_msg: carla.GnssMeasurement): + """Invoked when a GNSS measurement is received from the simulator. + """ + game_time = int(gnss_msg.timestamp * 1000) + timestamp = erdos.Timestamp(coordinates=[game_time]) + with erdos.profile(self.config.name + '.process_gnss', + self, + event_data={'timestamp': str(timestamp)}): + with self._lock: + self.process_gnss(timestamp, gnss_msg) + + def run(self): + # Read the vehicle ID from the vehicle ID stream. + vehicle_id = self._vehicle_id_stream.read().data + self._logger.debug("received the vehicle id: {}".format(vehicle_id)) + + # Connect to the world. + _, self._world = get_world(self._flags.simulator_host, + self._flags.simulator_port, + self._flags.simulator_timeout) + set_simulation_mode(self._world, self._flags) + + # Retrieve the vehicle and install the GNSS sensor. + self._vehicle = get_vehicle_handle(self._world, vehicle_id) + gnss_blueprint = self._world.get_blueprint_library().find( + 'sensor.other.gnss') + + # Set the noise and bias parameters. + gnss_blueprint.set_attribute('noise_alt_stddev', + str(self._flags.gnss_noise_stddev_alt)) + gnss_blueprint.set_attribute('noise_lat_stddev', + str(self._flags.gnss_noise_stddev_lat)) + gnss_blueprint.set_attribute('noise_lon_stddev', + str(self._flags.gnss_noise_stddev_lon)) + gnss_blueprint.set_attribute('noise_alt_bias', + str(self._flags.gnss_bias_alt)) + gnss_blueprint.set_attribute('noise_lat_bias', + str(self._flags.gnss_bias_lat)) + gnss_blueprint.set_attribute('noise_lon_bias', + str(self._flags.gnss_bias_lon)) + + if self._frequency == -1: + gnss_blueprint.set_attribute('sensor_tick', '0.0') + else: + gnss_blueprint.set_attribute('sensor_tick', + str(1.0 / self._frequency)) + transform = self._gnss_setup.get_transform().as_simulator_transform() + self._logger.debug("Spawning a GNSS sensor: {}".format( + self._gnss_setup)) + self._gnss = self._world.spawn_actor(gnss_blueprint, + transform, + attach_to=self._vehicle) + + # Register the callback on the GNSS sensor. + self._gnss.listen(self.on_gnss) diff --git a/pylot/drivers/carla_gnss_driver_operator.py b/pylot/drivers/carla_gnss_driver_operator.py index 7121d46b1..adbdccda3 100644 --- a/pylot/drivers/carla_gnss_driver_operator.py +++ b/pylot/drivers/carla_gnss_driver_operator.py @@ -4,18 +4,18 @@ The operator attaches a GNSS sensor to the ego vehicle, receives GNSS measurements from the simulator, and sends them on its output stream. """ - -import threading +import carla import erdos +from pylot.drivers.carla_base_gnss_driver_operator import ( + CarlaBaseGNSSDriverOperator) +from pylot.drivers.sensor_setup import GNSSSetup from pylot.localization.messages import GNSSMessage -from pylot.simulation.utils import get_vehicle_handle, get_world, \ - set_simulation_mode from pylot.utils import Transform -class CarlaGNSSDriverOperator(erdos.Operator): +class CarlaGNSSDriverOperator(CarlaBaseGNSSDriverOperator): """Publishes GNSSMessages (transform, altitude, latitude, longitude) from the GNSS sensor. @@ -24,99 +24,29 @@ class CarlaGNSSDriverOperator(erdos.Operator): measurements and publishes it to downstream operators. Args: - ground_vehicle_id_stream (:py:class:`erdos.ReadStream`): Stream on - which the operator receives the id of the ego vehicle. It uses this - id to get a simulator handle to the vehicle. - gnss_stream (:py:class:`erdos.WriteStream`): Stream on which the - operator sends GNSS info. - gnss_setup (:py:class:`pylot.drivers.sensor_setup.GNSSSetup`): - Setup of the GNSS sensor. - flags (absl.flags): Object to be used to access absl flags. + vehicle_id_stream: Stream on which the operator receives the id of the + ego vehicle. The ID is used to get a simulator handle to the + vehicle. + gnss_stream: Stream on which the operator sends GNSS info. + gnss_setup: Setup of the GNSS sensor. + flags: Object used to access absl flags. """ def __init__(self, ground_vehicle_id_stream: erdos.ReadStream, - gnss_stream: erdos.WriteStream, gnss_setup, flags): - # Save the streams. - self._vehicle_id_stream = ground_vehicle_id_stream - self._gnss_stream = gnss_stream - - # Save the flags and initialize logging. - self._flags = flags - self._logger = erdos.utils.setup_logging(self.config.name, - self.config.log_file_name) - - # Save the setup, the vehicle and the sensor. - self._gnss_setup = gnss_setup - self._vehicle = None - self._gnss = None - self._lock = threading.Lock() + gnss_stream: erdos.WriteStream, gnss_setup: GNSSSetup, flags): - @staticmethod - def connect(ground_vehicle_id_stream): - gnss_stream = erdos.WriteStream() - return [gnss_stream] + frequency = flags.simulator_gnss_frequency + super().__init__(ground_vehicle_id_stream, gnss_stream, gnss_setup, + frequency, flags) - def process_gnss(self, gnss_msg): + def process_gnss(self, timestamp: erdos.Timestamp, + gnss_msg: carla.GnssMeasurement): """Invoked when a GNSS measurement is received from the simulator. Sends GNSS measurements to downstream operators. """ - game_time = int(gnss_msg.timestamp * 1000) - timestamp = erdos.Timestamp(coordinates=[game_time]) - watermark_msg = erdos.WatermarkMessage(timestamp) - with erdos.profile(self.config.name + '.process_gnss', - self, - event_data={'timestamp': str(timestamp)}): - with self._lock: - msg = GNSSMessage( - timestamp, - Transform.from_simulator_transform(gnss_msg.transform), - gnss_msg.altitude, gnss_msg.latitude, gnss_msg.longitude) - self._gnss_stream.send(msg) - self._gnss_stream.send(watermark_msg) - - def run(self): - # Read the vehicle ID from the vehicle ID stream. - vehicle_id = self._vehicle_id_stream.read().data - self._logger.debug( - "The GNSSDriverOperator received the vehicle id: {}".format( - vehicle_id)) - - # Connect to the world. - _, world = get_world(self._flags.simulator_host, - self._flags.simulator_port, - self._flags.simulator_timeout) - set_simulation_mode(world, self._flags) - - # Retrieve the vehicle and install the GNSS sensor. - self._vehicle = get_vehicle_handle(world, vehicle_id) - gnss_blueprint = world.get_blueprint_library().find( - 'sensor.other.gnss') - - # Set the noise and bias parameters. - gnss_blueprint.set_attribute('noise_alt_stddev', - str(self._flags.gnss_noise_stddev_alt)) - gnss_blueprint.set_attribute('noise_lat_stddev', - str(self._flags.gnss_noise_stddev_lat)) - gnss_blueprint.set_attribute('noise_lon_stddev', - str(self._flags.gnss_noise_stddev_lon)) - gnss_blueprint.set_attribute('noise_alt_bias', - str(self._flags.gnss_bias_alt)) - gnss_blueprint.set_attribute('noise_lat_bias', - str(self._flags.gnss_bias_lat)) - gnss_blueprint.set_attribute('noise_lon_bias', - str(self._flags.gnss_bias_lon)) - - if self._flags.simulator_gnss_frequency == -1: - gnss_blueprint.set_attribute('sensor_tick', '0.0') - else: - gnss_blueprint.set_attribute( - 'sensor_tick', str(1.0 / self._flags.simulator_gnss_frequency)) - transform = self._gnss_setup.get_transform().as_simulator_transform() - self._logger.debug("Spawning a GNSS sensor: {}".format( - self._gnss_setup)) - self._gnss = world.spawn_actor(gnss_blueprint, - transform, - attach_to=self._vehicle) - - # Register the callback on the GNSS sensor. - self._gnss.listen(self.process_gnss) + self._logger.debug('@{}: sending GNSS reading'.format(timestamp)) + msg = GNSSMessage( + timestamp, Transform.from_simulator_transform(gnss_msg.transform), + gnss_msg.altitude, gnss_msg.latitude, gnss_msg.longitude) + self._output_stream.send(msg) + self._output_stream.send(erdos.WatermarkMessage(timestamp)) diff --git a/pylot/drivers/carla_obstacles_driver_operator.py b/pylot/drivers/carla_obstacles_driver_operator.py new file mode 100644 index 000000000..cc267435c --- /dev/null +++ b/pylot/drivers/carla_obstacles_driver_operator.py @@ -0,0 +1,48 @@ +import carla + +import erdos + +import pylot.utils +from pylot.drivers.carla_base_gnss_driver_operator import ( + CarlaBaseGNSSDriverOperator) +from pylot.perception.detection.obstacle import Obstacle +from pylot.perception.messages import ObstaclesMessage + + +class CarlaObstaclesDriverOperator(CarlaBaseGNSSDriverOperator): + """Publishes the bounding boxes of all vehicles and people retrieved from + the simulator at the provided frequency. + + Args: + vehicle_id_stream: Stream on which the operator receives the ID of the + ego vehicle. The ID is used to get a simulator handle to the + vehicle. + obstacles_stream: Stream on which the operator sends the obstacles. + frequency: Rate at which the pose is published, in Hertz. + flags: Object used to access absl flags. + """ + def __init__(self, vehicle_id_stream: erdos.ReadStream, + obstacles_stream: erdos.WriteStream, frequency: float, flags): + transform = pylot.utils.Transform(pylot.utils.Location(), + pylot.utils.Rotation()) + gnss_setup = pylot.drivers.sensor_setup.GNSSSetup( + self.config.name, transform) + super().__init__(vehicle_id_stream, obstacles_stream, gnss_setup, + frequency, flags) + + def process_gnss(self, timestamp: erdos.Timestamp, + gnss_msg: carla.GnssMeasurement): + """Sends obstacles (vehicles and people) followed by a watermark.""" + self._logger.debug('@{}: sending obstacles'.format(timestamp)) + + actor_list = self._world.get_actors() + + vec_actors = actor_list.filter('vehicle.*') + vehicles = list(map(Obstacle.from_simulator_actor, vec_actors)) + + person_actors = actor_list.filter('walker.pedestrian.*') + people = list(map(Obstacle.from_simulator_actor, person_actors)) + + self._output_stream.send(ObstaclesMessage(timestamp, + vehicles + people)) + self._output_stream.send(erdos.WatermarkMessage(timestamp)) diff --git a/pylot/drivers/carla_open_drive_driver_operator.py b/pylot/drivers/carla_open_drive_driver_operator.py new file mode 100644 index 000000000..0c3c32438 --- /dev/null +++ b/pylot/drivers/carla_open_drive_driver_operator.py @@ -0,0 +1,32 @@ +import erdos + +from pylot.simulation.utils import get_world, set_simulation_mode + + +class CarlaOpenDriveDriverOperator(erdos.Operator): + """Sends a string containing the ground-truth map of the world in OpenDRIVE + format, followed by a top watermark.""" + def __init__(self, open_drive_stream: erdos.WriteStream, flags): + self._open_drive_stream = open_drive_stream + + self._flags = flags + self._logger = erdos.utils.setup_logging(self.config.name, + self.config.log_file_name) + + @staticmethod + def connect(): + open_drive_stream = erdos.WriteStream() + return [open_drive_stream] + + def run(self): + _, world = get_world(self._flags.simulator_host, + self._flags.simulator_port, + self._flags.simulator_timeout) + set_simulation_mode(world, self._flags) + + self._logger.debug('Sending the map in OpenDRIVE format') + self._open_drive_stream.send( + erdos.Message(erdos.Timestamp(coordinates=[0]), + world.get_map().to_opendrive())) + self._open_drive_stream.send( + erdos.WatermarkMessage(erdos.Timestamp(is_top=True))) diff --git a/pylot/drivers/carla_pose_driver_operator.py b/pylot/drivers/carla_pose_driver_operator.py new file mode 100644 index 000000000..51c06c45d --- /dev/null +++ b/pylot/drivers/carla_pose_driver_operator.py @@ -0,0 +1,47 @@ +import carla + +import erdos + +import pylot.utils +from pylot.drivers.carla_base_gnss_driver_operator import ( + CarlaBaseGNSSDriverOperator) + + +class CarlaPoseDriverOperator(CarlaBaseGNSSDriverOperator): + """Publishes the pose (location, orientation, and velocity) at the provided + frequency. + + This operator attaches to the vehicle using the vehicle ID provided by the + ``vehicle_id_stream``, registers callback functions to retrieve the + pose at the provided frequency, and publishes it to downstream operators. + + Args: + vehicle_id_stream: Stream on which the operator receives the ID of the + ego vehicle. The ID is used to get a simulator handle to the + vehicle. + pose_stream: Stream on which the operator sends the vehicle's pose. + frequency: Rate at which the pose is published, in Hertz. + flags: Object used to access absl flags. + """ + def __init__(self, vehicle_id_stream: erdos.ReadStream, + pose_stream: erdos.WriteStream, frequency: float, flags): + transform = pylot.utils.Transform(pylot.utils.Location(), + pylot.utils.Rotation()) + gnss_setup = pylot.drivers.sensor_setup.GNSSSetup( + self.config.name, transform) + super().__init__(vehicle_id_stream, pose_stream, gnss_setup, frequency, + flags) + + def process_gnss(self, timestamp: erdos.Timestamp, + gnss_msg: carla.GnssMeasurement): + """Sends pose information followed by a watermark.""" + self._logger.debug('@{}: sending pose'.format(timestamp)) + vec_transform = pylot.utils.Transform.from_simulator_transform( + self._vehicle.get_transform()) + velocity_vector = pylot.utils.Vector3D.from_simulator_vector( + self._vehicle.get_velocity()) + forward_speed = velocity_vector.magnitude() + pose = pylot.utils.Pose(vec_transform, forward_speed, velocity_vector, + timestamp.coordinates[0]) + self._output_stream.send(erdos.Message(timestamp, pose)) + self._output_stream.send(erdos.WatermarkMessage(timestamp)) diff --git a/pylot/drivers/carla_speed_limit_signs_driver_operator.py b/pylot/drivers/carla_speed_limit_signs_driver_operator.py new file mode 100644 index 000000000..bf92accd3 --- /dev/null +++ b/pylot/drivers/carla_speed_limit_signs_driver_operator.py @@ -0,0 +1,49 @@ +import carla + +import erdos + +import pylot.utils +from pylot.drivers.carla_base_gnss_driver_operator import ( + CarlaBaseGNSSDriverOperator) +from pylot.perception.detection.speed_limit_sign import SpeedLimitSign +from pylot.perception.messages import SpeedSignsMessage + + +class CarlaSpeedLimitSignsDriverOperator(CarlaBaseGNSSDriverOperator): + """Publishes the locations and values of all speed limit signs retrieved + from the simulator at the provided frequency. + + Args: + vehicle_id_stream: Stream on which the operator receives the ID of the + ego vehicle. The ID is used to get a simulator handle to the + vehicle. + speed_limit_signs_stream: Stream on which the operator sends the speed + limit signs. + frequency: Rate at which the pose is published, in Hertz. + flags: Object used to access absl flags. + """ + def __init__(self, vehicle_id_stream: erdos.ReadStream, + speed_limit_signs_stream: erdos.WriteStream, frequency: float, + flags): + transform = pylot.utils.Transform(pylot.utils.Location(), + pylot.utils.Rotation()) + gnss_setup = pylot.drivers.sensor_setup.GNSSSetup( + self.config.name, transform) + super().__init__(vehicle_id_stream, speed_limit_signs_stream, + gnss_setup, frequency, flags) + self._speed_limit_actors = None + + def process_gnss(self, timestamp: erdos.Timestamp, + gnss_msg: carla.GnssMeasurement): + """Sends speed limit signs followed by a watermark.""" + self._logger.debug('@{}: sending speed limit signs'.format(timestamp)) + speed_limits = list( + map(SpeedLimitSign.from_simulator_actor, self._speed_limit_actors)) + self._output_stream.send(SpeedSignsMessage(timestamp, speed_limits)) + self._output_stream.send(erdos.WatermarkMessage(timestamp)) + + def run(self): + super().run() + # Get speed limit actors + self._speed_limit_actors = self._world.get_actors().filter( + 'traffic.speed_limit*') diff --git a/pylot/drivers/carla_stop_signs_driver_operator.py b/pylot/drivers/carla_stop_signs_driver_operator.py new file mode 100644 index 000000000..c1a95e5b8 --- /dev/null +++ b/pylot/drivers/carla_stop_signs_driver_operator.py @@ -0,0 +1,49 @@ +import carla + +import erdos + +import pylot.utils +from pylot.drivers.carla_base_gnss_driver_operator import ( + CarlaBaseGNSSDriverOperator) +from pylot.perception.detection.stop_sign import StopSign +from pylot.perception.messages import StopSignsMessage + + +class CarlaStopSignsDriverOperator(CarlaBaseGNSSDriverOperator): + """Publishes the locations and values of all stop signs retrieved from the + simulator at the provided frequency. + + Args: + vehicle_id_stream: Stream on which the operator receives the ID of the + ego vehicle. The ID is used to get a simulator handle to the + vehicle. + stop_signs_stream: Stream on which the operator sends the speed + limit signs. + frequency: Rate at which the pose is published, in Hertz. + flags: Object used to access absl flags. + """ + def __init__(self, vehicle_id_stream: erdos.ReadStream, + stop_signs_stream: erdos.WriteStream, frequency: float, + flags): + transform = pylot.utils.Transform(pylot.utils.Location(), + pylot.utils.Rotation()) + gnss_setup = pylot.drivers.sensor_setup.GNSSSetup( + self.config.name, transform) + super().__init__(vehicle_id_stream, stop_signs_stream, gnss_setup, + frequency, flags) + self._stop_sign_actors = None + + def process_gnss(self, timestamp: erdos.Timestamp, + gnss_msg: carla.GnssMeasurement): + """Send stop signs followed by a watermark.""" + self._logger.debug('@{}:sending stop signs'.format(timestamp)) + stop_signs = list( + map(StopSign.from_simulator_actor, self._stop_sign_actors)) + self._output_stream.send(StopSignsMessage(timestamp, stop_signs)) + self._output_stream.send(erdos.WatermarkMessage(timestamp)) + + def run(self): + super().run() + # Get speed limit actors + self._stop_sign_actors = self._world.get_actors().filter( + 'traffic.stop') diff --git a/pylot/drivers/carla_traffic_lights_driver_operator.py b/pylot/drivers/carla_traffic_lights_driver_operator.py new file mode 100644 index 000000000..5f05e95a9 --- /dev/null +++ b/pylot/drivers/carla_traffic_lights_driver_operator.py @@ -0,0 +1,54 @@ +import carla + +import erdos + +import pylot.utils +from pylot.drivers.carla_base_gnss_driver_operator import ( + CarlaBaseGNSSDriverOperator) +from pylot.perception.detection.traffic_light import TrafficLight +from pylot.perception.messages import TrafficLightsMessage + + +class CarlaTrafficLightsDriverOperator(CarlaBaseGNSSDriverOperator): + """Publishes the location and state of all traffic lights. + + This operator attaches to the vehicle using the vehicle ID provided by the + ``vehicle_id_stream``, registers callback functions to retrieve the + state of the traffic lights, and publishes them to downstream operators. + + Args: + vehicle_id_stream: Stream on which the operator receives the ID of the + ego vehicle. The ID is used to get a simulator handle to the + vehicle. + traffic_lights_stream: Stream on which the operator sends the status of + all traffic lights. + frequency: Rate at which the pose is published, in Hertz. + flags: Object used to access absl flags. + """ + def __init__(self, vehicle_id_stream: erdos.ReadStream, + traffic_lights_stream: erdos.WriteStream, frequency: float, + flags): + transform = pylot.utils.Transform(pylot.utils.Location(), + pylot.utils.Rotation()) + gnss_setup = pylot.drivers.sensor_setup.GNSSSetup( + self.config.name, transform) + super().__init__(vehicle_id_stream, traffic_lights_stream, gnss_setup, + frequency, flags) + self._traffic_light_actors = None + + def process_gnss(self, timestamp: erdos.Timestamp, + gnss_msg: carla.GnssMeasurement): + """Sends traffic light information followed by a watermark.""" + self._logger.debug( + '@{}: sending traffic light information'.format(timestamp)) + traffic_lights = list( + map(TrafficLight.from_simulator_actor, self._traffic_light_actors)) + self._output_stream.send( + TrafficLightsMessage(timestamp, traffic_lights)) + self._output_stream.send(erdos.WatermarkMessage(timestamp)) + + def run(self): + super().run() + # Get traffic light actors + self._traffic_light_actors = self._world.get_actors().filter( + 'traffic.traffic_light*') diff --git a/pylot/operator_creator.py b/pylot/operator_creator.py index 2274901e6..7a6349e12 100644 --- a/pylot/operator_creator.py +++ b/pylot/operator_creator.py @@ -10,19 +10,147 @@ FLAGS = flags.FLAGS -def add_simulator_bridge(control_stream, sensor_ready_stream, - pipeline_finish_notify_stream): +def add_simulator_bridge(control_stream, pipeline_finish_notify_stream): from pylot.simulation.carla_operator import CarlaOperator + op_config = erdos.OperatorConfig(name='simulator_bridge_operator', flow_watermarks=False, log_file_name=FLAGS.log_file_name, csv_log_file_name=FLAGS.csv_log_file_name, profile_file_name=FLAGS.profile_file_name) - return erdos.connect( + vehicle_id_stream, = erdos.connect( CarlaOperator, op_config, + [control_stream, pipeline_finish_notify_stream], FLAGS) + + return vehicle_id_stream + + +def add_simulator_bridge_old(control_stream, sensor_ready_stream, + pipeline_finish_notify_stream): + from pylot.simulation.old_carla_operator import OldCarlaOperator + + op_config = erdos.OperatorConfig(name='simulator_bridge_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + ( + pose_stream, + pose_stream_for_control, + ground_traffic_lights_stream, + ground_obstacles_stream, + ground_speed_limit_signs_stream, + ground_stop_signs_stream, + vehicle_id_stream, + open_drive_stream, + global_trajectory_stream, + ) = erdos.connect( + OldCarlaOperator, op_config, [control_stream, sensor_ready_stream, pipeline_finish_notify_stream], FLAGS) + # Check that Pose matches + from pylot.utils import MatchesOperator + pose_driver_stream = add_pose(vehicle_id_stream, + FLAGS.simulator_localization_frequency) + + matches_pose_driver_config = erdos.OperatorConfig( + name='matches_pose_driver_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + erdos.connect(MatchesOperator, matches_pose_driver_config, + [pose_stream, pose_driver_stream]) + + # Check that Pose for control matches + pose_for_control_driver_stream = add_pose( + vehicle_id_stream, flags.simulator_control_frequency, + 'pose_for_control') + + matches_pose_for_control_driver_config = erdos.OperatorConfig( + name='matches_pose_for_control_driver_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + erdos.connect(MatchesOperator, matches_pose_for_control_driver_config, + [pose_stream_for_control, pose_for_control_driver_stream]) + + # Check that traffic lights match + traffic_lights_driver_stream = add_simulator_traffic_lights( + vehicle_id_stream) + + matches_ground_obstacles_config = erdos.OperatorConfig( + name='matches_ground_traffic_lights_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + erdos.connect(MatchesOperator, matches_ground_obstacles_config, + [traffic_lights_driver_stream, ground_traffic_lights_stream]) + + # Check that obstacles match + obstacles_driver_stream = add_simulator_obstacles(vehicle_id_stream) + + matches_ground_obstacles_config = erdos.OperatorConfig( + name='matches_obstacles_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + erdos.connect(MatchesOperator, matches_ground_obstacles_config, + [ground_obstacles_stream, obstacles_driver_stream]) + + # Check that speed signs match + speed_limit_signs_driver_stream = add_simulator_speed_limit_signs( + vehicle_id_stream) + + matches_speed_limit_signs_config = erdos.OperatorConfig( + name='matches_speed_limit_signs_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + erdos.connect( + MatchesOperator, matches_speed_limit_signs_config, + [ground_speed_limit_signs_stream, speed_limit_signs_driver_stream]) + + # Check that stop signs match + stop_signs_driver_stream = add_simulator_stop_signs(vehicle_id_stream) + + matches_stop_signs_config = erdos.OperatorConfig( + name='matches_stop_signs_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + erdos.connect(MatchesOperator, matches_stop_signs_config, + [ground_stop_signs_stream, stop_signs_driver_stream]) + + # Check that open drive matches + open_drive_driver_stream = add_simulator_open_drive() + + matches_open_drive_config = erdos.OperatorConfig( + name='matches_open_drive_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + erdos.connect(MatchesOperator, matches_open_drive_config, + [open_drive_stream, open_drive_driver_stream]) + + return ( + pose_stream, + pose_stream_for_control, + ground_traffic_lights_stream, + ground_obstacles_stream, + ground_speed_limit_signs_stream, + ground_stop_signs_stream, + vehicle_id_stream, + open_drive_stream, + ) + def add_efficientdet_obstacle_detection(camera_stream, time_to_decision_stream, @@ -598,6 +726,88 @@ def add_localization(imu_stream, return pose_stream +def add_pose(vehicle_id_stream, frequency: float, name: str = 'pose'): + from pylot.drivers.carla_pose_driver_operator import ( + CarlaPoseDriverOperator) + pose_driver_config = erdos.OperatorConfig( + name=name + '_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + return erdos.connect(CarlaPoseDriverOperator, pose_driver_config, + [vehicle_id_stream], frequency, FLAGS)[0] + + +def add_simulator_traffic_lights(vehicle_id_stream): + from pylot.drivers.carla_traffic_lights_driver_operator import ( + CarlaTrafficLightsDriverOperator) + ground_traffic_lights_config = erdos.OperatorConfig( + name='simulator_traffic_lights_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + return erdos.connect(CarlaTrafficLightsDriverOperator, + ground_traffic_lights_config, [vehicle_id_stream], + FLAGS.simulator_localization_frequency, FLAGS)[0] + + +def add_simulator_obstacles(vehicle_id_stream): + from pylot.drivers.carla_obstacles_driver_operator import ( + CarlaObstaclesDriverOperator) + ground_obstacles_config = erdos.OperatorConfig( + name='simulator_obstacles_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + return erdos.connect(CarlaObstaclesDriverOperator, ground_obstacles_config, + [vehicle_id_stream], + FLAGS.simulator_localization_frequency, FLAGS)[0] + + +def add_simulator_speed_limit_signs(vehicle_id_stream): + from pylot.drivers.carla_speed_limit_signs_driver_operator import ( + CarlaSpeedLimitSignsDriverOperator) + speed_limit_signs_config = erdos.OperatorConfig( + name='simulator_speed_signs_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + return erdos.connect(CarlaSpeedLimitSignsDriverOperator, + speed_limit_signs_config, [vehicle_id_stream], + FLAGS.simulator_localization_frequency, FLAGS)[0] + + +def add_simulator_stop_signs(vehicle_id_stream): + from pylot.drivers.carla_stop_signs_driver_operator import ( + CarlaStopSignsDriverOperator) + stop_signs_config = erdos.OperatorConfig( + name='simulator_stop_signs_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + return erdos.connect(CarlaStopSignsDriverOperator, stop_signs_config, + [vehicle_id_stream], + FLAGS.simulator_localization_frequency, FLAGS)[0] + + +def add_simulator_open_drive(): + from pylot.drivers.carla_open_drive_driver_operator import ( + CarlaOpenDriveDriverOperator) + open_drive_driver_config = erdos.OperatorConfig( + name='simulator_open_drive_operator', + flow_watermarks=False, + log_file_name=FLAGS.log_file_name, + csv_log_file_name=FLAGS.csv_log_file_name, + profile_file_name=FLAGS.profile_file_name) + return erdos.connect(CarlaOpenDriveDriverOperator, + open_drive_driver_config, [], FLAGS)[0] + + def add_fusion(pose_stream, obstacles_stream, depth_stream, ground_obstacles_stream): from pylot.perception.fusion.fusion_operator import FusionOperator diff --git a/pylot/simulation/carla_operator.py b/pylot/simulation/carla_operator.py index da59b7ea5..c4a94c6bb 100644 --- a/pylot/simulation/carla_operator.py +++ b/pylot/simulation/carla_operator.py @@ -1,9 +1,6 @@ -import enum -import heapq import random import threading import time -from functools import total_ordering from carla import Location, VehicleControl, command @@ -13,8 +10,6 @@ import pylot.simulation.utils import pylot.utils from pylot.control.messages import ControlMessage -from pylot.perception.messages import ObstaclesMessage, SpeedSignsMessage, \ - StopSignsMessage, TrafficLightsMessage class CarlaOperator(erdos.Operator): @@ -25,6 +20,10 @@ class CarlaOperator(erdos.Operator): vehicle that the rest of the pipeline drives. Args: + control_stream: Stream on which the operator receives control messages + to apply to the ego vehicle. + pipeline_finish_notify_stream: In pseudo-async mode, notifies the + operator that the pipeline has finished executing. flags: A handle to the global flags instance to retrieve the configuration. @@ -33,35 +32,18 @@ class CarlaOperator(erdos.Operator): _world: A handle to the world running inside the simulation. _vehicles: A list of identifiers of the vehicles inside the simulation. """ - def __init__( - self, control_stream: ReadStream, - release_sensor_stream: ReadStream, - pipeline_finish_notify_stream: ReadStream, - pose_stream: WriteStream, pose_stream_for_control: WriteStream, - ground_traffic_lights_stream: WriteStream, - ground_obstacles_stream: WriteStream, - ground_speed_limit_signs_stream: WriteStream, - ground_stop_signs_stream: WriteStream, - vehicle_id_stream: WriteStream, open_drive_stream: WriteStream, - global_trajectory_stream: WriteStream, flags): + def __init__(self, control_stream: ReadStream, + pipeline_finish_notify_stream: ReadStream, + vehicle_id_stream: WriteStream, flags): if flags.random_seed: random.seed(flags.random_seed) # Register callback on control stream. control_stream.add_callback(self.on_control_msg) - erdos.add_watermark_callback([release_sensor_stream], [], - self.on_sensor_ready) if flags.simulator_mode == "pseudo-asynchronous": erdos.add_watermark_callback([pipeline_finish_notify_stream], [], self.on_pipeline_finish) - self.pose_stream = pose_stream - self.pose_stream_for_control = pose_stream_for_control - self.ground_traffic_lights_stream = ground_traffic_lights_stream - self.ground_obstacles_stream = ground_obstacles_stream - self.ground_speed_limit_signs_stream = ground_speed_limit_signs_stream - self.ground_stop_signs_stream = ground_stop_signs_stream + self.vehicle_id_stream = vehicle_id_stream - self.open_drive_stream = open_drive_stream - self.global_trajectory_stream = global_trajectory_stream self._flags = flags self._logger = erdos.utils.setup_logging(self.config.name, @@ -126,39 +108,11 @@ def __init__( # concurrently. self._lock = threading.Lock() - # Dictionary that stores the processing times when sensors are ready - # to realease data. This info is used to calculate the real processing - # time of our pipeline without including simulator-induced sensor - # delays. - self._next_localization_sensor_reading = None - self._next_control_sensor_reading = None - self._simulator_in_sync = False - self._tick_events = [] - self._control_msgs = {} - @staticmethod - def connect(control_stream: ReadStream, release_sensor_stream: ReadStream, + def connect(control_stream: ReadStream, pipeline_finish_notify_stream: ReadStream): - pose_stream = erdos.WriteStream() - pose_stream_for_control = erdos.WriteStream() - ground_traffic_lights_stream = erdos.WriteStream() - ground_obstacles_stream = erdos.WriteStream() - ground_speed_limit_signs_stream = erdos.WriteStream() - ground_stop_signs_stream = erdos.WriteStream() vehicle_id_stream = erdos.WriteStream() - open_drive_stream = erdos.WriteStream() - global_trajectory_stream = erdos.WriteStream() - return [ - pose_stream, - pose_stream_for_control, - ground_traffic_lights_stream, - ground_obstacles_stream, - ground_speed_limit_signs_stream, - ground_stop_signs_stream, - vehicle_id_stream, - open_drive_stream, - global_trajectory_stream, - ] + return [vehicle_id_stream] @erdos.profile_method() def on_control_msg(self, msg: ControlMessage): @@ -167,135 +121,40 @@ def on_control_msg(self, msg: ControlMessage): Args: msg: A control.messages.ControlMessage message. """ - self._logger.debug('@{}: received control message'.format( - msg.timestamp)) + self._logger.debug('@{}: received control message {}'.format( + msg.timestamp, msg)) + if self._flags.control not in ['simulator_auto_pilot', 'manual']: + self._apply_control_msg(msg) + # If auto pilot or manual mode is enabled then we do not apply the + # control, but we still want to tick in this method to ensure that + # all operators finished work before the world ticks. + if not self._flags.simulator_mode == 'pseudo-asynchronous': + self._tick_simulator() if self._flags.simulator_mode == 'pseudo-asynchronous': - heapq.heappush( - self._tick_events, - (msg.timestamp.coordinates[0], TickEvent.CONTROL_CMD)) - self._control_msgs[msg.timestamp.coordinates[0]] = msg - # Tick until the next sensor read game time to ensure that the - # data-flow has a new round of sensor inputs. Apply control - # commands if they must be applied before the next sensor read. - self._consume_next_event() - else: - # If auto pilot or manual mode is enabled then we do not apply the - # control, but we still want to tick in this method to ensure that - # all operators finished work before the world ticks. - if self._flags.control not in ['simulator_auto_pilot', 'manual']: - self._apply_control_msg(msg) # Tick the world after the operator received a control command. # This usually indicates that all the operators have completed # processing the previous timestamp (with the exception of logging # operators that are not part of the main loop). - self._tick_simulator() - - def _consume_next_event(self): - while True: - (sim_time, event_type) = heapq.heappop(self._tick_events) - if event_type == TickEvent.SENSOR_READ: - self._tick_simulator_until(sim_time) - break - elif event_type == TickEvent.CONTROL_CMD: - self._tick_simulator_until(sim_time) - control_msg = self._control_msgs[sim_time] - self._apply_control_msg(control_msg) + self._apply_control_msg(msg) def on_pipeline_finish(self, timestamp: Timestamp): + """Only invoked in pseudo-asynchronous mode.""" self._logger.debug("@{}: Received pipeline finish.".format(timestamp)) + # Tick simulator forward until the next expected control message. game_time = timestamp.coordinates[0] - if (self._flags.simulator_control_frequency == -1 - or self._next_control_sensor_reading is None - or game_time == self._next_control_sensor_reading): - # There was supposed to be a control message for this timestamp - # too. Send the Pose message and continue after the control message - # is received. - self._update_next_control_pseudo_asynchronous_ticks(game_time) - self.__send_hero_vehicle_data(self.pose_stream_for_control, - timestamp) - self.__update_spectactor_pose() - else: - # No pose message was supposed to be sent for this timestamp, we - # need to consume the next event to move the dataflow forward. - self._consume_next_event() - - def on_sensor_ready(self, timestamp: Timestamp): - # The first sensor reading needs to be discarded because it might - # not be correctly spaced out. - if not self._simulator_in_sync: - self._simulator_in_sync = True - - def send_actor_data(self, msg): - """ Callback function that gets called when the world is ticked. - This function sends a WatermarkMessage to the downstream operators as - a signal that they need to release data to the rest of the pipeline. - - Args: - msg: Data recieved from the simulation at a tick. - """ - # Ensure that the callback executes serially. - with self._lock: - game_time = int(msg.elapsed_seconds * 1000) - self._logger.info( - 'The world is at the timestamp {}'.format(game_time)) - timestamp = erdos.Timestamp(coordinates=[game_time]) - with erdos.profile(self.config.name + '.send_actor_data', - self, - event_data={'timestamp': str(timestamp)}): - if (self._flags.simulator_localization_frequency == -1 - or self._next_localization_sensor_reading is None or - game_time == self._next_localization_sensor_reading): - if self._flags.simulator_mode == 'pseudo-asynchronous': - self._update_next_localization_pseudo_async_ticks( - game_time) - self.__send_hero_vehicle_data(self.pose_stream, timestamp) - self.__send_ground_actors_data(timestamp) - self.__update_spectactor_pose() - - if self._flags.simulator_mode == "pseudo-asynchronous" and ( - self._flags.simulator_control_frequency == -1 - or self._next_control_sensor_reading is None - or game_time == self._next_control_sensor_reading): - self._update_next_control_pseudo_asynchronous_ticks( - game_time) - self.__send_hero_vehicle_data(self.pose_stream_for_control, - timestamp) - self.__update_spectactor_pose() - - def _update_next_localization_pseudo_async_ticks(self, game_time: int): - if self._flags.simulator_localization_frequency > -1: - self._next_localization_sensor_reading = ( - game_time + - int(1000 / self._flags.simulator_localization_frequency)) - if not self._simulator_in_sync: - # If this is the first sensor reading, then tick - # one more time because the second sensor reading - # is sometimes delayed by 1 tick. - self._next_localization_sensor_reading += int( - 1000 / self._flags.simulator_fps) - else: - self._next_localization_sensor_reading = ( - game_time + int(1000 / self._flags.simulator_fps)) - heapq.heappush( - self._tick_events, - (self._next_localization_sensor_reading, TickEvent.SENSOR_READ)) - - def _update_next_control_pseudo_asynchronous_ticks(self, game_time: int): if self._flags.simulator_control_frequency > -1: - self._next_control_sensor_reading = ( - game_time + - int(1000 / self._flags.simulator_control_frequency)) + control_frequency = self._flags.simulator_control_frequency else: - self._next_control_sensor_reading = ( - game_time + int(1000 / self._flags.simulator_fps)) - if (self._next_control_sensor_reading != - self._next_localization_sensor_reading): - heapq.heappush( - self._tick_events, - (self._next_control_sensor_reading, TickEvent.SENSOR_READ)) + control_frequency = self._flags.simulator_fps + next_control_msg_time = game_time + round(1000 / control_frequency) + self._tick_simulator_until(next_control_msg_time) def run(self): - self.__send_world_data() + self.vehicle_id_stream.send( + erdos.Message(erdos.Timestamp(coordinates=[0]), + self._ego_vehicle.id)) + self.vehicle_id_stream.send( + erdos.WatermarkMessage(erdos.Timestamp(is_top=True))) # Tick here once to ensure that the driver operators can get a handle # to the ego vehicle. # XXX(ionel): Hack to fix a race condition. Driver operators @@ -308,21 +167,12 @@ def run(self): time.sleep(4) # The older CARLA versions require an additional tick to sync # sensors. - self._world.on_tick(self.send_actor_data) self._tick_simulator() def _initialize_world(self): - """ Setups the world town, and activates the desired weather.""" - if self._simulator_version == '0.9.5': - # TODO (Sukrit) :: ERDOS provides no way to retrieve handles to the - # class objects to do garbage collection. Hence, objects from - # previous runs of the simulation may persist. We need to clean - # them up right now. In future, move this logic to a seperate - # destroy function. - pylot.simulation.utils.reset_world(self._world) - else: - self._world = self._client.load_world('Town{:02d}'.format( - self._flags.simulator_town)) + """Sets up the world town, and activates the desired weather.""" + self._world = self._client.load_world('Town{:02d}'.format( + self._flags.simulator_town)) self._logger.info('Setting the weather to {}'.format( self._flags.simulator_weather)) pylot.simulation.utils.set_weather(self._world, @@ -336,6 +186,7 @@ def _tick_simulator(self): self._world.tick() def _tick_simulator_until(self, goal_time: int): + self._logger.debug("ticking simulator until {}".format(goal_time)) while True: snapshot = self._world.get_snapshot() sim_time = int(snapshot.timestamp.elapsed_seconds * 1000) @@ -353,63 +204,7 @@ def _apply_control_msg(self, msg: ControlMessage): reverse=msg.reverse) self._client.apply_batch_sync( [command.ApplyVehicleControl(self._ego_vehicle.id, vec_control)]) - - def __send_hero_vehicle_data(self, stream: WriteStream, - timestamp: Timestamp): - vec_transform = pylot.utils.Transform.from_simulator_transform( - self._ego_vehicle.get_transform()) - velocity_vector = pylot.utils.Vector3D.from_simulator_vector( - self._ego_vehicle.get_velocity()) - forward_speed = velocity_vector.magnitude() - pose = pylot.utils.Pose(vec_transform, forward_speed, velocity_vector, - timestamp.coordinates[0]) - stream.send(erdos.Message(timestamp, pose)) - stream.send(erdos.WatermarkMessage(timestamp)) - - def __send_ground_actors_data(self, timestamp: Timestamp): - # Get all the actors in the simulation. - actor_list = self._world.get_actors() - - (vehicles, people, traffic_lights, speed_limits, traffic_stops - ) = pylot.simulation.utils.extract_data_in_pylot_format(actor_list) - - # Send ground people and vehicles. - self.ground_obstacles_stream.send( - ObstaclesMessage(timestamp, vehicles + people)) - self.ground_obstacles_stream.send(erdos.WatermarkMessage(timestamp)) - # Send ground traffic lights. - self.ground_traffic_lights_stream.send( - TrafficLightsMessage(timestamp, traffic_lights)) - self.ground_traffic_lights_stream.send( - erdos.WatermarkMessage(timestamp)) - # Send ground speed signs. - self.ground_speed_limit_signs_stream.send( - SpeedSignsMessage(timestamp, speed_limits)) - self.ground_speed_limit_signs_stream.send( - erdos.WatermarkMessage(timestamp)) - # Send stop signs. - self.ground_stop_signs_stream.send( - StopSignsMessage(timestamp, traffic_stops)) - self.ground_stop_signs_stream.send(erdos.WatermarkMessage(timestamp)) - - def __send_world_data(self): - """ Sends ego vehicle id, open drive and trajectory messages.""" - # Send the id of the ego vehicle. This id is used by the driver - # operators to get a handle to the ego vehicle, which they use to - # attach sensors. - self.vehicle_id_stream.send( - erdos.Message(erdos.Timestamp(coordinates=[0]), - self._ego_vehicle.id)) - self.vehicle_id_stream.send( - erdos.WatermarkMessage(erdos.Timestamp(is_top=True))) - - # Send open drive string. - self.open_drive_stream.send( - erdos.Message(erdos.Timestamp(coordinates=[0]), - self._world.get_map().to_opendrive())) - top_watermark = erdos.WatermarkMessage(erdos.Timestamp(is_top=True)) - self.open_drive_stream.send(top_watermark) - self.global_trajectory_stream.send(top_watermark) + self.__update_spectactor_pose() def __update_spectactor_pose(self): # Set the world simulation view with respect to the vehicle. @@ -417,14 +212,3 @@ def __update_spectactor_pose(self): v_pose.location -= 10 * Location(v_pose.get_forward_vector()) v_pose.location.z = 5 self._spectator.set_transform(v_pose) - - -@total_ordering -class TickEvent(enum.Enum): - CONTROL_CMD = 1 - SENSOR_READ = 2 - - def __lt__(self, other): - if self.__class__ is other.__class__: - return self.value < other.value - return NotImplemented diff --git a/pylot/simulation/old_carla_operator.py b/pylot/simulation/old_carla_operator.py new file mode 100644 index 000000000..623ad12ba --- /dev/null +++ b/pylot/simulation/old_carla_operator.py @@ -0,0 +1,430 @@ +import enum +import heapq +import random +import threading +import time +from functools import total_ordering + +from carla import Location, VehicleControl, command + +import erdos +from erdos import ReadStream, Timestamp, WriteStream + +import pylot.simulation.utils +import pylot.utils +from pylot.control.messages import ControlMessage +from pylot.perception.messages import ObstaclesMessage, SpeedSignsMessage, \ + StopSignsMessage, TrafficLightsMessage + + +class OldCarlaOperator(erdos.Operator): + """Initializes and controls a CARLA simulation. + + This operator connects to the simulation, sets the required weather in the + simulation world, initializes the required number of actors, and the + vehicle that the rest of the pipeline drives. + + Args: + flags: A handle to the global flags instance to retrieve the + configuration. + + Attributes: + _client: A connection to the simulator. + _world: A handle to the world running inside the simulation. + _vehicles: A list of identifiers of the vehicles inside the simulation. + """ + def __init__( + self, control_stream: ReadStream, + release_sensor_stream: ReadStream, + pipeline_finish_notify_stream: ReadStream, + pose_stream: WriteStream, pose_stream_for_control: WriteStream, + ground_traffic_lights_stream: WriteStream, + ground_obstacles_stream: WriteStream, + ground_speed_limit_signs_stream: WriteStream, + ground_stop_signs_stream: WriteStream, + vehicle_id_stream: WriteStream, open_drive_stream: WriteStream, + global_trajectory_stream: WriteStream, flags): + if flags.random_seed: + random.seed(flags.random_seed) + # Register callback on control stream. + control_stream.add_callback(self.on_control_msg) + erdos.add_watermark_callback([release_sensor_stream], [], + self.on_sensor_ready) + if flags.simulator_mode == "pseudo-asynchronous": + erdos.add_watermark_callback([pipeline_finish_notify_stream], [], + self.on_pipeline_finish) + self.pose_stream = pose_stream + self.pose_stream_for_control = pose_stream_for_control + self.ground_traffic_lights_stream = ground_traffic_lights_stream + self.ground_obstacles_stream = ground_obstacles_stream + self.ground_speed_limit_signs_stream = ground_speed_limit_signs_stream + self.ground_stop_signs_stream = ground_stop_signs_stream + self.vehicle_id_stream = vehicle_id_stream + self.open_drive_stream = open_drive_stream + self.global_trajectory_stream = global_trajectory_stream + + self._flags = flags + self._logger = erdos.utils.setup_logging(self.config.name, + self.config.log_file_name) + self._csv_logger = erdos.utils.setup_csv_logging( + self.config.name + '-csv', self.config.csv_log_file_name) + # Connect to simulator and retrieve the world running. + self._client, self._world = pylot.simulation.utils.get_world( + self._flags.simulator_host, self._flags.simulator_port, + self._flags.simulator_timeout) + self._simulator_version = self._client.get_client_version() + + if not self._flags.scenario_runner and \ + self._flags.control != "manual": + # Load the appropriate town. + self._initialize_world() + + # Save the spectator handle so that we don't have to repeteadly get the + # handle (which is slow). + self._spectator = self._world.get_spectator() + + if pylot.simulation.utils.check_simulator_version( + self._simulator_version, required_minor=9, required_patch=8): + # Any simulator version after 0.9.7. + # Create a traffic manager to that auto pilot works. + self._traffic_manager = self._client.get_trafficmanager( + self._flags.carla_traffic_manager_port) + self._traffic_manager.set_synchronous_mode( + self._flags.simulator_mode == 'synchronous') + + if self._flags.scenario_runner: + # Tick until 4.0 seconds time so that all synchronous scenario runs + # start at exactly the same game time. + pylot.simulation.utils.set_synchronous_mode(self._world, 1000) + self._tick_simulator_until(4000) + + pylot.simulation.utils.set_simulation_mode(self._world, self._flags) + + if self._flags.scenario_runner or self._flags.control == "manual": + # Wait until the ego vehicle is spawned by the scenario runner. + self._logger.info("Waiting for the scenario to be ready ...") + self._ego_vehicle = pylot.simulation.utils.wait_for_ego_vehicle( + self._world) + self._logger.info("Found ego vehicle") + else: + # Spawn ego vehicle, people and vehicles. + (self._ego_vehicle, self._vehicle_ids, + self._people) = pylot.simulation.utils.spawn_actors( + self._client, self._world, + self._flags.carla_traffic_manager_port, + self._simulator_version, + self._flags.simulator_spawn_point_index, + self._flags.control == 'simulator_auto_pilot', + self._flags.simulator_num_people, + self._flags.simulator_num_vehicles, self._logger) + + pylot.simulation.utils.set_vehicle_physics( + self._ego_vehicle, self._flags.simulator_vehicle_moi, + self._flags.simulator_vehicle_mass) + + # Lock used to ensure that simulator callbacks are not executed + # concurrently. + self._lock = threading.Lock() + + # Dictionary that stores the processing times when sensors are ready + # to realease data. This info is used to calculate the real processing + # time of our pipeline without including simulator-induced sensor + # delays. + self._next_localization_sensor_reading = None + self._next_control_sensor_reading = None + self._simulator_in_sync = False + self._tick_events = [] + self._control_msgs = {} + + @staticmethod + def connect(control_stream: ReadStream, release_sensor_stream: ReadStream, + pipeline_finish_notify_stream: ReadStream): + pose_stream = erdos.WriteStream() + pose_stream_for_control = erdos.WriteStream() + ground_traffic_lights_stream = erdos.WriteStream() + ground_obstacles_stream = erdos.WriteStream() + ground_speed_limit_signs_stream = erdos.WriteStream() + ground_stop_signs_stream = erdos.WriteStream() + vehicle_id_stream = erdos.WriteStream() + open_drive_stream = erdos.WriteStream() + global_trajectory_stream = erdos.WriteStream() + return [ + pose_stream, + pose_stream_for_control, + ground_traffic_lights_stream, + ground_obstacles_stream, + ground_speed_limit_signs_stream, + ground_stop_signs_stream, + vehicle_id_stream, + open_drive_stream, + global_trajectory_stream, + ] + + @erdos.profile_method() + def on_control_msg(self, msg: ControlMessage): + """ Invoked when a ControlMessage is received. + + Args: + msg: A control.messages.ControlMessage message. + """ + self._logger.debug('@{}: received control message'.format( + msg.timestamp)) + if self._flags.simulator_mode == 'pseudo-asynchronous': + heapq.heappush( + self._tick_events, + (msg.timestamp.coordinates[0], TickEvent.CONTROL_CMD)) + self._control_msgs[msg.timestamp.coordinates[0]] = msg + # Tick until the next sensor read game time to ensure that the + # data-flow has a new round of sensor inputs. Apply control + # commands if they must be applied before the next sensor read. + self._consume_next_event() + else: + # If auto pilot or manual mode is enabled then we do not apply the + # control, but we still want to tick in this method to ensure that + # all operators finished work before the world ticks. + if self._flags.control not in ['simulator_auto_pilot', 'manual']: + self._apply_control_msg(msg) + # Tick the world after the operator received a control command. + # This usually indicates that all the operators have completed + # processing the previous timestamp (with the exception of logging + # operators that are not part of the main loop). + self._tick_simulator() + + def _consume_next_event(self): + while True: + (sim_time, event_type) = heapq.heappop(self._tick_events) + if event_type == TickEvent.SENSOR_READ: + self._tick_simulator_until(sim_time) + break + elif event_type == TickEvent.CONTROL_CMD: + self._tick_simulator_until(sim_time) + control_msg = self._control_msgs[sim_time] + self._apply_control_msg(control_msg) + + def on_pipeline_finish(self, timestamp: Timestamp): + self._logger.debug("@{}: Received pipeline finish.".format(timestamp)) + game_time = timestamp.coordinates[0] + if (self._flags.simulator_control_frequency == -1 + or self._next_control_sensor_reading is None + or game_time == self._next_control_sensor_reading): + # There was supposed to be a control message for this timestamp + # too. Send the Pose message and continue after the control message + # is received. + self._update_next_control_pseudo_asynchronous_ticks(game_time) + self.__send_hero_vehicle_data(self.pose_stream_for_control, + timestamp) + self.__update_spectactor_pose() + else: + # No pose message was supposed to be sent for this timestamp, we + # need to consume the next event to move the dataflow forward. + self._consume_next_event() + + def on_sensor_ready(self, timestamp: Timestamp): + # The first sensor reading needs to be discarded because it might + # not be correctly spaced out. + if not self._simulator_in_sync: + self._simulator_in_sync = True + + def send_actor_data(self, msg): + """ Callback function that gets called when the world is ticked. + This function sends a WatermarkMessage to the downstream operators as + a signal that they need to release data to the rest of the pipeline. + + Args: + msg: Data recieved from the simulation at a tick. + """ + # Ensure that the callback executes serially. + with self._lock: + game_time = int(msg.elapsed_seconds * 1000) + self._logger.info( + 'The world is at the timestamp {}'.format(game_time)) + timestamp = erdos.Timestamp(coordinates=[game_time]) + with erdos.profile(self.config.name + '.send_actor_data', + self, + event_data={'timestamp': str(timestamp)}): + if (self._flags.simulator_localization_frequency == -1 + or self._next_localization_sensor_reading is None or + game_time == self._next_localization_sensor_reading): + if self._flags.simulator_mode == 'pseudo-asynchronous': + self._update_next_localization_pseudo_async_ticks( + game_time) + self.__send_hero_vehicle_data(self.pose_stream, timestamp) + self.__send_ground_actors_data(timestamp) + self.__update_spectactor_pose() + + if self._flags.simulator_mode == "pseudo-asynchronous" and ( + self._flags.simulator_control_frequency == -1 + or self._next_control_sensor_reading is None + or game_time == self._next_control_sensor_reading): + self._update_next_control_pseudo_asynchronous_ticks( + game_time) + self.__send_hero_vehicle_data(self.pose_stream_for_control, + timestamp) + self.__update_spectactor_pose() + + def _update_next_localization_pseudo_async_ticks(self, game_time: int): + if self._flags.simulator_localization_frequency > -1: + self._next_localization_sensor_reading = ( + game_time + + int(1000 / self._flags.simulator_localization_frequency)) + if not self._simulator_in_sync: + # If this is the first sensor reading, then tick + # one more time because the second sensor reading + # is sometimes delayed by 1 tick. + self._next_localization_sensor_reading += int( + 1000 / self._flags.simulator_fps) + else: + self._next_localization_sensor_reading = ( + game_time + int(1000 / self._flags.simulator_fps)) + heapq.heappush( + self._tick_events, + (self._next_localization_sensor_reading, TickEvent.SENSOR_READ)) + + def _update_next_control_pseudo_asynchronous_ticks(self, game_time: int): + if self._flags.simulator_control_frequency > -1: + self._next_control_sensor_reading = ( + game_time + + int(1000 / self._flags.simulator_control_frequency)) + else: + self._next_control_sensor_reading = ( + game_time + int(1000 / self._flags.simulator_fps)) + if (self._next_control_sensor_reading != + self._next_localization_sensor_reading): + heapq.heappush( + self._tick_events, + (self._next_control_sensor_reading, TickEvent.SENSOR_READ)) + + def run(self): + self.__send_world_data() + # Tick here once to ensure that the driver operators can get a handle + # to the ego vehicle. + # XXX(ionel): Hack to fix a race condition. Driver operators + # register a simulator listen callback only after they've received + # the vehicle id value. We miss frames if we tick before + # they register a listener. Thus, we sleep here a bit to + # give them sufficient time to register a callback. + time.sleep(4) + self._tick_simulator() + time.sleep(4) + # The older CARLA versions require an additional tick to sync + # sensors. + self._world.on_tick(self.send_actor_data) + self._tick_simulator() + + def _initialize_world(self): + """ Setups the world town, and activates the desired weather.""" + if self._simulator_version == '0.9.5': + # TODO (Sukrit) :: ERDOS provides no way to retrieve handles to the + # class objects to do garbage collection. Hence, objects from + # previous runs of the simulation may persist. We need to clean + # them up right now. In future, move this logic to a seperate + # destroy function. + pylot.simulation.utils.reset_world(self._world) + else: + self._world = self._client.load_world('Town{:02d}'.format( + self._flags.simulator_town)) + self._logger.info('Setting the weather to {}'.format( + self._flags.simulator_weather)) + pylot.simulation.utils.set_weather(self._world, + self._flags.simulator_weather) + + def _tick_simulator(self): + if (self._flags.simulator_mode == 'asynchronous-fixed-time-step' + or self._flags.simulator_mode == 'asynchronous'): + # No need to tick when running in these modes. + return + self._world.tick() + + def _tick_simulator_until(self, goal_time: int): + while True: + snapshot = self._world.get_snapshot() + sim_time = int(snapshot.timestamp.elapsed_seconds * 1000) + if sim_time < goal_time: + self._world.tick() + else: + return + + def _apply_control_msg(self, msg: ControlMessage): + # Transform the message to a simulator control cmd. + vec_control = VehicleControl(throttle=msg.throttle, + steer=msg.steer, + brake=msg.brake, + hand_brake=msg.hand_brake, + reverse=msg.reverse) + self._client.apply_batch_sync( + [command.ApplyVehicleControl(self._ego_vehicle.id, vec_control)]) + + def __send_hero_vehicle_data(self, stream: WriteStream, + timestamp: Timestamp): + vec_transform = pylot.utils.Transform.from_simulator_transform( + self._ego_vehicle.get_transform()) + velocity_vector = pylot.utils.Vector3D.from_simulator_vector( + self._ego_vehicle.get_velocity()) + forward_speed = velocity_vector.magnitude() + pose = pylot.utils.Pose(vec_transform, forward_speed, velocity_vector, + timestamp.coordinates[0]) + stream.send(erdos.Message(timestamp, pose)) + stream.send(erdos.WatermarkMessage(timestamp)) + + def __send_ground_actors_data(self, timestamp: Timestamp): + # Get all the actors in the simulation. + actor_list = self._world.get_actors() + + (vehicles, people, traffic_lights, speed_limits, traffic_stops + ) = pylot.simulation.utils.extract_data_in_pylot_format(actor_list) + + # Send ground people and vehicles. + self.ground_obstacles_stream.send( + ObstaclesMessage(timestamp, vehicles + people)) + self.ground_obstacles_stream.send(erdos.WatermarkMessage(timestamp)) + # Send ground traffic lights. + self.ground_traffic_lights_stream.send( + TrafficLightsMessage(timestamp, traffic_lights)) + self.ground_traffic_lights_stream.send( + erdos.WatermarkMessage(timestamp)) + # Send ground speed signs. + self.ground_speed_limit_signs_stream.send( + SpeedSignsMessage(timestamp, speed_limits)) + self.ground_speed_limit_signs_stream.send( + erdos.WatermarkMessage(timestamp)) + # Send stop signs. + self.ground_stop_signs_stream.send( + StopSignsMessage(timestamp, traffic_stops)) + self.ground_stop_signs_stream.send(erdos.WatermarkMessage(timestamp)) + + def __send_world_data(self): + """ Sends ego vehicle id, open drive and trajectory messages.""" + # Send the id of the ego vehicle. This id is used by the driver + # operators to get a handle to the ego vehicle, which they use to + # attach sensors. + self.vehicle_id_stream.send( + erdos.Message(erdos.Timestamp(coordinates=[0]), + self._ego_vehicle.id)) + self.vehicle_id_stream.send( + erdos.WatermarkMessage(erdos.Timestamp(is_top=True))) + + # Send open drive string. + self.open_drive_stream.send( + erdos.Message(erdos.Timestamp(coordinates=[0]), + self._world.get_map().to_opendrive())) + top_watermark = erdos.WatermarkMessage(erdos.Timestamp(is_top=True)) + self.open_drive_stream.send(top_watermark) + self.global_trajectory_stream.send(top_watermark) + + def __update_spectactor_pose(self): + # Set the world simulation view with respect to the vehicle. + v_pose = self._ego_vehicle.get_transform() + v_pose.location -= 10 * Location(v_pose.get_forward_vector()) + v_pose.location.z = 5 + self._spectator.set_transform(v_pose) + + +@total_ordering +class TickEvent(enum.Enum): + CONTROL_CMD = 1 + SENSOR_READ = 2 + + def __lt__(self, other): + if self.__class__ is other.__class__: + return self.value < other.value + return NotImplemented diff --git a/pylot/utils.py b/pylot/utils.py index d3774e130..0d271cbd5 100644 --- a/pylot/utils.py +++ b/pylot/utils.py @@ -4,6 +4,8 @@ import numpy as np +import erdos + class Rotation(object): """Used to represent the rotation of an actor or obstacle. @@ -63,6 +65,12 @@ def __str__(self): return 'Rotation(pitch={}, yaw={}, roll={})'.format( self.pitch, self.yaw, self.roll) + def __eq__(self, other: object) -> bool: + if isinstance(other, Rotation): + return (self.pitch == other.pitch and self.yaw == other.yaw + and self.roll == other.roll) + return NotImplemented + class Quaternion(object): """ Represents the Rotation of an obstacle or vehicle in quaternion @@ -369,6 +377,12 @@ def __repr__(self): def __str__(self): return 'Vector3D(x={}, y={}, z={})'.format(self.x, self.y, self.z) + def __eq__(self, other: object) -> bool: + if isinstance(other, Vector3D): + return (self.x == other.x and self.y == other.y + and self.z == other.z) + return NotImplemented + class Vector2D(object): """Represents a 2D vector and provides helper functions.""" @@ -826,6 +840,12 @@ def __str__(self): else: return "Transform({})".format(str(self.matrix)) + def __eq__(self, other: object) -> bool: + if isinstance(other, Transform): + return (self.location == other.location + and self.rotation == other.rotation) + return NotImplemented + class Pose(object): """Class used to wrap ego-vehicle information. @@ -868,6 +888,12 @@ def __str__(self): return "Pose(transform: {}, forward speed: {}, velocity vector: {})"\ .format(self.transform, self.forward_speed, self.velocity_vector) + def __eq__(self, other: object) -> bool: + if isinstance(other, Pose): + return (self.transform == other.transform + and self.forward_speed == other.forward_speed + and self.velocity_vector == other.velocity_vector) + class LaneMarkingColor(Enum): """Enum that defines the lane marking colors according to OpenDrive 1.4. @@ -1079,3 +1105,54 @@ def run_visualizer_control_loop(control_display_stream): def verify_keys_in_dict(required_keys, arg_dict): assert set(required_keys).issubset(set(arg_dict.keys())), \ "one or more of {} not found in {}".format(required_keys, arg_dict) + + +def matches_data(left_msgs, right_msgs): + length_matches = (len(left_msgs) == len(right_msgs)) + return length_matches and all( + map( + lambda x: x[0].timestamp == x[1].timestamp and x[0].data == x[1]. + data, zip(left_msgs, right_msgs))) + + +class MatchesOperator(erdos.Operator): + def __init__(self, left_stream, right_stream, matches_fn=matches_data): + self._matches_fn = matches_fn + self._left_msgs = [] + self._right_msgs = [] + self._logger = erdos.utils.setup_logging(self.config.name, + self.config.log_file_name) + + left_stream.add_callback(self.on_left_stream) + right_stream.add_callback(self.on_right_stream) + left_stream.add_watermark_callback( + lambda t: self._logger.debug("@{}: got left watermark".format(t))) + right_stream.add_watermark_callback( + lambda t: self._logger.debug("@{}: got right watermark".format(t))) + + erdos.add_watermark_callback([left_stream, right_stream], [], + self.on_watermark) + + @staticmethod + def connect(left_stream, right_stream): + return [] + + def on_left_stream(self, msg): + self._logger.debug("got left msg") + self._left_msgs.append(msg) + + def on_right_stream(self, msg): + self._logger.debug("got right msg") + self._right_msgs.append(msg) + + def on_watermark(self, t): + left_msgs, self._left_msgs = self._left_msgs, [] + right_msgs, self._right_msgs = self._right_msgs, [] + matches = self._matches_fn(left_msgs, right_msgs) + + if matches: + self._logger.debug("@{}: left matches right".format(t)) + else: + self._logger.warn( + "@{}: left does not match right\n\tleft: {}\n\tright: {}". + format(t, left_msgs, right_msgs)) diff --git a/scripts/compute_decay.py b/scripts/compute_decay.py index 88a39e4cb..42d4b3f83 100644 --- a/scripts/compute_decay.py +++ b/scripts/compute_decay.py @@ -4,6 +4,7 @@ import erdos import pylot.flags +import pylot.component_creator import pylot.operator_creator import pylot.utils from pylot.control.messages import ControlMessage @@ -60,9 +61,8 @@ def main(argv): ground_stop_signs_stream, vehicle_id_stream, open_drive_stream, - global_trajectory_stream, - ) = pylot.operator_creator.add_simulator_bridge(control_loop_stream, - release_sensor_stream) + ) = pylot.component_creator.add_simulator(control_loop_stream, + release_sensor_stream) # Add camera sensors. rgb_camera_setup = RGBCameraSetup('center_camera',