Skip to content

Commit

Permalink
Add basic read-remote-routing flow
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed May 16, 2024
1 parent 560ecb8 commit 4c33720
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
// index to IndexRoutingTable map
private final Map<String, IndexRoutingTable> indicesRouting;

private RoutingTable(long version, final Map<String, IndexRoutingTable> indicesRouting) {
public RoutingTable(long version, final Map<String, IndexRoutingTable> indicesRouting) {
this.version = version;
this.indicesRouting = Collections.unmodifiableMap(indicesRouting);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStream;
import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStreamReader;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
Expand All @@ -40,6 +42,7 @@

import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -159,6 +162,23 @@ public RoutingTable getIncrementalRoutingTable(ClusterState previousClusterState
return null;
}

public RoutingTable getLatestRoutingTable(long routingTableVersion, List<ClusterMetadataManifest.UploadedIndexMetadata> indicesRoutingMetaData) throws IOException {
Map<String, IndexRoutingTable> indicesRouting = new HashMap<>();

for(ClusterMetadataManifest.UploadedIndexMetadata indexRoutingMetaData: indicesRoutingMetaData) {
logger.debug("Starting the read for first indexRoutingMetaData: {}", indexRoutingMetaData);
String filePath = indexRoutingMetaData.getUploadedFilePath();
BlobContainer container = blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath().add(filePath));
InputStream inputStream = container.readBlob(indexRoutingMetaData.getIndexName());
IndexRoutingTableInputStreamReader indexRoutingTableInputStreamReader = new IndexRoutingTableInputStreamReader(inputStream);
Index index = new Index(indexRoutingMetaData.getIndexName(), indexRoutingMetaData.getIndexUUID());
IndexRoutingTable indexRouting = indexRoutingTableInputStreamReader.readIndexRoutingTable(index);
indicesRouting.put(indexRoutingMetaData.getIndexName(), indexRouting);
logger.debug("IndexRouting {}", indexRouting);
}
return new RoutingTable(routingTableVersion, indicesRouting);
}

private void deleteStaleRoutingTable(String clusterName, String clusterUUID, int manifestsToRetain) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@

package org.opensearch.gateway.remote.routingtable;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.common.io.stream.BufferedChecksumStreamOutput;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesReference;

import java.io.IOException;
Expand Down Expand Up @@ -56,24 +53,20 @@ public class IndexRoutingTableInputStream extends InputStream {
private static final int BUFFER_SIZE = 8192;

private final IndexRoutingTableHeader indexRoutingTableHeader;

private final Iterator<IndexShardRoutingTable> shardIter;
private static final Logger logger = LogManager.getLogger(IndexRoutingTableInputStream.class);
private final BytesStreamOutput bytesStreamOutput;
private final BufferedChecksumStreamOutput out;

public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable) throws IOException {
this(indexRoutingTable, BUFFER_SIZE);
}

public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, int size)
throws IOException {
public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, int size) throws IOException {
this.buf = new byte[size];
this.shardIter = indexRoutingTable.iterator();
this.indexRoutingTableHeader = new IndexRoutingTableHeader(indexRoutingTable.getIndex().getName());
this.bytesStreamOutput = new BytesStreamOutput();
this.out = new BufferedChecksumStreamOutput(bytesStreamOutput);
logger.info("indexRoutingTable {}", indexRoutingTable.prettyPrint());

initialFill(indexRoutingTable.shards().size());
}
Expand All @@ -91,34 +84,34 @@ private void initialFill(int shardCount) throws IOException {
indexRoutingTableHeader.write(out);
out.writeVInt(shardCount);

System.arraycopy(bytesStreamOutput.bytes().toBytesRef().bytes, 0 , buf, 0, bytesStreamOutput.bytes().length());
System.arraycopy(bytesStreamOutput.bytes().toBytesRef().bytes, 0, buf, 0, bytesStreamOutput.bytes().length());
count = bytesStreamOutput.bytes().length();
bytesStreamOutput.reset();
fill(buf);
}

private void fill(byte[] buf) throws IOException {
if (leftOverBuf != null) {
if(leftOverBuf.length > buf.length - count) {
// leftOverBuf has more content than length of buf, so we need to copy only based on buf length and keep the remaining in leftOverBuf.
if (leftOverBuf.length > buf.length - count) {
// leftOverBuf has more content than length of buf, so we need to copy only based on buf length and keep the remaining in
// leftOverBuf.
System.arraycopy(leftOverBuf, 0, buf, count, buf.length - count);
byte[] tempLeftOverBuffer = new byte[leftOverBuf.length - (buf.length - count)];
System.arraycopy(leftOverBuf, buf.length - count , tempLeftOverBuffer, 0, leftOverBuf.length - (buf.length - count));
byte[] tempLeftOverBuffer = new byte[leftOverBuf.length - (buf.length - count)];
System.arraycopy(leftOverBuf, buf.length - count, tempLeftOverBuffer, 0, leftOverBuf.length - (buf.length - count));
leftOverBuf = tempLeftOverBuffer;
count = buf.length - count;

} else {
System.arraycopy(leftOverBuf, 0, buf, count, leftOverBuf.length);
count += leftOverBuf.length;
count += leftOverBuf.length;
leftOverBuf = null;
}
}

if (count < buf.length && shardIter.hasNext()) {
IndexShardRoutingTable next = shardIter.next();
IndexShardRoutingTable.Builder.writeTo(next, out);
//Add checksum for the file after all shards are done
if(!shardIter.hasNext()) {
// Add checksum for the file after all shards are done
if (!shardIter.hasNext()) {
out.writeLong(out.getChecksum());
}
out.flush();
Expand All @@ -132,12 +125,10 @@ private void fill(byte[] buf) throws IOException {
} else {
System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, buf.length - count);
leftOverBuf = new byte[bytesRef.length() - (buf.length - count)];
System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count , leftOverBuf, 0, bytesRef.length() - (buf.length - count));
System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count, leftOverBuf, 0, bytesRef.length() - (buf.length - count));
count = buf.length;

}
}

}

