Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROSBAG: add snapshot subcommand #1414

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions test/test_rosbag/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ if(CATKIN_ENABLE_TESTING)
add_rostest(${PROJECT_BINARY_DIR}/test/latched_sub.test)
add_rostest(test/record_two_publishers.test)
add_rostest(test/record_one_publisher_two_topics.test)
add_rostest(test/snapshot.test)

include_directories(${GTEST_INCLUDE_DIRS})
add_executable(double_pub EXCLUDE_FROM_ALL test/double_pub.cpp)
Expand Down
20 changes: 20 additions & 0 deletions test/test_rosbag/test/snapshot.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<launch>
<node name="snapshot" pkg="rosbag" type="snapshot" args="">
<rosparam>
default_duration_limit: 1 # Maximum time difference between newest and oldest message, seconds, overrides -d flag
default_memory_limit: 0.1 # Maximum memory used by messages in each topic's buffer, MB, override -s flag
topics:
- test1 # Inherit defaults
- test2: # Override duration limit, inherit memory limit
duration: 2
- test3: # Override both limits
duration: -1 # Negative means no duration limit
memory: 0.00
</rosparam>
</node>
<!-- A bunch of fixed frequency publishers, each 64bits to make size calculations easy -->
<node name="test1pub" pkg="rostopic" type="rostopic" args="pub /test1 std_msgs/Time '{data:{ secs: 0, nsecs: 0}}' -r2"/>
<node name="test2pub" pkg="rostopic" type="rostopic" args="pub /test2 std_msgs/Int64 'data: 1337' -r12"/>
<node name="test3pub" pkg="rostopic" type="rostopic" args="pub /test3 std_msgs/Float64 'data: 42.00' -r25" />
<test test-name="snapshot_test" pkg="test_rosbag" type="test_snapshot.py"/>
</launch>
250 changes: 250 additions & 0 deletions test/test_rosbag/test/test_snapshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2018, Open Source Robotics Foundation, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Open Source Robotics Foundation, Inc. nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import os
import sys
import tempfile
import unittest
import rospy
from rosbag import Bag
from std_msgs.msg import String
from std_srvs.srv import SetBool
from rosbag_msgs.msg import SnapshotStatus
from rosbag_msgs.srv import TriggerSnapshot, TriggerSnapshotRequest, TriggerSnapshotResponse


class TestRosbagSnapshot(unittest.TestCase):
'''
Tests the "rosbag snapshot" command.
Relies on the nodes launched in snapshot.test
'''
def __init__(self, *args):
self.params = rospy.get_param("snapshot")
self._parse_params(self.params)
self.last_status = None
self.status_sub = rospy.Subscriber("snapshot_status", SnapshotStatus, self._status_cb, queue_size=5)
self.trigger = rospy.ServiceProxy("trigger_snapshot", TriggerSnapshot)
self.enable = rospy.ServiceProxy("enable_snapshot", SetBool)
super(TestRosbagSnapshot, self).__init__(*args)

def _parse_params(self, params):
'''
Parse launch parameters of snapshot to cache the topic limits in a map
so it is easier to check compliance in later tests.
'''
self.topic_limits = {}
self.default_duration_limit = params['default_duration_limit']
self.default_memory_limit = params['default_memory_limit']
for topic_obj in self.params['topics']:
duration = self.default_duration_limit
memory = self.default_memory_limit
if type(topic_obj) == dict:
topic = topic_obj.keys()[0]
duration = topic_obj[topic].get('duration', duration)
memory = topic_obj[topic].get('memory', memory)
else:
topic = topic_obj
topic = rospy.resolve_name(topic)
duration = rospy.Duration(duration)
memory = 1E6 * memory
self.topic_limits[topic] = (duration, memory)

def _status_cb(self, msg):
self.last_status = msg

