Skip to content

Commit

Permalink
[improve] [broker] Skip split boundle if only one broker (apache#20190)
Browse files Browse the repository at this point in the history
Co-authored-by: Zixuan Liu <[email protected]>
(cherry picked from commit d135c4a)
(cherry picked from commit f3edfc5)
  • Loading branch information
poorbarcode authored and nicoloboschi committed May 15, 2023
1 parent c20bb11 commit 9c7f219
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle
public void checkNamespaceBundleSplit() {

if (!conf.isLoadBalancerAutoBundleSplitEnabled() || pulsar.getLeaderElectionService() == null
|| !pulsar.getLeaderElectionService().isLeader()) {
|| !pulsar.getLeaderElectionService().isLeader() || loadData.getBrokerData().size() <= 1) {
return;
}
final boolean unloadSplitBundles = pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -74,6 +75,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.compaction.Compactor;
import org.apache.zookeeper.CreateMode;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -548,6 +550,58 @@ public void testBundlesMetrics() throws Exception {
c1.acknowledge(c1.receive());
}

// Mock another broker to make split task work.
String json =
"{"
+ "\"webServiceUrl\": \"http://127.0.0.1:0\","
+ "\"webServiceUrlTls\": \"https://127.0.0.1:0\","
+ "\"pulsarServiceUrl\": \"pulsar://127.0.0.1:0\","
+ "\"pulsarServiceUrlTls\": \"pulsar+ssl://127.0.0.1:0\","
+ "\"persistentTopicsEnabled\": true,"
+ "\"nonPersistentTopicsEnabled\": true,"
+ "\"cpu\": {"
+ "\"usage\": 0.0,"
+ "\"limit\": 1000.0"
+ "},"
+ "\"memory\": {"
+ "\"usage\": 124.1398696899414,"
+ "\"limit\": 1024.0"
+ "},"
+ "\"directMemory\": {"
+ "\"usage\": 4.0,"
+ "\"limit\": 1024.0"
+ "},"
+ "\"bandwidthIn\": {"
+ "\"usage\": -1.0,"
+ "\"limit\": -1.0"
+ "},"
+ "\"bandwidthOut\": {"
+ "\"usage\": -1.0,"
+ "\"limit\": -1.0"
+ "},"
+ "\"msgThroughputIn\": 0.0,"
+ "\"msgThroughputOut\": 0.0,"
+ "\"msgRateIn\": 0.0,"
+ "\"msgRateOut\": 0.0,"
+ "\"lastUpdate\": 1683812593521,"
+ "\"lastStats\": {},"
+ "\"numTopics\": 0,"
+ "\"numBundles\": 0,"
+ "\"numConsumers\": 0,"
+ "\"numProducers\": 0,"
+ "\"bundles\": [],"
+ "\"lastBundleGains\": [],"
+ "\"lastBundleLosses\": [],"
+ "\"brokerVersionString\": \"2.10.4\","
+ "\"protocols\": {},"
+ "\"advertisedListeners\": {},"
+ "\"maxResourceUsage\": 0.00390625,"
+ "\"loadReportType\": \"LocalBrokerData\","
+ "\"bundleStats\": {}"
+ "}";
String mockedBroker = "/loadbalance/brokers/127.0.0.1:0";
mockZooKeeper.create(mockedBroker, json.getBytes(), Collections.emptyList(), CreateMode.EPHEMERAL);

pulsar.getBrokerService().updateRates();
Awaitility.await().untilAsserted(() -> assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0));
ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper)pulsar.getLoadManager().get();
Expand All @@ -572,6 +626,8 @@ public void testBundlesMetrics() throws Exception {
assertTrue(metrics.containsKey("pulsar_lb_bandwidth_out_usage"));

assertTrue(metrics.containsKey("pulsar_lb_bundles_split_count"));

mockZooKeeper.delete(mockedBroker, 0);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
Expand All @@ -78,6 +79,7 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
Expand Down Expand Up @@ -762,6 +764,62 @@ public void testModularLoadManagerSplitBundle() throws Exception {
}
}

@Test(timeOut = 20000)
public void testSkipSplitBundleIfOnlyOneBroker() throws Exception {

log.info("-- Starting {} test --", methodName);
final String loadBalancerName = conf.getLoadManagerClassName();
final int defaultNumberOfNamespaceBundles = conf.getDefaultNumberOfNamespaceBundles();
final int loadBalancerNamespaceBundleMaxTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();

final String namespace = "my-property/my-ns";
final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_");
final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_");
try {
// configure broker with ModularLoadManager.
stopBroker();
conf.setDefaultNumberOfNamespaceBundles(1);
conf.setLoadBalancerNamespaceBundleMaxTopics(1);
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
startBroker();
final ModularLoadManagerWrapper modularLoadManagerWrapper =
(ModularLoadManagerWrapper) pulsar.getLoadManager().get();
final ModularLoadManagerImpl modularLoadManager =
(ModularLoadManagerImpl) modularLoadManagerWrapper.getLoadManager();

// Create one topic and trigger tasks, then verify there is only one bundle now.
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName1)
.subscriptionName("my-subscriber-name").subscribe();
List<NamespaceBundle> bounldes1 = pulsar.getNamespaceService().getNamespaceBundleFactory()
.getBundles(NamespaceName.get(namespace)).getBundles();
pulsar.getBrokerService().updateRates();
pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper();
modularLoadManager.updateAll();
assertEquals(bounldes1.size(), 1);

// Create the second topic and trigger tasks, then verify the split task will be skipped.
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName2)
.subscriptionName("my-subscriber-name").subscribe();
pulsar.getBrokerService().updateRates();
pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper();
modularLoadManager.updateAll();
List<NamespaceBundle> bounldes2 = pulsar.getNamespaceService().getNamespaceBundleFactory()
.getBundles(NamespaceName.get(namespace)).getBundles();
assertEquals(bounldes2.size(), 1);

consumer1.close();
consumer2.close();
admin.topics().delete(topicName1, false);
admin.topics().delete(topicName2, false);
} finally {
conf.setDefaultNumberOfNamespaceBundles(defaultNumberOfNamespaceBundles);
conf.setLoadBalancerNamespaceBundleMaxTopics(loadBalancerNamespaceBundleMaxTopics);
conf.setLoadManagerClassName(loadBalancerName);
}
}

@Test(timeOut = 10000)
public void testPartitionedMetadataWithDeprecatedVersion() throws Exception {

Expand Down

0 comments on commit 9c7f219

Please sign in to comment.