Skip to content

Commit 2fca171

Browse files
committed
Added assembled_record_rmd_size_in_bytes to track the RMD size and refined the unit test. 🖇️
1 parent 6c32331 commit 2fca171

File tree

4 files changed

+46
-24
lines changed

4 files changed

+46
-24
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import com.linkedin.venice.utils.VeniceProperties;
109109
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
110110
import com.linkedin.venice.utils.lazy.Lazy;
111+
import com.linkedin.venice.writer.VeniceWriter;
111112
import it.unimi.dsi.fastutil.objects.Object2IntMap;
112113
import java.io.Closeable;
113114
import java.io.IOException;
@@ -2922,13 +2923,11 @@ private int internalProcessConsumerRecord(
29222923
return sizeOfPersistedData;
29232924
}
29242925

2925-
protected void recordWriterStats(
2926+
protected abstract void recordWriterStats(
29262927
long consumerTimestampMs,
29272928
long producerBrokerLatencyMs,
29282929
long brokerConsumerLatencyMs,
2929-
PartitionConsumptionState partitionConsumptionState) {
2930-
2931-
}
2930+
PartitionConsumptionState partitionConsumptionState);
29322931

29332932
/**
29342933
* Message validation using DIV. Leaders should pass in the validator instance from {@link LeaderFollowerStoreIngestionTask};
@@ -3205,10 +3204,14 @@ private int processKafkaDataMessage(
32053204
ChunkedValueManifest chunkedValueManifest = manifestSerializer.deserialize(
32063205
ByteUtils.extractByteArray(put.getPutValue()), // must be done before recordTransformer changes putValue
32073206
AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion());
3208-
hostLevelIngestionStats.recordAssembledValueSize(chunkedValueManifest.getSize(), currentTimeMs);
3209-
3210-
// Ideally, the record size calculation should include rmd, but it's too difficult to keep track of it
3211-
recordAssembledRecordSizeRatio(keyLen + chunkedValueManifest.getSize(), currentTimeMs);
3207+
ByteBuffer rmdPayload = put.getReplicationMetadataPayload();
3208+
boolean isValueManifest = rmdPayload != null && rmdPayload.equals(VeniceWriter.EMPTY_BYTE_BUFFER);
3209+
if (isValueManifest) {
3210+
hostLevelIngestionStats.recordAssembledValueSize(chunkedValueManifest.getSize(), currentTimeMs);
3211+
recordAssembledRecordSizeRatio(keyLen + chunkedValueManifest.getSize(), currentTimeMs);
3212+
} else { // the manifest pertains to a chunked rmd
3213+
hostLevelIngestionStats.recordAssembledRmdSize(chunkedValueManifest.getSize(), currentTimeMs);
3214+
}
32123215
} catch (VeniceException | IllegalArgumentException | AvroRuntimeException e) {
32133216
LOGGER.error("Failed to deserialize ChunkedValueManifest to record assembled value size", e);
32143217
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
*/
3434
public class HostLevelIngestionStats extends AbstractVeniceStats {
3535
public static final String ASSEMBLED_RECORD_SIZE_RATIO = "assembled_record_size_ratio";
36+
public static final String ASSEMBLED_RECORD_RMD_SIZE_IN_BYTES = "assembled_record_rmd_size_in_bytes";
3637
public static final String ASSEMBLED_RECORD_VALUE_SIZE_IN_BYTES = "assembled_record_value_size_in_bytes";
3738

3839
// The aggregated bytes ingested rate for the entire host
@@ -51,6 +52,7 @@ public class HostLevelIngestionStats extends AbstractVeniceStats {
5152
private final Sensor keySizeSensor;
5253
private final Sensor valueSizeSensor;
5354
private final Sensor assembledValueSizeSensor;
55+
private final Sensor assembledRmdSizeSensor;
5456
private final Sensor assembledRecordSizeRatioSensor;
5557
private final Sensor unexpectedMessageSensor;
5658
private final Sensor inconsistentStoreMetadataSensor;
@@ -282,6 +284,8 @@ public HostLevelIngestionStats(
282284

283285
this.assembledValueSizeSensor = registerSensor(ASSEMBLED_RECORD_VALUE_SIZE_IN_BYTES, avgAndMax());
284286

287+
this.assembledRmdSizeSensor = registerSensor(ASSEMBLED_RECORD_RMD_SIZE_IN_BYTES, avgAndMax());
288+
285289
this.assembledRecordSizeRatioSensor = registerSensor(ASSEMBLED_RECORD_SIZE_RATIO, new Max());
286290

287291
String viewTimerSensorName = "total_view_writer_latency";
@@ -471,6 +475,10 @@ public void recordAssembledValueSize(long bytes, long currentTimeMs) {
471475
assembledValueSizeSensor.record(bytes, currentTimeMs);
472476
}
473477

478+
public void recordAssembledRmdSize(long bytes, long currentTimeMs) {
479+
assembledRmdSizeSensor.record(bytes, currentTimeMs);
480+
}
481+
474482
public void recordAssembledRecordSizeRatio(double ratio, long currentTimeMs) {
475483
assembledRecordSizeRatioSensor.record(ratio, currentTimeMs);
476484
}

clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4368,11 +4368,10 @@ public void testStoreIngestionRecordTransformerError(AAConfig aaConfig) throws E
43684368

43694369
@DataProvider
43704370
public static Object[][] testAssembledValueSizeProvider() {
4371-
Set<Integer> schemaIdSet = new HashSet<>();
4372-
for (AvroProtocolDefinition avroProtocolDefinition: AvroProtocolDefinition.values()) {
4373-
avroProtocolDefinition.currentProtocolVersion.ifPresent(schemaIdSet::add);
4374-
}
4375-
return DataProviderUtils.allPermutationGenerator(AAConfig.values(), schemaIdSet.toArray());
4371+
Object[] testSchemaIds =
4372+
{ VeniceWriter.VENICE_DEFAULT_VALUE_SCHEMA_ID, AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion(),
4373+
AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion() };
4374+
return DataProviderUtils.allPermutationGenerator(AAConfig.values(), testSchemaIds, DataProviderUtils.BOOLEAN);
43764375
}
43774376

43784377
/**
@@ -4383,7 +4382,7 @@ public static Object[][] testAssembledValueSizeProvider() {
43834382
* @throws Exception
43844383
*/
43854384
@Test(dataProvider = "testAssembledValueSizeProvider")
4386-
public void testAssembledValueSizeSensor(AAConfig aaConfig, int testSchemaId) throws Exception {
4385+
public void testAssembledValueSizeSensor(AAConfig aaConfig, int testSchemaId, boolean hasRmd) throws Exception {
43874386
int numChunks = 10;
43884387
long expectedRecordSize = (long) numChunks * ChunkingTestUtils.CHUNK_LENGTH;
43894388
PubSubTopicPartition tp = new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_FOO);
@@ -4395,8 +4394,6 @@ public void testAssembledValueSizeSensor(AAConfig aaConfig, int testSchemaId) th
43954394
ChunkingTestUtils.createChunkValueManifestRecord(putKeyFoo, messages.get(0), numChunks, tp);
43964395
messages.add(manifestMessage);
43974396

4398-
// The only expected real-life case should be when schemaId values are chunk=-10 and manifest=-20
4399-
boolean useRealSchemaId = testSchemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion();
44004397
runTest(Collections.singleton(PARTITION_FOO), () -> {
44014398
TestUtils.waitForNonDeterministicAssertion(
44024399
5,
@@ -4406,8 +4403,9 @@ public void testAssembledValueSizeSensor(AAConfig aaConfig, int testSchemaId) th
44064403
for (PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> message: messages) {
44074404
try {
44084405
Put put = (Put) message.getValue().getPayloadUnion();
4409-
if (!useRealSchemaId) {
4410-
put.schemaId = testSchemaId;
4406+
if (put.schemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) {
4407+
put.schemaId = testSchemaId; // set manifest schemaId to testSchemaId to see if metrics are still recorded
4408+
put.replicationMetadataPayload = (hasRmd) ? ByteBuffer.allocate(10) : VeniceWriter.EMPTY_BYTE_BUFFER;
44114409
}
44124410
LeaderProducedRecordContext leaderProducedRecordContext = mock(LeaderProducedRecordContext.class);
44134411
when(leaderProducedRecordContext.getMessageType()).thenReturn(MessageType.PUT);
@@ -4426,13 +4424,25 @@ public void testAssembledValueSizeSensor(AAConfig aaConfig, int testSchemaId) th
44264424
}
44274425
}
44284426

4429-
// Verify that the size of the assembled record was recorded in the metrics only if schemaId=-20
4427+
// Verify that the assembled record metrics are only recorded if schemaId=-20 which indicates a manifest
44304428
HostLevelIngestionStats hostLevelIngestionStats = storeIngestionTaskUnderTest.hostLevelIngestionStats;
4431-
if (useRealSchemaId) {
4432-
verify(hostLevelIngestionStats, timeout(1000)).recordAssembledValueSize(eq(expectedRecordSize), anyLong());
4433-
verify(hostLevelIngestionStats, timeout(1000).times(1)).recordAssembledValueSize(anyLong(), anyLong());
4429+
if (testSchemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) {
4430+
if (hasRmd) { // should have metrics for rmd but not value size
4431+
verify(hostLevelIngestionStats, timeout(1000)).recordAssembledRmdSize(eq(expectedRecordSize), anyLong());
4432+
verify(hostLevelIngestionStats, timeout(1000).times(1)).recordAssembledRmdSize(anyLong(), anyLong());
4433+
verify(hostLevelIngestionStats, times(0)).recordAssembledValueSize(anyLong(), anyLong());
4434+
verify(hostLevelIngestionStats, times(0)).recordAssembledRecordSizeRatio(anyDouble(), anyLong());
4435+
} else { // should have metrics for value but not rmd size
4436+
verify(hostLevelIngestionStats, timeout(1000)).recordAssembledValueSize(eq(expectedRecordSize), anyLong());
4437+
verify(hostLevelIngestionStats, timeout(1000).times(1)).recordAssembledValueSize(anyLong(), anyLong());
4438+
verify(hostLevelIngestionStats, timeout(1000).times(1))
4439+
.recordAssembledRecordSizeRatio(anyDouble(), anyLong());
4440+
verify(hostLevelIngestionStats, times(0)).recordAssembledRmdSize(anyLong(), anyLong());
4441+
}
44344442
} else {
44354443
verify(hostLevelIngestionStats, times(0)).recordAssembledValueSize(anyLong(), anyLong());
4444+
verify(hostLevelIngestionStats, times(0)).recordAssembledRmdSize(anyLong(), anyLong());
4445+
verify(hostLevelIngestionStats, times(0)).recordAssembledRecordSizeRatio(anyDouble(), anyLong());
44364446
}
44374447
}, aaConfig);
44384448
}

internal/venice-test-common/src/main/java/com/linkedin/venice/utils/ChunkingTestUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.linkedin.venice.storage.protocol.ChunkId;
2020
import com.linkedin.venice.storage.protocol.ChunkedKeySuffix;
2121
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
22+
import com.linkedin.venice.writer.VeniceWriter;
2223
import java.nio.ByteBuffer;
2324
import java.util.ArrayList;
2425

@@ -94,7 +95,7 @@ public static PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> createChunkedR
9495
put.schemaId = AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion();
9596
byte[] valueBytes = createChunkBytes(chunkIndex * CHUNK_LENGTH, CHUNK_LENGTH);
9697
put.putValue = prependSchemaId(valueBytes, put.schemaId);
97-
put.replicationMetadataPayload = ByteBuffer.allocate(10);
98+
put.replicationMetadataPayload = VeniceWriter.EMPTY_BYTE_BUFFER;
9899
messageEnvelope.payloadUnion = put;
99100
return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, newOffset, 0, 20);
100101
}
@@ -123,7 +124,7 @@ public static PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> createChunkVal
123124
Put put = new Put();
124125
put.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion();
125126
put.putValue = prependSchemaId(valueBytes, put.schemaId);
126-
put.replicationMetadataPayload = ByteBuffer.allocate(10);
127+
put.replicationMetadataPayload = VeniceWriter.EMPTY_BYTE_BUFFER;
127128
messageEnvelope.payloadUnion = put;
128129
return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, newOffset, 0, 20);
129130
}

0 commit comments

Comments
 (0)