diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java index e4095a84be081..6c7b94f316da2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java @@ -79,7 +79,7 @@ public class RoutingTable implements Iterable, Diffable indicesRouting; - private RoutingTable(long version, final Map indicesRouting) { + public RoutingTable(long version, final Map indicesRouting) { this.version = version; this.indicesRouting = Collections.unmodifiableMap(indicesRouting); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 230178252b81f..017ac23ce1427 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -24,9 +24,11 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStream; +import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStreamReader; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; @@ -40,6 +42,7 @@ import java.io.InputStream; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -159,6 +162,23 @@ public RoutingTable getIncrementalRoutingTable(ClusterState previousClusterState return null; } + public RoutingTable getLatestRoutingTable(long routingTableVersion, List indicesRoutingMetaData) throws IOException { + Map indicesRouting = new HashMap<>(); + + for(ClusterMetadataManifest.UploadedIndexMetadata indexRoutingMetaData: indicesRoutingMetaData) { + logger.debug("Starting the read for first indexRoutingMetaData: {}", indexRoutingMetaData); + String filePath = indexRoutingMetaData.getUploadedFilePath(); + BlobContainer container = blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath().add(filePath)); + InputStream inputStream = container.readBlob(indexRoutingMetaData.getIndexName()); + IndexRoutingTableInputStreamReader indexRoutingTableInputStreamReader = new IndexRoutingTableInputStreamReader(inputStream); + Index index = new Index(indexRoutingMetaData.getIndexName(), indexRoutingMetaData.getIndexUUID()); + IndexRoutingTable indexRouting = indexRoutingTableInputStreamReader.readIndexRoutingTable(index); + indicesRouting.put(indexRoutingMetaData.getIndexName(), indexRouting); + logger.debug("IndexRouting {}", indexRouting); + } + return new RoutingTable(routingTableVersion, indicesRouting); + } + private void deleteStaleRoutingTable(String clusterName, String clusterUUID, int manifestsToRetain) { } diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java index 3c249894a2ace..d4e2594bf153c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java @@ -8,13 +8,10 @@ package org.opensearch.gateway.remote.routingtable; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.core.common.Strings; import org.opensearch.core.common.bytes.BytesReference; import java.io.IOException; @@ -56,9 +53,7 @@ public class IndexRoutingTableInputStream extends InputStream { private static final int BUFFER_SIZE = 8192; private final IndexRoutingTableHeader indexRoutingTableHeader; - private final Iterator shardIter; - private static final Logger logger = LogManager.getLogger(IndexRoutingTableInputStream.class); private final BytesStreamOutput bytesStreamOutput; private final BufferedChecksumStreamOutput out; @@ -66,14 +61,12 @@ public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable) throws this(indexRoutingTable, BUFFER_SIZE); } - public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, int size) - throws IOException { + public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, int size) throws IOException { this.buf = new byte[size]; this.shardIter = indexRoutingTable.iterator(); this.indexRoutingTableHeader = new IndexRoutingTableHeader(indexRoutingTable.getIndex().getName()); this.bytesStreamOutput = new BytesStreamOutput(); this.out = new BufferedChecksumStreamOutput(bytesStreamOutput); - logger.info("indexRoutingTable {}", indexRoutingTable.prettyPrint()); initialFill(indexRoutingTable.shards().size()); } @@ -91,7 +84,7 @@ private void initialFill(int shardCount) throws IOException { indexRoutingTableHeader.write(out); out.writeVInt(shardCount); - System.arraycopy(bytesStreamOutput.bytes().toBytesRef().bytes, 0 , buf, 0, bytesStreamOutput.bytes().length()); + System.arraycopy(bytesStreamOutput.bytes().toBytesRef().bytes, 0, buf, 0, bytesStreamOutput.bytes().length()); count = bytesStreamOutput.bytes().length(); bytesStreamOutput.reset(); fill(buf); @@ -99,17 +92,17 @@ private void initialFill(int shardCount) throws IOException { private void fill(byte[] buf) throws IOException { if (leftOverBuf != null) { - if(leftOverBuf.length > buf.length - count) { - // leftOverBuf has more content than length of buf, so we need to copy only based on buf length and keep the remaining in leftOverBuf. + if (leftOverBuf.length > buf.length - count) { + // leftOverBuf has more content than length of buf, so we need to copy only based on buf length and keep the remaining in + // leftOverBuf. System.arraycopy(leftOverBuf, 0, buf, count, buf.length - count); - byte[] tempLeftOverBuffer = new byte[leftOverBuf.length - (buf.length - count)]; - System.arraycopy(leftOverBuf, buf.length - count , tempLeftOverBuffer, 0, leftOverBuf.length - (buf.length - count)); + byte[] tempLeftOverBuffer = new byte[leftOverBuf.length - (buf.length - count)]; + System.arraycopy(leftOverBuf, buf.length - count, tempLeftOverBuffer, 0, leftOverBuf.length - (buf.length - count)); leftOverBuf = tempLeftOverBuffer; count = buf.length - count; - } else { System.arraycopy(leftOverBuf, 0, buf, count, leftOverBuf.length); - count += leftOverBuf.length; + count += leftOverBuf.length; leftOverBuf = null; } } @@ -117,8 +110,8 @@ private void fill(byte[] buf) throws IOException { if (count < buf.length && shardIter.hasNext()) { IndexShardRoutingTable next = shardIter.next(); IndexShardRoutingTable.Builder.writeTo(next, out); - //Add checksum for the file after all shards are done - if(!shardIter.hasNext()) { + // Add checksum for the file after all shards are done + if (!shardIter.hasNext()) { out.writeLong(out.getChecksum()); } out.flush(); @@ -132,12 +125,10 @@ private void fill(byte[] buf) throws IOException { } else { System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, buf.length - count); leftOverBuf = new byte[bytesRef.length() - (buf.length - count)]; - System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count , leftOverBuf, 0, bytesRef.length() - (buf.length - count)); + System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count, leftOverBuf, 0, bytesRef.length() - (buf.length - count)); count = buf.length; - } } - } private void maybeResizeAndFill() throws IOException { @@ -153,7 +144,7 @@ else if (pos >= buffer.length) { /* no room left in buffer */ markPos = -1; /* buffer got too big, invalidate mark */ pos = 0; /* drop buffer contents */ } else { /* grow buffer */ - int nsz = markLimit + 1; //NEED TO CHECK THIS + int nsz = markLimit + 1; byte[] nbuf = new byte[nsz]; System.arraycopy(buffer, 0, nbuf, 0, pos); buffer = nbuf; diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamReader.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamReader.java index 847d387b228ca..e2b4f5da5f6e9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamReader.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamReader.java @@ -13,19 +13,13 @@ import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.common.io.stream.BufferedChecksumStreamInput; -import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.core.common.io.stream.InputStreamStreamInput; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.index.Index; -import java.io.BufferedReader; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; public class IndexRoutingTableInputStreamReader { @@ -34,32 +28,31 @@ public class IndexRoutingTableInputStreamReader { private static final Logger logger = LogManager.getLogger(IndexRoutingTableInputStreamReader.class); public IndexRoutingTableInputStreamReader(InputStream inputStream) throws IOException { - this.streamInput = new InputStreamStreamInput(inputStream); + streamInput = new InputStreamStreamInput(inputStream); } - public Map read() throws IOException { + public IndexRoutingTable readIndexRoutingTable(Index index) throws IOException { try { try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(streamInput, "assertion")) { - // Read the Table Header first - IndexRoutingTableHeader.read(in); - int shards = in.readVInt(); - logger.info("Number of Index Routing Table {}", shards); - Map indicesRouting = new HashMap<>(); - for(int i=0; i indexShardRoutingTableMap = reader.read(); + IndexRoutingTable indexRoutingTable = reader.readIndexRoutingTable(metadata.index("test").getIndex()); - assertEquals(1, indexShardRoutingTableMap.size()); - assertNotNull(indexShardRoutingTableMap.get("test")); - assertEquals(2,indexShardRoutingTableMap.get("test").shards().size()); + assertEquals(1, indexRoutingTable.getShards().size()); + assertEquals(indexRoutingTable.getIndex(), metadata.index("test").getIndex()); + assertEquals(indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED).size(), 2); } catch (IOException e) { throw new RuntimeException(e); } }); } + public void testRoutingTableInputStreamWithInvalidIndex() { + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetadata.builder("invalid-index").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + AtomicInteger assertionError = new AtomicInteger(); + initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> { + try { + InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables); + + IndexRoutingTableInputStreamReader reader = new IndexRoutingTableInputStreamReader(indexRoutingStream); + reader.readIndexRoutingTable(metadata.index("invalid-index").getIndex()); + + } catch (AssertionError e) { + assertionError.getAndIncrement(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + assertEquals(1, assertionError.get()); + } + }