def _assert_no_data(self, topics=[]):
'''
Asserts that calling TriggerWrite service with
specifed parameters responds non-success and did not create
a bag file.
'''
filename = tempfile.mktemp()
res = self.trigger(filename=filename, topics=topics)
self.assertFalse(res.success)
self.assertEqual(res.message, TriggerSnapshotResponse.NO_DATA)
self.assertFalse(os.path.isfile(filename))

def _assert_record_success(self, data):
'''
Assert that the recording SetBool service responds with success
'''
res = self.enable(data)
self.assertTrue(res.success)
self.assertEqual(res.message, "")

def _pause(self):
self._assert_record_success(False)

def _resume(self):
self._assert_record_success(True)

def _assert_write_success(self, topics=[], prefix_mode=False, **kwargs):
'''
Asserts that the TriggerWrite services succeeds for the specified request arguments
and that the specified bag file is actually created
@param prefix_mode: If True, don't put .bag at the end of reqest to check prefix filename mode
'''
if prefix_mode:
d = tempfile.mkdtemp()
filename = tempfile.mktemp(dir=d)
else:
filename = tempfile.mktemp(suffix='.bag')
req = TriggerSnapshotRequest(filename=filename, topics=topics, **kwargs)
res = self.trigger(req)
self.assertTrue(res.success, msg="snapshot should have succeeded. message: {}".format(res.message))
self.assertTrue(res.message == "")
if prefix_mode:
dircontents = os.listdir(d)
self.assertEqual(len(dircontents), 1)
filename = os.path.join(d, dircontents[0])
self.assertEqual(filename[-4:], '.bag')
self.assertTrue(os.path.isfile(filename))
return filename

def _assert_limits_enforced(self, test_topic, duration, memory):
'''
Asserts that the measured duration and memory for a topic comply with the launch parameters
@param topic: string
@param duration: rospy.Duration, age of buffer
@param memory: integer, bytes of memory used
'''
test_topic = rospy.resolve_name(test_topic)
self.assertIn(test_topic, self.topic_limits)
limits = self.topic_limits[test_topic]
if limits[0] > rospy.Duration():
self.assertLessEqual(duration, limits[0])
if limits[1] > 0:
self.assertLessEqual(memory, limits[1])

def _assert_status_valid(self):
'''
Check that a status message contains info on all subscribed topics
and reports that their buffer complies with the configured limits.
'''
self.assertIsNotNone(self.last_status) # A message was recieved
topics = [msg.topic for msg in self.last_status.topics]
# Oneliners :)
status_topics = [rospy.resolve_name(topic.keys()[0] if type(topic) == dict else topic)
for topic in self.params['topics']]
self.assertEquals(set(topics), set(status_topics)) # Topics from params are same as topics in status message
for topic in self.last_status.topics:
duration = topic.window_stop - topic.window_start
memory = topic.traffic
self._assert_limits_enforced(topic.topic, duration, memory)

def _assert_bag_valid(self, filename, topics=None, start_time=None, stop_time=None):
'''
Open the bagfile at the specified filename and read it to ensure topic limits were
enforced and the optional topic list and start/stop times are also enforced.
'''
bag = Bag(filename)
topics_dict = bag.get_type_and_topic_info()[1]
bag_topics = set(topics_dict.keys())
param_topics = set(self.topic_limits.keys())
if topics:
self.assertEqual(bag_topics, set(topics))
self.assertTrue(bag_topics.issubset(param_topics))
for topic in topics_dict:
size = topics_dict[topic].message_count * 8 # Calculate stored message size as each message is 8 bytes
gen = bag.read_messages(topics=topic)
_, _, first_time = gen.next()
if start_time:
self.assertGreaterEqual(first_time, start_time)
for _, _, last_time in gen: # Read through all messages so last_time is valid
pass
if stop_time:
self.assertLessEqual(last_time, stop_time)
duration = last_time - first_time
self._assert_limits_enforced(topic, duration, size)

def test_1_service_connects(self):
'''
Check that both services provided by snapshot exist.
'''
self.trigger.wait_for_service()
self.enable.wait_for_service()

