diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index d81f6949f43f7..293ff2760f07b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -700,7 +700,7 @@ public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle public void checkNamespaceBundleSplit() { if (!conf.isLoadBalancerAutoBundleSplitEnabled() || pulsar.getLeaderElectionService() == null - || !pulsar.getLeaderElectionService().isLeader()) { + || !pulsar.getLeaderElectionService().isLeader() || loadData.getBrokerData().size() <= 1) { return; } final boolean unloadSplitBundles = pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 473adc26daab1..103b8f94df1de 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -36,6 +36,7 @@ import java.text.NumberFormat; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -74,6 +75,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.compaction.Compactor; +import org.apache.zookeeper.CreateMode; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -548,6 +550,58 @@ public void testBundlesMetrics() throws Exception { c1.acknowledge(c1.receive()); } + // Mock another broker to make split task work. + String json = + "{" + + "\"webServiceUrl\": \"http://127.0.0.1:0\"," + + "\"webServiceUrlTls\": \"https://127.0.0.1:0\"," + + "\"pulsarServiceUrl\": \"pulsar://127.0.0.1:0\"," + + "\"pulsarServiceUrlTls\": \"pulsar+ssl://127.0.0.1:0\"," + + "\"persistentTopicsEnabled\": true," + + "\"nonPersistentTopicsEnabled\": true," + + "\"cpu\": {" + + "\"usage\": 0.0," + + "\"limit\": 1000.0" + + "}," + + "\"memory\": {" + + "\"usage\": 124.1398696899414," + + "\"limit\": 1024.0" + + "}," + + "\"directMemory\": {" + + "\"usage\": 4.0," + + "\"limit\": 1024.0" + + "}," + + "\"bandwidthIn\": {" + + "\"usage\": -1.0," + + "\"limit\": -1.0" + + "}," + + "\"bandwidthOut\": {" + + "\"usage\": -1.0," + + "\"limit\": -1.0" + + "}," + + "\"msgThroughputIn\": 0.0," + + "\"msgThroughputOut\": 0.0," + + "\"msgRateIn\": 0.0," + + "\"msgRateOut\": 0.0," + + "\"lastUpdate\": 1683812593521," + + "\"lastStats\": {}," + + "\"numTopics\": 0," + + "\"numBundles\": 0," + + "\"numConsumers\": 0," + + "\"numProducers\": 0," + + "\"bundles\": []," + + "\"lastBundleGains\": []," + + "\"lastBundleLosses\": []," + + "\"brokerVersionString\": \"2.10.4\"," + + "\"protocols\": {}," + + "\"advertisedListeners\": {}," + + "\"maxResourceUsage\": 0.00390625," + + "\"loadReportType\": \"LocalBrokerData\"," + + "\"bundleStats\": {}" + + "}"; + String mockedBroker = "/loadbalance/brokers/127.0.0.1:0"; + mockZooKeeper.create(mockedBroker, json.getBytes(), Collections.emptyList(), CreateMode.EPHEMERAL); + pulsar.getBrokerService().updateRates(); Awaitility.await().untilAsserted(() -> assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0)); ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper)pulsar.getLoadManager().get(); @@ -572,6 +626,8 @@ public void testBundlesMetrics() throws Exception { assertTrue(metrics.containsKey("pulsar_lb_bandwidth_out_usage")); assertTrue(metrics.containsKey("pulsar_lb_bundles_split_count")); + + mockZooKeeper.delete(mockedBroker, 0); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 3e252d0d9be09..601f7690fbc82 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -66,6 +66,7 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -78,6 +79,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -762,6 +764,62 @@ public void testModularLoadManagerSplitBundle() throws Exception { } } + @Test(timeOut = 20000) + public void testSkipSplitBundleIfOnlyOneBroker() throws Exception { + + log.info("-- Starting {} test --", methodName); + final String loadBalancerName = conf.getLoadManagerClassName(); + final int defaultNumberOfNamespaceBundles = conf.getDefaultNumberOfNamespaceBundles(); + final int loadBalancerNamespaceBundleMaxTopics = conf.getLoadBalancerNamespaceBundleMaxTopics(); + + final String namespace = "my-property/my-ns"; + final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_"); + final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_"); + try { + // configure broker with ModularLoadManager. + stopBroker(); + conf.setDefaultNumberOfNamespaceBundles(1); + conf.setLoadBalancerNamespaceBundleMaxTopics(1); + conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + startBroker(); + final ModularLoadManagerWrapper modularLoadManagerWrapper = + (ModularLoadManagerWrapper) pulsar.getLoadManager().get(); + final ModularLoadManagerImpl modularLoadManager = + (ModularLoadManagerImpl) modularLoadManagerWrapper.getLoadManager(); + + // Create one topic and trigger tasks, then verify there is only one bundle now. + Consumer consumer1 = pulsarClient.newConsumer().topic(topicName1) + .subscriptionName("my-subscriber-name").subscribe(); + List bounldes1 = pulsar.getNamespaceService().getNamespaceBundleFactory() + .getBundles(NamespaceName.get(namespace)).getBundles(); + pulsar.getBrokerService().updateRates(); + pulsar.getLoadManager().get().writeLoadReportOnZookeeper(); + pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper(); + modularLoadManager.updateAll(); + assertEquals(bounldes1.size(), 1); + + // Create the second topic and trigger tasks, then verify the split task will be skipped. + Consumer consumer2 = pulsarClient.newConsumer().topic(topicName2) + .subscriptionName("my-subscriber-name").subscribe(); + pulsar.getBrokerService().updateRates(); + pulsar.getLoadManager().get().writeLoadReportOnZookeeper(); + pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper(); + modularLoadManager.updateAll(); + List bounldes2 = pulsar.getNamespaceService().getNamespaceBundleFactory() + .getBundles(NamespaceName.get(namespace)).getBundles(); + assertEquals(bounldes2.size(), 1); + + consumer1.close(); + consumer2.close(); + admin.topics().delete(topicName1, false); + admin.topics().delete(topicName2, false); + } finally { + conf.setDefaultNumberOfNamespaceBundles(defaultNumberOfNamespaceBundles); + conf.setLoadBalancerNamespaceBundleMaxTopics(loadBalancerNamespaceBundleMaxTopics); + conf.setLoadManagerClassName(loadBalancerName); + } + } + @Test(timeOut = 10000) public void testPartitionedMetadataWithDeprecatedVersion() throws Exception {