Skip to content

Commit

Permalink
ros2GH-21 Extend transport python module interface
Browse files Browse the repository at this point in the history
The python interface should accept all options that can be passed to rosbag2_transport
  • Loading branch information
greimela-si committed Sep 19, 2018
1 parent afee8fe commit d1a811c
Show file tree
Hide file tree
Showing 16 changed files with 202 additions and 121 deletions.
2 changes: 1 addition & 1 deletion ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ def add_arguments(self, parser, cli_name): # noqa: D102

def main(self, *, args): # noqa: D102
bag_file = args.bag_file
rosbag2_transport_py.play_bag(bag_file)
rosbag2_transport_py.play(uri=bag_file, storage_id='sqlite3')
7 changes: 5 additions & 2 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ def main(self, *, args): # noqa: D102
print('invalid choice: Can not specify topics and -a at the same time')
return

uri = 'test.bag'
storage_id = 'sqlite3'

if args.all:
rosbag2_transport_py.record_topics([])
rosbag2_transport_py.record(uri=uri, storage_id=storage_id, all=True)
elif args.topics and len(args.topics) > 0:
rosbag2_transport_py.record_topics(args.topics)
rosbag2_transport_py.record(uri=uri, storage_id=storage_id, topics=args.topics)
else:
self._subparser.print_help()
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_TRANSPORT__ROSBAG2_PLAY_OPTIONS_HPP_
#define ROSBAG2_TRANSPORT__ROSBAG2_PLAY_OPTIONS_HPP_
#ifndef ROSBAG2_TRANSPORT__PLAY_OPTIONS_HPP_
#define ROSBAG2_TRANSPORT__PLAY_OPTIONS_HPP_

#include <cstddef>

namespace rosbag2_transport
{
struct Rosbag2PlayOptions
struct PlayOptions
{
public:
size_t read_ahead_queue_size;
};

} // namespace rosbag2_transport

#endif // ROSBAG2_TRANSPORT__ROSBAG2_PLAY_OPTIONS_HPP_
#endif // ROSBAG2_TRANSPORT__PLAY_OPTIONS_HPP_
32 changes: 32 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/record_options.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2018, Bosch Software Innovations GmbH.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_TRANSPORT__RECORD_OPTIONS_HPP_
#define ROSBAG2_TRANSPORT__RECORD_OPTIONS_HPP_

#include <string>
#include <vector>

namespace rosbag2_transport
{
struct RecordOptions
{
public:
bool all;
std::vector<std::string> topics;
};

} // namespace rosbag2_transport

#endif // ROSBAG2_TRANSPORT__RECORD_OPTIONS_HPP_
30 changes: 10 additions & 20 deletions rosbag2_transport/include/rosbag2_transport/rosbag2_transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

#include "rosbag2/sequential_reader.hpp"
#include "rosbag2/writer.hpp"
#include "rosbag2_transport/rosbag2_play_options.hpp"
#include "rosbag2_transport/play_options.hpp"
#include "rosbag2_transport/record_options.hpp"
#include "rosbag2_transport/storage_options.hpp"
#include "rosbag2_transport/visibility_control.hpp"

namespace rosbag2_transport
Expand All @@ -37,9 +39,6 @@ class Player;
class Rosbag2Transport
{
public:
ROSBAG2_TRANSPORT_PUBLIC
Rosbag2Transport();

ROSBAG2_TRANSPORT_PUBLIC
void init();

Expand All @@ -50,37 +49,28 @@ class Rosbag2Transport
* Records topics to a bagfile. Subscription happens at startup time, hence the topics must
* exist when "record" is called.
*
* \param file_name Name of the bagfile to write
* \param topic_names Vector of topics to subscribe to. Topics must exist at startup time. If
* the vector is empty, all topics will be subscribed.
*/
ROSBAG2_TRANSPORT_PUBLIC
void record(const std::string & file_name, const std::vector<std::string> & topic_names);

/**
* Records all available topics to a bagfile. Subscription happens at startup time, hence only
* topics available at startup time are recorded.
*
* \param file_name Name of the bagfile to write
* \param storage_options Options regarding the storage (e.g. bag file name)
* \param record_options Options regarding the recording (e.g. the topics to record)
*/
ROSBAG2_TRANSPORT_PUBLIC
void record(const std::string & file_name);
void record(const StorageOptions & storage_options, const RecordOptions & record_options);

/**
* Replay all topics in a bagfile.
*
* \param file_name Name of the bagfile to replay
* \param storage_options Option regarding the storage (e.g. bag file name)
* \param play_options Options regarding the playback (e.g. queue size)
*/
ROSBAG2_TRANSPORT_PUBLIC
void play(const std::string & file_name, const Rosbag2PlayOptions & options);
void play(const StorageOptions & storage_options, const PlayOptions & play_options);

private:
std::shared_ptr<rosbag2_transport::GenericSubscription>
create_subscription(
rosbag2::Writer & writer,
std::shared_ptr<Rosbag2Node> & node,
const std::string & topic_name, const std::string & topic_type) const;

std::shared_ptr<Rosbag2Node> node_;
std::vector<std::shared_ptr<GenericSubscription>> subscriptions_;
};

Expand Down
31 changes: 31 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/storage_options.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2018, Bosch Software Innovations GmbH.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_TRANSPORT__STORAGE_OPTIONS_HPP_
#define ROSBAG2_TRANSPORT__STORAGE_OPTIONS_HPP_

#include <string>

namespace rosbag2_transport
{
struct StorageOptions
{
public:
std::string uri;
std::string storage_id;
};

} // namespace rosbag2_transport

#endif // ROSBAG2_TRANSPORT__STORAGE_OPTIONS_HPP_
10 changes: 5 additions & 5 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
namespace rosbag2_transport
{

Player::Player(std::shared_ptr<rosbag2::SequentialReader> reader)
: reader_(reader), node_(std::make_shared<Rosbag2Node>("rosbag2_node"))
Player::Player(std::unique_ptr<rosbag2::SequentialReader> reader)
: reader_(std::move(reader)), node_(std::make_shared<Rosbag2Node>("rosbag2_node"))
{}

bool Player::is_storage_completely_loaded() const
Expand All @@ -48,7 +48,7 @@ bool Player::is_storage_completely_loaded() const
return !storage_loading_future_.valid();
}

void Player::play(const Rosbag2PlayOptions & options)
void Player::play(const PlayOptions & options)
{
prepare_publishers();

Expand All @@ -60,7 +60,7 @@ void Player::play(const Rosbag2PlayOptions & options)
play_messages_from_queue();
}

void Player::wait_for_filled_queue(const Rosbag2PlayOptions & options) const
void Player::wait_for_filled_queue(const PlayOptions & options) const
{
while (
message_queue_.size_approx() < options.read_ahead_queue_size &&
Expand All @@ -70,7 +70,7 @@ void Player::wait_for_filled_queue(const Rosbag2PlayOptions & options) const
}
}

