Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_PARALLEL_BATCH_GET_CHUNK_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_PARALLEL_RESOURCE_SHUTDOWN_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_PARTITION_GRACEFUL_DROP_DELAY_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_PER_RECORD_OTEL_METRICS_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_PUBSUB_CONSUMER_POLL_RETRY_BACKOFF_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_PUBSUB_CONSUMER_POLL_RETRY_TIMES;
Expand All @@ -178,6 +179,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_QUOTA_ENFORCEMENT_INTERVAL_IN_MILLIS;
import static com.linkedin.venice.ConfigKeys.SERVER_READ_QUOTA_INITIALIZATION_FALLBACK_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_RECORD_LEVEL_METRICS_WHEN_BOOTSTRAPPING_CURRENT_VERSION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_RECORD_LEVEL_TIMESTAMP_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_CONSUMER_CONFIG_PREFIX;
import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_INGESTION_REPAIR_SLEEP_INTERVAL_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_RESET_ERROR_REPLICA_ENABLED;
Expand Down Expand Up @@ -590,6 +592,8 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean batchReportEOIPEnabled;
private final IncrementalPushStatusWriteMode incrementalPushStatusWriteMode;
private final long ingestionHeartbeatIntervalMs;
private final boolean recordLevelTimestampEnabled;
private final boolean perRecordOtelMetricsEnabled;
private final long leaderCompleteStateCheckInFollowerValidIntervalMs;
private final boolean stuckConsumerRepairEnabled;
private final int stuckConsumerRepairIntervalSecond;
Expand Down Expand Up @@ -1009,6 +1013,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
metaStoreWriterCloseConcurrency = serverProperties.getInt(META_STORE_WRITER_CLOSE_CONCURRENCY, -1);
ingestionHeartbeatIntervalMs =
serverProperties.getLong(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, TimeUnit.MINUTES.toMillis(1));
recordLevelTimestampEnabled = serverProperties.getBoolean(SERVER_RECORD_LEVEL_TIMESTAMP_ENABLED, false);
perRecordOtelMetricsEnabled = serverProperties.getBoolean(SERVER_PER_RECORD_OTEL_METRICS_ENABLED, false);
batchReportEOIPEnabled =
serverProperties.getBoolean(SERVER_BATCH_REPORT_END_OF_INCREMENTAL_PUSH_STATUS_ENABLED, false);
incrementalPushStatusWriteMode =
Expand Down Expand Up @@ -1779,6 +1785,14 @@ public long getIngestionHeartbeatIntervalMs() {
return ingestionHeartbeatIntervalMs;
}

public boolean isRecordLevelTimestampEnabled() {
return recordLevelTimestampEnabled;
}

public boolean isPerRecordOtelMetricsEnabled() {
return perRecordOtelMetricsEnabled;
}

