diff --git a/include/pulsar/ProducerConfiguration.h b/include/pulsar/ProducerConfiguration.h index 62a63807..ab6d54a5 100644 --- a/include/pulsar/ProducerConfiguration.h +++ b/include/pulsar/ProducerConfiguration.h @@ -236,7 +236,7 @@ class PULSAR_PUBLIC ProducerConfiguration { /** * Set the message routing modes for partitioned topics. * - * Default: UseSinglePartition + * Default: RoundRobinDistribution * * @param PartitionsRoutingMode partition routing mode. * @return diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index 4178096c..0f05c78b 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -65,17 +65,17 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() { switch (conf_.getPartitionsRoutingMode()) { + case ProducerConfiguration::UseSinglePartition: + return std::make_shared(getNumPartitions(), + conf_.getHashingScheme()); + case ProducerConfiguration::CustomPartition: + return conf_.getMessageRouterPtr(); case ProducerConfiguration::RoundRobinDistribution: + default: return std::make_shared( conf_.getHashingScheme(), conf_.getBatchingEnabled(), conf_.getBatchingMaxMessages(), conf_.getBatchingMaxAllowedSizeInBytes(), std::chrono::milliseconds(conf_.getBatchingMaxPublishDelayMs())); - case ProducerConfiguration::CustomPartition: - return conf_.getMessageRouterPtr(); - case ProducerConfiguration::UseSinglePartition: - default: - return std::make_shared(getNumPartitions(), - conf_.getHashingScheme()); } } diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h index c635c48f..c3240209 100644 --- a/lib/ProducerConfigurationImpl.h +++ b/lib/ProducerConfigurationImpl.h @@ -34,7 +34,7 @@ struct ProducerConfigurationImpl { CompressionType compressionType{CompressionNone}; int maxPendingMessages{1000}; int maxPendingMessagesAcrossPartitions{50000}; - ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition}; + ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::RoundRobinDistribution}; MessageRoutingPolicyPtr messageRouter; ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash}; bool useLazyStartPartitionedProducers{false}; diff --git a/tests/ProducerConfigurationTest.cc b/tests/ProducerConfigurationTest.cc index df5867c1..a12911da 100644 --- a/tests/ProducerConfigurationTest.cc +++ b/tests/ProducerConfigurationTest.cc @@ -33,7 +33,7 @@ TEST(ProducerConfigurationTest, testDefaultConfig) { ASSERT_EQ(conf.getCompressionType(), CompressionType::CompressionNone); ASSERT_EQ(conf.getMaxPendingMessages(), 1000); ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 50000); - ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::RoundRobinDistribution); ASSERT_EQ(conf.getMessageRouterPtr(), MessageRoutingPolicyPtr{}); ASSERT_EQ(conf.getHashingScheme(), ProducerConfiguration::BoostHash); ASSERT_EQ(conf.getBlockIfQueueFull(), false); @@ -88,6 +88,9 @@ TEST(ProducerConfigurationTest, testCustomConfig) { conf.setMaxPendingMessagesAcrossPartitions(100000); ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 100000); + conf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::UseSinglePartition); + conf.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::RoundRobinDistribution);