Skip to content

Commit

Permalink
Enable Routing Table parallel read
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed May 21, 2024
1 parent 9982d79 commit 6d33900
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
Expand All @@ -24,10 +25,14 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStream;
import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStreamReader;
import org.opensearch.index.remote.RemoteStoreUtils;
Expand All @@ -36,6 +41,8 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.opensearch.threadpool.ThreadPool;

import java.io.*;
import java.io.Closeable;
Expand All @@ -46,6 +53,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -77,14 +85,16 @@ public class RemoteRoutingTableService implements Closeable {
private final Supplier<RepositoriesService> repositoriesService;
private final ClusterSettings clusterSettings;
private BlobStoreRepository blobStoreRepository;
private final ThreadPool threadPool;

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings) {
ClusterSettings clusterSettings, ThreadPool threadPool) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> writeFullRoutingTable(ClusterState clusterState, String previousClusterUUID) {
Expand Down Expand Up @@ -216,6 +226,39 @@ public RoutingTable getFullRoutingTable(long routingTableVersion, List<ClusterMe
return new RoutingTable(routingTableVersion, indicesRouting);
}

public CheckedRunnable<IOException> getAsyncIndexMetadataReadAction(
String clusterName,
String clusterUUID,
String uploadedFilename,
Index index,
LatchedActionListener<RemoteIndexRoutingResult> latchedActionListener) {
BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(getCusterMetadataBasePath(blobStoreRepository, clusterName, clusterUUID));
return () -> readAsync(
blobContainer,
uploadedFilename,
threadPool.executor(ThreadPool.Names.GENERIC),
ActionListener.wrap(response -> latchedActionListener.onResponse(new RemoteIndexRoutingResult(index.getName(), response.readIndexRoutingTable(index))), latchedActionListener::onFailure)
);
}

public void readAsync(BlobContainer blobContainer, String name, ExecutorService executorService, ActionListener<IndexRoutingTableInputStreamReader> listener) throws IOException {
executorService.execute(() -> {
try {
listener.onResponse(read(blobContainer, name));
} catch (Exception e) {
listener.onFailure(e);
}
});
}

public IndexRoutingTableInputStreamReader read(BlobContainer blobContainer, String path) {
try {
new IndexRoutingTableInputStreamReader(blobContainer.readBlob(path));
} catch (IOException e) {
logger.info("RoutingTable read failed with error: {}", e.toString());
}
return null;
}
private void deleteStaleRoutingTable(String clusterName, String clusterUUID, int manifestsToRetain) {
}

Expand All @@ -237,4 +280,21 @@ public void start() {
blobStoreRepository = (BlobStoreRepository) repository;
}

public static class RemoteIndexRoutingResult {
String indexName;
IndexRoutingTable indexRoutingTable;

public RemoteIndexRoutingResult(String indexName, IndexRoutingTable indexRoutingTable) {
this.indexName = indexName;
this.indexRoutingTable = indexRoutingTable;
}

public String getIndexName() {
return indexName;
}

public IndexRoutingTable getIndexRoutingTable() {
return indexRoutingTable;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.TemplatesMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand All @@ -34,6 +35,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
Expand Down Expand Up @@ -183,7 +185,7 @@ public RemoteClusterStateService(

if(isRemoteRoutingTableEnabled(settings)) {
this.remoteRoutingTableService = new RemoteRoutingTableService(repositoriesService,
settings, clusterSettings);
settings, clusterSettings, threadPool);
logger.info("REMOTE ROUTING ENABLED");
} else {
logger.info("REMOTE ROUTING DISABLED");
Expand Down Expand Up @@ -821,12 +823,15 @@ private ClusterState readClusterStateInParallel(
boolean readSettingsMetadata,
boolean readTemplatesMetadata,
boolean readDiscoveryNodes,
boolean readClusterBlocks
boolean readClusterBlocks,
List<UploadedIndexMetadata> indicesRoutingToRead
) throws IOException {
int totalReadTasks = indicesToRead.size() + customToRead.size() + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata ? 1 : 0) + (readTemplatesMetadata ? 1 : 0) + (readDiscoveryNodes ? 1 : 0) + (readClusterBlocks ? 1 : 0);
int totalReadTasks = indicesToRead.size() + customToRead.size() + indicesRoutingToRead.size() + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata ? 1 : 0) + (readTemplatesMetadata ? 1 : 0) + (readDiscoveryNodes ? 1 : 0) + (readClusterBlocks ? 1 : 0);
CountDownLatch latch = new CountDownLatch(totalReadTasks);
List<CheckedRunnable<IOException>> asyncMetadataReadActions = new ArrayList<>();
List<RemoteReadResult> readResults = new ArrayList<>();
List<RemoteRoutingTableService.RemoteIndexRoutingResult> readIndexRoutingTableResults = new ArrayList<>();
List<Exception> indexRoutingExceptionList = Collections.synchronizedList(new ArrayList<>(indicesRoutingToRead.size()));
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks));

LatchedActionListener<RemoteClusterStateUtils.RemoteReadResult> listener = new LatchedActionListener<>(
Expand Down Expand Up @@ -854,6 +859,32 @@ private ClusterState readClusterStateInParallel(
);
}

LatchedActionListener<RemoteRoutingTableService.RemoteIndexRoutingResult> routingTableLatchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(
response -> {
logger.info("Successfully read cluster state component from remote");
readIndexRoutingTableResults.add(response);
},
ex -> {
logger.error("Failed to read cluster state from remote", ex);
indexRoutingExceptionList.add(ex);
}
),
latch
);

