Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,44 +1,43 @@
package com.linkedin.metadata.dao.tracking;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;


/**
* Interface for recording per-DAO-operation latency and error metrics.
* Interface for recording per-DAO-operation latency and count metrics with dimensional attributes.
*
* <p>Implementations collect histograms (latency) and counters (operation count, error count)
* to benchmark DAO performance during the MySQL to TiDB migration evaluation.</p>
* <p>Each call records both a count (one increment) and a latency observation. Implementations
* are expected to emit one metric per dimension instead of baking dimensions into metric names,
* so consumers can aggregate or filter on individual attributes without enumerating every
* concrete metric series.
*
* <p>Follows the same pattern as {@link BaseTrackingManager} / {@link DummyTrackingManager}:
* a no-op implementation ({@code NoOpDaoBenchmarkMetrics}) lives in the kernel (datahub-gma)
* and the real Dropwizard-backed implementation lives in the service layer.</p>
* <p>A no-op implementation ({@link NoOpDaoBenchmarkMetrics}) lives in the kernel; concrete
* backends (OTEL, Dropwizard, etc.) live in the service layer.
*/
public interface BaseDaoBenchmarkMetrics {

/**
* Record the latency of a successful or failed DAO operation.
* Record a completed DAO operation with its dimensional attributes.
*
* @param operationType the DAO operation name (e.g. "add", "batchGetUnion", "list")
* @param entityType the entity type derived from the URN class (e.g. "dataset", "corpuser")
* @param latencyMs wall-clock latency in milliseconds
* @param operation pure operation name with no concatenation (e.g. {@code "add"},
* {@code "batchGetUnion"})
* @param entityType entity type derived from the URN class (e.g. {@code "dataset"})
* @param aspect aspect class simple name for per-aspect operations, or {@code null} when
* the operation is not per-aspect
* @param countBucket pre-bucketed count label ({@code "1"} through {@code "9"}, {@code "10+"})
* for batch operations, or {@code null} when there is no count dimension
* @param status outcome string, e.g. {@code "success"} or {@code "failure"}
* @param errorClass simple name of the thrown exception on failure, or {@code null} on success
* @param latencyMs wall-clock latency of the operation in milliseconds
*/
void recordOperationLatency(@Nonnull String operationType, @Nonnull String entityType, long latencyMs);
void recordOperation(@Nonnull String operation, @Nonnull String entityType,
@Nullable String aspect, @Nullable String countBucket, @Nonnull String status,
@Nullable String errorClass, long latencyMs);

/**
* Record an error that occurred during a DAO operation.
*
* @param operationType the DAO operation name (e.g. "add", "create")
* @param entityType the entity type derived from the URN class
* @param exceptionClass the simple class name of the thrown exception (e.g. "SQLException")
*/
void recordOperationError(@Nonnull String operationType, @Nonnull String entityType,
@Nonnull String exceptionClass);

/**
* Whether metrics collection is enabled. Callers may short-circuit expensive
* instrumentation when this returns {@code false}.
*
* @return true if metrics are being collected
* Whether metrics collection is enabled. Callers may short-circuit instrumentation when
* this returns {@code false}.
*/
boolean isEnabled();
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
package com.linkedin.metadata.dao.tracking;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;


/**
* A no-op implementation of {@link BaseDaoBenchmarkMetrics} that discards all metrics.
* A no-op implementation of {@link BaseDaoBenchmarkMetrics} that discards all observations.
*
* <p>Used as the default when no real metrics backend is configured.
* Follows the same pattern as {@link DummyTrackingManager}.</p>
*/
public class NoOpDaoBenchmarkMetrics implements BaseDaoBenchmarkMetrics {

@Override
public void recordOperationLatency(@Nonnull String operationType, @Nonnull String entityType, long latencyMs) {
// Do nothing
}

@Override
public void recordOperationError(@Nonnull String operationType, @Nonnull String entityType,
@Nonnull String exceptionClass) {
public void recordOperation(@Nonnull String operation, @Nonnull String entityType,
@Nullable String aspect, @Nullable String countBucket, @Nonnull String status,
@Nullable String errorClass, long latencyMs) {
// Do nothing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ public class NoOpDaoBenchmarkMetricsTest {
public void testNoOpBehavior() {
NoOpDaoBenchmarkMetrics metrics = new NoOpDaoBenchmarkMetrics();

// Should not throw
metrics.recordOperationLatency("add", "dataset", 42L);
metrics.recordOperationError("add", "dataset", "SQLException");
// Should not throw on any dimension combination
metrics.recordOperation("add", "dataset", "AspectFoo", null, "success", null, 42L);
metrics.recordOperation("batchUpsert", "corpuser", null, "3", "failure", "SQLException", 15L);

assertFalse(metrics.isEnabled());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.dao;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
Expand All @@ -19,30 +20,43 @@


/**
* A decorator around {@link IEbeanLocalAccess} that records per-operation latency and error metrics
* via {@link BaseDaoBenchmarkMetrics}.
* A decorator around {@link IEbeanLocalAccess} that records per-operation latency and count
* metrics via {@link BaseDaoBenchmarkMetrics}, passing dimensions (operation, aspect, count
* bucket, status, error class) as separate attributes rather than baking them into a single
* metric name.
*
* <p>Delegates every call to the wrapped implementation, measuring wall-clock time with
* {@link System#nanoTime()}. On success, records latency; on error, records both latency and
* the exception class, then re-throws.</p>
* {@link System#nanoTime()}. On both success and failure, calls
* {@link BaseDaoBenchmarkMetrics#recordOperation} once with the observed latency, status, and
* (on failure) the thrown exception's simple class name; the exception is re-thrown unchanged.
*
* <p>When {@link BaseDaoBenchmarkMetrics#isEnabled()} returns {@code false}, delegation is
* direct with zero overhead (no timing).</p>
* direct with zero overhead (no timing).
*
* @param <URN> the URN type for this entity
*/
public class InstrumentedEbeanLocalAccess<URN extends Urn> implements IEbeanLocalAccess<URN> {

static final String STATUS_SUCCESS = "success";
static final String STATUS_FAILURE = "failure";

// 1..9 mapped to interned strings; 0 and 10+ handled separately. Avoids
// Integer.toString() allocation on the hot path.
private static final String[] COUNT_BUCKET_INTERNED =
{"1", "2", "3", "4", "5", "6", "7", "8", "9"};
private static final String BUCKET_ZERO = "0";
private static final String BUCKET_OVERFLOW = "10+";

private final IEbeanLocalAccess<URN> _delegate;
private final BaseDaoBenchmarkMetrics _metrics;
private final String _entityType;

/**
* Creates an instrumented wrapper around the given local access implementation.
*
* @param delegate the real local access implementation to wrap
* @param metrics the metrics recorder (may be a no-op)
* @param urnClass the URN class, used to derive the entity type name once at construction
* @param delegate the real local access implementation to wrap
* @param metrics the metrics recorder (may be a no-op)
* @param urnClass the URN class, used to derive the entity type name once at construction
*/
public InstrumentedEbeanLocalAccess(@Nonnull IEbeanLocalAccess<URN> delegate,
@Nonnull BaseDaoBenchmarkMetrics metrics, @Nonnull Class<URN> urnClass) {
Expand All @@ -51,6 +65,23 @@ public InstrumentedEbeanLocalAccess(@Nonnull IEbeanLocalAccess<URN> delegate,
_entityType = urnClass.getSimpleName().replace("Urn", "").toLowerCase();
}

/**
* Bucket a count value into a small fixed set of labels to keep metric cardinality bounded.
*
* <p>Returns {@code "0"} for non-positive values (defensive; batch ops shouldn't hit this),
* the interned digit string {@code "1".."9"} for 1-9, or {@code "10+"} for anything >= 10.
*/
@VisibleForTesting
static String bucketCount(int n) {
if (n <= 0) {
return BUCKET_ZERO;
}
if (n >= 10) {
return BUCKET_OVERFLOW;
}
return COUNT_BUCKET_INTERNED[n - 1];
}

@Override
public void setUrnPathExtractor(@Nonnull UrnPathExtractor<URN> urnPathExtractor) {
_delegate.setUrnPathExtractor(urnPathExtractor);
Expand All @@ -66,16 +97,17 @@ public void configureOptionalForceIndex(@Nullable String indexName,
public <ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPECT newValue,
@Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp auditStamp,
@Nullable IngestionTrackingContext ingestionTrackingContext, boolean isTestMode) {
return instrument("add." + aspectClass.getSimpleName(), () -> _delegate.add(urn, newValue,
aspectClass, auditStamp, ingestionTrackingContext, isTestMode));
return instrument("add", aspectClass.getSimpleName(), null,
() -> _delegate.add(urn, newValue, aspectClass, auditStamp,
ingestionTrackingContext, isTestMode));
}

@Override
public <ASPECT extends RecordTemplate> int addWithOptimisticLocking(@Nonnull URN urn,
@Nullable ASPECT newValue, @Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp auditStamp,
@Nullable Timestamp oldTimestamp, @Nullable IngestionTrackingContext ingestionTrackingContext,
boolean isTestMode, boolean softDeleteOverwrite) {
return instrument("addWithOptimisticLocking." + aspectClass.getSimpleName(),
return instrument("addWithOptimisticLocking", aspectClass.getSimpleName(), null,
() -> _delegate.addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp,
oldTimestamp, ingestionTrackingContext, isTestMode, softDeleteOverwrite));
}
Expand All @@ -86,7 +118,7 @@ public <ASPECT_UNION extends RecordTemplate> int create(@Nonnull URN urn,
@Nonnull List<BaseLocalDAO.AspectCreateLambda<? extends RecordTemplate>> aspectCreateLambdas,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext,
boolean isTestMode) {
return instrument("create.aspects_" + aspectValues.size(),
return instrument("create", null, bucketCount(aspectValues.size()),
() -> _delegate.create(urn, aspectValues, aspectCreateLambdas,
auditStamp, ingestionTrackingContext, isTestMode));
}
Expand All @@ -96,82 +128,87 @@ public <ASPECT_UNION extends RecordTemplate> int batchUpsert(@Nonnull URN urn,
@Nonnull List<BaseLocalDAO.AspectUpdateContext<RecordTemplate>> updateContexts,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext,
boolean isTestMode) {
return instrument("batchUpsert.aspects_" + updateContexts.size(),
() -> _delegate.batchUpsert(urn, updateContexts, auditStamp, ingestionTrackingContext, isTestMode));
return instrument("batchUpsert", null, bucketCount(updateContexts.size()),
() -> _delegate.batchUpsert(urn, updateContexts, auditStamp,
ingestionTrackingContext, isTestMode));
}

@Nonnull
@Override
public <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> batchGetUnion(
@Nonnull List<AspectKey<URN, ? extends RecordTemplate>> keys, int keysCount, int position,
boolean includeSoftDeleted, boolean isTestMode) {
return instrument("batchGetUnion.keys_" + keys.size(),
return instrument("batchGetUnion", null, bucketCount(keys.size()),
() -> _delegate.batchGetUnion(keys, keysCount, position, includeSoftDeleted, isTestMode));
}

@Override
public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) {
return instrument("softDeleteAsset", () -> _delegate.softDeleteAsset(urn, isTestMode));
return instrument("softDeleteAsset", null, null,
() -> _delegate.softDeleteAsset(urn, isTestMode));
}

@Override
public Map<URN, EntityDeletionInfo> readDeletionInfoBatch(@Nonnull List<URN> urns, boolean isTestMode) {
return instrument("readDeletionInfoBatch.urns_" + urns.size(),
public Map<URN, EntityDeletionInfo> readDeletionInfoBatch(@Nonnull List<URN> urns,
boolean isTestMode) {
return instrument("readDeletionInfoBatch", null, bucketCount(urns.size()),
() -> _delegate.readDeletionInfoBatch(urns, isTestMode));
}

@Override
public int batchSoftDeleteAssets(@Nonnull List<URN> urns, @Nonnull String cutoffTimestamp, boolean isTestMode) {
return instrument("batchSoftDeleteAssets.urns_" + urns.size(),
public int batchSoftDeleteAssets(@Nonnull List<URN> urns, @Nonnull String cutoffTimestamp,
boolean isTestMode) {
return instrument("batchSoftDeleteAssets", null, bucketCount(urns.size()),
() -> _delegate.batchSoftDeleteAssets(urns, cutoffTimestamp, isTestMode));
}

@Override
public List<URN> listUrns(@Nullable IndexFilter indexFilter,
@Nullable IndexSortCriterion indexSortCriterion, @Nullable URN lastUrn, int pageSize) {
return instrument("listUrns.cursor",
return instrument("listUrns.cursor", null, null,
() -> _delegate.listUrns(indexFilter, indexSortCriterion, lastUrn, pageSize));
}

@Override
public ListResult<URN> listUrns(@Nullable IndexFilter indexFilter,
@Nullable IndexSortCriterion indexSortCriterion, int start, int pageSize) {
return instrument("listUrns.offset",
return instrument("listUrns.offset", null, null,
() -> _delegate.listUrns(indexFilter, indexSortCriterion, start, pageSize));
}

@Override
public boolean exists(@Nonnull URN urn) {
return instrument("exists", () -> _delegate.exists(urn));
return instrument("exists", null, null, () -> _delegate.exists(urn));
}

@Nonnull
@Override
public Map<String, Long> countAggregate(@Nullable IndexFilter indexFilter,
@Nonnull IndexGroupByCriterion indexGroupByCriterion) {
return instrument("countAggregate",
return instrument("countAggregate", null, null,
() -> _delegate.countAggregate(indexFilter, indexGroupByCriterion));
}

@Nonnull
@Override
public <ASPECT extends RecordTemplate> ListResult<URN> listUrns(@Nonnull Class<ASPECT> aspectClass,
int start, int pageSize) {
return instrument("listUrns", () -> _delegate.listUrns(aspectClass, start, pageSize));
return instrument("listUrns", null, null,
() -> _delegate.listUrns(aspectClass, start, pageSize));
}

@Nonnull
@Override
public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<ASPECT> aspectClass,
@Nonnull URN urn, int start, int pageSize) {
return instrument("list", () -> _delegate.list(aspectClass, urn, start, pageSize));
return instrument("list", null, null, () -> _delegate.list(aspectClass, urn, start, pageSize));
}

@Nonnull
@Override
public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<ASPECT> aspectClass,
int start, int pageSize) {
return instrument("list", () -> _delegate.list(aspectClass, start, pageSize));
return instrument("list", null, null, () -> _delegate.list(aspectClass, start, pageSize));
}

@Override
Expand All @@ -181,21 +218,26 @@ public void ensureSchemaUpToDate() {
}

/**
* Core instrumentation wrapper. When metrics are disabled, delegates directly.
* When enabled, times the operation and records latency on success, latency + error on failure.
* Core instrumentation wrapper. When metrics are disabled, delegates directly. When enabled,
* times the operation and emits one {@code recordOperation} call in the {@code finally} block
* with status and (on failure) error class populated.
*/
private <T> T instrument(@Nonnull String operationType, @Nonnull Supplier<T> supplier) {
private <T> T instrument(@Nonnull String operation, @Nullable String aspect,
@Nullable String countBucket, @Nonnull Supplier<T> supplier) {
if (!_metrics.isEnabled()) {
return supplier.get();
}
final long startNanos = System.nanoTime();
String status = STATUS_SUCCESS;
String errorClass = null;
try {
return supplier.get();
} catch (RuntimeException ex) {
_metrics.recordOperationError(operationType, _entityType, ex.getClass().getSimpleName());
status = STATUS_FAILURE;
errorClass = ex.getClass().getSimpleName();
throw ex;
} finally {
_metrics.recordOperationLatency(operationType, _entityType,
_metrics.recordOperation(operation, _entityType, aspect, countBucket, status, errorClass,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
}
}
Expand Down
Loading
Loading