private void maybeResizeAndFill() throws IOException {
Expand All @@ -153,7 +144,7 @@ else if (pos >= buffer.length) { /* no room left in buffer */
markPos = -1; /* buffer got too big, invalidate mark */
pos = 0; /* drop buffer contents */
} else { /* grow buffer */
int nsz = markLimit + 1; //NEED TO CHECK THIS
int nsz = markLimit + 1;
byte[] nbuf = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
buffer = nbuf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,13 @@
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.common.io.stream.BufferedChecksumStreamInput;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.Index;

import java.io.BufferedReader;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class IndexRoutingTableInputStreamReader {

Expand All @@ -34,32 +28,31 @@ public class IndexRoutingTableInputStreamReader {
private static final Logger logger = LogManager.getLogger(IndexRoutingTableInputStreamReader.class);

public IndexRoutingTableInputStreamReader(InputStream inputStream) throws IOException {
this.streamInput = new InputStreamStreamInput(inputStream);
streamInput = new InputStreamStreamInput(inputStream);
}

public Map<String, IndexShardRoutingTable> read() throws IOException {
public IndexRoutingTable readIndexRoutingTable(Index index) throws IOException {
try {
try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(streamInput, "assertion")) {
// Read the Table Header first
IndexRoutingTableHeader.read(in);
int shards = in.readVInt();
logger.info("Number of Index Routing Table {}", shards);
Map<String, IndexShardRoutingTable> indicesRouting = new HashMap<>();
for(int i=0; i<shards; i++)
{
// Read the Table Header first and confirm the index
IndexRoutingTableHeader indexRoutingTableHeader = IndexRoutingTableHeader.read(in);
assert indexRoutingTableHeader.getIndexName().equals(index.getName());

int numberOfShardRouting = in.readVInt();
logger.debug("Number of Index Routing Table {}", numberOfShardRouting);
IndexRoutingTable.Builder indicesRoutingTable = IndexRoutingTable.builder(index);
for (int idx = 0; idx < numberOfShardRouting; idx++) {
IndexShardRoutingTable indexShardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in);
logger.info("Index Shard Routing Table reading {}", indexShardRoutingTable);
indicesRouting.put(indexShardRoutingTable.getShardId().getIndexName(), indexShardRoutingTable);
logger.debug("Index Shard Routing Table reading {}", indexShardRoutingTable);
indicesRoutingTable.addIndexShard(indexShardRoutingTable);

}
verifyCheckSum(in);
// Return indices Routing table
return indicesRouting;
return indicesRoutingTable.build();
}
} catch (EOFException e) {
throw new IOException("Indices Routing table is corrupted", e);
}

}

private void verifyCheckSum(BufferedChecksumStreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,21 @@

package org.opensearch.gateway.remote.routingtable;

import org.apache.lucene.util.BytesRef;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.seqno.ReplicationTrackerTestCase;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.io.InputStream;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import java.util.concurrent.atomic.AtomicInteger;

public class IndexRoutingTableInputStreamTests extends OpenSearchTestCase {

public void testRoutingTableInputStream(){
public void testRoutingTableInputStream() {
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
Expand All @@ -47,15 +34,40 @@ public void testRoutingTableInputStream(){
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables);

IndexRoutingTableInputStreamReader reader = new IndexRoutingTableInputStreamReader(indexRoutingStream);
Map<String, IndexShardRoutingTable> indexShardRoutingTableMap = reader.read();
IndexRoutingTable indexRoutingTable = reader.readIndexRoutingTable(metadata.index("test").getIndex());

assertEquals(1, indexShardRoutingTableMap.size());
assertNotNull(indexShardRoutingTableMap.get("test"));
assertEquals(2,indexShardRoutingTableMap.get("test").shards().size());
assertEquals(1, indexRoutingTable.getShards().size());
assertEquals(indexRoutingTable.getIndex(), metadata.index("test").getIndex());
assertEquals(indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED).size(), 2);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

public void testRoutingTableInputStreamWithInvalidIndex() {
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetadata.builder("invalid-index").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();

RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();
AtomicInteger assertionError = new AtomicInteger();
initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> {
try {
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables);

IndexRoutingTableInputStreamReader reader = new IndexRoutingTableInputStreamReader(indexRoutingStream);
reader.readIndexRoutingTable(metadata.index("invalid-index").getIndex());

} catch (AssertionError e) {
assertionError.getAndIncrement();
} catch (IOException e) {
throw new RuntimeException(e);
}
});

assertEquals(1, assertionError.get());
}

}

0 comments on commit 4c33720

Please sign in to comment.