for (UploadedIndexMetadata indexRouting: indicesRoutingToRead) {
asyncMetadataReadActions.add(
remoteRoutingTableService.getAsyncIndexMetadataReadAction(
clusterName,
clusterUUID,
indexRouting.getUploadedFilename(),
new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()),
routingTableLatchedActionListener
)
);
}

for (Map.Entry<String, UploadedMetadataAttribute> entry : customToRead.entrySet()) {
asyncMetadataReadActions.add(
remoteGlobalMetadataManager.getAsyncMetadataReadAction(
Expand Down Expand Up @@ -960,12 +991,20 @@ private ClusterState readClusterStateInParallel(
exceptionList.forEach(exception::addSuppressed);
throw exception;
}

if (!indexRoutingExceptionList.isEmpty()) {
RemoteStateTransferException exception = new RemoteStateTransferException("Exception during reading routing table from remote");
exceptionList.forEach(exception::addSuppressed);
throw exception;
}
ClusterState.Builder clusterStateBuilder = ClusterState.builder(previousState);
AtomicReference<DiscoveryNodes.Builder> discoveryNodesBuilder = new AtomicReference<>(DiscoveryNodes.builder());
Metadata.Builder metadataBuilder = Metadata.builder(previousState.metadata());
metadataBuilder.clusterUUID(manifest.getClusterUUID());
metadataBuilder.clusterUUIDCommitted(manifest.isClusterUUIDCommitted());
Map<String, IndexMetadata> indexMetadataMap = new HashMap<>();
Map<String, IndexRoutingTable> indicesRouting = new HashMap<>();

readResults.forEach(remoteReadResult -> {
switch (remoteReadResult.getComponent()) {
case INDEX_PATH_TOKEN:
Expand Down Expand Up @@ -995,13 +1034,19 @@ private ClusterState readClusterStateInParallel(
throw new IllegalStateException("Unknown component: " + remoteReadResult.getComponent());
}
});

readIndexRoutingTableResults.forEach(remoteIndexRoutingResult -> {
indicesRouting.put(remoteIndexRoutingResult.getIndexName(), remoteIndexRoutingResult.getIndexRoutingTable());
});

metadataBuilder.indices(indexMetadataMap);
if (readDiscoveryNodes) {
clusterStateBuilder.nodes(discoveryNodesBuilder.get().localNodeId(localNodeId));
}
return clusterStateBuilder.metadata(metadataBuilder)
.version(manifest.getStateVersion())
.stateUUID(manifest.getStateUUID())
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indicesRouting))
.build();
}

Expand All @@ -1019,7 +1064,8 @@ public ClusterState getClusterStateForManifest(String clusterName, ClusterMetada
manifest.getSettingsMetadata() != null,
manifest.getTemplatesMetadata() != null,
manifest.getDiscoveryNodesMetadata() != null,
manifest.getClusterBlocksMetadata() != null
manifest.getClusterBlocksMetadata() != null,
manifest.getIndicesRouting()
);
}

