Skip to content

Commit

Permalink
User configuration for SHM metatraffic (#3753)
Browse files Browse the repository at this point in the history
* Refs #18966. Forcing UDP for metatraffic.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19255. Allowing metatraffic depends on flags.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19255. NetworkFactory constructor receives RTPSParticipantAttributes.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19255. Parsing property from participant attributes.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19255. Remove unused mp_ResourceSemaphore.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19263. Fixed NetworkFactoryTests.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19263. Fixed TCPv6Tests.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19263. Fixed link errors on unit tests.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19263. Apply suggestions from code review.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19263. Additional suggestions from code review.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19263. Added possitive test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19263. Added negative test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19263. Configuration for avoid_builtin_multicast on PubSubWriter.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19263. Configuration for avoid_builtin_multicast on PubSubReader.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19263. Configuration for max_multicast_locators_number on PubSubReader/PubSubWriter.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19263. Enable multicast discovery on new test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19263. Apply suggestions.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19263. Add feature to versions.md.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19263. Apply suggestion.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

---------

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
  • Loading branch information
MiguelCompany committed Aug 2, 2023
1 parent 4250cfa commit 25631e6
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 34 deletions.
52 changes: 48 additions & 4 deletions src/cpp/rtps/network/NetworkFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,36 @@ namespace rtps {

using SendResourceList = fastdds::rtps::SendResourceList;

NetworkFactory::NetworkFactory()
NetworkFactory::NetworkFactory(
const RTPSParticipantAttributes& PParam)
: maxMessageSizeBetweenTransports_(std::numeric_limits<uint32_t>::max())
, minSendBufferSize_(std::numeric_limits<uint32_t>::max())
{
const std::string* enforce_metatraffic = nullptr;
enforce_metatraffic = PropertyPolicyHelper::find_property(PParam.properties, "fastdds.shm.enforce_metatraffic");
if (enforce_metatraffic)
{
if (*enforce_metatraffic == "unicast")
{
enforce_shm_unicast_metatraffic_ = true;
enforce_shm_multicast_metatraffic_ = false;
}
else if (*enforce_metatraffic == "all")
{
enforce_shm_unicast_metatraffic_ = true;
enforce_shm_multicast_metatraffic_ = true;
}
else if (*enforce_metatraffic == "none")
{
enforce_shm_unicast_metatraffic_ = false;
enforce_shm_multicast_metatraffic_ = false;
}
else
{
EPROSIMA_LOG_WARNING(RTPS_NETWORK, "Unrecognized value '" << *enforce_metatraffic << "'" <<
" for 'fastdds.shm.enforce_metatraffic'. Using default value: 'none'");
}
}
}

bool NetworkFactory::build_send_resources(
Expand Down Expand Up @@ -246,9 +272,9 @@ bool NetworkFactory::getDefaultMetatrafficMulticastLocators(

for (auto& transport : mRegisteredTransports)
{
// For better fault-tolerance reasons, SHM multicast metatraffic is avoided if it is already provided
// For better fault-tolerance reasons, SHM metatraffic is avoided if it is already provided
// by another transport
if (transport->kind() != LOCATOR_KIND_SHM)
if (enforce_shm_multicast_metatraffic_ || transport->kind() != LOCATOR_KIND_SHM)
{
result |= transport->getDefaultMetatrafficMulticastLocators(locators, metatraffic_multicast_port);
}
Expand Down Expand Up @@ -286,10 +312,28 @@ bool NetworkFactory::getDefaultMetatrafficUnicastLocators(
uint32_t metatraffic_unicast_port) const
{
bool result = false;

TransportInterface* shm_transport = nullptr;

for (auto& transport : mRegisteredTransports)
{
result |= transport->getDefaultMetatrafficUnicastLocators(locators, metatraffic_unicast_port);
// For better fault-tolerance reasons, SHM metatraffic is avoided if it is already provided
// by another transport
if (enforce_shm_unicast_metatraffic_ || transport->kind() != LOCATOR_KIND_SHM)
{
result |= transport->getDefaultMetatrafficUnicastLocators(locators, metatraffic_unicast_port);
}
else
{
shm_transport = transport.get();
}
}

if (locators.size() == 0 && shm_transport)
{
result |= shm_transport->getDefaultMetatrafficUnicastLocators(locators, metatraffic_unicast_port);
}

return result;
}

Expand Down
9 changes: 8 additions & 1 deletion src/cpp/rtps/network/NetworkFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class NetworkFactory
{
public:

NetworkFactory();
NetworkFactory(
const RTPSParticipantAttributes& PParam);

/**
* Allow registration of a transport statically, by specifying the transport type and
Expand Down Expand Up @@ -224,6 +225,12 @@ class NetworkFactory

uint32_t minSendBufferSize_;

// Whether unicast metatraffic on SHM transport should always be used
bool enforce_shm_unicast_metatraffic_ = false;

// Whether multicast metatraffic on SHM transport should always be used
bool enforce_shm_multicast_metatraffic_ = false;

/**
* Calculate well-known ports.
*/
Expand Down
19 changes: 1 addition & 18 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ RTPSParticipantImpl::RTPSParticipantImpl(
, m_att(PParam)
, m_guid(guidP, c_EntityId_RTPSParticipant)
, mp_builtinProtocols(nullptr)
, mp_ResourceSemaphore(new Semaphore(0))
, IdCounter(0)
, m_network_Factory(PParam)
, type_check_fn_(nullptr)
, client_override_(false)
, internal_metatraffic_locators_(false)
Expand Down Expand Up @@ -512,7 +512,6 @@ RTPSParticipantImpl::~RTPSParticipantImpl()
}
m_receiverResourcelist.clear();

delete mp_ResourceSemaphore;
delete mp_userParticipant;
mp_userParticipant = nullptr;
send_resource_list_.clear();
Expand Down Expand Up @@ -2034,22 +2033,6 @@ bool RTPSParticipantImpl::newRemoteEndpointDiscovered(
return false;
}

void RTPSParticipantImpl::ResourceSemaphorePost()
{
if (mp_ResourceSemaphore != nullptr)
{
mp_ResourceSemaphore->post();
}
}

void RTPSParticipantImpl::ResourceSemaphoreWait()
{
if (mp_ResourceSemaphore != nullptr)
{
mp_ResourceSemaphore->wait();
}
}

void RTPSParticipantImpl::assert_remote_participant_liveliness(
const GuidPrefix_t& remote_guid)
{
Expand Down
8 changes: 0 additions & 8 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,6 @@ class RTPSParticipantImpl
return (uint32_t)m_att.participantID;
}

//!Post to the resource semaphore
void ResourceSemaphorePost();

//!Wait for the resource semaphore
void ResourceSemaphoreWait();

//!Get Pointer to the Event Resource.
ResourceEvent& getEventResource()
{
Expand Down Expand Up @@ -529,8 +523,6 @@ class RTPSParticipantImpl
ResourceEvent mp_event_thr;
//! BuiltinProtocols of this RTPSParticipant
BuiltinProtocols* mp_builtinProtocols;
//!Semaphore to wait for the listen thread creation.
Semaphore* mp_ResourceSemaphore;
//!Id counter to correctly assign the ids to writers and readers.
std::atomic<uint32_t> IdCounter;
//! Mutex to safely access endpoints collections
Expand Down
14 changes: 14 additions & 0 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,13 @@ class PubSubReader
return *this;
}

PubSubReader& avoid_builtin_multicast(
bool value)
{
participant_qos_.wire_protocol().builtin.avoid_builtin_multicast = value;
return *this;
}

PubSubReader& property_policy(
const eprosima::fastrtps::rtps::PropertyPolicy& property_policy)
{
Expand Down Expand Up @@ -1319,6 +1326,13 @@ class PubSubReader
return *this;
}

PubSubReader& max_multicast_locators_number(
size_t max_multicast_locators)
{
participant_qos_.allocation().locators.max_multicast_locators = max_multicast_locators;
return *this;
}

PubSubReader& lease_duration(
eprosima::fastrtps::Duration_t lease_duration,
eprosima::fastrtps::Duration_t announce_period)
Expand Down
14 changes: 14 additions & 0 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,13 @@ class PubSubWriter
return *this;
}

PubSubWriter& avoid_builtin_multicast(
bool value)
{
participant_qos_.wire_protocol().builtin.avoid_builtin_multicast = value;
return *this;
}

PubSubWriter& property_policy(
const eprosima::fastrtps::rtps::PropertyPolicy& property_policy)
{
Expand Down Expand Up @@ -1312,6 +1319,13 @@ class PubSubWriter
return *this;
}

PubSubWriter& max_multicast_locators_number(
size_t max_multicast_locators)
{
participant_qos_.allocation().locators.max_multicast_locators = max_multicast_locators;
return *this;
}

PubSubWriter& lease_duration(
eprosima::fastrtps::Duration_t lease_duration,
eprosima::fastrtps::Duration_t announce_period)
Expand Down
7 changes: 7 additions & 0 deletions test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,13 @@ class PubSubReader
return *this;
}

PubSubReader& avoid_builtin_multicast(
bool value)
{
participant_attr_.rtps.builtin.avoid_builtin_multicast = value;
return *this;
}

PubSubReader& property_policy(
const eprosima::fastrtps::rtps::PropertyPolicy property_policy)
{
Expand Down
7 changes: 7 additions & 0 deletions test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,13 @@ class PubSubWriter
return *this;
}

PubSubWriter& avoid_builtin_multicast(
bool value)
{
participant_attr_.rtps.builtin.avoid_builtin_multicast = value;
return *this;
}

PubSubWriter& property_policy(
const eprosima::fastrtps::rtps::PropertyPolicy& property_policy)
{
Expand Down
84 changes: 84 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsTransportSHMUDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.

#include "BlackboxTests.hpp"
#include "mock/BlackboxMockConsumer.h"

#include <algorithm>
#include <chrono>
#include <cstdint>
#include <memory>
Expand Down Expand Up @@ -178,6 +180,88 @@ TEST_P(SHMUDP, Transport_Reliable_Reliable_test)
run_parametrized_test(true, true);
}

static bool has_shm_locators(
const ResourceLimitedVector<Locator_t>& locators)
{
auto loc_is_shm = [](const Locator_t& loc)
{
return LOCATOR_KIND_SHM == loc.kind;
};
return std::any_of(locators.cbegin(), locators.cend(), loc_is_shm);
}

static void check_shm_locators(
const eprosima::fastrtps::rtps::ParticipantDiscoveryInfo& info,
bool unicast,
bool multicast)
{
EXPECT_EQ(multicast, has_shm_locators(info.info.metatraffic_locators.multicast));
EXPECT_EQ(unicast, has_shm_locators(info.info.metatraffic_locators.unicast));
}

static void shm_metatraffic_test(
const std::string& topic_name,
const char* const value,
bool unicast,
bool multicast)
{
PubSubWriter<HelloWorldPubSubType> writer(topic_name + "/" + value);
PubSubReader<HelloWorldPubSubType> reader(topic_name + "/" + value);

auto discovery_checker = [unicast, multicast](const eprosima::fastrtps::rtps::ParticipantDiscoveryInfo& info)
{
check_shm_locators(info, unicast, multicast);
return true;
};
reader.setOnDiscoveryFunction(discovery_checker);
reader.max_multicast_locators_number(2);
reader.init();
ASSERT_TRUE(reader.isInitialized());

PropertyPolicy properties;
Property p;
p.name("fastdds.shm.enforce_metatraffic");
p.value(value);
properties.properties().push_back(p);
writer.property_policy(properties).avoid_builtin_multicast(false).max_multicast_locators_number(2);
writer.init();
ASSERT_TRUE(writer.isInitialized());

reader.wait_discovery();
writer.wait_discovery();
}

TEST(SHMUDP, SHM_metatraffic_config)
{
shm_metatraffic_test(TEST_TOPIC_NAME, "none", false, false);
shm_metatraffic_test(TEST_TOPIC_NAME, "unicast", true, false);
shm_metatraffic_test(TEST_TOPIC_NAME, "all", true, true);
}

TEST(SHMUDP, SHM_metatraffic_wrong_config)
{
using eprosima::fastdds::dds::BlackboxMockConsumer;

/* Set up log */
BlackboxMockConsumer* helper_consumer = new BlackboxMockConsumer();
Log::ClearConsumers(); // Remove default consumers
Log::RegisterConsumer(std::unique_ptr<LogConsumer>(helper_consumer)); // Registering a consumer transfer ownership
// Filter specific message
Log::SetVerbosity(Log::Kind::Warning);
Log::SetCategoryFilter(std::regex("RTPS_NETWORK"));
Log::SetErrorStringFilter(std::regex(".*__WRONG_VALUE__.*"));

// Perform test
shm_metatraffic_test(TEST_TOPIC_NAME, "__WRONG_VALUE__", false, false);

/* Check logs */
Log::Flush();
EXPECT_EQ(helper_consumer->ConsumedEntries().size(), 1u);

/* Clean-up */
Log::Reset(); // This calls to ClearConsumers, which deletes the registered consumer
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
4 changes: 4 additions & 0 deletions test/unittest/rtps/network/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ set(NETWORKFACTORYTESTS_SOURCE
${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutErrConsumer.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp

${PROJECT_SOURCE_DIR}/src/cpp/rtps/attributes/PropertyPolicy.cpp

${PROJECT_SOURCE_DIR}/src/cpp/rtps/flowcontrol/ThroughputControllerDescriptor.cpp

${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/ChannelResource.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp
Expand Down
6 changes: 4 additions & 2 deletions test/unittest/rtps/network/NetworkFactoryTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <gtest/gtest.h>

#include <fastdds/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastrtps/transport/TCPv4TransportDescriptor.h>
#include <fastrtps/transport/TCPv6TransportDescriptor.h>
#include <fastrtps/transport/UDPv4TransportDescriptor.h>
Expand All @@ -34,7 +35,8 @@ class NetworkTests : public ::testing::Test
{
public:

NetworkFactory networkFactoryUnderTest;
RTPSParticipantAttributes pattr{};
NetworkFactory networkFactoryUnderTest{pattr};
void HELPER_RegisterTransportWithKindAndChannels(
int kind,
unsigned int channels);
Expand Down Expand Up @@ -648,7 +650,7 @@ TEST_F(NetworkTests, LocatorShrink)
std::vector<ShrinkLocatorCase_t> test_cases;
fill_blackbox_locators_test_cases(test_cases);

NetworkFactory f;
NetworkFactory f{pattr};
UDPv4TransportDescriptor udpv4;
f.RegisterTransport(&udpv4);
// TODO: Register more transports
Expand Down
Loading

0 comments on commit 25631e6

Please sign in to comment.