-
Notifications
You must be signed in to change notification settings - Fork 114
[da-vinci] Add OTel metrics for IngestionStats #2441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
m-nagarajan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Detailed PR Review: [da-vinci] Add OTel metrics for IngestionStats
Overall Assessment
This is a well-structured PR that adds OpenTelemetry (OTel) metrics infrastructure for ingestion stats. The code demonstrates good understanding of thread-safety concerns and follows existing patterns in the codebase. The test coverage is comprehensive.
Strengths
- Good Thread-Safety Patterns: The use of
volatile+ immutableVersionInfopattern for thread-safe version updates is correct and appropriate. - Hot Path Optimization: Pre-computing local Kafka cluster IDs using
IntSet.contains()for O(1) lookup instead of string comparison is a good performance optimization. - Comprehensive Test Coverage: The test suite covers edge cases like OTel disabled scenarios, NPE prevention, and deterministic backup version selection.
- Good Code Reuse: Extracting shared utilities (
OtelStatsUtils,IngestionStatsUtils) reduces duplication between Tehuti and OTel stats.
Areas for Improvement
I've added specific line-level comments below. Key concerns:
- Potential race condition in
AbstractVeniceAggVersionedStats.getVersionedStats() - Some metric recordings could benefit from early OTel-disabled checks
- Minor naming and documentation improvements
Review Summary: Good overall implementation. Please address the specific comments below.
m-nagarajan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on AbstractVeniceAggVersionedStats.java
Line 84-96 (getVersionedStats method):
The refactoring from get-then-add to computeIfAbsent is a good improvement for thread safety. However, there's a subtle concern:
The initializeVersionInfo method is called inside computeIfAbsent, which means it runs while holding a lock on the bucket in the ConcurrentHashMap. If metadataRepository.getStoreOrThrow() is slow (e.g., making a remote call), this could cause contention.
Consider if this is acceptable for your use case, or if you need to do the store lookup outside computeIfAbsent and use a two-phase initialization pattern.
Line 118-120 (initializeVersionInfo):
Good approach to extract the initialization logic. The early check if (newCurrentVersion != versionedStats.getCurrentVersion()) is a nice optimization, though since VeniceVersionedStats is newly created, the condition will always be true. Consider removing this redundant check for clarity.
m-nagarajan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on IngestionOtelStats.java
Lines 252-280 (getVersionForRole and getTaskForRole):
The BACKUP version selection logic is deterministic (returning smallest backup version), which is good for consistency. However, the iteration over ingestionTasksByVersion.keySet() could be problematic if this collection is modified concurrently.
Consider:
// Current code iterates over keySet which could throw ConcurrentModificationException
for (Integer version: ingestionTasksByVersion.keySet()) {Since ingestionTasksByVersion is a VeniceConcurrentHashMap, the iteration is weakly consistent, which should be safe. But you might want to add a comment explaining why this is acceptable.
Lines 347-358 (close method):
The close() method clears all maps but doesn't unregister the ASYNC_GAUGE metrics that were registered during construction. OpenTelemetry ASYNC_GAUGE callbacks will still be invoked after close. The callbacks will return 0/default values which may be acceptable, but consider if you need explicit metric unregistration.
Line 425-448 (getOrCreateRtRecordsConsumedMetric and getOrCreateRtBytesConsumedMetric):
Nested computeIfAbsent calls look correct, but this creates a new MetricEntityStateTwoEnums for each unique (sourceRegion, destRegion) combination. In a multi-region setup with N regions, this could create O(N²) metric objects. Consider if this cardinality is acceptable for your metrics backend.
m-nagarajan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on AggVersionedIngestionStats.java
Lines 73-76 (getIngestionOtelStats method):
Good pattern using computeIfAbsent with immediate version info initialization. However, there's a potential issue: if getCurrentVersion(k) or getFutureVersion(k) returns stale data (because the store was just added), the OtelStats will start with incorrect version info.
The onVersionInfoUpdated hook should correct this eventually, but there may be a brief window where metrics are recorded with incorrect VersionRole classification. If this is a concern, consider initializing with NON_EXISTING_VERSION and relying solely on the callback.
Lines 83-86 (recordOtelConsumptionMetrics):
This method is called on every consumption event, making it a hot path. Consider adding an early return if OTel metrics are disabled:
private void recordOtelConsumptionMetrics(String storeName, int version, ReplicaType replicaType, long bytes) {
if (!getIngestionOtelStats(storeName).emitOtelMetrics()) {
return;
}
// ... rest of method
}Though the underlying OTel methods likely already have such checks, an early return here avoids the computeIfAbsent call overhead when OTel is disabled.
Lines 189-206 (recordRegionHybridConsumption signature change):
The method signature change from 5 to 8 parameters is getting verbose. Consider using a parameter object or builder pattern if more parameters are needed in the future. For now, this is acceptable but worth noting.
m-nagarajan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on LeaderFollowerStoreIngestionTask.java
Lines 67-68 (computeLocalKafkaClusterIds):
The hot path optimization using IntSet.contains() for O(1) lookup is a good performance improvement. The pre-computation at construction time avoids per-record string comparisons.
Lines 96-98 (setWriteComputeFailureCode refactoring):
The change from direct field assignment to a setter method (setWriteComputeFailureCode()) is a good refactoring that enables better encapsulation. However, ensure this doesn't introduce any unexpected behavior changes since the field writeComputeFailureCode is accessed from multiple threads. The field is already non-volatile, which was the case before, so this maintains the same (potentially racy) behavior.
If thread-safety is important for this field, consider making it volatile or using AtomicInteger.
Lines 61-68 (recordRegionHybridConsumptionStats):
Good use of pre-computed localKafkaClusterIds and kafkaClusterIdToAliasMap. The UNKNOWN_REGION constant is a sensible default when the cluster ID isn't found in the alias map.
m-nagarajan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on New Dimension Enums
VeniceDCREvent.java, VeniceIngestionComponent.java, VeniceRegionLocality.java:
These enums follow the established pattern for dimension enums in the codebase. Good consistency.
VeniceDestinationIngestionComponent.java vs VeniceIngestionComponent.java:
Having two separate enums (VeniceIngestionComponent and VeniceDestinationIngestionComponent) provides type safety when recording metrics with both source and destination dimensions. This is a good design choice that prevents accidentally swapping the dimensions.
However, there's some code duplication (both have LOCAL_BROKER, SOURCE_BROKER, etc.). Consider if a shared base or utility could reduce this duplication while maintaining type safety. For now, the current approach is acceptable.
VeniceIngestionComponent (Lines 27-29):
The Javadoc mentions "This enum is used for both source and destination component dimensions" but getDimensionName() only returns VENICE_INGESTION_SOURCE_COMPONENT. This is inconsistent with the comment. Either update the comment or clarify the intended usage.
m-nagarajan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on AsyncMetricEntityStateOneEnum.java
Lines 77-91 (createMetricStatesForAllEnumValues):
Good approach to eagerly create metric states for all enum values. This ensures consistent behavior and avoids lazy initialization issues.
Line 76:
Map<VeniceMetricsDimensions, String> dimensionsWithEnum = new HashMap<>(baseDimensionsMap);This creates a new HashMap for each enum value during construction. Since this only happens once at startup and not on the hot path, the overhead is acceptable. Just noting for future reference.
Line 80-81:
The attributes are created using baseDimensionsMap (without the enum dimension) plus the enum value. Ensure this matches the behavior expected by AsyncMetricEntityStateBase.create().
General:
This class complements the existing MetricEntityStateOneEnum for synchronous metrics. The naming follows the established pattern (Async prefix). Good consistency with the existing codebase.
m-nagarajan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on ServerMetricEntity.java
Lines 35-245 (New metric definitions):
Comprehensive set of metric definitions covering:
- Task lifecycle metrics (error count, push timeout)
- Consumption/production metrics (records, bytes)
- Latency metrics (subscribe prep, ingestion time, preprocessing, producer callback)
- DCR metrics (event count, total count)
- Batch processing metrics
- RT region-specific metrics
All metrics follow the naming convention (ingestion.*, disk.*). The dimension sets are appropriate for each metric type.
Line 100 (SERVER_METRIC_ENTITIES):
Moving SERVER_METRIC_ENTITIES from HeartbeatOtelStats to here is a good consolidation. The static collection is now defined in the entity class where it belongs.
Metric Units:
Good use of appropriate units: MILLISECOND for latencies, BYTES for data size, NUMBER for counts, RATIO for quota usage. The new RATIO unit in MetricUnit.java is appropriate for the DISK_QUOTA_USED metric.
m-nagarajan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on Utility Classes and Tests
OtelStatsUtils.java and IngestionStatsUtils.java:
Good extraction of shared utilities. The VersionInfo immutable class and classifyVersion logic are now reusable across IngestionOtelStats and HeartbeatOtelStats.
The IngestionStatsUtils class correctly centralizes the task state queries that were previously duplicated in IngestionStats.
Test Coverage:
Excellent test coverage including:
IngestionOtelStatsTest.java(1089 lines) - Comprehensive unit testsAggVersionedIngestionStatsTest.java- Integration with versioned statstestRtMetricsNoNpeWhenOtelDisabled- Important regression testtestDeterministicBackupVersionSelection- Tests the edge case of multiple backup versions- Reflection-based tests for private callback methods
Test Concerns:
The tests use significant reflection to access private methods (getVersionForRole, getPushTimeoutCountForRole, etc.). While this provides good coverage, consider if these methods should be package-private with @VisibleForTesting annotation to reduce reflection usage.
LeaderFollowerStoreIngestionTaskTest.java:
The test for computeLocalKafkaClusterIds using reflection is thorough, covering:
- Normal multi-cluster case
- No local clusters
- Empty/null inputs
- Both null edge case
m-nagarajan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional Observations
Backward Compatibility:
The method signature change for recordRegionHybridConsumption (adding 3 new parameters) is a breaking change for any external callers. Since this is an internal stats class, this should be fine, but ensure no external modules depend on the old signature.
Memory Considerations:
The IngestionOtelStats class creates per-store state with:
ingestionTasksByVersionmappushTimeoutByVersionmapidleTimeByVersionmap- Multiple metric entity state objects for each enum combination
For deployments with many stores, monitor memory usage after this change.
Missing Metrics:
Looking at the existing IngestionStats.java, some metrics are recorded to Tehuti but not to OTel (e.g., recordStalePartitionsWithoutIngestionTask, recordSubscribePartitionCount). This is likely intentional (phased migration), but worth documenting which metrics are included vs excluded in this PR.
Summary of Action Items
- Consider: Thread-safety of
writeComputeFailureCodefield access - Consider: Adding early OTel-disabled check in hot path methods
- Clarify: Javadoc in
VeniceIngestionComponentabout source vs destination usage - Monitor: Memory usage with many stores due to per-store OTel state
- Optional: Reduce reflection in tests by using package-private visibility
Overall, this is a solid implementation. The concerns above are mostly minor improvements.
Problem Statement
Venice currently uses Tehuti for metrics collection in
IngestionStats. To support observability migration and enable dimensional metrics with attributes likeVersionRole,ReplicaType, and region locality, we need to add OpenTelemetry (OTel) instrumentation alongside existing Tehuti metrics.Solution
Add OTel metrics infrastructure for ingestion stats:
New OTel Stats Class (
IngestionOtelStats.java)MetricEntityState*classesNew Async Metric Support (
AsyncMetricEntityStateOneEnum.java)New Dimension Enums
VeniceDCREvent- DCR event types (TOMBSTONE, DUPLICATE, UPDATE_IGNORED, etc.)VeniceIngestionComponent- Source components (KAFKA_CONSUMER, KAFKA_PRODUCER, etc.)VeniceDestinationIngestionComponent- Destination components (STORAGE_ENGINE, KAFKA)VeniceRegionLocality- Region locality (LOCAL, REMOTE)Shared Utilities
OtelStatsUtils.java- VersionInfo holder, classifyVersion logicIngestionStatsUtils.java- Task value extraction utilitiesIntegration (
AggVersionedIngestionStats.java)otelStatsMap(VeniceConcurrentHashMap) for per-store OTel stats managementonVersionInfoUpdated()for version lifecycle updatescleanupVersionResources()for per-version cleanuphandleStoreDeleted()Hot Path Optimizations (
LeaderFollowerStoreIngestionTask.java)IntSet.contains()for O(1) locality lookup instead of string comparison on every recordTests
IngestionOtelStatsTest.java- Unit tests for OTel stats classAggVersionedIngestionStatsTest.java- Integration with versioned statsOtelStatsUtilsTest.java,IngestionStatsUtilsTest.java- Utility testsAsyncMetricEntityStateOneEnumTest.java- Async metric support testsVeniceDCREventTest.java,VeniceIngestionComponentTest.java,VeniceDestinationIngestionComponentTest.java,VeniceRegionLocalityTest.java- Dimension enum testsSuggested Review Order
ServerMetricEntity.java- Metric entity definitionsIngestionOtelStats.java- OTel implementationAggVersionedIngestionStats.java- IntegrationLeaderFollowerStoreIngestionTask.java- Hot path optimizationIngestionOtelStatsTest.java- TestsCode changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized,RWLock) are used where needed.VeniceConcurrentHashMapforotelStatsMapvolatile+ immutableVersionInfopattern for thread-safe version updatesConcurrentHashMap,CopyOnWriteArrayList).VeniceConcurrentHashMapfor per-store and per-version stateHow was this PR tested?
IngestionOtelStatsTest.javaAggVersionedIngestionStatsTest.javaOtelStatsUtilsTest.javaIngestionStatsUtilsTest.javaAsyncMetricEntityStateOneEnumTest.javaVeniceDCREventTest.java,VeniceIngestionComponentTest.java,VeniceDestinationIngestionComponentTest.java,VeniceRegionLocalityTest.javaServerMetricEntityTest.javaHeartbeatOtelStatsTest.javaHeartbeatVersionedStatsTest.javaLeaderFollowerStoreIngestionTaskTest.javaVeniceMetricsDimensionsTest.javaDoes this PR introduce any user-facing or breaking changes?