Expand All @@ -1031,6 +1077,13 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata
assert uploadedIndexMetadataOptional.isPresent() == true;
return uploadedIndexMetadataOptional.get();
}).collect(Collectors.toList());

List<UploadedIndexMetadata> updatedIndexRouting = diff.getIndicesRoutingUpdated().stream().map(idx -> {
Optional<UploadedIndexMetadata> uploadedIndexMetadataOptional = manifest.getIndicesRouting().stream().filter(idx2 -> idx2.getIndexName().equals(idx)).findFirst();
assert uploadedIndexMetadataOptional.isPresent() == true;
return uploadedIndexMetadataOptional.get();
}).collect(Collectors.toList());

Map<String, UploadedMetadataAttribute> updatedCustomMetadata = new HashMap<>();
for (String customType : diff.getCustomMetadataUpdated().keySet()) {
updatedCustomMetadata.put(customType, manifest.getCustomMetadataMap().get(customType));
Expand All @@ -1047,7 +1100,8 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata
diff.isSettingsMetadataUpdated(),
diff.isTemplatesMetadataUpdated(),
diff.isDiscoveryNodesUpdated(),
diff.isClusterBlocksUpdated()
diff.isClusterBlocksUpdated(),
updatedIndexRouting
);
ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState);
Metadata.Builder metadataBuilder = Metadata.builder(updatedClusterState.metadata());
Expand All @@ -1065,23 +1119,21 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata
metadataBuilder.removeCustom(customType);
}

// Constructing Routing Table
if(remoteRoutingTableService!= null){
RoutingTable routingTable = remoteRoutingTableService.getIncrementalRoutingTable(previousState, manifest);
return clusterStateBuilder.
stateUUID(manifest.getStateUUID()).
version(manifest.getStateVersion()).
metadata(metadataBuilder).
routingTable(routingTable).
build();
HashMap<String, IndexRoutingTable> indexRoutingTables = new HashMap<>(updatedClusterState.getRoutingTable().getIndicesRouting());

for(String indexName: diff.getIndicesRoutingDeleted()){
indexRoutingTables.remove(indexName);
}

RoutingTable routingTable = new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables);

return clusterStateBuilder.
stateUUID(manifest.getStateUUID()).
version(manifest.getStateVersion()).
metadata(metadataBuilder).
routingTable(routingTable).
build();

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public RemoteStateTransferException(String errorDesc, Throwable cause) {
}
}

static class UploadedMetadataResults {
public static class UploadedMetadataResults {
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadata;
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedCustomMetadataMap;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedCoordinationMetadata;
Expand Down Expand Up @@ -105,7 +105,7 @@ public UploadedMetadataResults() {
}
}

static class RemoteReadResult {
public static class RemoteReadResult {
ToXContent obj;
String component;
String componentName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;

import java.util.function.Supplier;

Expand Down Expand Up @@ -55,7 +56,8 @@ public void setup() {
remoteRoutingTableService = new RemoteRoutingTableService(
repositoriesServiceSupplier,
settings,
clusterSettings
clusterSettings,
new ThreadPool(settings)
);
}

Expand All @@ -73,7 +75,8 @@ public void testFailInitializationWhenRemoteRoutingDisabled() {
() -> new RemoteRoutingTableService(
repositoriesServiceSupplier,
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new ThreadPool(settings)
)
);
}
Expand Down

0 comments on commit 6d33900

Please sign in to comment.