Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_THREAD_POOL_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_ACTIVE_KEY_COUNT_FOR_ALL_BATCH_PUSH_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_ACTIVE_KEY_COUNT_FOR_HYBRID_STORE_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_ACTIVE_KEY_COUNT_REPLICA_CONSISTENCY_CHECK_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_MULTI_GET_LATENCY_THRESHOLD;
import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_READ_COMPUTE_GET_LATENCY_THRESHOLD;
Expand Down Expand Up @@ -738,6 +739,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean addRmdToBatchPushForHybridStores;
private final boolean activeKeyCountForAllBatchPushEnabled;
private final boolean activeKeyCountForHybridStoreEnabled;
private final boolean activeKeyCountReplicaConsistencyCheckEnabled;
private final int partialUpdateLargeResultLogThresholdBytes;
private final long partialUpdateAmplificationReportIntervalMs;

Expand Down Expand Up @@ -1290,6 +1292,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getBoolean(SERVER_ACTIVE_KEY_COUNT_FOR_ALL_BATCH_PUSH_ENABLED, false);
this.activeKeyCountForHybridStoreEnabled =
serverProperties.getBoolean(SERVER_ACTIVE_KEY_COUNT_FOR_HYBRID_STORE_ENABLED, false);
this.activeKeyCountReplicaConsistencyCheckEnabled =
serverProperties.getBoolean(SERVER_ACTIVE_KEY_COUNT_REPLICA_CONSISTENCY_CHECK_ENABLED, false);
this.partialUpdateLargeResultLogThresholdBytes =
serverProperties.getInt(PARTIAL_UPDATE_LARGE_RESULT_LOG_THRESHOLD_BYTES, 100 * 1024);
this.partialUpdateAmplificationReportIntervalMs =
Expand Down Expand Up @@ -2337,6 +2341,15 @@ public boolean isActiveKeyCountForHybridStoreEnabled() {
return activeKeyCountForHybridStoreEnabled;
}

/**
* @return {@code true} when the replica-consistency check will actually run — i.e. both the check flag
* ({@code server.active.key.count.replica.consistency.check.enabled}) AND its hybrid-tracking
* prerequisite ({@link #isActiveKeyCountForHybridStoreEnabled()}) are on.
*/
public boolean isActiveKeyCountReplicaConsistencyCheckEnabled() {
return activeKeyCountForHybridStoreEnabled && activeKeyCountReplicaConsistencyCheckEnabled;
}

