Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_PARALLEL_BATCH_GET_CHUNK_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_PARALLEL_RESOURCE_SHUTDOWN_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_PARTITION_GRACEFUL_DROP_DELAY_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_PER_RECORD_OTEL_METRICS_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_PUBSUB_CONSUMER_POLL_RETRY_BACKOFF_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_PUBSUB_CONSUMER_POLL_RETRY_TIMES;
Expand All @@ -178,6 +179,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_QUOTA_ENFORCEMENT_INTERVAL_IN_MILLIS;
import static com.linkedin.venice.ConfigKeys.SERVER_READ_QUOTA_INITIALIZATION_FALLBACK_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_RECORD_LEVEL_METRICS_WHEN_BOOTSTRAPPING_CURRENT_VERSION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_RECORD_LEVEL_TIMESTAMP_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_CONSUMER_CONFIG_PREFIX;
import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_INGESTION_REPAIR_SLEEP_INTERVAL_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_RESET_ERROR_REPLICA_ENABLED;
Expand Down Expand Up @@ -590,6 +592,8 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean batchReportEOIPEnabled;
private final IncrementalPushStatusWriteMode incrementalPushStatusWriteMode;
private final long ingestionHeartbeatIntervalMs;
private final boolean recordLevelTimestampEnabled;
private final boolean perRecordOtelMetricsEnabled;
private final long leaderCompleteStateCheckInFollowerValidIntervalMs;
private final boolean stuckConsumerRepairEnabled;
private final int stuckConsumerRepairIntervalSecond;
Expand Down Expand Up @@ -1009,6 +1013,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
metaStoreWriterCloseConcurrency = serverProperties.getInt(META_STORE_WRITER_CLOSE_CONCURRENCY, -1);
ingestionHeartbeatIntervalMs =
serverProperties.getLong(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, TimeUnit.MINUTES.toMillis(1));
recordLevelTimestampEnabled = serverProperties.getBoolean(SERVER_RECORD_LEVEL_TIMESTAMP_ENABLED, false);
perRecordOtelMetricsEnabled = serverProperties.getBoolean(SERVER_PER_RECORD_OTEL_METRICS_ENABLED, false);
batchReportEOIPEnabled =
serverProperties.getBoolean(SERVER_BATCH_REPORT_END_OF_INCREMENTAL_PUSH_STATUS_ENABLED, false);
incrementalPushStatusWriteMode =
Expand Down Expand Up @@ -1779,6 +1785,14 @@ public long getIngestionHeartbeatIntervalMs() {
return ingestionHeartbeatIntervalMs;
}

public boolean isRecordLevelTimestampEnabled() {
return recordLevelTimestampEnabled;
}

public boolean isPerRecordOtelMetricsEnabled() {
return perRecordOtelMetricsEnabled;
}

public boolean getBatchReportEOIPEnabled() {
return batchReportEOIPEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2345,6 +2345,43 @@ protected void recordHeartbeatReceived(
}
}

/**
* Record a regular data record timestamp to the heartbeat monitoring service.
* Called only when record-level timestamp tracking is enabled (checked by caller).
*/
@Override
protected void recordRecordReceived(
PartitionConsumptionState partitionConsumptionState,
DefaultPubSubMessage consumerRecord,
String kafkaUrl) {
HeartbeatMonitoringService hbService = getHeartbeatMonitoringService();
if (hbService == null) {
return;
}
String region =
isDaVinciClient() ? serverConfig.getRegionName() : serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl);
long messageTimestamp = consumerRecord.getValue().getProducerMetadata().getMessageTimestamp();
boolean isComplete = partitionConsumptionState.isComplete();

if (partitionConsumptionState.getLeaderFollowerState().equals(LEADER)) {
hbService.recordLeaderRecordTimestamp(
storeName,
versionNumber,
partitionConsumptionState.getPartition(),
region,
messageTimestamp,
isComplete);
} else {
hbService.recordFollowerRecordTimestamp(
storeName,
versionNumber,
partitionConsumptionState.getPartition(),
region,
messageTimestamp,
isComplete);
}
}

@Override
protected Iterable<DefaultPubSubMessage> validateAndFilterOutDuplicateMessagesFromLeaderTopic(
Iterable<DefaultPubSubMessage> records,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected final PubSubTopicRepository pubSubTopicRepository;
private final String[] msgForLagMeasurement;
protected final AtomicBoolean recordLevelMetricEnabled;
protected final boolean recordLevelTimestampEnabled;
protected final boolean isGlobalRtDivEnabled;
protected volatile VersionRole versionRole;
protected volatile PartitionReplicaIngestionContext.WorkloadType workloadType;
Expand Down Expand Up @@ -670,6 +671,7 @@ public StoreIngestionTask(
this.recordLevelMetricEnabled = new AtomicBoolean(
serverConfig.isRecordLevelMetricWhenBootstrappingCurrentVersionEnabled()
|| !this.isCurrentVersion.getAsBoolean());
this.recordLevelTimestampEnabled = serverConfig.isRecordLevelTimestampEnabled();
this.isGlobalRtDivEnabled = version.isGlobalRtDivEnabled();
if (!this.recordLevelMetricEnabled.get()) {
LOGGER.info("Disabled record-level metric when ingesting current version: {}", kafkaVersionTopic);
Expand Down Expand Up @@ -2817,6 +2819,17 @@ protected void recordHeartbeatReceived(
// No Op
}

/**
* Hook for recording regular data record timestamps. Override in subclasses that support
* record-level timestamp tracking.
*/
protected void recordRecordReceived(
PartitionConsumptionState partitionConsumptionState,
DefaultPubSubMessage consumerRecord,
String kafkaUrl) {
// No Op
}

boolean shouldSendGlobalRtDiv(DefaultPubSubMessage record, PartitionConsumptionState pcs, String brokerUrl) {
if (!isGlobalRtDivEnabled() || record.getKey().isControlMessage()) {
return false;
Expand Down Expand Up @@ -3539,6 +3552,14 @@ private int internalProcessConsumerRecord(
partitionConsumptionState,
leaderProducedRecordContext,
currentTimeMs);
// Record regular record timestamp for heartbeat monitoring if enabled
if (recordLevelTimestampEnabled) {
try {
recordRecordReceived(partitionConsumptionState, consumerRecord, kafkaUrl);
} catch (Exception e) {
LOGGER.error("Failed to record regular record timestamp: ", e);
}
}
}
if (recordLevelMetricEnabled.get()) {
versionedIngestionStats.recordConsumedRecordEndToEndProcessingLatency(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ public enum ServerMetricEntity implements ModuleMetricEntityInterface {
VENICE_VERSION_ROLE,
VENICE_REPLICA_TYPE,
VENICE_REPLICA_STATE)
),

/**
* Record-level replication delay: Tracks nearline replication lag for regular data records in milliseconds.
* Only populated when record-level timestamp tracking is enabled.
*/
INGESTION_RECORD_DELAY(
"ingestion.replication.record.delay", MetricType.HISTOGRAM, MetricUnit.MILLISECOND,
"Nearline ingestion record-level replication lag",
setOf(
VENICE_STORE_NAME,
VENICE_CLUSTER_NAME,
VENICE_REGION_NAME,
VENICE_VERSION_ROLE,
VENICE_REPLICA_TYPE,
VENICE_REPLICA_STATE)
);

private final MetricEntity metricEntity;
Expand Down
Loading
Loading