Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ private PubSubMessageProcessedResult processActiveActiveMessage(

if (mergeConflictResult.isUpdateIgnored()) {
hostLevelIngestionStats.recordUpdateIgnoredDCR();
aggVersionedIngestionStats.recordUpdateIgnoredDCR(storeName, versionNumber);
return new PubSubMessageProcessedResult(
new MergeConflictResultWrapper(
mergeConflictResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -176,6 +178,7 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask {
private static final Logger LOGGER = LogManager.getLogger(LeaderFollowerStoreIngestionTask.class);
public static final String GLOBAL_RT_DIV_KEY_PREFIX = "GLOBAL_RT_DIV_KEY.";
static final long VIEW_WRITER_CLOSE_TIMEOUT_IN_MS = 60000; // 60s
private static final String UNKNOWN_REGION = "unknown";

/**
* The new leader will stay inactive (not switch to any new topic or produce anything) for
Expand Down Expand Up @@ -203,6 +206,9 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask {
protected Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriter;
protected final Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterForRealTime;
protected final Int2ObjectMap<String> kafkaClusterIdToUrlMap;
protected final Int2ObjectMap<String> kafkaClusterIdToAliasMap;
protected final String localRegionName;
protected final IntSet localKafkaClusterIds;
protected final Map<String, byte[]> globalRtDivKeyBytesCache;
private volatile long dataRecoveryCompletionTimeLagThresholdInMs = 0;

Expand Down Expand Up @@ -303,6 +309,9 @@ public LeaderFollowerStoreIngestionTask(
}

this.kafkaClusterIdToUrlMap = serverConfig.getKafkaClusterIdToUrlMap();
this.kafkaClusterIdToAliasMap = serverConfig.getKafkaClusterIdToAliasMap();
this.localRegionName = serverConfig.getRegionName();
this.localKafkaClusterIds = computeLocalKafkaClusterIds(kafkaClusterIdToAliasMap, localRegionName);
if (builder.getVeniceViewWriterFactory() != null && !store.getViewConfigs().isEmpty()
&& !store.isFlinkVeniceViewsEnabled()) {
viewWriters = builder.getVeniceViewWriterFactory()
Expand Down Expand Up @@ -2159,12 +2168,37 @@ protected final void recordAssembledRecordSizeRatio(double ratio, long currentTi

private void recordRegionHybridConsumptionStats(int kafkaClusterId, int producedRecordSize, long currentTimeMs) {
if (kafkaClusterId >= 0) {
versionedIngestionStats
.recordRegionHybridConsumption(storeName, versionNumber, kafkaClusterId, producedRecordSize, currentTimeMs);
String sourceRegion = kafkaClusterIdToAliasMap.getOrDefault(kafkaClusterId, UNKNOWN_REGION);
boolean isLocalRegion = localKafkaClusterIds.contains(kafkaClusterId);
versionedIngestionStats.recordRegionHybridConsumption(
storeName,
versionNumber,
kafkaClusterId,
producedRecordSize,
currentTimeMs,
sourceRegion,
localRegionName,
isLocalRegion);
hostLevelIngestionStats.recordTotalRegionHybridBytesConsumed(kafkaClusterId, producedRecordSize, currentTimeMs);
}
}

/**
* Pre-computes the set of kafka cluster IDs that correspond to the local region.
* This allows O(1) lookup on the hot path instead of string comparison.
*/
private static IntSet computeLocalKafkaClusterIds(Int2ObjectMap<String> clusterIdToAlias, String localRegion) {
IntSet localIds = new IntOpenHashSet();
if (localRegion != null && clusterIdToAlias != null) {
for (Int2ObjectMap.Entry<String> entry: clusterIdToAlias.int2ObjectEntrySet()) {
if (localRegion.equals(entry.getValue())) {
localIds.add(entry.getIntKey());
}
}
}
return localIds;
}

@Override
protected boolean isHybridFollower(PartitionConsumptionState partitionConsumptionState) {
return isHybridMode() && (isDaVinciClient || partitionConsumptionState.getLeaderFollowerState().equals(STANDBY));
Expand Down Expand Up @@ -3198,7 +3232,7 @@ private PubSubMessageProcessedResult processMessage(
hostLevelIngestionStats
.recordWriteComputeUpdateLatency(LatencyUtils.getElapsedTimeFromNSToMS(writeComputeStartTimeInNS));
} catch (Exception e) {
writeComputeFailureCode = StatsErrorCode.WRITE_COMPUTE_UPDATE_FAILURE.code;
setWriteComputeFailureCode(StatsErrorCode.WRITE_COMPUTE_UPDATE_FAILURE.code);
throw new RuntimeException(e);
}

Expand Down Expand Up @@ -3714,7 +3748,7 @@ GenericRecord readStoredValueRecord(
hostLevelIngestionStats
.recordWriteComputeLookUpLatency(LatencyUtils.getElapsedTimeFromNSToMS(lookupStartTimeInNS));
} catch (Exception e) {
writeComputeFailureCode = StatsErrorCode.WRITE_COMPUTE_DESERIALIZATION_FAILURE.code;
setWriteComputeFailureCode(StatsErrorCode.WRITE_COMPUTE_DESERIALIZATION_FAILURE.code);
throw e;
}
} else {
Expand All @@ -3729,7 +3763,7 @@ GenericRecord readStoredValueRecord(
storeDeserializerCache.getDeserializer(transientRecord.getValueSchemaId(), readerValueSchemaID),
compressor.get());
} catch (Exception e) {
writeComputeFailureCode = StatsErrorCode.WRITE_COMPUTE_DESERIALIZATION_FAILURE.code;
setWriteComputeFailureCode(StatsErrorCode.WRITE_COMPUTE_DESERIALIZATION_FAILURE.code);
throw e;
}
if (manifestContainer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {

protected int writeComputeFailureCode = 0;

void setWriteComputeFailureCode(int code) {
this.writeComputeFailureCode = code;
}

private final InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer;

// Do not convert it to a local variable because it is used in test.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,53 @@ protected void registerConditionalStats(String storeName) {
private VeniceVersionedStats<STATS, STATS_REPORTER> getVersionedStats(String storeName) {
VeniceVersionedStats<STATS, STATS_REPORTER> stats = aggStats.get(storeName);
if (stats == null) {
stats = addStore(storeName);
Store store = metadataRepository.getStoreOrThrow(storeName);
updateStatsVersionInfo(store.getName(), store.getVersions(), store.getCurrentVersion());
// Use computeIfAbsent to atomically add the store and initialize version info.
// This prevents duplicate updateStatsVersionInfo calls when multiple threads
// try to get stats for a new store concurrently.
stats = aggStats.computeIfAbsent(storeName, s -> {
VeniceVersionedStats<STATS, STATS_REPORTER> newStats =
new VeniceVersionedStats<>(metricsRepository, s, statsInitiator, reporterSupplier);
Store store = metadataRepository.getStoreOrThrow(s);
// Initialize version info inside computeIfAbsent to ensure atomicity
initializeVersionInfo(newStats, store);
return newStats;
});
}
return stats;
}

/**
* Initializes version info for a newly created VeniceVersionedStats.
* This is called within computeIfAbsent to ensure atomic initialization.
*/
private void initializeVersionInfo(VeniceVersionedStats<STATS, STATS_REPORTER> versionedStats, Store store) {
int newCurrentVersion = store.getCurrentVersion();
if (newCurrentVersion != versionedStats.getCurrentVersion()) {
versionedStats.setCurrentVersion(newCurrentVersion);
}

List<Version> existingVersions = store.getVersions();
int futureVersion = NON_EXISTING_VERSION;
for (Version version: existingVersions) {
int versionNum = version.getNumber();
versionedStats.addVersion(versionNum);

VersionStatus status = version.getStatus();
if (status == VersionStatus.STARTED || status == VersionStatus.PUSHED) {
if (futureVersion < versionNum) {
futureVersion = versionNum;
}
}
}

if (futureVersion != versionedStats.getFutureVersion()) {
versionedStats.setFutureVersion(futureVersion);
}

// Notify subclasses that version info has been initialized
onVersionInfoUpdated(store.getName(), versionedStats.getCurrentVersion(), versionedStats.getFutureVersion());
}

protected VeniceVersionedStats<STATS, STATS_REPORTER> addStore(String storeName) {
return aggStats.computeIfAbsent(
storeName,
Expand All @@ -111,7 +151,10 @@ protected void updateStatsVersionInfo(String storeName, List<Version> existingVe
versionedStats.getAllVersionNumbers()
.stream()
.filter(versionNum -> !existingVersionNumbers.contains(versionNum) && versionNum != NON_EXISTING_VERSION)
.forEach(versionedStats::removeVersion);
.forEach(versionNum -> {
versionedStats.removeVersion(versionNum);
cleanupVersionResources(storeName, versionNum);
});

int futureVersion = NON_EXISTING_VERSION;
for (Version version: existingVersions) {
Expand Down Expand Up @@ -205,4 +248,16 @@ protected void updateTotalStats(String storeName) {
protected void onVersionInfoUpdated(String storeName, int currentVersion, int futureVersion) {
// no-op by default
}

/**
* Hook method for subclasses to clean up their own version-specific resources
* (e.g., OTel stats) when a version is removed. This is called after the internal
* versioned stats have been removed via {@link VeniceVersionedStats#removeVersion}.
*
* @param storeName The store whose version was removed
* @param version The version number that was removed
*/
protected void cleanupVersionResources(String storeName, int version) {
// no-op by default
}
}
Loading