def test_write_all(self):
'''
Wait long enough for memory & duration limits to need to be used
'''
rospy.sleep(3.0) # Give some time to fill buffers to maximums
self._assert_status_valid()
filename = self._assert_write_success(prefix_mode=True)
self._assert_bag_valid(filename)

def test_write_advanced(self):
'''
Test the more advanced features: pausing and resuming, specific write times, and specific topic list.
'''
# Pause, resume, and pause again so buffer should only contain data from a known time internal
self._pause()
rospy.sleep(1.5)
start = rospy.Time.now()
self._resume()
rospy.sleep(3.0)
self._pause()
stop = rospy.Time.now()
rospy.sleep(1.0)

# Write all buffer data, check that only data from resumed interval is present
filename = self._assert_write_success()
self._assert_bag_valid(filename, start_time=start, stop_time=stop)

# With recording still paused (same buffer as last write), write only a shorter time range
cropped_start = start + rospy.Duration(0.5)
cropped_stop = stop - rospy.Duration(1.0)
filename = self._assert_write_success(start_time=cropped_start, stop_time=cropped_stop)
self._assert_bag_valid(filename, start_time=cropped_start, stop_time=cropped_stop)

# Write only specific topics and ensure that only those are present in bag file
specific_topics = [rospy.resolve_name(topic) for topic in ['/test2', 'test1']]
filename = self._assert_write_success(topics=specific_topics)
self._assert_bag_valid(filename, topics=specific_topics)

# Resume recording for other tests
self._resume()

def test_invalid_topics(self):
'''
Test that an invalid topic or one not subscribed to fails
'''
self._assert_no_data(['_invalid_graph_name'])
self._assert_no_data(['/test4'])


if __name__ == '__main__':
import rostest
PKG = 'rosbag'
rospy.init_node('test_rosbag_snapshot', anonymous=True)
rostest.run(PKG, 'TestRosbagSnapshot', TestRosbagSnapshot, sys.argv)
12 changes: 9 additions & 3 deletions tools/rosbag/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ if(NOT WIN32)
set_directory_properties(PROPERTIES COMPILE_OPTIONS "-Wall;-Wextra")
endif()

find_package(catkin REQUIRED COMPONENTS rosbag_storage rosconsole roscpp std_srvs topic_tools xmlrpcpp)
find_package(catkin REQUIRED COMPONENTS rosbag_storage rosconsole roscpp std_srvs topic_tools xmlrpcpp rosbag_msgs)
find_package(Boost REQUIRED COMPONENTS date_time regex program_options filesystem)
find_package(BZip2 REQUIRED)

Expand All @@ -21,20 +21,26 @@ include_directories(include ${catkin_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS}
catkin_package(
LIBRARIES rosbag
INCLUDE_DIRS include
CATKIN_DEPENDS rosbag_storage rosconsole roscpp std_srvs topic_tools xmlrpcpp)
CATKIN_DEPENDS rosbag_storage rosconsole roscpp std_srvs topic_tools xmlrpcpp rosbag_msgs)

add_library(rosbag
src/player.cpp
src/recorder.cpp
src/snapshotter.cpp
src/time_translator.cpp)

add_dependencies(rosbag ${catkin_EXPORTED_TARGETS})

target_link_libraries(rosbag ${catkin_LIBRARIES} ${Boost_LIBRARIES}
${BZIP2_LIBRARIES}
)

add_executable(record src/record.cpp)
target_link_libraries(record rosbag)

add_executable(snapshot src/snapshot.cpp)
target_link_libraries(snapshot rosbag)

add_executable(play src/play.cpp)
target_link_libraries(play rosbag)

Expand All @@ -50,7 +56,7 @@ install(TARGETS rosbag
ARCHIVE DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION}
LIBRARY DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION}
RUNTIME DESTINATION ${CATKIN_GLOBAL_BIN_DESTINATION})
install(TARGETS record play
install(TARGETS record play snapshot
ARCHIVE DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION}
LIBRARY DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION}
RUNTIME DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION})
Expand Down
Loading