Skip to content

Commit

Permalink
Fixes in remote store flow
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasvb90 committed Jan 8, 2025
1 parent 1575e94 commit 02453d8
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.cat.RestClusterManagerAction;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -126,19 +133,21 @@ private void verifyAfterSplit(long totalIndexedDocs, Set<String> ids, int parent
// logger.info("Shard stat after first indexing of shard " + shardStat.getShardRouting().shardId().id() + " docs: "
// + shardStat.getStats().indexing.getTotal().getIndexCount() + " seq no: " + shardStat.getSeqNoStats().getMaxSeqNo());
// }

SearchHits hits = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setSize((int) totalIndexedDocs)
.seqNoAndPrimaryTerm(true)
.setPreference("_primary")
.storedFields()
.execute()
.actionGet()
.getHits();

assertThat(hits.getTotalHits().value, equalTo(totalIndexedDocs));
for (String id : ids) {
// Make sure there is no duplicate doc.
assertHitCount(client().prepareSearch("test").setSize(0)
.setQuery(matchQuery("_id", id)).get(), 1);
.setQuery(matchQuery("_id", id)).setPreference("_primary").get(), 1);
}
logger.info("Shard is split successfully");
}
Expand All @@ -149,7 +158,7 @@ public void testShardSplit() throws Exception {
prepareCreate("test", Settings.builder().put("index.number_of_shards", 3)
.put("index.number_of_replicas", replicaCount)).get();
ensureGreen();
int numDocs = scaledRandomIntBetween(1500, 2400);
int numDocs = scaledRandomIntBetween(200, 500);
try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs, 4)) {
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
waitForDocs(numDocs, indexer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,6 @@ static boolean executeBulkItemRequest(
request.ifPrimaryTerm()
);
} else {
// if (context.getBulkShardRequest().shardId().id() == 0) {
// logger.info("Executing item index");
// }
final IndexRequest request = context.getRequestToExecute();
result = primary.applyIndexOperationOnPrimary(
version,
Expand All @@ -672,9 +669,6 @@ static boolean executeBulkItemRequest(
// logger.info("Indexing operation sequence " + result.getSeqNo() + " on shard 0.");
// }
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
// if (context.getBulkShardRequest().shardId().id() == 0) {
// logger.info("Executing bulk item mapping update");
// }
try {
primary.mapperService()
.merge(
Expand Down Expand Up @@ -716,17 +710,8 @@ public void onFailure(Exception e) {
});
return false;
} else {
// if (context.getBulkShardRequest().shardId().id() == 0) {
// logger.info("Forming bulk item result");
// }
onComplete(result, context, updateResult);
// if (context.getBulkShardRequest().shardId().id() == 0) {
// logger.info("Formed bulk item result");
// }
}
// if (context.getBulkShardRequest().shardId().id() == 0) {
// logger.info("Return true bulk item request");
// }
return true;
}

Expand Down Expand Up @@ -887,11 +872,16 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
// if (replica.routingEntry().isStartedChildReplica()) {
// logger.info("Processing seq no." + response.getResponse().getSeqNo() + " on replica child "
// + replica.shardId().id() + ", discarding " + discardOperation);
// } else if (replica.routingEntry().isSplitTarget()) {
// logger.info("Processing seq no. on child primary" + response.getResponse().getSeqNo() + " on replica child "
// + replica.shardId().id() + ", discarding " + discardOperation);
// }
// } else

}
// if (replica.routingEntry().shardId().id() > 2 && replica.routingEntry().started()) {
// logger.info("Processing seq no. on child replica" + response.getResponse().getSeqNo() + " on replica child "
// + replica.shardId().id() + ", discarding " + discardOperation);
// }
// if (replica.routingEntry().shardId().id() == 1 || replica.routingEntry().shardId().id() == 2) {
// logger.info("Processing seq no. on other child replica" + response.getResponse().getSeqNo() + " on replica child ");
// }

if (item.getPrimaryResponse().isFailed()) {
if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ public void onResponse(Void aVoid) {
logger.info("Updating checkpoints for id: " + id);
}
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
if (TransportShardBulkAction.debugRequest.get() && request.shardId().id() == 0) {
logger.info("Updated checkpoints for id: " + id);
}
// if (TransportShardBulkAction.debugRequest.get() && request.shardId().id() == 0) {
// logger.info("Updated checkpoints for id: " + id);
// }
} finally {
decPendingAndFinishIfNeeded();
}
Expand Down Expand Up @@ -256,9 +256,6 @@ private void performOnReplicas(
final ShardRouting primaryRouting = primary.routingEntry();

for (final ShardRouting shardRouting : replicationGroup.getReplicationTargets()) {
if ((primaryRouting.isSplitTarget() || primaryRouting.isRelocationTarget()) && !shardRouting.isSameAllocation(primaryRouting)) {
logger.info("Performing unexpectedly on routing " + shardRouting.shardId().id());
}
ReplicationProxyRequest<ReplicaRequest> proxyRequest = new Builder<ReplicaRequest>(
shardRouting,
primaryRouting,
Expand Down Expand Up @@ -466,9 +463,9 @@ private void decPendingAndFinishIfNeeded() {
if (pendingActions.decrementAndGet() == 0) {
finish();
}
if (TransportShardBulkAction.debugRequest.get() && request.shardId().id() == 0) {
logger.info("Pending action count for id " + id + " is:" + pendingActions.get() + " and list is: " + pendingActionsList);
}
// if (TransportShardBulkAction.debugRequest.get() && request.shardId().id() == 0) {
// logger.info("Pending action count for id " + id + " is:" + pendingActions.get() + " and list is: " + pendingActionsList);
// }
}

private void finish() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,23 @@ public ShardRouting activePrimary(ShardId shardId) {
return null;
}

/**
* Returns the primary child for the given shard id or <code>null</code> if
* no child is found or the parent shard is not splitting.
*/
public ShardRouting primaryChild(ShardId parentShardId, ShardId childShardId) {
for (ShardRouting shardRouting : assignedShards(parentShardId)) {
if (shardRouting.splitting()) {
for (ShardRouting childShard : shardRouting.getRecoveringChildShards()) {
if (childShard.shardId().id() == childShardId.id() && childShard.primary()) {
return childShard;
}
}
}
}
return null;
}

/**
* Returns one active replica shard for the given shard id or <code>null</code> if
* no active replica is found.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public boolean isThrottled() {

@Override
public IndexResult index(Index index) throws IOException {
logger.info("Replication engine: Indexing operation " + index.seqNo() + " on shard " + shardId.id());
// logger.info("Replication engine: Indexing operation " + index.seqNo() + " on shard " + shardId.id());
ensureOpen();
IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false);
final Translog.Location location = translogManager.add(new Translog.Index(index, indexResult));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1234,9 +1234,16 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
}

private boolean isPrimaryRelocationOrChild(String allocationId) {
if (routingTable.primaryShard().splitting()) {
for (ShardRouting child : routingTable.primaryShard().getRecoveringChildShards()) {
if (child.allocationId().getId().equals(allocationId)) {
return true;
}
}
}
Optional<ShardRouting> shardRouting = routingTable.shards()
.stream()
.filter(routing -> routing.allocationId().getId().equals(allocationId) || routing.isSplitTarget())
.filter(routing -> routing.allocationId().getId().equals(allocationId))
.findAny();
return shardRouting.isPresent() && shardRouting.get().primary();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,6 @@ public void relocated(
List<Releasable> releasablesOnHandoffFailures = new ArrayList<>(2);
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
TransportShardBulkAction.debugRequest.set(true);
Thread.sleep(1000);
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
forceRefreshes.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,11 @@ private void onSuccessfulSegmentsSync(
resetBackOffDelayIterator();
// Set the minimum sequence number for keeping translog
indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
// Publishing the new checkpoint which is used for remote store + segrep indexes
checkpointPublisher.publish(indexShard, checkpoint);
// Publishing the new checkpoint which is used for remote store + segrep indexes. Skipping replication from child primary
// since it happens via parent primary on child replicas.
if (indexShard.routingEntry().isSplitTarget() == false) {
checkpointPublisher.publish(indexShard, checkpoint);
}
logger.debug("onSuccessfulSegmentsSync lastRefreshedCheckpoint={} checkpoint={}", lastRefreshedCheckpoint, checkpoint);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
if (hasRemoteSegmentStore && !indexShard.routingEntry().isSplitTarget()) {
// ToDo: This is a temporary mitigation to not fail the peer recovery flow in case there is
// an exception while downloading segments from remote store. For remote backed indexes, we
// plan to revamp this flow so that node-node segment copy will not happen.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,
onSendFileStepComplete(sendFileStep, wrappedSafeCommit, releaseStore);

assert Transports.assertNotTransportThread(this + "[phase1]");
phase1(wrappedSafeCommit.get(), startingSeqNo, () -> 0, sendFileStep, true);
phase1(wrappedSafeCommit.get(), startingSeqNo, () -> 0, sendFileStep, shouldSkipCreateRetentionLeaseStep());
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,20 @@ public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryConte
throw new IllegalStateException("Allocation ID " + allocationId +
" not found in synced checkpoint states of parent shard." + sourceShard.shardId());
}
checkpointStates.put(allocationId, primaryContext.getCheckpointStates().get(allocationId));
ReplicationTracker.CheckpointState checkpointState = primaryContext.getCheckpointStates().get(allocationId);
if (sourceShard.remoteStore() != null &&
recoveryTarget.indexShard().routingEntry().allocationId().getId().equals(allocationId) == false) {
// This is needed to update replicated to false on child replicas because now there won't be any
// doc rep replication on them.
checkpointState = new ReplicationTracker.CheckpointState(
checkpointState.getLocalCheckpoint(),
checkpointState.getGlobalCheckpoint(),
true,
true,
false
);
}
checkpointStates.put(allocationId, checkpointState);
}
ReplicationTracker.PrimaryContext childPrimaryContext = new ReplicationTracker.PrimaryContext(
primaryContext.clusterStateVersion(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
Expand All @@ -33,6 +34,7 @@
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardNotFoundException;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkRequest;
Expand Down Expand Up @@ -172,17 +174,27 @@ public void clusterChanged(ClusterChangedEvent event) {
for (IndexShard shard : indexService) {
if (shard.routingEntry().primary() == false && shard.routingEntry().isSplitTarget() == false) {
// for this shard look up its primary routing, if it has completed a relocation trigger replication
final String previousNode = event.previousState()
.routingTable()
.shardRoutingTable(shard.shardId())
.primaryShard()
.currentNodeId();

final String currentNode = event.state()
.routingTable()
.shardRoutingTable(shard.shardId())
.primaryShard()
.currentNodeId();
if (previousNode.equals(currentNode) == false) {

String previousNode;
try {
previousNode = event.previousState()
.routingTable()
.shardRoutingTable(shard.shardId())
.primaryShard()
.currentNodeId();
} catch (ShardNotFoundException ex) {
// This will be true when a parent shard have just split into new child shards and hence,
// new entries in routing table are only available in latest cluster state.
previousNode = null;
}

if (previousNode == null || previousNode.equals(currentNode) == false) {
processLatestReceivedCheckpoint(shard, Thread.currentThread());
}
}
Expand Down Expand Up @@ -383,7 +395,12 @@ protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaSha
if (replicaShard.indexSettings().isRemoteStoreEnabled() == false) {
return;
}
ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(replicaShard.shardId()).primaryShard();
ShardRouting primaryShard;
if (replicaShard.routingEntry().isSplitTarget()) {
primaryShard = clusterService.state().getRoutingNodes().primaryChild(replicaShard.getParentShardId(), replicaShard.shardId());
} else {
primaryShard = clusterService.state().routingTable().shardRoutingTable(replicaShard.shardId()).primaryShard();
}

final UpdateVisibleCheckpointRequest request = new UpdateVisibleCheckpointRequest(
replicationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
ActionListener.completeWith(listener, () -> {
logger.trace(() -> new ParameterizedMessage("Checkpoint {} received on replica {}", request, replica.shardId()));
if (request.getCheckpoint().getShardId().equals(replica.shardId())) {
replicationService.onNewCheckpoint(request.getCheckpoint(), replica);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ public synchronized void writeTo(StreamOutput out) throws IOException {

public synchronized void start() {
if (startTime != 0) {
System.out.println();
// This needs to be handled properly because another flush (external?) operation can attempt to reset it
// before first one completes.
return;
}
assert startTime == 0 : "already started";
startTime = System.currentTimeMillis();
startNanoTime = System.nanoTime();
}
Expand Down

0 comments on commit 02453d8

Please sign in to comment.