Skip to content

Commit

Permalink
Revert "[improve] [broker] Skip split boundle if only one broker (apa…
Browse files Browse the repository at this point in the history
…che#20190)"

This reverts commit 7da7873.

(cherry picked from commit 14dd76f)
  • Loading branch information
poorbarcode authored and nicoloboschi committed May 15, 2023
1 parent 0e039e8 commit c20bb11
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle
public void checkNamespaceBundleSplit() {

if (!conf.isLoadBalancerAutoBundleSplitEnabled() || pulsar.getLeaderElectionService() == null
|| !pulsar.getLeaderElectionService().isLeader() || loadData.getBrokerData().size() <= 1) {
|| !pulsar.getLeaderElectionService().isLeader()) {
return;
}
final boolean unloadSplitBundles = pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -75,7 +74,6 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.compaction.Compactor;
import org.apache.zookeeper.CreateMode;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -550,10 +548,6 @@ public void testBundlesMetrics() throws Exception {
c1.acknowledge(c1.receive());
}

// Mock another broker to make split task work.
String mockedBroker = "/loadbalance/brokers/127.0.0.1:0";
mockZooKeeper.create(mockedBroker, new byte[]{0}, Collections.emptyList(), CreateMode.EPHEMERAL);

pulsar.getBrokerService().updateRates();
Awaitility.await().untilAsserted(() -> assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0));
ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper)pulsar.getLoadManager().get();
Expand All @@ -577,9 +571,7 @@ public void testBundlesMetrics() throws Exception {
assertTrue(metrics.containsKey("pulsar_lb_bandwidth_in_usage"));
assertTrue(metrics.containsKey("pulsar_lb_bandwidth_out_usage"));

assertTrue(metrics.containsKey("pulsar_lb_bundles_split_total"));

mockZooKeeper.delete(mockedBroker, 0);
assertTrue(metrics.containsKey("pulsar_lb_bundles_split_count"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
Expand All @@ -79,7 +78,6 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
Expand Down Expand Up @@ -764,62 +762,6 @@ public void testModularLoadManagerSplitBundle() throws Exception {
}
}

@Test(timeOut = 20000)
public void testSkipSplitBundleIfOnlyOneBroker() throws Exception {

log.info("-- Starting {} test --", methodName);
final String loadBalancerName = conf.getLoadManagerClassName();
final int defaultNumberOfNamespaceBundles = conf.getDefaultNumberOfNamespaceBundles();
final int loadBalancerNamespaceBundleMaxTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();

final String namespace = "my-property/my-ns";
final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_");
final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_");
try {
// configure broker with ModularLoadManager.
stopBroker();
conf.setDefaultNumberOfNamespaceBundles(1);
conf.setLoadBalancerNamespaceBundleMaxTopics(1);
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
startBroker();
final ModularLoadManagerWrapper modularLoadManagerWrapper =
(ModularLoadManagerWrapper) pulsar.getLoadManager().get();
final ModularLoadManagerImpl modularLoadManager =
(ModularLoadManagerImpl) modularLoadManagerWrapper.getLoadManager();

// Create one topic and trigger tasks, then verify there is only one bundle now.
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName1)
.subscriptionName("my-subscriber-name").subscribe();
List<NamespaceBundle> bounldes1 = pulsar.getNamespaceService().getNamespaceBundleFactory()
.getBundles(NamespaceName.get(namespace)).getBundles();
pulsar.getBrokerService().updateRates();
pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper();
modularLoadManager.updateAll();
assertEquals(bounldes1.size(), 1);

// Create the second topic and trigger tasks, then verify the split task will be skipped.
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName2)
.subscriptionName("my-subscriber-name").subscribe();
pulsar.getBrokerService().updateRates();
pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper();
modularLoadManager.updateAll();
List<NamespaceBundle> bounldes2 = pulsar.getNamespaceService().getNamespaceBundleFactory()
.getBundles(NamespaceName.get(namespace)).getBundles();
assertEquals(bounldes2.size(), 1);

consumer1.close();
consumer2.close();
admin.topics().delete(topicName1, false);
admin.topics().delete(topicName2, false);
} finally {
conf.setDefaultNumberOfNamespaceBundles(defaultNumberOfNamespaceBundles);
conf.setLoadBalancerNamespaceBundleMaxTopics(loadBalancerNamespaceBundleMaxTopics);
conf.setLoadManagerClassName(loadBalancerName);
}
}

@Test(timeOut = 10000)
public void testPartitionedMetadataWithDeprecatedVersion() throws Exception {

Expand Down

0 comments on commit c20bb11

Please sign in to comment.