Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add correct timing behaviour for rosbag play #32

Merged
merged 27 commits into from
Sep 20, 2018

Conversation

Martin-Idel-SI
Copy link
Contributor

This PR adds correct timing behaviour for rosbag play

  • Uses a shared queue (introduces vendor package for shared queues - header only library!)
  • Use one thread to get messages from storage
  • Use one thread to publish according to timestamp in storage
  • Use a simple heuristic to make sure that the storage will not get starved and doesn't write too many messages to memory

In addition, a few technical aspects have been improved

  • Only depend on test_msgs (not std_msgs) in rosbag2 tests
  • Fix pluginlib classloader lifetime for rosbag play

@Martin-Idel-SI Martin-Idel-SI force-pushed the feature/replay_timing branch 2 times, most recently from 8e8af82 to 6eea477 Compare September 14, 2018 07:06
@anhosi
Copy link
Contributor

anhosi commented Sep 17, 2018

CI:

  • Linux Build Status
  • Linux-aarch64 Build Status
  • macOS Build Status
  • Windows Build Status

@anhosi
Copy link
Contributor

anhosi commented Sep 17, 2018

new CI after fixing test:

  • Linux Build Status
  • Linux-aarch64 Build Status
  • macOS Build Status
  • Windows Build Status

#include "rosbag2/rosbag2.hpp"