public boolean getBatchReportEOIPEnabled() {
return batchReportEOIPEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linkedin.davinci.listener.response.NoOpReadResponseStats;
import com.linkedin.davinci.schema.merge.CollectionTimestampMergeRecordHelper;
import com.linkedin.davinci.schema.merge.MergeRecordHelper;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatKey;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatLagMonitorAction;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import com.linkedin.davinci.storage.StorageService;
Expand Down Expand Up @@ -2316,32 +2317,55 @@ protected void recordHeartbeatReceived(
PartitionConsumptionState partitionConsumptionState,
DefaultPubSubMessage consumerRecord,
String kafkaUrl) {
if (getHeartbeatMonitoringService() == null) {
HeartbeatMonitoringService hbService = getHeartbeatMonitoringService();
if (hbService == null) {
// Not enabled!
return;
}
String region;
long timestamp = consumerRecord.getValue().getProducerMetadata().getMessageTimestamp();
boolean isComplete = partitionConsumptionState.isComplete();
if (partitionConsumptionState.getLeaderFollowerState().equals(LEADER)) {
getHeartbeatMonitoringService().recordLeaderHeartbeat(
getStoreName(),
getVersionNumber(),
partitionConsumptionState.getPartition(),
getServerConfig().getKafkaClusterUrlToAliasMap().get(kafkaUrl),
consumerRecord.getValue().getProducerMetadata().getMessageTimestamp(),
partitionConsumptionState.isComplete());
region = getServerConfig().getKafkaClusterUrlToAliasMap().get(kafkaUrl);
HeartbeatKey cachedKey = partitionConsumptionState.getOrCreateCachedHeartbeatKey(region);
hbService.recordLeaderHeartbeat(cachedKey, getStoreName(), getVersionNumber(), region, timestamp, isComplete);
} else {
getHeartbeatMonitoringService().recordFollowerHeartbeat(
getStoreName(),
getVersionNumber(),
partitionConsumptionState.getPartition(),
/**
* For Da Vinci there is no kafkaUrl mapping configured, we should refer to local region name setup in the
* Venice server config. This is consistent from the heartbeat lag calculation for ready-to-serve check.
*/
isDaVinciClient()
? getServerConfig().getRegionName()
: getServerConfig().getKafkaClusterUrlToAliasMap().get(kafkaUrl),
consumerRecord.getValue().getProducerMetadata().getMessageTimestamp(),
partitionConsumptionState.isComplete());
/**
* For Da Vinci there is no kafkaUrl mapping configured, we should refer to local region name setup in the
* Venice server config. This is consistent from the heartbeat lag calculation for ready-to-serve check.
*/
region = isDaVinciClient()
? getServerConfig().getRegionName()
: getServerConfig().getKafkaClusterUrlToAliasMap().get(kafkaUrl);
HeartbeatKey cachedKey = partitionConsumptionState.getOrCreateCachedHeartbeatKey(region);
hbService.recordFollowerHeartbeat(cachedKey, getStoreName(), getVersionNumber(), region, timestamp, isComplete);
}
}

/**
* Record a regular data record timestamp to the heartbeat monitoring service.
* Called only when record-level timestamp tracking is enabled (checked by caller).
*/
@Override
protected void trackRecordReceived(
PartitionConsumptionState partitionConsumptionState,
DefaultPubSubMessage consumerRecord,
String pubSubUrl) {
HeartbeatMonitoringService hbService = getHeartbeatMonitoringService();
if (hbService == null) {
return;
}
String region =
isDaVinciClient() ? serverConfig.getRegionName() : serverConfig.getKafkaClusterUrlToAliasMap().get(pubSubUrl);
long messageTimestamp = consumerRecord.getValue().getProducerMetadata().getMessageTimestamp();
boolean isComplete = partitionConsumptionState.isComplete();
HeartbeatKey cachedKey = partitionConsumptionState.getOrCreateCachedHeartbeatKey(region);

if (partitionConsumptionState.getLeaderFollowerState().equals(LEADER)) {
hbService.recordLeaderRecordTimestamp(cachedKey, storeName, versionNumber, region, messageTimestamp, isComplete);
} else {
hbService
.recordFollowerRecordTimestamp(cachedKey, storeName, versionNumber, region, messageTimestamp, isComplete);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.davinci.compression.KeyUrnCompressor;
import com.linkedin.davinci.compression.UrnDictV1;
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatKey;
import com.linkedin.davinci.utils.ByteArrayKey;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.GUID;
Expand All @@ -31,6 +32,7 @@
import com.linkedin.venice.writer.VeniceWriter;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -280,6 +282,12 @@ enum LatchStatus {
*/
private boolean hasResubscribedAfterBootstrapAsCurrentVersion;

/**
* Cached HeartbeatKey references keyed by region, populated during lag monitor setup.
* Eliminates HeartbeatKey creation and hash computation on the per-record recording path.
*/
private Map<String, HeartbeatKey> cachedHeartbeatKeys;

public PartitionConsumptionState(
PubSubTopicPartition partitionReplica,
OffsetRecord offsetRecord,
Expand Down Expand Up @@ -1133,4 +1141,20 @@ public boolean hasResubscribedAfterBootstrapAsCurrentVersion() {
public void setHasResubscribedAfterBootstrapAsCurrentVersion(boolean hasResubscribedAfterBootstrapAsCurrentVersion) {
this.hasResubscribedAfterBootstrapAsCurrentVersion = hasResubscribedAfterBootstrapAsCurrentVersion;
}

/**
* Get or create a cached HeartbeatKey for the given region.
* Derives storeName/version from the partition replica topic name.
*/
public HeartbeatKey getOrCreateCachedHeartbeatKey(String region) {
if (cachedHeartbeatKeys == null) {
cachedHeartbeatKeys = new HashMap<>(3);
}
return cachedHeartbeatKeys.computeIfAbsent(region, r -> {
String topicName = partitionReplica.getTopicName();
String storeName = Version.parseStoreFromKafkaTopicName(topicName);
int version = Version.parseVersionFromKafkaTopicName(topicName);
return new HeartbeatKey(storeName, version, getPartition(), r);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected final PubSubTopicRepository pubSubTopicRepository;
private final String[] msgForLagMeasurement;
protected final AtomicBoolean recordLevelMetricEnabled;
protected final boolean recordLevelTimestampEnabled;
protected final boolean isGlobalRtDivEnabled;
protected volatile VersionRole versionRole;
protected volatile PartitionReplicaIngestionContext.WorkloadType workloadType;
Expand Down Expand Up @@ -670,6 +671,7 @@ public StoreIngestionTask(
this.recordLevelMetricEnabled = new AtomicBoolean(
serverConfig.isRecordLevelMetricWhenBootstrappingCurrentVersionEnabled()
|| !this.isCurrentVersion.getAsBoolean());
this.recordLevelTimestampEnabled = serverConfig.isRecordLevelTimestampEnabled();
this.isGlobalRtDivEnabled = version.isGlobalRtDivEnabled();
if (!this.recordLevelMetricEnabled.get()) {
LOGGER.info("Disabled record-level metric when ingesting current version: {}", kafkaVersionTopic);
Expand Down Expand Up @@ -2570,6 +2572,10 @@ public PartitionConsumptionState getPartitionConsumptionState(int partitionId) {
return partitionConsumptionStateMap.get(partitionId);
}

public Collection<PartitionConsumptionState> getPartitionConsumptionStates() {
return partitionConsumptionStateMap.values();
}

public boolean hasAnyPartitionConsumptionState(Predicate<PartitionConsumptionState> pcsPredicate) {
for (Map.Entry<Integer, PartitionConsumptionState> partitionToConsumptionState: partitionConsumptionStateMap
.entrySet()) {
Expand Down Expand Up @@ -2817,6 +2823,17 @@ protected void recordHeartbeatReceived(
// No Op
}

/**
* Hook for recording regular data record timestamps. Override in subclasses that support
* record-level timestamp tracking.
*/
protected void trackRecordReceived(
PartitionConsumptionState partitionConsumptionState,
DefaultPubSubMessage consumerRecord,
String pubSubUrl) {
// No Op
}

boolean shouldSendGlobalRtDiv(DefaultPubSubMessage record, PartitionConsumptionState pcs, String brokerUrl) {
if (!isGlobalRtDivEnabled() || record.getKey().isControlMessage()) {
return false;
Expand Down Expand Up @@ -3546,6 +3563,16 @@ private int internalProcessConsumerRecord(
versionNumber,
LatencyUtils.getElapsedTimeFromNSToMS(beforeProcessingRecordTimestampNs),
currentTimeMs);
if (recordLevelTimestampEnabled) {
try {
trackRecordReceived(partitionConsumptionState, consumerRecord, kafkaUrl);
} catch (Exception e) {
LOGGER.error(
"Failed to record regular record timestamp for replica {}: ",
partitionConsumptionState.getReplicaId(),
e);
}
}
}
} catch (DuplicateDataException e) {
divErrorMetricCallback.accept(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ public enum ServerMetricEntity implements ModuleMetricEntityInterface {
VENICE_VERSION_ROLE,
VENICE_REPLICA_TYPE,
VENICE_REPLICA_STATE)
),

/**
* Record-level replication delay: Tracks nearline replication lag for regular data records in milliseconds.
* Only populated when record-level timestamp tracking is enabled.
*/
INGESTION_RECORD_DELAY(
"ingestion.replication.record.delay", MetricType.HISTOGRAM, MetricUnit.MILLISECOND,
"Nearline ingestion record-level replication lag",
setOf(
VENICE_STORE_NAME,
VENICE_CLUSTER_NAME,
VENICE_REGION_NAME,
VENICE_VERSION_ROLE,
VENICE_REPLICA_TYPE,
VENICE_REPLICA_STATE)
);

private final MetricEntity metricEntity;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.linkedin.davinci.stats.ingestion.heartbeat;

import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.Utils;


/**
* Composite key for the flattened heartbeat timestamp map
*/
public final class HeartbeatKey {
final String storeName;
final int version;
final int partition;
final String region;
private final int hashCode;

public HeartbeatKey(String storeName, int version, int partition, String region) {
this.storeName = storeName;
this.version = version;
this.partition = partition;
this.region = region;
// Manual hash computation avoids Objects.hash() varargs Object[] allocation and Integer autoboxing
int h = storeName.hashCode();
h = 31 * h + version;
h = 31 * h + partition;
h = 31 * h + region.hashCode();
this.hashCode = h;
}

String getReplicaId() {
return Utils.getReplicaId(Version.composeKafkaTopic(storeName, version), partition);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof HeartbeatKey)) {
return false;
}
HeartbeatKey that = (HeartbeatKey) o;
return version == that.version && partition == that.partition && storeName.equals(that.storeName)
&& region.equals(that.region);
}

@Override
public int hashCode() {
return hashCode;
}

@Override
public String toString() {
return getReplicaId() + "-" + region;
}
}
Loading
Loading