void Player::load_storage_content(const Rosbag2PlayOptions & options)
void Player::load_storage_content(const PlayOptions & options)
{
TimePoint time_first_message;

Expand Down
12 changes: 6 additions & 6 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "replayable_message.hpp"
#include "rosbag2/sequential_reader.hpp"
#include "rosbag2/types.hpp"
#include "rosbag2_transport/rosbag2_play_options.hpp"
#include "rosbag2_transport/play_options.hpp"

using TimePoint = std::chrono::time_point<std::chrono::high_resolution_clock>;

Expand All @@ -38,21 +38,21 @@ class Rosbag2Node;
class Player
{
public:
explicit Player(std::shared_ptr<rosbag2::SequentialReader> reader);
explicit Player(std::unique_ptr<rosbag2::SequentialReader> reader);

void play(const Rosbag2PlayOptions & options);
void play(const PlayOptions & options);

private:
void load_storage_content(const Rosbag2PlayOptions & options);
void load_storage_content(const PlayOptions & options);
bool is_storage_completely_loaded() const;
void enqueue_up_to_boundary(const TimePoint & time_first_message, uint64_t boundary);
void wait_for_filled_queue(const Rosbag2PlayOptions & options) const;
void wait_for_filled_queue(const PlayOptions & options) const;
void play_messages_from_queue();
void prepare_publishers();

static constexpr double read_ahead_lower_bound_percentage_ = 0.9;

std::shared_ptr<rosbag2::SequentialReader> reader_;
std::unique_ptr<rosbag2::SequentialReader> reader_;
moodycamel::ReaderWriterQueue<ReplayableMessage> message_queue_;
mutable std::future<void> storage_loading_future_;
std::shared_ptr<Rosbag2Node> node_;
Expand Down
55 changes: 21 additions & 34 deletions rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>
#include <queue>
#include <string>
#include <utility>
#include <vector>

#include "rcl/graph.h"
Expand All @@ -28,21 +29,17 @@
#include "rosbag2_transport/logging.hpp"
#include "rosbag2/sequential_reader.hpp"
#include "rosbag2/types.hpp"
#include "rosbag2/typesupport_helpers.hpp"
#include "rosbag2/writer.hpp"

#include "generic_subscription.hpp"
#include "rosbag2_node.hpp"
#include "player.hpp"
#include "replayable_message.hpp"
#include "rosbag2/typesupport_helpers.hpp"
#include "rosbag2_node.hpp"

namespace rosbag2_transport
{

Rosbag2Transport::Rosbag2Transport()
: node_(std::make_shared<Rosbag2Node>("rosbag2"))
{}

void Rosbag2Transport::init()
{
rclcpp::init(0, nullptr);
Expand All @@ -54,11 +51,16 @@ void Rosbag2Transport::shutdown()
}

void Rosbag2Transport::record(
const std::string & file_name, const std::vector<std::string> & topic_names)
const StorageOptions & storage_options, const RecordOptions & record_options)
{
rosbag2::Writer writer(file_name, "sqlite3");
rosbag2::Writer writer(storage_options.uri, storage_options.storage_id);

auto node = std::make_shared<Rosbag2Node>("rosbag2");

auto topics_and_types = record_options.all ?
node->get_all_topics_with_types() :
node->get_topics_with_types(record_options.topics);

auto topics_and_types = node_->get_topics_with_types(topic_names);
if (topics_and_types.empty()) {
throw std::runtime_error("No topics found. Abort");
}
Expand All @@ -68,7 +70,7 @@ void Rosbag2Transport::record(
auto topic_type = topic_and_type.second;

std::shared_ptr<GenericSubscription> subscription = create_subscription(
writer, topic_name, topic_type);
writer, node, topic_name, topic_type);

if (subscription) {
subscriptions_.push_back(subscription);
Expand All @@ -83,35 +85,18 @@ void Rosbag2Transport::record(

ROSBAG2_TRANSPORT_LOG_INFO("Waiting for messages...");
while (rclcpp::ok()) {
rclcpp::spin(node_);
rclcpp::spin(node);
}
subscriptions_.clear();
}


void Rosbag2Transport::record(const std::string & file_name)
{
auto topics_and_types = node_->get_all_topics_with_types();
std::vector<std::string> topic_names;
topic_names.reserve(topics_and_types.size());
for (const auto & topic_and_type : topics_and_types) {
topic_names.push_back(topic_and_type.first);
}

if (topic_names.empty()) {
ROSBAG2_TRANSPORT_LOG_ERROR("no topics found to record");
return;
}

record(file_name, topic_names);
}

std::shared_ptr<GenericSubscription>
Rosbag2Transport::create_subscription(
rosbag2::Writer & writer,
std::shared_ptr<Rosbag2Node> & node,
const std::string & topic_name, const std::string & topic_type) const
{
auto subscription = node_->create_generic_subscription(
auto subscription = node->create_generic_subscription(
topic_name,
topic_type,
[&writer, topic_name](std::shared_ptr<rmw_serialized_message_t> message) {
Expand All @@ -131,12 +116,14 @@ Rosbag2Transport::create_subscription(
return subscription;
}

void Rosbag2Transport::play(const std::string & file_name, const Rosbag2PlayOptions & options)
void Rosbag2Transport::play(
const StorageOptions & storage_options, const PlayOptions & play_options)
{
auto reader = std::make_shared<rosbag2::SequentialReader>(file_name, "sqlite3");
auto reader = std::make_unique<rosbag2::SequentialReader>(
storage_options.uri, storage_options.storage_id);

Player player(reader);
player.play(options);
Player player(std::move(reader));
player.play(play_options);
}

} // namespace rosbag2_transport
Loading

0 comments on commit d1a811c

Please sign in to comment.