int main(int argc, const char ** argv)
{
rclcpp::init(argc, argv);

auto options = rosbag2::Rosbag2PlayOptions();
options.queue_buffer_length_ = 1000;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly does this queue_buffer_length describe? Is it the amount of serialized messages to load?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is the queue size up to which the storage is preloaded for smooth replaying.

rosbag2/include/rosbag2/rosbag2_play_options.hpp Outdated Show resolved Hide resolved

bool is_pending(const std::future<void> & future)
{
return !(future.valid() && future.wait_for(std::chrono::seconds(0)) == std::future_status::ready);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does wait_for(std::chrono::seconds(0)) do? Wait forever or return immediately?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will return immediately.


namespace rosbag2
{
struct Rosbag2PlayOptions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The options should also encompass the qos options in order to guarantee correct replay behavior.

Copy link
Contributor

@anhosi anhosi Sep 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, however I would prefer to do this later with a followup pr (#34) as we do not yet have the CLI integration.

message_queue_.enqueue(message);
}

auto queue_lower_boundary = options.queue_buffer_length_ - options.queue_buffer_length_ / 10;
Copy link
Collaborator

@Karsten1987 Karsten1987 Sep 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few comments on this:

  • Where does that magic number (90%) come from?
  • The result of this should be casted explicitely to a type. I believe you can lose decimal precision here.
  • Should this number be part of the Rosbag2PlayOptions struct?

Copy link
Contributor

@anhosi anhosi Sep 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • About being part of Rosbag2PlayOptions: I am not sure. It would be an expert user option only and we definitely would need to provide a useful default, e.g. 90%.
  • There is actually no cast necessary as / is integer division in C++ which is what we want here.
  • This is also a candidate for Allow to specify more parameters for rosbag play #34.

namespace rosbag2
{

struct ReplayableMessage
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would prefer that the SerializedBagMessage struct gets enhanced by the time_to_start field (maybe in another name), but I don't see the point of cascading these types, given that the serialized bag message was introduced for this purpose.
Because I believe this time point field can also be used otherwise besides publishing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree on that one. The SerializedBagMessage represents a message with meta data that can be stored in a bag file.
The ReplayableMessage represents a SerializedBagMessage enriched with play-specific meta data that is needed for playing. These are different things and thus should have different types.

rosbag2/src/rosbag2/rosbag2.cpp Outdated Show resolved Hide resolved
rosbag2/test/rosbag2/rosbag2_rosbag_node_test.cpp Outdated Show resolved Hide resolved
rosbag2/test/rosbag2/rosbag2_test_fixture.hpp Outdated Show resolved Hide resolved
rosbag2/test/rosbag2/rosbag2_test_fixture.hpp Outdated Show resolved Hide resolved
@Karsten1987
Copy link
Collaborator

Karsten1987 commented Sep 17, 2018

The tests seem to be (at least) flaky on my OSX.
I've run the gmock suite 10 times and rosbag2_record_all_integration_test constantly fails on my machine.

Also, I guess it'd be good to cleanup the tmp_tes_dir.* folder after a build run. This might flood your build folder.

The demo applications are currently not installed. So you can't run them with the ros2 run command.

When running the demo application, I have three files in my workspace

  • test.bag
  • test.bag-shm
  • test.bag-wal
    What are these files?

The print of [INFO] [rosbag2_storage]: Stored message is flooding the console pretty heavily

for the record function: I think that an empty topic_names vector should lead to an exception. I believe the get_all_topics...() should be called outside of this function. This makes the API clearer, IMHO. Or the function can have an overload:
record(const std::string & file_name);
record(const std::string & file_name, const std::vector<std::string> topic_names);
whereas the first one fetches all available topics from the network and then calls the second function.
https://github.com/ros2/rosbag2/blob/master/rosbag2/include/rosbag2/rosbag2.hpp#L52
https://github.com/ros2/rosbag2/blob/master/rosbag2/src/rosbag2/rosbag2.cpp#L55-L57

For the generic subscription:
I think it makes sense to subscribe with info at this place to have meta data available at this time:
https://github.com/ros2/rmw/blob/master/rmw/include/rmw/types.h#L226-L231
I could see that this meta data is enhanced by a received time stamp in the future.

Overall:
The demo works and from a first test seems to perform correctly. Nice work! 🥇

rosbag2/include/rosbag2/rosbag2.hpp Outdated Show resolved Hide resolved
message_queue_.size_approx() < options.queue_buffer_length_ &&
is_pending(storage_loading_future))
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see that this parameter should be exposed or configurable. Could we pack this into the Rosbag2PlayOptions struct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely possible. I am not yet sure how useful this parameter will be. I suggest to discuss this here (#34).

@anhosi
Copy link
Contributor

anhosi commented Sep 18, 2018

  • We also see this flaky integration test. I created a issue (rosbag2_record_all_integration_test is flaky #35) because we are not sure whether this is a rosbag2 problem.
  • We noticed the broken test cleanup and are addressing it on the rosbag2_transport branch.
  • The test.bag-shm and test.bag-wal are internal files of sqlite3.
  • Not installing the demo programs is intentional and they will be removed with the rosbag2_transport branch as then the CLI will be working.
  • The "Stored message" logging has been removed.
  • I decided to use a Rosbag2::record overload for the record all usage.

@anhosi
Copy link
Contributor

anhosi commented Sep 18, 2018

New CI after working on review comments:

  • Linux Build Status
  • Linux-aarch64 Build Status
  • macOS Build Status
  • Windows Build Status

{
while (
message_queue_.size_approx() < options.queue_buffer_length_ &&
is_pending(storage_loading_future))
message_queue_.size_approx() < options.read_ahead_queue_size &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this queue have to be initialized in some way? Or is it guaranteed that size_approx() returns 0 by default initialization?

while (message_queue_.size_approx() != 0 || is_pending(storage_loading_future)) {
ReplayableMessage message;
if (message_queue_.try_dequeue(message)) {
std::this_thread::sleep_until(start_time + message.time_since_start);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my point is that start_time + message.time_since_start could be smaller than now, right?
But from what I read, the sleep_until command doesn't block on a time point in the past. So this should be alright.

void Rosbag2::record(const std::string & file_name)
{
auto topics_and_types = node_->get_all_topics_with_types();
std::vector<std::string> topic_names(topics_and_types.size());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this function throw when no topics are available?

node->get_all_topics_with_types() :
node->get_topics_with_types(topic_names);

auto topics_and_types = node_->get_topics_with_types(topic_names);
if (topics_and_types.empty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this check should go into the other record function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we do not have topic discovery after startup we should also report if we could not detect the topics that were explicitely specified. So I think is is an error for both record overloads and the location is fine.

Copy link
Collaborator

@Karsten1987 Karsten1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last commit fixes the record all test for me:

3255d5c

anhosi and others added 18 commits September 19, 2018 09:34
For now the publishing starts only after the reading is completly
done. This should change aufter ros2GH-68 is done and a thread-safe
queue can be used instead of std::queue.
- The main (play) thread sleeps until the time for publishing the
  message is reached.
- Using std::chrono time_point and duration for type-safe time
  arithmetic instead of rcutils time types.
- Subscribers need to maintain a longer history if the messages are
  not consumed fast enough.
The Classloader instance needs to outlive all objects created by it.
Reason: record and play have almost no common code but do the exact
opposite with the storage and rclcpp.
- only required in tests
- this decreases the amount of packages needed for a clean build without tests
- Download and install headers from moodycamel readerwriterqueue
- Download and install headers from moodycamel concurrentqueue
- Use readerwriterqueue in code to load and publish concurrently
- Only load if queue contains less than 1000 messages
- Wait a millisecond before loading again once the queue is long enough
- gets rid of string_msgs dependency
Martin-Idel-SI and others added 3 commits September 19, 2018 09:34
- Currently, we write sequentially in order of arrival time so
  reading in id order is fine
- This may change at a later time and should not change the reading
  behaviour, i.e. we need to read in order of timestamps
@anhosi
Copy link
Contributor

anhosi commented Sep 19, 2018

New CI:

  • Linux Build Status
  • Linux-aarch64 Build Status
  • macOS Build Status
  • Windows Build Status

anhosi and others added 5 commits September 19, 2018 11:14
Make the future a class member of player to avoid having to hand it
into several functions which is difficult with a move-only type.
Add an explicit overload for record without a topic_names argument to
record all topics.
@Karsten1987
Copy link
Collaborator

Karsten1987 commented Sep 20, 2018

new round of CI:

Build Status
Build Status
Build Status
Build Status

@Karsten1987
Copy link
Collaborator

I'll go ahead and merge this. The flaky test is already ticketed in #37

@Karsten1987 Karsten1987 merged commit bc822f1 into ros2:master Sep 20, 2018
@Martin-Idel-SI Martin-Idel-SI deleted the feature/replay_timing branch September 20, 2018 06:30
james-rms pushed a commit to james-rms/rosbag2 that referenced this pull request Nov 17, 2022
Signed-off-by: Jacob Bandes-Storch <jacob@foxglove.dev>
james-rms pushed a commit to james-rms/rosbag2 that referenced this pull request Nov 17, 2022
Signed-off-by: Jacob Bandes-Storch <jacob@foxglove.dev>
Signed-off-by: James Smith <james@foxglove.dev>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants