Skip to content

Commit

Permalink
WIP Benchmark
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Mar 1, 2024
1 parent c95b507 commit 16c26f8
Show file tree
Hide file tree
Showing 2 changed files with 273 additions and 0 deletions.
1 change: 1 addition & 0 deletions benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dependencies {
// us to invoke the JMH uberjar as usual.
exclude group: 'net.sf.jopt-simple', module: 'jopt-simple'
}
api project(":test:framework")
api "org.openjdk.jmh:jmh-core:$versions.jmh"
annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh"
// Dependencies of JMH
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.benchmark.gateway.remote;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.opensearch.Version;
import org.opensearch.benchmark.routing.allocation.Allocators;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.opensearch.env.Environment.PATH_HOME_SETTING;
import static org.opensearch.env.Environment.PATH_REPO_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.repositories.fs.FsRepository.REPOSITORIES_LOCATION_SETTING;

@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(1)
@Warmup(iterations = 1)
@Measurement(iterations = 1)
@BenchmarkMode(Mode.AverageTime)
public class RemoteClusterStateBenchmark {
@Param({
// indices| aliases| templates|
" 100| 1000| 1000|",
" 200| 2000| 2000|"
})
public String indicesAliasesTemplates = "100|10|10";
public int numShards = 1;
public int numReplicas = 1;
public int numZone = 3;

private final String repository = "test-fs-repository";
private RemoteClusterStateService remoteClusterStateService;
private ClusterState initialClusterState;
private static final Path path;
private String randomPreviousUUID;

static {
try {
path = Files.createTempDirectory("test");
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Setup
public void setup() throws IOException {
final String[] params = indicesAliasesTemplates.split("\\|");
final int numIndices = toInt(params[0]);
final int numAliases = toInt(params[1]);
final int numTemplates = toInt(params[2]);

Random random = new Random();
Path repoPath = Files.createDirectory(path.resolve(repository));
Settings settings = Settings.builder()
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, repository)
.put(PATH_HOME_SETTING.getKey(), path)
.putList(PATH_REPO_SETTING.getKey(), Collections.singletonList(path.toString()))
.put(REPOSITORIES_LOCATION_SETTING.getKey(), repoPath)
.build();

// Number of nodes doesn't matter in this benchmark as DiscoveryNodes object is not uploaded to remote
int numNodes = random.nextInt(200) + 1;
DiscoveryNodes.Builder nb = setUpClusterNodes(numNodes);

String clusterManagerNodeId = "node_s_" + (random.nextInt(numNodes) + 1);
nb.clusterManagerNodeId(clusterManagerNodeId);
nb.localNodeId(clusterManagerNodeId);

Metadata.Builder mb = Metadata.builder();
// Populate index metadata
for (int i = 0; i < numIndices; i++) {
int aliases = random.nextInt(numAliases);
IndexMetadata.Builder indexMetadata = IndexMetadata.builder("index_" + i)
.settings(Settings.builder().put("index.version.created", Version.CURRENT))
.numberOfShards(numShards)
.numberOfReplicas(numReplicas);

for (int j=0; j < aliases; j++) {
AliasMetadata.Builder aliasMetadata = AliasMetadata.builder(String.format(Locale.ROOT, "alias_{}_index_{}", j, i));
indexMetadata.putAlias(aliasMetadata);
}

mb.put(indexMetadata);
}

// Populate template metadata
for (int i = 0; i< numTemplates; i++) {
IndexTemplateMetadata.Builder indexTemplate = IndexTemplateMetadata.builder("template_" + i);
indexTemplate.patterns(Collections.singletonList("index_" + i));
int aliases = random.nextInt(numAliases);
for (int j = 0; j < aliases; j++) {
AliasMetadata.Builder aliasMetadata = AliasMetadata.builder(
String.format(Locale.ROOT, "alias_{}_template_{}", j, i)
);
indexTemplate.putAlias(aliasMetadata);
}
mb.put(indexTemplate);
}

// Populate coordination metadata
// CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder()
// .term(7331L)
// .addVotingConfigExclusion(new CoordinationMetadata.VotingConfigExclusion())

Metadata metadata = mb.build();

initialClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(settings))
.metadata(metadata)
.nodes(nb)
.build();

randomPreviousUUID = UUID.randomUUID().toString();


String clusterManagerNode = initialClusterState.nodes().getClusterManagerNodeId();

ClusterService clusterService = new ClusterService(
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
ThreadPool threadPool = new ThreadPool(settings);
CapturingTransport capturingTransport = new CapturingTransport();
DiscoveryNode localNode = initialClusterState.nodes().get(clusterManagerNode);
TransportService transportService = capturingTransport.createTransportService(
Settings.EMPTY,
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> localNode,
null,
Collections.emptySet(),
NoopTracer.INSTANCE
);

RepositoriesService repositoriesService = new RepositoriesService(
settings,
clusterService,
transportService,
Collections.emptyMap(),
Collections.emptyMap(),
threadPool
);

Map<String, Repository> repositoryMap = new HashMap<>();

ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
Repository repo = new FsRepository(
new RepositoryMetadata(repository, FsRepository.TYPE, settings),
new Environment(settings, path),
null,
new ClusterService(
settings,
clusterSettings,
threadPool
),
null
);
repo.start();
repositoryMap.put(repository, repo);
repositoriesService.updateRepositoriesMap(repositoryMap);
remoteClusterStateService = new RemoteClusterStateService(
clusterManagerNode,
() -> repositoriesService,
settings,
clusterSettings,
() -> 0L,
threadPool
);
remoteClusterStateService.start();
}

@Benchmark
public void testFullMetadataUpload() throws IOException {
remoteClusterStateService.writeFullMetadata(initialClusterState, randomPreviousUUID);
}

private DiscoveryNodes.Builder setUpClusterNodes(int nodes) {
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
for (int i = 1; i <= nodes; i++) {
Map<String, String> attributes = new HashMap<>();
attributes.put("zone", "zone_" + (i % numZone));
attributes.put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, repository);
attributes.put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, repository);
attributes.put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, repository);
attributes.put(
String.format(Locale.ROOT, REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, repository) + "location",
path.toAbsolutePath().toString()
);
nb.add(Allocators.newNode("node_s_" + i, attributes));
}
return nb;
}

private int toInt(String v) {
return Integer.valueOf(v.trim());
}

@TearDown
public void tearDown() throws IOException {
deleteFolder(path.toFile());
}

private static void deleteFolder(File path) {
List<File> files = List.of(path.listFiles());
for (File file : files) {
if (file.isDirectory()) {
deleteFolder(file);
} else {
file.delete();
}
}
path.delete();
}
}

0 comments on commit 16c26f8

Please sign in to comment.