Skip to content

Commit

Permalink
Minor refactoring for read flow
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed May 30, 2024
1 parent e541318 commit e8fec03
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -49,6 +50,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
Expand Down Expand Up @@ -264,6 +266,42 @@ public IndexRoutingTableInputStreamReader read(BlobContainer blobContainer, Stri
return null;
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(List<String> updatedIndicesRouting, List<ClusterMetadataManifest.UploadedIndexMetadata> allIndicesRouting) {
return updatedIndicesRouting.stream().map(idx -> {
Optional<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadataOptional = allIndicesRouting.stream().filter(idx2 -> idx2.getIndexName().equals(idx)).findFirst();
assert uploadedIndexMetadataOptional.isPresent() == true;
return uploadedIndexMetadataOptional.get();
}).collect(Collectors.toList());
}

public static List<String> getIndicesRoutingDeleted(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) {
List<String> deletedIndicesRouting = new ArrayList<>();
for(IndexRoutingTable previousIndexRouting: previousRoutingTable.getIndicesRouting().values()) {
if(!currentRoutingTable.getIndicesRouting().containsKey(previousIndexRouting.getIndex().getName())) {
// Latest Routing Table does not have entry for the index which means the index is deleted
deletedIndicesRouting.add(previousIndexRouting.getIndex().getName());
}
}
return deletedIndicesRouting;
}

public static List<String> getIndicesRoutingUpdated(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) {
List<String> updatedIndicesRouting = new ArrayList<>();
for(IndexRoutingTable currentIndicesRouting: currentRoutingTable.getIndicesRouting().values()) {
if(!previousRoutingTable.getIndicesRouting().containsKey(currentIndicesRouting.getIndex().getName())) {
// Latest Routing Table does not have entry for the index which means the index is created
updatedIndicesRouting.add(currentIndicesRouting.getIndex().getName());
} else {
if(previousRoutingTable.getIndicesRouting().get(currentIndicesRouting.getIndex().getName()).equals(currentIndicesRouting)) {
// if the latest routing table has the same routing table as the previous routing table, then the index is not updated
continue;
}
updatedIndicesRouting.add(currentIndicesRouting.getIndex().getName());
}
}
return updatedIndicesRouting;
}

@Override
public void close() throws IOException {
if (blobStoreRepository != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContentObject;
Expand Down Expand Up @@ -83,8 +84,8 @@ public class ClusterStateDiffManifest implements ToXContentObject {
customMetadataDeleted.add(custom);
}
}
indicesRoutingUpdated = getIndicesRoutingUpdated(previousState.routingTable(), state.routingTable());
indicesRoutingDeleted = getIndicesRoutingDeleted(previousState.routingTable(), state.routingTable());
indicesRoutingUpdated = RemoteRoutingTableService.getIndicesRoutingUpdated(previousState.routingTable(), state.routingTable());
indicesRoutingDeleted = RemoteRoutingTableService.getIndicesRoutingDeleted(previousState.routingTable(), state.routingTable());
}

public ClusterStateDiffManifest(String fromStateUUID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1106,11 +1106,7 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata
return uploadedIndexMetadataOptional.get();
}).collect(Collectors.toList());

List<UploadedIndexMetadata> updatedIndexRouting = diff.getIndicesRoutingUpdated().stream().map(idx -> {
Optional<UploadedIndexMetadata> uploadedIndexMetadataOptional = manifest.getIndicesRouting().stream().filter(idx2 -> idx2.getIndexName().equals(idx)).findFirst();
assert uploadedIndexMetadataOptional.isPresent() == true;
return uploadedIndexMetadataOptional.get();
}).collect(Collectors.toList());
List<UploadedIndexMetadata> updatedIndexRouting = remoteRoutingTableService.getUpdatedIndexRoutingTableMetadata(diff.getIndicesRoutingUpdated(), manifest.getIndicesRouting());

Map<String, UploadedMetadataAttribute> updatedCustomMetadata = new HashMap<>();
if (diff.getCustomMetadataUpdated() != null) {
Expand Down

0 comments on commit e8fec03

Please sign in to comment.