Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Participant & liveliness QoS overrides #12448

Merged
merged 4 commits into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions third-party/realdds/doc/device.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,10 @@ If available, the [`query-option` and `set-option` controls](control.md#query-op

The following device options may be available:

| Setting | Default | Type | `option-name` | Description
|----------------|--------:|------------------|-----------------|---------------
| Domain ID | 0 | int 0-232 | `domain-id` | The DDS domain number to use to segment communications on the network
| IP address | ? | string "#.#.#.#" | `ip-address` | The static IP that the server uses for itself (if DHCP is off)
| DHCP enable | ? | bool | `dhcp` | If on, the `ip-address` is ignored and retrieved on startup from a DHCP server
| Setting | Default | Type | `option-name` | Description |
|----------------|--------:|------------------|-----------------|---------------|
| Domain ID | 0 | int, 0-232 | `domain-id` | The DDS domain number to use to segment communications on the network
| IP address | empty | string "#.#.#.#" | `ip-address` | The static IP that the server uses for itself (empty=DHCP on)
| Multicast IP | - | string "#.#.#.#" | `multicast-ip` | The IP address to use for multicasting (empty to disable)


Expand Down
9 changes: 5 additions & 4 deletions third-party/realdds/doc/discovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ When a context is created, a JSON representation may be passed to it, e.g.: `{"d

The `dds` is there by default (i.e., not `false`). The value may contain the following settings dealing with discovery:

| Field | Description |
|----------------------|----------------------------------------|
| domain | The domain number to use (0-232); `0` is the default
| participant | The name given this context (how other participants will see it); defaults to the executable name
| Field | Default | Description |
|------------------|----------------:|----------------------------------|
| domain | `0` | The domain number to use (0-232)
| participant | Executable name | The name given this context (how other participants will see it)
| participant-id | Automatic | The ID; not recommended to use, but may be needed in special circumstances

See a comprehensive list of settings under [device](device.md#Settings).
37 changes: 34 additions & 3 deletions third-party/realdds/include/realdds/dds-serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,22 @@ std::ostream & operator<<( std::ostream &, ReliabilityQosPolicy const & );
std::ostream & operator<<( std::ostream &, DurabilityQosPolicyKind );
std::ostream & operator<<( std::ostream &, DurabilityQosPolicy const & );
std::ostream & operator<<( std::ostream &, HistoryQosPolicy const & );
std::ostream & operator<<( std::ostream &, LivelinessQosPolicyKind );
std::ostream & operator<<( std::ostream &, LivelinessQosPolicy const & );
std::ostream & operator<<( std::ostream &, DataSharingQosPolicy const & );
std::ostream & operator<<( std::ostream &, RTPSEndpointQos const & );

class DomainParticipantQos;
} // namespace dds
} // namespace fastdds
} // namespace eprosima

namespace eprosima {
namespace fastrtps {
// Allow j["key"] = qos.lease_duration;
void to_json( nlohmann::json &, Duration_t const & );
// Allow j.get< eprosima::fastrtps::Duration_t >();
void from_json( nlohmann::json const &, Duration_t & );
namespace rtps {
std::ostream & operator<<( std::ostream &, class WriterProxyData const & );
std::ostream & operator<<( std::ostream &, class ReaderProxyData const & );
Expand All @@ -38,6 +46,7 @@ namespace realdds {
eprosima::fastdds::dds::ReliabilityQosPolicyKind reliability_kind_from_string( std::string const & );
eprosima::fastdds::dds::DurabilityQosPolicyKind durability_kind_from_string( std::string const & );
eprosima::fastdds::dds::HistoryQosPolicyKind history_kind_from_string( std::string const & );
eprosima::fastdds::dds::LivelinessQosPolicyKind liveliness_kind_from_string( std::string const & );
eprosima::fastrtps::rtps::MemoryManagementPolicy_t history_memory_policy_from_string( std::string const & );

// Override QoS reliability from a JSON source.
Expand Down Expand Up @@ -71,19 +80,41 @@ void override_durability_qos_from_json( eprosima::fastdds::dds::DurabilityQosPol
//
void override_history_qos_from_json( eprosima::fastdds::dds::HistoryQosPolicy & qos, nlohmann::json const & );


// Override QoS liveliness from a JSON source.
// Liveliness settings are an object
// {
// "kind": "automatic",
// "lease-duration": 5, // seconds
// "announcement-period": 3 // seconds
// }
//
void override_liveliness_qos_from_json( eprosima::fastdds::dds::LivelinessQosPolicy & qos, nlohmann::json const & );


// Override QoS data-sharing from a JSON source.
// The JSON can be a simple boolean indicating off or automatic mode:
// "data-sharing": true // <-- the JSON is the true value
//
void override_data_sharing_qos_from_json( eprosima::fastdds::dds::DataSharingQosPolicy & qos, nlohmann::json const & );

// Override QoS endpoint from a JSON source.
// The JSON can is an object:
// "endpoint": { // <-- the JSON is the true value
// The JSON is an object:
// {
// "history-memory-policy": "preallocated-with-realloc"
// }
// }
//
void override_endpoint_qos_from_json( eprosima::fastdds::dds::RTPSEndpointQos & qos, nlohmann::json const & );


// Override participant QoS from a JSON source.
// The JSON is an object:
// {
// "participant-id": -1,
// "lease-duration": 10, // seconds
// }
//
void override_participant_qos_from_json( eprosima::fastdds::dds::DomainParticipantQos & qos, nlohmann::json const & );


} // namespace realdds
7 changes: 5 additions & 2 deletions third-party/realdds/src/dds-participant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ void dds_participant::init( dds_domain_id domain_id, std::string const & partici
if( is_valid() )
{
DDS_THROW( runtime_error,
"participant is already initialized; cannot init '" + participant_name + "' on domain id "
+ std::to_string( domain_id ) );
"participant is already initialized; cannot init '" << participant_name << "' on domain id "
<< domain_id );
}

if( domain_id == -1 )
Expand Down Expand Up @@ -225,6 +225,9 @@ void dds_participant::init( dds_domain_id domain_id, std::string const & partici
pqos.transport().use_builtin_transports = false;
pqos.transport().user_transports.push_back( udp_transport );

// Above are defaults
override_participant_qos_from_json( pqos, settings );

// Listener will call DataReaderListener::on_data_available for a specific reader,
// not SubscriberListener::on_data_on_readers for any reader
// ( See note on https://fast-dds.docs.eprosima.com/en/v2.7.0/fastdds/dds_layer/core/entity/entity.html )
Expand Down
163 changes: 159 additions & 4 deletions third-party/realdds/src/dds-serialization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@

#include <realdds/dds-serialization.h>
#include <realdds/dds-utilities.h>
#include <realdds/dds-time.h>

#include <fastdds/rtps/writer/WriterDiscoveryInfo.h>
#include <fastdds/rtps/reader/ReaderDiscoveryInfo.h>

#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/rtps/transport/UDPTransportDescriptor.h>

#include <rsutils/string/from.h>
#include <rsutils/string/nocase.h>
#include <rsutils/json.h>


Expand Down Expand Up @@ -63,6 +69,33 @@ std::ostream & operator<<( std::ostream & os, HistoryQosPolicy const & qos )
}


std::ostream & operator<<( std::ostream & os, LivelinessQosPolicyKind kind )
{
switch( kind )
{
case eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS:
return os << "automatic";
case eprosima::fastdds::dds::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
return os << "by-participant";
case eprosima::fastdds::dds::MANUAL_BY_TOPIC_LIVELINESS_QOS:
return os << "by-topic";
}
return os << (int) kind;
}


std::ostream & operator<<( std::ostream & os, LivelinessQosPolicy const & qos )
{
os << qos.kind;
if( qos.lease_duration != eprosima::fastrtps::c_TimeInfinite )
os << "/" << rsutils::string::from( qos.lease_duration.to_ns() / 1e9 ) << 's';
if( qos.kind == eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS
&& qos.announcement_period != eprosima::fastrtps::c_TimeInfinite )
os << "/" << rsutils::string::from( qos.announcement_period.to_ns() / 1e9 ) << 's';
return os;
}


std::ostream & operator<<( std::ostream & os, DataSharingQosPolicy const & qos )
{
return os;
Expand All @@ -82,6 +115,38 @@ std::ostream & operator<<( std::ostream & os, RTPSEndpointQos const & qos )

namespace eprosima {
namespace fastrtps {


// Allow j["key"] = qos.lease_duration;
void to_json( nlohmann::json & j, Duration_t const & duration )
{
if( duration == c_TimeInfinite )
j = "infinite";
else if( duration == c_TimeInvalid )
j = "invalid";
else
j = realdds::time_to_double( duration );
}


// Allow j.get< eprosima::fastrtps::Duration_t >();
void from_json( nlohmann::json const & j, Duration_t & duration )
{
if( j.is_string() )
{
auto & s = rsutils::json::string_ref( j );
if( rsutils::string::nocase_equal( s, "infinite" ) )
duration = c_TimeInfinite;
else if( rsutils::string::nocase_equal( s, "invalid" ) )
duration = c_TimeInvalid;
else
throw nlohmann::json::type_error::create( 317, "unknown duration value '" + s + "'", &j );
OhadMeir marked this conversation as resolved.
Show resolved Hide resolved
}
else
duration = realdds::dds_time( j.get< double >() );
}


namespace rtps {


Expand All @@ -92,6 +157,8 @@ std::ostream & operator<<( std::ostream & os, WriterProxyData const & info )
os << /*" reliability"*/ " " << info.m_qos.m_reliability;
if( ! ( info.m_qos.m_durability == eprosima::fastdds::dds::DurabilityQosPolicy() ) )
os << /*" durability"*/ " " << info.m_qos.m_durability;
if( ! ( info.m_qos.m_liveliness == eprosima::fastdds::dds::LivelinessQosPolicy() ) )
os << " liveliness" " " << info.m_qos.m_liveliness;
os << "]";
return os;
}
Expand Down Expand Up @@ -151,6 +218,18 @@ eprosima::fastdds::dds::HistoryQosPolicyKind history_kind_from_string( std::stri
}


eprosima::fastdds::dds::LivelinessQosPolicyKind liveliness_kind_from_string( std::string const & s )
{
if( s == "automatic" )
return eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS;
if( s == "by-participant" )
return eprosima::fastdds::dds::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
if( s == "by-topic" )
return eprosima::fastdds::dds::MANUAL_BY_TOPIC_LIVELINESS_QOS;
DDS_THROW( runtime_error, "invalid liveliness kind '" << s << "'" );
}


eprosima::fastrtps::rtps::MemoryManagementPolicy_t history_memory_policy_from_string( std::string const & s )
{
if( s == "preallocated" )
Expand Down Expand Up @@ -211,20 +290,49 @@ void override_history_qos_from_json( eprosima::fastdds::dds::HistoryQosPolicy &
}


void override_liveliness_qos_from_json( eprosima::fastdds::dds::LivelinessQosPolicy & qos, nlohmann::json const & j )
{
if( j.is_object() )
{
if( auto kind = rsutils::json::nested( j, "kind" ) )
{
if( kind->is_string() )
qos.kind = liveliness_kind_from_string( rsutils::json::string_ref( kind ) );
else
DDS_THROW( runtime_error, "liveliness kind not a string: " << kind );
}

if( auto lease = rsutils::json::nested( j, "lease-duration" ) )
{
if( lease->is_null() )
qos.lease_duration = eprosima::fastdds::dds::LivelinessQosPolicy().lease_duration;
else
lease->get_to( qos.lease_duration );
}

if( auto announce = rsutils::json::nested( j, "announcement-period" ) )
{
if( announce->is_null() )
qos.announcement_period = eprosima::fastdds::dds::LivelinessQosPolicy().announcement_period;
else
announce->get_to( qos.announcement_period );
}
}
}


void override_data_sharing_qos_from_json( eprosima::fastdds::dds::DataSharingQosPolicy & qos, nlohmann::json const & j )
{
if( j.is_null() )
return;
if( j.is_boolean() )
{
if( rsutils::json::value< bool >( j ) )
qos.automatic();
else
qos.off();
}
else
else if( ! j.is_null() )
{
DDS_THROW( runtime_error, "data-sharing must be a boolean (off/automatic)" );
DDS_THROW( runtime_error, "data-sharing must be a boolean (off/automatic); got " << j );
}
}

Expand All @@ -242,4 +350,51 @@ void override_endpoint_qos_from_json( eprosima::fastdds::dds::RTPSEndpointQos &
}


static bool parse_ip_list( nlohmann::json const & j, std::string const & key, std::vector< std::string > * output )
{
if( auto whitelist_j = rsutils::json::nested( j, key ) )
{
if( ! whitelist_j->is_array() )
return false;
for( auto & ip : whitelist_j.get() )
{
if( ! ip.is_string() )
return false;
if( output )
output->push_back( rsutils::json::string_ref( ip ) );
}
}
return true;
}


static void override_udp_settings( eprosima::fastdds::rtps::UDPTransportDescriptor & udp, nlohmann::json const & j )
{
rsutils::json::get_ex( j, "send-buffer-size", &udp.sendBufferSize );
rsutils::json::get_ex( j, "receive-buffer-size", &udp.receiveBufferSize );
if( ! parse_ip_list( j, "whitelist", &udp.interfaceWhiteList ) )
LOG_WARNING( "invalid UDP whitelist in settings" );
}


void override_participant_qos_from_json( eprosima::fastdds::dds::DomainParticipantQos & qos, nlohmann::json const & j )
{
if( ! j.is_object() )
return;
rsutils::json::get_ex( j, "participant-id", &qos.wire_protocol().participant_id );
rsutils::json::get_ex( j, "lease-duration", &qos.wire_protocol().builtin.discovery_config.leaseDuration );

rsutils::json::get_ex( j, "use-builtin-transports", &qos.transport().use_builtin_transports );
if( auto udp_j = rsutils::json::nested( j, "udp" ) )
{
for( auto t : qos.transport().user_transports )
if( auto udp_t = std::dynamic_pointer_cast<eprosima::fastdds::rtps::UDPTransportDescriptor>(t) )
{
override_udp_settings( *udp_t, udp_j );
break;
}
}
}


} // namespace realdds
1 change: 1 addition & 0 deletions third-party/realdds/src/dds-topic-reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ void dds_topic_reader::qos::override_from_json( nlohmann::json const & qos_setti
override_reliability_qos_from_json( reliability(), rsutils::json::nested( qos_settings, "reliability" ) );
override_durability_qos_from_json( durability(), rsutils::json::nested( qos_settings, "durability" ) );
override_history_qos_from_json( history(), rsutils::json::nested( qos_settings, "history" ) );
override_liveliness_qos_from_json( liveliness(), rsutils::json::nested( qos_settings, "liveliness" ) );
override_data_sharing_qos_from_json( data_sharing(), rsutils::json::nested( qos_settings, "data-sharing" ) );
override_endpoint_qos_from_json( endpoint(), rsutils::json::nested( qos_settings, "endpoint" ) );
}
Expand Down
1 change: 1 addition & 0 deletions third-party/realdds/src/dds-topic-writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ void dds_topic_writer::qos::override_from_json( nlohmann::json const & qos_setti
override_reliability_qos_from_json( reliability(), rsutils::json::nested( qos_settings, "reliability" ) );
override_durability_qos_from_json( durability(), rsutils::json::nested( qos_settings, "durability" ) );
override_history_qos_from_json( history(), rsutils::json::nested( qos_settings, "history" ) );
override_liveliness_qos_from_json( liveliness(), rsutils::json::nested( qos_settings, "liveliness" ) );
override_data_sharing_qos_from_json( data_sharing(), rsutils::json::nested( qos_settings, "data-sharing" ) );
override_endpoint_qos_from_json( endpoint(), rsutils::json::nested( qos_settings, "endpoint" ) );
}
Expand Down
Loading