public boolean isAnyActiveKeyCountTrackingEnabled() {
return activeKeyCountForAllBatchPushEnabled || activeKeyCountForHybridStoreEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,14 @@ public class ActiveActiveStoreIngestionTask extends LeaderFollowerStoreIngestion
static final byte KEY_CREATED_SIGNAL_VALUE = 1;
static final byte KEY_DELETED_SIGNAL_VALUE = -1;
static final byte KEY_COUNT_INVALIDATE_SIGNAL_VALUE = 0;
static final PubSubMessageHeader KEY_CREATED_SIGNAL =
new PubSubMessageHeader(StoreIngestionTask.KEY_COUNT_SIGNAL_HEADER, new byte[] { KEY_CREATED_SIGNAL_VALUE });
static final PubSubMessageHeader KEY_DELETED_SIGNAL =
new PubSubMessageHeader(StoreIngestionTask.KEY_COUNT_SIGNAL_HEADER, new byte[] { KEY_DELETED_SIGNAL_VALUE });
static final PubSubMessageHeader KEY_CREATED_SIGNAL = new PubSubMessageHeader(
PubSubMessageHeaders.VENICE_KEY_COUNT_SIGNAL_HEADER,
new byte[] { KEY_CREATED_SIGNAL_VALUE });
static final PubSubMessageHeader KEY_DELETED_SIGNAL = new PubSubMessageHeader(
PubSubMessageHeaders.VENICE_KEY_COUNT_SIGNAL_HEADER,
new byte[] { KEY_DELETED_SIGNAL_VALUE });
static final PubSubMessageHeader KEY_COUNT_INVALIDATE_SIGNAL = new PubSubMessageHeader(
StoreIngestionTask.KEY_COUNT_SIGNAL_HEADER,
PubSubMessageHeaders.VENICE_KEY_COUNT_SIGNAL_HEADER,
new byte[] { KEY_COUNT_INVALIDATE_SIGNAL_VALUE });

/** RMD bytes (ts=0) with superset schema ID prepended. For chunk manifests (superset schema ID known at construction). */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
package com.linkedin.davinci.kafka.consumer;

import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_ACTIVE_KEY_COUNT_INVALIDATION_REASON;

import com.linkedin.venice.stats.dimensions.VeniceDimensionInterface;
import com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions;


/**
* Reasons for invalidating a partition's active-key-count tracking. Each value carries a message
* template (some with a {@code %d} placeholder for runtime detail like the offending signal value
* or header length) that becomes the prefix of the operator-visible ERROR log emitted by
* {@link StoreIngestionTask#invalidateActiveKeyCount}.
*
* <p>Also serves as an OTel dimension on
* {@link com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity#ACTIVE_KEY_COUNT_INVALIDATION}.
* The Tehuti sensor remains a flat total.
*/
public enum ActiveKeyCountInvalidationReason {
public enum ActiveKeyCountInvalidationReason implements VeniceDimensionInterface {
/** Follower received {@code kcs=-1} but its count was already zero (count drifted). */
FOLLOWER_DECREMENT_UNDERFLOW("Decrement underflow on follower from kcs=-1"),
/** Follower received {@code kcs=0} (leader-propagated invalidation signal). */
LEADER_PROPAGATED_INVALIDATION("Leader propagated invalidation signal"),
/** Follower received a single-byte {@code kcs} value outside the {-1, 0, +1} contract. */
CORRUPT_KCS_SIGNAL_VALUE("Unexpected kcs signal value %d"),
/** Follower received a multi-byte {@code kcs} header (corrupt or future producer). */
CORRUPT_MULTI_BYTE_KCS_SIGNAL("Unexpected multi-byte kcs signal (length=%d)"),
CORRUPT_KEY_COUNT_SIGNAL_HEADER_VALUE("Unexpected kcs signal value %d"),
/** Follower received a {@code kcs} header with the wrong byte length (expected 1; corrupt or future producer). */
CORRUPT_KEY_COUNT_SIGNAL_HEADER_LENGTH("Unexpected kcs header length=%d"),
/** Leader detected an underflow during DCR (count was zero but a delete was processed). */
LEADER_DCR_UNDERFLOW("Decrement underflow on leader during DCR"),
/**
Expand All @@ -23,19 +33,40 @@ public enum ActiveKeyCountInvalidationReason {
* The transient I/O failure must not stop ingestion or leave the active count in a wrong state —
* invalidate so we stop publishing a stale value.
*/
KEY_EXISTS_FAILURE("RocksDB value column family lookup failed");
KEY_EXISTS_FAILURE("RocksDB value column family lookup failed"),
/** Follower received an {@code lkc} header with the wrong byte length (expected 8; corrupt or future producer). */
CORRUPT_LEADER_KEY_COUNT_HEADER_LENGTH("Unexpected lkc header length=%d");

private final String messageTemplate;
private final boolean templateWithExtraData;

ActiveKeyCountInvalidationReason(String messageTemplate) {
this.messageTemplate = messageTemplate;
this.templateWithExtraData = messageTemplate.contains("%d");
}

String getMessage() {
if (templateWithExtraData) {
throw new IllegalStateException(
"Reason " + name() + " carries a '%d' placeholder; caller must use getMessage(int detail).");
}
return messageTemplate;
}

String getMessage(int detail) {
if (!templateWithExtraData) {
throw new IllegalStateException(
"Reason " + name() + " has no '%d' placeholder; caller must use the no-arg getMessage().");
}
return String.format(messageTemplate, detail);
}

/**
* All instances of this enum share the same dimension name.
* Refer to {@link VeniceDimensionInterface#getDimensionName()} for more details.
*/
@Override
public VeniceMetricsDimensions getDimensionName() {
return VENICE_ACTIVE_KEY_COUNT_INVALIDATION_REASON;
}
}
Loading