Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented computation of segment replication stats at shard level #17055

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
44a1134
Implemented computation of segment replication stats at shard level
vinaykpud Jan 19, 2025
5d138e3
Updated style checks in the test
vinaykpud Jan 19, 2025
18664d2
Updated changelog
vinaykpud Jan 19, 2025
43d0798
Merge branch 'main' into node-stats
vinaykpud Jan 19, 2025
3468164
fixed style issues
vinaykpud Jan 19, 2025
4e693a5
Fix the failing integration test
vinaykpud Jan 21, 2025
5a3d1ef
Fix stylecheck
vinaykpud Jan 21, 2025
a94240f
Fixed the comments for the initial revision
vinaykpud Jan 21, 2025
dd0406d
Updated to use System.nanoTime() for lag calculation
vinaykpud Jan 21, 2025
1104c1f
Fixed the integration test for node stats
vinaykpud Jan 22, 2025
59e2617
Merge branch 'main' into node-stats
vinaykpud Jan 22, 2025
90b96a8
Modified the version in the ReplicationCheckpoint for backward compat…
vinaykpud Jan 23, 2025
7f465a0
Added precomputation logic for the stats calculation
vinaykpud Jan 27, 2025
23cedac
Merge branch 'main' into node-stats
vinaykpud Jan 27, 2025
28f1cfc
Removed unwanted lines
vinaykpud Jan 27, 2025
f80791f
Clean up the maps when index closed
vinaykpud Jan 27, 2025
29ffb01
Added a null check for the indexshard checkpoint
vinaykpud Jan 28, 2025
4fe2f87
fix style checks
vinaykpud Jan 28, 2025
5d1180f
Merge branch 'main' into node-stats
vinaykpud Jan 28, 2025
c838033
Updated version and added bwc for RemoteSegmentMetadata
vinaykpud Feb 4, 2025
1fdd5d2
Upated the javadoc comments
vinaykpud Feb 4, 2025
73efd49
Merge branch 'main' into node-stats
vinaykpud Feb 4, 2025
85dd342
Address comments PR
vinaykpud Feb 19, 2025
7252842
Merge branch 'main' into node-stats
vinaykpud Feb 19, 2025
c993370
Removed the latestReceivedCheckpoint map from SegmentReplicationTarge…
vinaykpud Feb 20, 2025
6879f4c
Added granular locks for the concurrency of stats methods
vinaykpud Feb 24, 2025
6ee1808
Style check fixes
vinaykpud Feb 24, 2025
4e2b335
Merge branch 'main' into node-stats
vinaykpud Feb 24, 2025
e928e80
Changes to maintain atomicity
vinaykpud Feb 25, 2025
04ba008
Merge branch 'main' into node-stats
vinaykpud Feb 25, 2025
3d030d5
spotlessApply
vinaykpud Feb 25, 2025
fcc57bf
removed querying the remotestore when replication is in progress
vinaykpud Feb 26, 2025
25fd006
Merge branch 'main' into node-stats
vinaykpud Feb 26, 2025
d8585f7
spotlessApply
vinaykpud Feb 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Propagate the sourceIncludes and excludes fields from fetchSourceContext to FieldsVisitor. ([#17080](https://github.com/opensearch-project/OpenSearch/pull/17080))
- [Star Tree] [Search] Resolving Date histogram with metric aggregation using star-tree ([#16674](https://github.com/opensearch-project/OpenSearch/pull/16674))
- [Star Tree] [Search] Extensible design to support different query and field types ([#17137](https://github.com/opensearch-project/OpenSearch/pull/17137))
- Implemented computation of segment replication stats at shard level ([#17055](https://github.com/opensearch-project/OpenSearch/pull/17055))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;

Expand All @@ -136,6 +137,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
import static org.mockito.Mockito.mock;

public class IndexShardIT extends OpenSearchSingleNodeTestCase {

Expand Down Expand Up @@ -716,7 +718,8 @@ public static final IndexShard newIndexShard(
null,
DefaultRemoteStoreSettings.INSTANCE,
false,
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting)
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting),
mock(Function.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,19 +404,17 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception {

for (NodeStats nodeStats : nodesStatsResponse.getNodes()) {
ReplicationStats replicationStats = nodeStats.getIndices().getSegments().getReplicationStats();
// primary node - should hold replication statistics
// primary node - do not have any replication statistics
if (nodeStats.getNode().getName().equals(primaryNode)) {
assertTrue(replicationStats.getMaxBytesBehind() == 0);
assertTrue(replicationStats.getTotalBytesBehind() == 0);
assertTrue(replicationStats.getMaxReplicationLag() == 0);
}
// replica nodes - should hold replication statistics
if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) {
assertTrue(replicationStats.getMaxBytesBehind() > 0);
assertTrue(replicationStats.getTotalBytesBehind() > 0);
assertTrue(replicationStats.getMaxReplicationLag() > 0);
// 2 replicas so total bytes should be double of max
assertEquals(replicationStats.getMaxBytesBehind() * 2, replicationStats.getTotalBytesBehind());
}
// replica nodes - should hold empty replication statistics
if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) {
assertEquals(0, replicationStats.getMaxBytesBehind());
assertEquals(0, replicationStats.getTotalBytesBehind());
assertEquals(0, replicationStats.getMaxReplicationLag());
}
}
// get replication statistics at index level
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.io;

/**
* Interface for factory to provide handler implementation for type {@link T}
* @param <T> The type of content to be read/written to stream
*
* @opensearch.internal
*/
public interface IndexIOStreamHandlerFactory<T> {

/**
* Implements logic to provide handler based on the stream versions
* @param version stream version
* @return Handler for reading/writing content streams to/from - {@link T}
*/
IndexIOStreamHandler<T> getHandler(int version);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,24 @@
public class VersionedCodecStreamWrapper<T> {
private static final Logger logger = LogManager.getLogger(VersionedCodecStreamWrapper.class);

// TODO This can be updated to hold a streamReadWriteHandlerFactory and get relevant handler based on the stream versions
private final IndexIOStreamHandler<T> indexIOStreamHandler;
private final IndexIOStreamHandlerFactory<T> indexIOStreamHandlerFactory;
private final int minVersion;
private final int currentVersion;
private final String codec;

/**
* @param indexIOStreamHandler handler to read/write stream from T
* @param indexIOStreamHandlerFactory factory for providing handler to read/write stream from T
* @param currentVersion latest supported version of the stream
* @param codec: stream codec
*/
public VersionedCodecStreamWrapper(IndexIOStreamHandler<T> indexIOStreamHandler, int currentVersion, String codec) {
this.indexIOStreamHandler = indexIOStreamHandler;
public VersionedCodecStreamWrapper(
IndexIOStreamHandlerFactory<T> indexIOStreamHandlerFactory,
int minVersion,
int currentVersion,
String codec
) {
this.indexIOStreamHandlerFactory = indexIOStreamHandlerFactory;
this.minVersion = minVersion;
this.currentVersion = currentVersion;
this.codec = codec;
}
Expand Down Expand Up @@ -87,7 +93,7 @@ public void writeStream(IndexOutput indexOutput, T content) throws IOException {
*/
private int checkHeader(IndexInput indexInput) throws IOException {
// TODO Once versioning strategy is decided we'll add support for min/max supported versions
return CodecUtil.checkHeader(indexInput, this.codec, this.currentVersion, this.currentVersion);
return CodecUtil.checkHeader(indexInput, this.codec, minVersion, this.currentVersion);
}

/**
Expand Down Expand Up @@ -120,8 +126,7 @@ private void writeFooter(IndexOutput indexOutput) throws IOException {
* @param version stream content version
*/
private IndexIOStreamHandler<T> getHandlerForVersion(int version) {
// TODO implement factory and pick relevant handler based on version.
// It should also take into account min and max supported versions
return this.indexIOStreamHandler;
// TODO factory should also take into account min and max supported versions
return this.indexIOStreamHandlerFactory.getHandler(version);
}
}
10 changes: 7 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.NodeEnvironment;
Expand Down Expand Up @@ -652,7 +653,8 @@
clusterDefaultRefreshIntervalSupplier,
recoverySettings,
remoteStoreSettings,
(s) -> {}
(s) -> {},
shardId -> ReplicationStats.empty()

Check warning on line 657 in server/src/main/java/org/opensearch/index/IndexModule.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexModule.java#L656-L657

Added lines #L656 - L657 were not covered by tests
);
}

Expand All @@ -678,7 +680,8 @@
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Consumer<IndexShard> replicator
Consumer<IndexShard> replicator,
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -740,7 +743,8 @@
remoteStoreSettings,
fileCache,
compositeIndexSettings,
replicator
replicator,
segmentReplicationStatsProvider
);
success = true;
return indexService;
Expand Down
11 changes: 8 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -235,7 +236,8 @@
RemoteStoreSettings remoteStoreSettings,
FileCache fileCache,
CompositeIndexSettings compositeIndexSettings,
Consumer<IndexShard> replicator
Consumer<IndexShard> replicator,
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -322,6 +324,7 @@
this.compositeIndexSettings = compositeIndexSettings;
this.fileCache = fileCache;
this.replicator = replicator;
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -398,7 +401,8 @@
remoteStoreSettings,
null,
null,
s -> {}
s -> {},
(shardId) -> ReplicationStats.empty()

Check warning on line 405 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L404-L405

Added lines #L404 - L405 were not covered by tests
);
}

Expand Down Expand Up @@ -694,7 +698,8 @@
recoverySettings,
remoteStoreSettings,
seedRemote,
discoveryNodes
discoveryNodes,
segmentReplicationStatsProvider
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public ReplicationStats(StreamInput in) throws IOException {
this.maxReplicationLag = in.readVLong();
}

public static ReplicationStats empty() {
return new ReplicationStats();
}

public ReplicationStats() {

}
Expand Down
23 changes: 10 additions & 13 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@
*/
private final ShardMigrationState shardMigrationState;
private DiscoveryNodes discoveryNodes;
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -391,7 +392,8 @@
final RecoverySettings recoverySettings,
final RemoteStoreSettings remoteStoreSettings,
boolean seedRemote,
final DiscoveryNodes discoveryNodes
final DiscoveryNodes discoveryNodes,
final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -493,6 +495,7 @@
this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote);
this.discoveryNodes = discoveryNodes;
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -1784,7 +1787,8 @@
segmentInfos.getVersion(),
metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName(),
metadataMap
metadataMap,
System.nanoTime()
);
logger.trace("Recomputed ReplicationCheckpoint for shard {}", checkpoint);
return checkpoint;
Expand Down Expand Up @@ -3233,17 +3237,10 @@
}

public ReplicationStats getReplicationStats() {
if (indexSettings.isSegRepEnabledOrRemoteNode() && routingEntry().primary()) {
final Set<SegmentReplicationShardStats> stats = getReplicationStatsForTrackedReplicas();
long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L);
long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum();
long maxReplicationLag = stats.stream()
.mapToLong(SegmentReplicationShardStats::getCurrentReplicationLagMillis)
.max()
.orElse(0L);
return new ReplicationStats(maxBytesBehind, totalBytesBehind, maxReplicationLag);
}
return new ReplicationStats();
if (indexSettings.isSegRepEnabledOrRemoteNode() && !routingEntry().primary()) {
return segmentReplicationStatsProvider.apply(shardId);

Check warning on line 3241 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L3241

Added line #L3241 was not covered by tests
}
return ReplicationStats.empty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandlerFactory;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -104,7 +104,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
private Map<String, UploadedSegmentMetadata> segmentsUploadedToRemoteStore;

private static final VersionedCodecStreamWrapper<RemoteSegmentMetadata> metadataStreamWrapper = new VersionedCodecStreamWrapper<>(
new RemoteSegmentMetadataHandler(),
new RemoteSegmentMetadataHandlerFactory(),
RemoteSegmentMetadata.VERSION_ONE,
RemoteSegmentMetadata.CURRENT_VERSION,
RemoteSegmentMetadata.METADATA_CODEC
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@
*/
@PublicApi(since = "2.6.0")
public class RemoteSegmentMetadata {

public static final int VERSION_ONE = 1;

public static final int VERSION_TWO = 2;

/**
* Latest supported version of metadata
*/
public static final int CURRENT_VERSION = 1;
public static final int CURRENT_VERSION = VERSION_TWO;
/**
* Metadata codec
*/
Expand Down Expand Up @@ -106,18 +111,30 @@ public static Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> f
);
}

/**
* Write always writes with the latest version of the RemoteSegmentMetadata
* @param out file output stream which will store stream content
* @throws IOException in case there is a problem writing the file
*/
public void write(IndexOutput out) throws IOException {
out.writeMapOfStrings(toMapOfStrings());
writeCheckpointToIndexOutput(replicationCheckpoint, out);
out.writeLong(segmentInfosBytes.length);
out.writeBytes(segmentInfosBytes, segmentInfosBytes.length);
}

public static RemoteSegmentMetadata read(IndexInput indexInput) throws IOException {
/**
* Read can happen in the upgraded version of replica which needs to support all versions of RemoteSegmentMetadata
* @param indexInput file input stream
* @param version version of the RemoteSegmentMetadata
* @return {@code RemoteSegmentMetadata}
* @throws IOException in case there is a problem reading from the file input stream
*/
public static RemoteSegmentMetadata read(IndexInput indexInput, int version) throws IOException {
Map<String, String> metadata = indexInput.readMapOfStrings();
final Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap = RemoteSegmentMetadata
.fromMapOfStrings(metadata);
ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput, uploadedSegmentMetadataMap);
ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput, uploadedSegmentMetadataMap, version);
int byteArraySize = (int) indexInput.readLong();
byte[] segmentInfosBytes = new byte[byteArraySize];
indexInput.readBytes(segmentInfosBytes, 0, byteArraySize);
Expand All @@ -136,11 +153,13 @@ public static void writeCheckpointToIndexOutput(ReplicationCheckpoint replicatio
out.writeLong(replicationCheckpoint.getSegmentInfosVersion());
out.writeLong(replicationCheckpoint.getLength());
out.writeString(replicationCheckpoint.getCodec());
out.writeLong(replicationCheckpoint.getCreatedTimeStamp());
}

private static ReplicationCheckpoint readCheckpointFromIndexInput(
IndexInput in,
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap,
int version
) throws IOException {
return new ReplicationCheckpoint(
new ShardId(new Index(in.readString(), in.readString()), in.readVInt()),
Expand All @@ -149,7 +168,8 @@ private static ReplicationCheckpoint readCheckpointFromIndexInput(
in.readLong(),
in.readLong(),
in.readString(),
toStoreFileMetadata(uploadedSegmentMetadataMap)
toStoreFileMetadata(uploadedSegmentMetadataMap),
version >= CURRENT_VERSION ? in.readLong() : 0
);
}

Expand Down
Loading
Loading