Skip to content

Commit

Permalink
Revert ensureAsyncFetchStorePrimaryRecency update
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Apr 23, 2024
1 parent 947899c commit ee22c65
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,6 @@ public synchronized void clearShard(ShardId shardId) {
this.cache.deleteShard(shardId);
}

/**
* Clear the cache for a given node and shardId.
*
* @param nodeId node id to be removed from the batch.
* @param shardId shard id to be removed from the batch.
*/
public synchronized void clearCache(String nodeId, ShardId shardId) {
this.cache.cleanCacheForNodeForShardId(nodeId, shardId);
}

/**
* Cache implementation of transport actions returning batch of shards related data in the response.
* Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or
Expand Down Expand Up @@ -148,14 +138,6 @@ public void deleteShard(ShardId shardId) {
}
}

@Override
public void cleanCacheForNodeForShardId(String nodeId, ShardId shardId) {
if (shardIdToArray.containsKey(shardId)) {
Integer shardIdIndex = shardIdToArray.remove(shardId);
cache.get(nodeId).clearShard(shardIdIndex);
}
}

@Override
public void initData(DiscoveryNode node) {
cache.put(node.getId(), new NodeEntry<>(node.getId(), shardResponseClass, batchSize, emptyShardResponsePredicate));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,6 @@ public void deleteShard(ShardId shardId) {
cache.clear(); // single shard cache can clear the full map
}

@Override
public void cleanCacheForNodeForShardId(String nodeId, ShardId shardId) {
cache.remove(nodeId); // non batch cache only has one entry per node
}

/**
* A node entry, holding the state of the fetched data for a specific shard
* for a giving node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ protected AsyncShardFetchCache(Logger logger, String type) {
*/
abstract void deleteShard(ShardId shardId);

abstract void cleanCacheForNodeForShardId(String nodeId, ShardId shardId);

/**
* Returns the number of fetches that are currently ongoing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ private void ensureAsyncFetchStorePrimaryRecency(RoutingAllocation allocation) {
// drops out, we fetch the shard data, then some indexing happens and then the node rejoins the cluster again. There are other
// ways we could decide to cancel a recovery based on stale data (e.g. changing allocation filters or a primary failure) but
// making the wrong decision here is not catastrophic so we only need to cover the common case.

logger.trace(
() -> new ParameterizedMessage(
"new nodes {} found, clearing primary async-fetch-store cache",
Expand All @@ -422,15 +423,18 @@ private void ensureAsyncFetchStorePrimaryRecency(RoutingAllocation allocation) {

private static void clearCacheForBatchPrimary(ShardsBatch batch, RoutingAllocation allocation) {
// We need to clear the cache for the primary shard to ensure we do not cancel recoveries based on excessively
// stale data. We do this by clearing the cache of primary shards on nodes for all the active primaries of
// replicas in the current batch.
// stale data. We do this by clearing the cache of nodes for all the active primaries of replicas in the current batch.
// Although this flow can be optimized by only clearing the cache for the primary shard but currently
// when we want to fetch data we do for complete node, for doing this a new fetch flow will also handle just
// fetching the data for a single shard on the node and fill that up in our cache
// Opened issue #13352 - to track the improvement
List<ShardRouting> primaries = batch.getBatchedShards()
.stream()
.map(allocation.routingNodes()::activePrimary)
.filter(Objects::nonNull)
.collect(Collectors.toList());
AsyncShardBatchFetch<? extends BaseNodeResponse, ?> fetch = batch.getAsyncFetcher();
primaries.forEach(shardRouting -> fetch.clearCache(shardRouting.currentNodeId(), shardRouting.shardId()));
primaries.forEach(shardRouting -> fetch.clearCacheForNode(shardRouting.currentNodeId()));
}

private boolean hasNewNodes(DiscoveryNodes nodes) {
Expand Down

0 comments on commit ee22c65

Please sign in to comment.