From e382726341aa153416ce43ffb411bacc5a64dfe2 Mon Sep 17 00:00:00 2001 From: Kai-Sern Lim Date: Mon, 10 Jun 2024 09:56:18 -0700 Subject: [PATCH] [server] Fully Assembled Record Size Metric (#1009) Added a new per-store metric called `assembled_record_value_size_in_bytes` at the drainer stage. This will record the assembled / original size of larger / chunked records by recording the size field of the `ChunkedValueManifest`. The size field is sufficient, because the size of the fully assembled record is known before the `VeniceWriter` performs chunking. The metric is added to the drainer (which is after any partial update reassembling work), so all large records should be covered. Moved `TestChunkingUtils` into the `venice-test-common` project, which is hopefully a more easily accessible test utils location and can be helpful in more tests in the future. Moved several helper functions from `TestKafkaTopicDumper` into `TestChunkingUtils`, in hopes that they can be more easily reused in creating chunked `PubSubMessage` objects. Added a test util `prependSchemaId()` to help with prepending the schema id into the put value bytes, which is useful in multiple other tests. Added a unit test with `testAssembledValueSizeSensor()` in `TestStoreIngestionTask` where a chunked record and its manifest are sent to the drainer, and the metrics should be populated. Added a check inside `testLargeValues()` in `TestBatch` that the metric is populated if chunking is enabled when sending large records and that the metric is not populated, otherwise. --- .../kafka/consumer/StoreIngestionTask.java | 17 ++ .../stats/HostLevelIngestionStats.java | 15 ++ .../storage/chunking/ChunkingUtils.java | 4 +- .../ActiveActiveStoreIngestionTaskTest.java | 27 +-- .../consumer/StoreIngestionTaskTest.java | 72 ++++++ .../linkedin/venice/TestKafkaTopicDumper.java | 219 ++---------------- .../input/kafka/chunk/TestChunkAssembler.java | 4 +- .../linkedin/venice/meta/IngestionMode.java | 2 +- .../venice/writer/WriterChunkingHelper.java | 2 +- .../v-20/ChunkedValueManifest.avsc | 2 +- .../TestChunkKeyValueTransformerImpl.java | 15 +- .../venice/chunking/TestChunkingUtils.java | 35 --- .../linkedin/venice/endToEnd/TestBatch.java | 49 +++- .../utils/VeniceServerWrapper.java | 13 ++ .../venice/utils/ChunkingTestUtils.java | 184 +++++++++++++++ 15 files changed, 387 insertions(+), 273 deletions(-) delete mode 100644 internal/venice-common/src/test/java/com/linkedin/venice/chunking/TestChunkingUtils.java create mode 100644 internal/venice-test-common/src/main/java/com/linkedin/venice/utils/ChunkingTestUtils.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index e939a12ca18..7e25cb1bcf2 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -87,9 +87,11 @@ import com.linkedin.venice.pubsub.manager.TopicManagerRepository; import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer; import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; import com.linkedin.venice.serializer.AvroGenericDeserializer; import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; +import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.system.store.MetaStoreWriter; import com.linkedin.venice.utils.ByteUtils; import com.linkedin.venice.utils.ComplementSet; @@ -139,6 +141,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; +import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.logging.log4j.LogManager; @@ -322,6 +325,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final StorageEngineBackedCompressorFactory compressorFactory; protected final Lazy compressor; protected final boolean isChunked; + protected final ChunkedValueManifestSerializer manifestSerializer; protected final PubSubTopicRepository pubSubTopicRepository; private final String[] msgForLagMeasurement; private final Runnable runnableForKillIngestionTasksForNonCurrentVersions; @@ -477,6 +481,7 @@ public StoreIngestionTask( this.compressorFactory = builder.getCompressorFactory(); this.compressor = Lazy.of(() -> compressorFactory.getCompressor(compressionStrategy, kafkaVersionTopic)); this.isChunked = version.isChunkingEnabled(); + this.manifestSerializer = new ChunkedValueManifestSerializer(true); this.msgForLagMeasurement = new String[subPartitionCount]; for (int i = 0; i < this.msgForLagMeasurement.length; i++) { this.msgForLagMeasurement[i] = kafkaVersionTopic + "_" + i; @@ -3226,6 +3231,18 @@ private int processKafkaDataMessage( // update checksum for this PUT message if needed. partitionConsumptionState.maybeUpdateExpectedChecksum(keyBytes, put); + if (metricsEnabled && recordLevelMetricEnabled.get() + && put.getSchemaId() == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) { + try { + ChunkedValueManifest chunkedValueManifest = manifestSerializer.deserialize( + ByteUtils.extractByteArray(put.getPutValue()), // must be done before recordTransformer changes putValue + AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()); + hostLevelIngestionStats.recordAssembledValueSize(chunkedValueManifest.getSize(), currentTimeMs); + } catch (VeniceException | IllegalArgumentException | AvroRuntimeException e) { + LOGGER.error("Failed to deserialize ChunkedValueManifest to record assembled value size", e); + } + } + // Check if put.getSchemaId is positive, if not default to 1 int putSchemaId = put.getSchemaId() > 0 ? put.getSchemaId() : 1; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java index f24be58ccc5..ea1c1639823 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java @@ -32,6 +32,8 @@ * (3) Per store and total: The stat is registered for each store on this host and the total number for this host. */ public class HostLevelIngestionStats extends AbstractVeniceStats { + public static final String ASSEMBLED_RECORD_VALUE_SIZE_IN_BYTES = "assembled_record_value_size_in_bytes"; + // The aggregated bytes ingested rate for the entire host private final LongAdderRateGauge totalBytesConsumedRate; // The aggregated records ingested rate for the entire host @@ -47,6 +49,7 @@ public class HostLevelIngestionStats extends AbstractVeniceStats { private final Sensor consumerRecordsQueuePutLatencySensor; private final Sensor keySizeSensor; private final Sensor valueSizeSensor; + private final Sensor assembledValueSizeSensor; private final Sensor unexpectedMessageSensor; private final Sensor inconsistentStoreMetadataSensor; private final Sensor ingestionFailureSensor; @@ -275,6 +278,14 @@ public HostLevelIngestionStats( new Max(), TehutiUtils.getPercentileStat(getName() + AbstractVeniceStats.DELIMITER + valueSizeSensorName)); + this.assembledValueSizeSensor = registerSensor( + ASSEMBLED_RECORD_VALUE_SIZE_IN_BYTES, + new Avg(), + new Min(), + new Max(), + TehutiUtils + .getPercentileStat(getName() + AbstractVeniceStats.DELIMITER + ASSEMBLED_RECORD_VALUE_SIZE_IN_BYTES)); + String viewTimerSensorName = "total_view_writer_latency"; this.viewProducerLatencySensor = registerPerStoreAndTotalSensor( viewTimerSensorName, @@ -458,6 +469,10 @@ public void recordValueSize(long bytes, long currentTimeMs) { valueSizeSensor.record(bytes, currentTimeMs); } + public void recordAssembledValueSize(long bytes, long currentTimeMs) { + assembledValueSizeSensor.record(bytes, currentTimeMs); + } + public void recordIngestionFailure() { ingestionFailureSensor.record(); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkingUtils.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkingUtils.java index 6f12412ebf3..3481ddc68aa 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkingUtils.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkingUtils.java @@ -57,8 +57,8 @@ * b) If it is negative, then it's a {@link ChunkedValueManifest}, and we continue to the next steps. * 3. The {@link ChunkedValueManifest} is deserialized, and its chunk keys are extracted. * 4. Each chunk key is queried. - * 5. The chunks are stitched back together using the various adpater interfaces of this package, - * depending on whether it is the single get or batch get/compute path that needs to re-assembe + * 5. The chunks are stitched back together using the various adapter interfaces of this package, + * depending on whether it is the single get or batch get/compute path that needs to re-assemble * a chunked value. */ public class ChunkingUtils { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java index 7fed21df15d..f8102ca1cae 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java @@ -3,7 +3,6 @@ import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME; import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS; -import static com.linkedin.venice.utils.ByteUtils.SIZE_OF_INT; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; @@ -81,6 +80,7 @@ import com.linkedin.venice.storage.protocol.ChunkedKeySuffix; import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.utils.ByteUtils; +import com.linkedin.venice.utils.ChunkingTestUtils; import com.linkedin.venice.utils.SystemTime; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.lazy.Lazy; @@ -314,11 +314,7 @@ public void testLeaderCanSendValueChunksIntoDrainer() stringBuilder.append("abcdefghabcdefghabcdefghabcdefgh"); } String valueString = stringBuilder.toString(); - byte[] valueBytes = valueString.getBytes(); - byte[] schemaIdPrependedValueBytes = new byte[4 + valueBytes.length]; - ByteUtils.writeInt(schemaIdPrependedValueBytes, 1, 0); - System.arraycopy(valueBytes, 0, schemaIdPrependedValueBytes, 4, valueBytes.length); - ByteBuffer updatedValueBytes = ByteBuffer.wrap(schemaIdPrependedValueBytes, 4, valueBytes.length); + ByteBuffer updatedValueBytes = ChunkingTestUtils.prependSchemaId(valueString.getBytes(), 1); ByteBuffer updatedRmdBytes = ByteBuffer.wrap(new byte[] { 0xa, 0xb }); PubSubMessage consumerRecord = mock(PubSubMessage.class); when(consumerRecord.getOffset()).thenReturn(100L); @@ -485,17 +481,13 @@ public void testReadingChunkedRmdFromStorage() { chunkedValueManifest.keysWithChunkIdSuffix = new ArrayList<>(1); chunkedValueManifest.size = 8; chunkedValueManifest.keysWithChunkIdSuffix.add(chunkedKeyWithSuffix1); + int manifestSchemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(); byte[] chunkedManifestBytes = chunkedValueManifestSerializer.serialize(topicName, chunkedValueManifest); - byte[] chunkedManifestWithSchemaBytes = new byte[SIZE_OF_INT + chunkedManifestBytes.length]; - System.arraycopy(chunkedManifestBytes, 0, chunkedManifestWithSchemaBytes, 4, chunkedManifestBytes.length); - ByteUtils.writeInt( - chunkedManifestWithSchemaBytes, - AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(), - 0); + chunkedManifestBytes = ChunkingTestUtils.prependSchemaId(chunkedManifestBytes, manifestSchemaId).array(); byte[] topLevelKey2 = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(key2); byte[] chunkedKey1InKey2 = chunkedKeyWithSuffix1.array(); - when(storageEngine.getReplicationMetadata(subPartition, topLevelKey2)).thenReturn(chunkedManifestWithSchemaBytes); + when(storageEngine.getReplicationMetadata(subPartition, topLevelKey2)).thenReturn(chunkedManifestBytes); when(storageEngine.getReplicationMetadata(subPartition, chunkedKey1InKey2)).thenReturn(chunkedValue1); List chunkedValues = new ArrayList<>(1); chunkedValues.add(chunkedValue1); @@ -528,17 +520,12 @@ public void testReadingChunkedRmdFromStorage() { chunkedValueManifest.keysWithChunkIdSuffix.add(chunkedKeyWithSuffix1); chunkedValueManifest.keysWithChunkIdSuffix.add(chunkedKeyWithSuffix2); chunkedManifestBytes = chunkedValueManifestSerializer.serialize(topicName, chunkedValueManifest); - chunkedManifestWithSchemaBytes = new byte[SIZE_OF_INT + chunkedManifestBytes.length]; - System.arraycopy(chunkedManifestBytes, 0, chunkedManifestWithSchemaBytes, SIZE_OF_INT, chunkedManifestBytes.length); - ByteUtils.writeInt( - chunkedManifestWithSchemaBytes, - AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(), - 0); + chunkedManifestBytes = ChunkingTestUtils.prependSchemaId(chunkedManifestBytes, manifestSchemaId).array(); byte[] topLevelKey3 = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(key3); byte[] chunkedKey1InKey3 = chunkedKeyWithSuffix1.array(); byte[] chunkedKey2InKey3 = chunkedKeyWithSuffix2.array(); - when(storageEngine.getReplicationMetadata(subPartition, topLevelKey3)).thenReturn(chunkedManifestWithSchemaBytes); + when(storageEngine.getReplicationMetadata(subPartition, topLevelKey3)).thenReturn(chunkedManifestBytes); when(storageEngine.getReplicationMetadata(subPartition, chunkedKey1InKey3)).thenReturn(chunkedValue1); when(storageEngine.getReplicationMetadata(subPartition, chunkedKey2InKey3)).thenReturn(chunkedValue2); when(storageEngine.multiGetReplicationMetadata(eq(subPartition), any())) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 836c5b5728c..29bcee3f8a9 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -189,6 +189,7 @@ import com.linkedin.venice.unit.matchers.NonEmptyStringMatcher; import com.linkedin.venice.utils.ByteArray; import com.linkedin.venice.utils.ByteUtils; +import com.linkedin.venice.utils.ChunkingTestUtils; import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.DiskUsage; import com.linkedin.venice.utils.Pair; @@ -4429,6 +4430,77 @@ public void testStoreIngestionRecordTransformerError(AAConfig aaConfig) throws E .recordTransformerError(eq(storeNameWithoutVersionInfo), anyInt(), anyDouble(), anyLong()); } + @DataProvider + public static Object[][] testAssembledValueSizeProvider() { + Set schemaIdSet = new HashSet<>(); + for (AvroProtocolDefinition avroProtocolDefinition: AvroProtocolDefinition.values()) { + avroProtocolDefinition.currentProtocolVersion.ifPresent(schemaIdSet::add); + } + return DataProviderUtils.allPermutationGenerator(AAConfig.values(), schemaIdSet.toArray()); + } + + /** + * Create messages for a chunked record (in multiple chunks) and its manifest, and verify that the drainer + * records the size of the chunked record as described in the size field of the manifest. + * Also, verify that metrics are only emitted when the correct schemaId=-20 is on the manifest message, + * and not emitted on any other invalid schemaId values. + * @throws Exception + */ + @Test(dataProvider = "testAssembledValueSizeProvider") + public void testAssembledValueSizeSensor(AAConfig aaConfig, int testSchemaId) throws Exception { + int numChunks = 10; + long expectedRecordSize = (long) numChunks * ChunkingTestUtils.CHUNK_LENGTH; + PubSubTopicPartition tp = new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_FOO); + List> messages = new ArrayList<>(numChunks + 1); // + manifest + for (int i = 0; i < numChunks; i++) { + messages.add(ChunkingTestUtils.createChunkedRecord(putKeyFoo, 1, 1, i, 0, tp)); + } + PubSubMessage manifestMessage = + ChunkingTestUtils.createChunkValueManifestRecord(putKeyFoo, messages.get(0), numChunks, tp); + messages.add(manifestMessage); + + // The only expected real-life case should be when schemaId values are chunk=-10 and manifest=-20 + boolean useRealSchemaId = testSchemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(); + runTest(Collections.singleton(PARTITION_FOO), () -> { + TestUtils.waitForNonDeterministicAssertion( + 5, + TimeUnit.SECONDS, + () -> assertTrue(storeIngestionTaskUnderTest.hasAnySubscription())); + + for (PubSubMessage message: messages) { + try { + Put put = (Put) message.getValue().getPayloadUnion(); + if (!useRealSchemaId) { + put.schemaId = testSchemaId; + } + LeaderProducedRecordContext leaderProducedRecordContext = mock(LeaderProducedRecordContext.class); + when(leaderProducedRecordContext.getMessageType()).thenReturn(MessageType.PUT); + when(leaderProducedRecordContext.getValueUnion()).thenReturn(put); + when(leaderProducedRecordContext.getKeyBytes()).thenReturn(putKeyFoo); + + storeIngestionTaskUnderTest.produceToStoreBufferService( + message, + leaderProducedRecordContext, + PARTITION_FOO, + localKafkaConsumerService.kafkaUrl, + System.nanoTime(), + System.currentTimeMillis()); + } catch (InterruptedException e) { + throw new VeniceException(e); + } + } + }, aaConfig); + + // Verify that the size of the assembled record was recorded in the metrics only if schemaId=-20 + HostLevelIngestionStats hostLevelIngestionStats = storeIngestionTaskUnderTest.hostLevelIngestionStats; + if (useRealSchemaId) { + verify(hostLevelIngestionStats).recordAssembledValueSize(eq(expectedRecordSize), anyLong()); + verify(hostLevelIngestionStats, times(1)).recordAssembledValueSize(anyLong(), anyLong()); + } else { + verify(hostLevelIngestionStats, times(0)).recordAssembledValueSize(anyLong(), anyLong()); + } + } + @Test public void testGetOffsetToOnlineLagThresholdPerPartition() { ReadOnlyStoreRepository storeRepository = mock(ReadOnlyStoreRepository.class); diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestKafkaTopicDumper.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestKafkaTopicDumper.java index 9fe2168864b..c2a579e7069 100644 --- a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestKafkaTopicDumper.java +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestKafkaTopicDumper.java @@ -1,27 +1,17 @@ package com.linkedin.venice; -import static com.linkedin.venice.kafka.protocol.enums.MessageType.DELETE; -import static com.linkedin.venice.kafka.protocol.enums.MessageType.PUT; -import static com.linkedin.venice.kafka.protocol.enums.MessageType.UPDATE; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.linkedin.venice.chunking.TestChunkingUtils; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.MultiSchemaResponse; import com.linkedin.venice.controllerapi.SchemaResponse; import com.linkedin.venice.controllerapi.StoreResponse; -import com.linkedin.venice.kafka.protocol.Delete; -import com.linkedin.venice.kafka.protocol.GUID; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; -import com.linkedin.venice.kafka.protocol.ProducerMetadata; -import com.linkedin.venice.kafka.protocol.Put; -import com.linkedin.venice.kafka.protocol.Update; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.Version; -import com.linkedin.venice.pubsub.ImmutablePubSubMessage; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter; @@ -29,19 +19,12 @@ import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.schema.rmd.RmdSchemaGenerator; import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; -import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer; -import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; -import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer; import com.linkedin.venice.serializer.RecordSerializer; import com.linkedin.venice.serializer.SerializerDeserializerFactory; -import com.linkedin.venice.storage.protocol.ChunkedKeySuffix; -import com.linkedin.venice.storage.protocol.ChunkedValueManifest; -import com.linkedin.venice.utils.ByteUtils; +import com.linkedin.venice.utils.ChunkingTestUtils; import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.writer.update.UpdateBuilderImpl; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collections; import java.util.Optional; import org.apache.avro.Schema; @@ -104,39 +87,23 @@ public void testAdminToolConsumptionForChunkedData() throws IOException { false, false); - int firstChunkSegmentNumber = 1; - int firstChunkSequenceNumber = 1; - PubSubMessage pubSubMessage1 = - getChunkedRecord(serializedKey, firstChunkSegmentNumber, firstChunkSequenceNumber, 0, 0, pubSubTopicPartition); - String firstChunkMetadataLog = kafkaTopicDumper.getChunkMetadataLog(pubSubMessage1); - Assert.assertEquals( - firstChunkMetadataLog, - " ChunkMd=(type:WITH_VALUE_CHUNK, ChunkIndex: 0, FirstChunkMd=(guid:00000000000000000000000000000000,seg:1,seq:1))"); - - PubSubMessage pubSubMessage2 = - getChunkedRecord(serializedKey, firstChunkSegmentNumber, firstChunkSequenceNumber, 1, 0, pubSubTopicPartition); - String secondChunkMetadataLog = kafkaTopicDumper.getChunkMetadataLog(pubSubMessage2); - Assert.assertEquals( - secondChunkMetadataLog, - " ChunkMd=(type:WITH_VALUE_CHUNK, ChunkIndex: 1, FirstChunkMd=(guid:00000000000000000000000000000000,seg:1,seq:1))"); - - PubSubMessage pubSubMessage3 = - getChunkedRecord(serializedKey, firstChunkSegmentNumber, firstChunkSequenceNumber, 2, 0, pubSubTopicPartition); - String thirdChunkMetadataLog = kafkaTopicDumper.getChunkMetadataLog(pubSubMessage3); - Assert.assertEquals( - thirdChunkMetadataLog, - " ChunkMd=(type:WITH_VALUE_CHUNK, ChunkIndex: 2, FirstChunkMd=(guid:00000000000000000000000000000000,seg:1,seq:1))"); + int numChunks = 3; + String metadataFormat = " ChunkMd=(type:%s, FirstChunkMd=(guid:00000000000000000000000000000000,seg:1,seq:1))"; + PubSubMessage chunkMessage = null; + for (int i = 0; i < numChunks; i++) { + chunkMessage = ChunkingTestUtils.createChunkedRecord(serializedKey, 1, 1, i, 0, pubSubTopicPartition); + String metadataLog = kafkaTopicDumper.getChunkMetadataLog(chunkMessage); + Assert.assertEquals(metadataLog, String.format(metadataFormat, "WITH_VALUE_CHUNK, ChunkIndex: " + i)); + } - PubSubMessage pubSubMessage4 = - getChunkValueManifestRecord(serializedKey, pubSubMessage1, firstChunkSequenceNumber, pubSubTopicPartition); - String manifestChunkMetadataLog = kafkaTopicDumper.getChunkMetadataLog(pubSubMessage4); - Assert.assertEquals( - manifestChunkMetadataLog, - " ChunkMd=(type:WITH_CHUNK_MANIFEST, FirstChunkMd=(guid:00000000000000000000000000000000,seg:1,seq:1))"); + PubSubMessage manifestMessage = + ChunkingTestUtils.createChunkValueManifestRecord(serializedKey, chunkMessage, numChunks, pubSubTopicPartition); + String manifestChunkMetadataLog = kafkaTopicDumper.getChunkMetadataLog(manifestMessage); + Assert.assertEquals(manifestChunkMetadataLog, String.format(metadataFormat, "WITH_CHUNK_MANIFEST")); - PubSubMessage pubSubMessage5 = - getDeleteRecord(serializedKey, null, pubSubTopicPartition); - String deleteChunkMetadataLog = kafkaTopicDumper.getChunkMetadataLog(pubSubMessage5); + PubSubMessage deleteMessage = + ChunkingTestUtils.createDeleteRecord(serializedKey, null, pubSubTopicPartition); + String deleteChunkMetadataLog = kafkaTopicDumper.getChunkMetadataLog(deleteMessage); Assert.assertEquals(deleteChunkMetadataLog, " ChunkMd=(type:WITH_FULL_VALUE)"); } @@ -243,7 +210,7 @@ public void testDumpDataRecord() throws IOException { byte[] serializedRmd = rmdSerializer.serialize(rmdRecord); byte[] serializedUpdate = updateSerializer.serialize(updateRecord); PubSubMessage putMessage = - getPutRecord(serializedKey, serializedValue, serializedRmd, pubSubTopicPartition); + ChunkingTestUtils.createPutRecord(serializedKey, serializedValue, serializedRmd, pubSubTopicPartition); String returnedLog = kafkaTopicDumper.buildDataRecordLog(putMessage, false); String expectedLog = String.format("Key: %s; Value: %s; Schema: %d", keyString, valueRecord, 1); Assert.assertEquals(returnedLog, expectedLog); @@ -253,7 +220,7 @@ public void testDumpDataRecord() throws IOException { // Test UPDATE PubSubMessage updateMessage = - getUpdateRecord(serializedKey, serializedUpdate, pubSubTopicPartition); + ChunkingTestUtils.createUpdateRecord(serializedKey, serializedUpdate, pubSubTopicPartition); returnedLog = kafkaTopicDumper.buildDataRecordLog(updateMessage, false); expectedLog = String.format("Key: %s; Value: %s; Schema: %d-%d", keyString, updateRecord, 1, 1); Assert.assertEquals(returnedLog, expectedLog); @@ -263,7 +230,7 @@ public void testDumpDataRecord() throws IOException { // Test DELETE with and without RMD PubSubMessage deleteMessage = - getDeleteRecord(serializedKey, serializedRmd, pubSubTopicPartition); + ChunkingTestUtils.createDeleteRecord(serializedKey, serializedRmd, pubSubTopicPartition); returnedLog = kafkaTopicDumper.buildDataRecordLog(deleteMessage, false); expectedLog = String.format("Key: %s; Value: %s; Schema: %d", keyString, null, 1); Assert.assertEquals(returnedLog, expectedLog); @@ -271,152 +238,4 @@ public void testDumpDataRecord() throws IOException { expectedLog = String.format("Key: %s; Value: %s; Schema: %d; RMD: %s", keyString, null, 1, rmdRecord); Assert.assertEquals(returnedLog, expectedLog); } - - private PubSubMessage getChunkedRecord( - byte[] serializedKey, - int firstChunkSegmentNumber, - int firstChunkSequenceNumber, - int chunkIndex, - int firstMessageOffset, - PubSubTopicPartition pubSubTopicPartition) { - int chunkLength = 10; - ChunkedKeySuffix chunkKeySuffix1 = - TestChunkingUtils.createChunkedKeySuffix(firstChunkSegmentNumber, firstChunkSequenceNumber, chunkIndex); - KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer(); - ByteBuffer chunkKeyWithSuffix1 = - keyWithChunkingSuffixSerializer.serializeChunkedKey(serializedKey, chunkKeySuffix1); - KafkaKey kafkaKey = new KafkaKey(PUT, ByteUtils.extractByteArray(chunkKeyWithSuffix1)); - KafkaMessageEnvelope messageEnvelope = new KafkaMessageEnvelope(); - messageEnvelope.messageType = 0; // PUT - messageEnvelope.producerMetadata = new ProducerMetadata(); - messageEnvelope.producerMetadata.messageTimestamp = 0; - messageEnvelope.producerMetadata.segmentNumber = firstChunkSegmentNumber; - messageEnvelope.producerMetadata.messageSequenceNumber = firstChunkSequenceNumber + chunkIndex; - messageEnvelope.producerMetadata.producerGUID = new GUID(); - Put put = new Put(); - put.schemaId = AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion(); - put.putValue = ByteBuffer.wrap(TestChunkingUtils.createChunkBytes(chunkIndex * chunkLength, chunkLength)); - messageEnvelope.payloadUnion = put; - return new ImmutablePubSubMessage<>( - kafkaKey, - messageEnvelope, - pubSubTopicPartition, - firstMessageOffset + chunkIndex, - 0, - 20); - } - - private PubSubMessage getChunkValueManifestRecord( - byte[] serializedKey, - PubSubMessage firstChunkMessage, - int numberOfChunks, - PubSubTopicPartition pubSubTopicPartition) { - int chunkLength = 10; - KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer(); - - byte[] chunkKeyWithSuffix = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey); - KafkaKey kafkaKey = new KafkaKey(PUT, chunkKeyWithSuffix); - KafkaMessageEnvelope messageEnvelope = new KafkaMessageEnvelope(); - messageEnvelope.messageType = 0; // PUT - messageEnvelope.producerMetadata = new ProducerMetadata(); - messageEnvelope.producerMetadata.messageTimestamp = 0; - messageEnvelope.producerMetadata.segmentNumber = firstChunkMessage.getValue().getProducerMetadata().segmentNumber; - messageEnvelope.producerMetadata.messageSequenceNumber = - firstChunkMessage.getValue().getProducerMetadata().messageSequenceNumber + numberOfChunks; - messageEnvelope.producerMetadata.producerGUID = new GUID(); - - ChunkedValueManifestSerializer chunkedValueManifestSerializer = new ChunkedValueManifestSerializer(true); - ChunkedValueManifest manifest = new ChunkedValueManifest(); - manifest.keysWithChunkIdSuffix = new ArrayList<>(numberOfChunks); - manifest.schemaId = 1; - manifest.size = chunkLength * numberOfChunks; - - manifest.keysWithChunkIdSuffix.add(ByteBuffer.wrap(firstChunkMessage.getKey().getKey())); - - Put put = new Put(); - put.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(); - put.putValue = chunkedValueManifestSerializer.serialize(manifest); - messageEnvelope.payloadUnion = put; - return new ImmutablePubSubMessage<>( - kafkaKey, - messageEnvelope, - pubSubTopicPartition, - firstChunkMessage.getOffset() + numberOfChunks, - 0, - 20); - } - - private PubSubMessage getDeleteRecord( - byte[] serializedKey, - byte[] serializedRmd, - PubSubTopicPartition pubSubTopicPartition) { - KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer(); - byte[] chunkKeyWithSuffix = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey); - KafkaKey kafkaKey = new KafkaKey(DELETE, chunkKeyWithSuffix); - KafkaMessageEnvelope messageEnvelope = new KafkaMessageEnvelope(); - messageEnvelope.messageType = 1; - messageEnvelope.producerMetadata = new ProducerMetadata(); - messageEnvelope.producerMetadata.messageTimestamp = 0; - messageEnvelope.producerMetadata.segmentNumber = 0; - messageEnvelope.producerMetadata.messageSequenceNumber = 0; - messageEnvelope.producerMetadata.producerGUID = new GUID(); - - Delete delete = new Delete(); - delete.schemaId = 1; - if (serializedRmd != null) { - delete.replicationMetadataPayload = ByteBuffer.wrap(serializedRmd); - delete.replicationMetadataVersionId = 1; - } - messageEnvelope.payloadUnion = delete; - return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, 1, 0, 20); - } - - private PubSubMessage getPutRecord( - byte[] serializedKey, - byte[] serializedValue, - byte[] serializedRmd, - PubSubTopicPartition pubSubTopicPartition) { - KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer(); - byte[] chunkKeyWithSuffix = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey); - KafkaKey kafkaKey = new KafkaKey(PUT, chunkKeyWithSuffix); - KafkaMessageEnvelope messageEnvelope = new KafkaMessageEnvelope(); - messageEnvelope.messageType = PUT.getValue(); - messageEnvelope.producerMetadata = new ProducerMetadata(); - messageEnvelope.producerMetadata.messageTimestamp = 0; - messageEnvelope.producerMetadata.segmentNumber = 0; - messageEnvelope.producerMetadata.messageSequenceNumber = 0; - messageEnvelope.producerMetadata.producerGUID = new GUID(); - Put put = new Put(); - put.schemaId = 1; - put.putValue = ByteBuffer.wrap(serializedValue); - if (serializedRmd != null) { - put.replicationMetadataPayload = ByteBuffer.wrap(serializedRmd); - put.replicationMetadataVersionId = 1; - } - messageEnvelope.payloadUnion = put; - return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, 1, 0, serializedValue.length); - } - - private PubSubMessage getUpdateRecord( - byte[] serializedKey, - byte[] serializedValue, - PubSubTopicPartition pubSubTopicPartition) { - KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer(); - byte[] chunkKeyWithSuffix = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey); - KafkaKey kafkaKey = new KafkaKey(UPDATE, chunkKeyWithSuffix); - KafkaMessageEnvelope messageEnvelope = new KafkaMessageEnvelope(); - messageEnvelope.messageType = UPDATE.getValue(); - messageEnvelope.producerMetadata = new ProducerMetadata(); - messageEnvelope.producerMetadata.messageTimestamp = 0; - messageEnvelope.producerMetadata.segmentNumber = 0; - messageEnvelope.producerMetadata.messageSequenceNumber = 0; - messageEnvelope.producerMetadata.producerGUID = new GUID(); - Update update = new Update(); - update.schemaId = 1; - update.updateValue = ByteBuffer.wrap(serializedValue); - update.updateSchemaId = 1; - - messageEnvelope.payloadUnion = update; - return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, 1, 0, serializedValue.length); - } } diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/chunk/TestChunkAssembler.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/chunk/TestChunkAssembler.java index 11c727a784f..3049954d1a5 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/chunk/TestChunkAssembler.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/chunk/TestChunkAssembler.java @@ -1,7 +1,7 @@ package com.linkedin.venice.hadoop.input.kafka.chunk; -import static com.linkedin.venice.chunking.TestChunkingUtils.createChunkBytes; -import static com.linkedin.venice.chunking.TestChunkingUtils.createChunkedKeySuffix; +import static com.linkedin.venice.utils.ChunkingTestUtils.createChunkBytes; +import static com.linkedin.venice.utils.ChunkingTestUtils.createChunkedKeySuffix; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/IngestionMode.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/IngestionMode.java index 3ec9ce5cc42..83439b0ea63 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/IngestionMode.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/IngestionMode.java @@ -38,7 +38,7 @@ private static Map getIngestionModeMap() { return intToTypeMap; } - public static IngestionMode valueOf(int value) { + public static IngestionMode valueOf(int value) throws VeniceMessageException { IngestionMode type = INTEGER_INGESTION_MODE_MAP.get(value); if (type == null) { throw new VeniceMessageException("Invalid ingestion mode: " + value); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java index 49ff957a677..240043521e9 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java @@ -32,7 +32,7 @@ public class WriterChunkingHelper { * @param isChunkAwareCallback boolean flag indicating whether to create chunk * @param sizeReport supplier function for size report. * @param maxSizeForUserPayloadPerMessageInBytes maximum size for payload in a message - * @param keyWithChunkingSuffixSerializer Chuncking suffix serializer for key + * @param keyWithChunkingSuffixSerializer Chunking suffix serializer for key * @param sendMessageFunction Pass in function for sending message * @return Chunked payload arrays and manifest. */ diff --git a/internal/venice-common/src/main/resources/avro/ChunkedValueManifest/v-20/ChunkedValueManifest.avsc b/internal/venice-common/src/main/resources/avro/ChunkedValueManifest/v-20/ChunkedValueManifest.avsc index 4f91de0d4fa..25a9bbe4360 100644 --- a/internal/venice-common/src/main/resources/avro/ChunkedValueManifest/v-20/ChunkedValueManifest.avsc +++ b/internal/venice-common/src/main/resources/avro/ChunkedValueManifest/v-20/ChunkedValueManifest.avsc @@ -1,7 +1,7 @@ { "name": "ChunkedValueManifest", "namespace": "com.linkedin.venice.storage.protocol", - "doc": "This record maintains chunking information in order to re-assemble a value value that was split in many chunks. The version of this schema is intentionally set to -1 because this is what will be used in the schema part of the value field, representing a special system-type schema, as opposed to a user-defined schema.", + "doc": "This record maintains chunking information in order to re-assemble a value that was split in many chunks. The version of this schema is intentionally set to -1 because this is what will be used in the schema part of the value field, representing a special system-type schema, as opposed to a user-defined schema.", "type": "record", "fields": [ { diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/chunking/TestChunkKeyValueTransformerImpl.java b/internal/venice-common/src/test/java/com/linkedin/venice/chunking/TestChunkKeyValueTransformerImpl.java index ffba95f7b4e..dd4cd197e47 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/chunking/TestChunkKeyValueTransformerImpl.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/chunking/TestChunkKeyValueTransformerImpl.java @@ -1,5 +1,8 @@ package com.linkedin.venice.chunking; +import static com.linkedin.venice.utils.ChunkingTestUtils.createChunkBytes; +import static com.linkedin.venice.utils.ChunkingTestUtils.createChunkedKeySuffix; + import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer; import com.linkedin.venice.serialization.avro.ChunkedKeySuffixSerializer; import com.linkedin.venice.storage.protocol.ChunkedKeySuffix; @@ -16,7 +19,7 @@ public class TestChunkKeyValueTransformerImpl { @Test public void testSplitCompositeKeyWithNonChunkValue() { - byte[] firstPartBytes = TestChunkingUtils.createChunkBytes(5, 1213); + byte[] firstPartBytes = createChunkBytes(5, 1213); byte[] secondPartBytes = serialize(KeyWithChunkingSuffixSerializer.NON_CHUNK_KEY_SUFFIX); byte[] compositeKey = combineParts(firstPartBytes, secondPartBytes); ChunkKeyValueTransformer chunkKeyValueTransformer = new ChunkKeyValueTransformerImpl(ChunkedKeySuffix.SCHEMA$); @@ -41,11 +44,11 @@ public void testSplitCompositeKeyWithChunkValue() { ChunkKeyValueTransformer chunkKeyValueTransformer = new ChunkKeyValueTransformerImpl(ChunkedKeySuffix.SCHEMA$); List firstParts = new ArrayList<>(); firstParts.add(KeyWithChunkingSuffixSerializer.NON_CHUNK_KEY_SUFFIX); - firstParts.add(TestChunkingUtils.createChunkedKeySuffix(12, 123, 12)); - firstParts.add(TestChunkingUtils.createChunkedKeySuffix(1212, 12331, 121213)); - firstParts.add(TestChunkingUtils.createChunkedKeySuffix(0, 0, 0)); - firstParts.add(TestChunkingUtils.createChunkedKeySuffix(Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE)); - firstParts.add(TestChunkingUtils.createChunkedKeySuffix(Integer.MIN_VALUE, Integer.MIN_VALUE, Integer.MIN_VALUE)); + firstParts.add(createChunkedKeySuffix(12, 123, 12)); + firstParts.add(createChunkedKeySuffix(1212, 12331, 121213)); + firstParts.add(createChunkedKeySuffix(0, 0, 0)); + firstParts.add(createChunkedKeySuffix(Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE)); + firstParts.add(createChunkedKeySuffix(Integer.MIN_VALUE, Integer.MIN_VALUE, Integer.MIN_VALUE)); List secondParts = new ArrayList<>(firstParts); for (int i = 0; i < firstParts.size(); i++) { diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/chunking/TestChunkingUtils.java b/internal/venice-common/src/test/java/com/linkedin/venice/chunking/TestChunkingUtils.java deleted file mode 100644 index 2513d2f672f..00000000000 --- a/internal/venice-common/src/test/java/com/linkedin/venice/chunking/TestChunkingUtils.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.linkedin.venice.chunking; - -import com.linkedin.venice.kafka.protocol.GUID; -import com.linkedin.venice.storage.protocol.ChunkId; -import com.linkedin.venice.storage.protocol.ChunkedKeySuffix; - - -public class TestChunkingUtils { - private TestChunkingUtils() { - // Util class - } - - public static byte[] createChunkBytes(int startValue, final int chunkLength) { - byte[] chunkBytes = new byte[chunkLength]; - for (int i = 0; i < chunkBytes.length; i++) { - chunkBytes[i] = (byte) startValue; - startValue++; - } - return chunkBytes; - } - - public static ChunkedKeySuffix createChunkedKeySuffix( - int firstChunkSegmentNumber, - int firstChunkSequenceNumber, - int chunkIndex) { - ChunkId chunkId = new ChunkId(); - chunkId.segmentNumber = firstChunkSegmentNumber; - chunkId.messageSequenceNumber = firstChunkSequenceNumber; - chunkId.chunkIndex = chunkIndex; - chunkId.producerGUID = new GUID(); - ChunkedKeySuffix chunkedKeySuffix = new ChunkedKeySuffix(); - chunkedKeySuffix.chunkId = chunkId; - return chunkedKeySuffix; - } -} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatch.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatch.java index 2845c69e49d..6019758399c 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatch.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatch.java @@ -1,5 +1,6 @@ package com.linkedin.venice.endToEnd; +import static com.linkedin.davinci.stats.HostLevelIngestionStats.ASSEMBLED_RECORD_VALUE_SIZE_IN_BYTES; import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; import static com.linkedin.venice.hadoop.VenicePushJobConstants.ALLOW_DUPLICATE_KEY; import static com.linkedin.venice.hadoop.VenicePushJobConstants.COMPRESSION_METRIC_COLLECTION_ENABLED; @@ -60,11 +61,14 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.hadoop.spark.datawriter.jobs.DataWriterSparkJob; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceServerWrapper; import com.linkedin.venice.meta.BackupStrategy; import com.linkedin.venice.meta.Version; import com.linkedin.venice.read.RequestType; +import com.linkedin.venice.stats.AbstractVeniceStats; import com.linkedin.venice.system.store.MetaStoreDataType; import com.linkedin.venice.systemstore.schemas.StoreMetaKey; +import com.linkedin.venice.tehuti.MetricsUtils; import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.DictionaryUtils; import com.linkedin.venice.utils.KeyAndValueSchemas; @@ -115,6 +119,7 @@ public abstract class TestBatch { private static final Logger LOGGER = LogManager.getLogger(TestBatch.class); protected static final int TEST_TIMEOUT = 120 * Time.MS_PER_SECOND; private static final int MAX_RETRY_ATTEMPTS = 3; + protected static final int MAX_RECORD_VALUE_SIZE = 3 * 1024 * 1024; // 3 MB apiece protected static final String BASE_DATA_PATH_1 = Utils.getTempDataDirectory().getAbsolutePath(); protected static final String BASE_DATA_PATH_2 = Utils.getTempDataDirectory().getAbsolutePath(); @@ -622,6 +627,10 @@ public void testKafkaInputBatchJob() throws Exception { inputDir -> new KeyAndValueSchemas(writeSimpleAvroFileWithStringToStringSchema(inputDir)), properties -> {}, validator); + + // Since chunking was not enabled, verify that the assembled record size metrics are not collected + assertUnusedPerStoreMetrics(storeName, ASSEMBLED_RECORD_VALUE_SIZE_IN_BYTES); + // Re-push with Kafka Input testRepush(storeName, validator); } @@ -942,6 +951,28 @@ void validate( MetricsRepository metricsRepository) throws Exception; } + private List getPerStoreMetricValues(String storeName, String sensorName) { + String metricName = AbstractVeniceStats.getSensorFullName(storeName, sensorName); + List veniceServers = veniceCluster.getVeniceServers(); + return Arrays.asList( + MetricsUtils.getMin(metricName + ".Min", veniceServers), // default=Double.MIN_VALUE + MetricsUtils.getMax(metricName + ".Max", veniceServers), // default=Double.MAX_VALUE + MetricsUtils.getAvg(metricName + ".Avg", veniceServers)); // default=NaN + } + + private void validatePerStoreMetricsRange(String storeName, String sensorName, double minValue, double maxValue) { + getPerStoreMetricValues(storeName, sensorName).forEach(value -> { + Assert.assertTrue(value >= minValue, "Metric value expected >= " + minValue + " actual=" + value); + Assert.assertTrue(value <= maxValue, "Metric value expected <= " + maxValue + " actual=" + value); + }); + } + + private void assertUnusedPerStoreMetrics(String storeName, String sensorName) { + getPerStoreMetricValues(storeName, sensorName).forEach(value -> { + Assert.assertTrue(value == Double.MIN_VALUE || value == Double.MAX_VALUE || value.isNaN(), "Needs to be invalid"); + }); + } + @Test(timeOut = TEST_TIMEOUT) public void testLargeValues() throws Exception { try { @@ -951,7 +982,16 @@ public void testLargeValues() throws Exception { // push is expected to fail because of large values } - testStoreWithLargeValues(true); + String storeName = testStoreWithLargeValues(true); + + // Verify that after records are chunked and re-assembled, the original sizes of these records are being recorded + // to the metrics sensor, and are within the correct size range. It doesn't work in isolated ingestion mode. + if (veniceCluster.getVeniceServers().stream().noneMatch(VeniceServerWrapper::isIsolatedIngestionEnabled)) { + int minSize = 1024 * 1024; // 1MB apiece + validatePerStoreMetricsRange(storeName, ASSEMBLED_RECORD_VALUE_SIZE_IN_BYTES, minSize, MAX_RECORD_VALUE_SIZE); + } else { + assertUnusedPerStoreMetrics(storeName, ASSEMBLED_RECORD_VALUE_SIZE_IN_BYTES); + } } @Test(timeOut = TEST_TIMEOUT * 3, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) @@ -987,18 +1027,17 @@ private String testStoreWithLargeValues( boolean isChunkingAllowed, Consumer extraProps, String existingStore) throws Exception { - int maxValueSize = 3 * 1024 * 1024; // 3 MB apiece int numberOfRecords = 10; InputFileWriter inputFileWriter = inputDir -> new KeyAndValueSchemas( - writeSimpleAvroFileWithCustomSize(inputDir, numberOfRecords, 0, maxValueSize)); + writeSimpleAvroFileWithCustomSize(inputDir, numberOfRecords, 0, MAX_RECORD_VALUE_SIZE)); VPJValidator dataValidator = (avroClient, vsonClient, metricsRepository) -> { Set keys = new HashSet<>(10); // Single gets for (int i = 0; i < numberOfRecords; i++) { - int expectedSize = maxValueSize / numberOfRecords * (i + 1); + int expectedSize = MAX_RECORD_VALUE_SIZE / numberOfRecords * (i + 1); String key = Integer.toString(i); keys.add(key); char[] chars = new char[expectedSize]; @@ -1051,7 +1090,7 @@ private String testStoreWithLargeValues( Map jsonResults = (Map) vsonClient.batchGet(keys).get(); for (String key: keys) { int i = Integer.parseInt(key); - int expectedSize = maxValueSize / numberOfRecords * (i + 1); + int expectedSize = MAX_RECORD_VALUE_SIZE / numberOfRecords * (i + 1); char[] chars = new char[expectedSize]; Arrays.fill(chars, key.charAt(0)); String expectedString = new String(chars); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java index 7c8ef1b4000..2d3716ce6a9 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java @@ -26,6 +26,7 @@ import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_HEARTBEAT_INTERVAL_MS; import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_APPLICATION_PORT; import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_SERVICE_PORT; +import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_MODE; import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS; import static com.linkedin.venice.ConfigKeys.SERVER_MAX_WAIT_FOR_VERSION_INFO_MS_CONFIG; import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS; @@ -41,8 +42,10 @@ import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.exceptions.VeniceMessageException; import com.linkedin.venice.helix.AllowlistAccessor; import com.linkedin.venice.helix.ZkAllowlistAccessor; +import com.linkedin.venice.meta.IngestionMode; import com.linkedin.venice.security.SSLFactory; import com.linkedin.venice.server.VeniceServer; import com.linkedin.venice.server.VeniceServerContext; @@ -394,6 +397,16 @@ public int getGrpcPort() { return serverProps.getInt(GRPC_READ_SERVER_PORT); } + public boolean isIsolatedIngestionEnabled() { + try { + return IngestionMode.valueOf( + serverProps.getString(SERVER_INGESTION_MODE, IngestionMode.BUILT_IN.toString())) == IngestionMode.ISOLATED; + } catch (VeniceMessageException e) { + LOGGER.error("Invalid ingestion mode: {}", serverProps.getString(SERVER_INGESTION_MODE)); + } + return false; + } + @Override protected void internalStart() throws Exception { if (!forkServer) { diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/ChunkingTestUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/ChunkingTestUtils.java new file mode 100644 index 00000000000..0540a4fd557 --- /dev/null +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/ChunkingTestUtils.java @@ -0,0 +1,184 @@ +package com.linkedin.venice.utils; + +import static com.linkedin.venice.kafka.protocol.enums.MessageType.*; + +import com.linkedin.venice.kafka.protocol.Delete; +import com.linkedin.venice.kafka.protocol.GUID; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.ProducerMetadata; +import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.kafka.protocol.Update; +import com.linkedin.venice.kafka.protocol.enums.MessageType; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.pubsub.ImmutablePubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer; +import com.linkedin.venice.storage.protocol.ChunkId; +import com.linkedin.venice.storage.protocol.ChunkedKeySuffix; +import com.linkedin.venice.storage.protocol.ChunkedValueManifest; +import java.nio.ByteBuffer; +import java.util.ArrayList; + + +public final class ChunkingTestUtils { + private ChunkingTestUtils() { + // Util class + } + + public final static int CHUNK_LENGTH = 10; + public final static KeyWithChunkingSuffixSerializer KEY_WITH_CHUNKING_SUFFIX_SERIALIZER = + new KeyWithChunkingSuffixSerializer(); + + public static byte[] createChunkBytes(int startValue, final int chunkLength) { + byte[] chunkBytes = new byte[chunkLength]; + for (int i = 0; i < chunkBytes.length; i++) { + chunkBytes[i] = (byte) startValue; + startValue++; + } + return chunkBytes; + } + + public static ChunkedKeySuffix createChunkedKeySuffix( + int firstSegmentNumber, + int firstSequenceNumber, + int chunkIndex) { + ChunkId chunkId = new ChunkId(); + chunkId.segmentNumber = firstSegmentNumber; + chunkId.messageSequenceNumber = firstSequenceNumber; + chunkId.chunkIndex = chunkIndex; + chunkId.producerGUID = new GUID(); + ChunkedKeySuffix chunkedKeySuffix = new ChunkedKeySuffix(); + chunkedKeySuffix.chunkId = chunkId; + return chunkedKeySuffix; + } + + public static ByteBuffer prependSchemaId(byte[] valueBytes, int schemaId) { + ByteBuffer prependedValueBytes = ByteUtils.enlargeByteBufferForIntHeader(ByteBuffer.wrap(valueBytes)); + prependedValueBytes.putInt(0, schemaId); + return prependedValueBytes; + } + + public static KafkaMessageEnvelope createKafkaMessageEnvelope( + MessageType messageType, + int segmentNumber, + int sequenceNumber) { + KafkaMessageEnvelope messageEnvelope = new KafkaMessageEnvelope(); + messageEnvelope.messageType = messageType.getValue(); + messageEnvelope.producerMetadata = new ProducerMetadata(); + messageEnvelope.producerMetadata.messageTimestamp = 0; + messageEnvelope.producerMetadata.segmentNumber = segmentNumber; + messageEnvelope.producerMetadata.messageSequenceNumber = sequenceNumber; + messageEnvelope.producerMetadata.producerGUID = new GUID(); + return messageEnvelope; + } + + public static PubSubMessage createChunkedRecord( + byte[] serializedKey, + int firstSegmentNumber, + int firstSequenceNumber, + int chunkIndex, + int firstMessageOffset, + PubSubTopicPartition pubSubTopicPartition) { + long newOffset = firstMessageOffset + chunkIndex; + int newSequenceNumber = firstSequenceNumber + chunkIndex; + ChunkedKeySuffix chunkKeySuffix = createChunkedKeySuffix(firstSegmentNumber, firstSequenceNumber, chunkIndex); + ByteBuffer chunkKeyWithSuffix = + KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeChunkedKey(serializedKey, chunkKeySuffix); + KafkaKey kafkaKey = new KafkaKey(PUT, ByteUtils.extractByteArray(chunkKeyWithSuffix)); + KafkaMessageEnvelope messageEnvelope = createKafkaMessageEnvelope(PUT, firstSegmentNumber, newSequenceNumber); + + Put put = new Put(); + put.schemaId = AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion(); + byte[] valueBytes = createChunkBytes(chunkIndex * CHUNK_LENGTH, CHUNK_LENGTH); + put.putValue = prependSchemaId(valueBytes, put.schemaId); + put.replicationMetadataPayload = ByteBuffer.allocate(10); + messageEnvelope.payloadUnion = put; + return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, newOffset, 0, 20); + } + + public static PubSubMessage createChunkValueManifestRecord( + byte[] serializedKey, + PubSubMessage firstMessage, + int numberOfChunks, + PubSubTopicPartition pubSubTopicPartition) { + long newOffset = firstMessage.getOffset() + numberOfChunks; + byte[] chunkKeyWithSuffix = KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(serializedKey); + KafkaKey kafkaKey = new KafkaKey(PUT, chunkKeyWithSuffix); + KafkaMessageEnvelope messageEnvelope = createKafkaMessageEnvelope( + PUT, + firstMessage.getValue().getProducerMetadata().segmentNumber, + firstMessage.getValue().getProducerMetadata().messageSequenceNumber + numberOfChunks); + + ChunkedValueManifestSerializer chunkedValueManifestSerializer = new ChunkedValueManifestSerializer(true); + ChunkedValueManifest manifest = new ChunkedValueManifest(); + manifest.keysWithChunkIdSuffix = new ArrayList<>(numberOfChunks); + manifest.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(); + manifest.size = numberOfChunks * CHUNK_LENGTH; + manifest.keysWithChunkIdSuffix.add(ByteBuffer.wrap(firstMessage.getKey().getKey())); + byte[] valueBytes = chunkedValueManifestSerializer.serialize(manifest).array(); + + Put put = new Put(); + put.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(); + put.putValue = prependSchemaId(valueBytes, put.schemaId); + put.replicationMetadataPayload = ByteBuffer.allocate(10); + messageEnvelope.payloadUnion = put; + return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, newOffset, 0, 20); + } + + public static PubSubMessage createDeleteRecord( + byte[] serializedKey, + byte[] serializedRmd, + PubSubTopicPartition pubSubTopicPartition) { + byte[] chunkKeyWithSuffix = KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(serializedKey); + KafkaKey kafkaKey = new KafkaKey(DELETE, chunkKeyWithSuffix); + KafkaMessageEnvelope messageEnvelope = createKafkaMessageEnvelope(DELETE, 0, 0); + + Delete delete = new Delete(); + delete.schemaId = 1; + if (serializedRmd != null) { + delete.replicationMetadataPayload = ByteBuffer.wrap(serializedRmd); + delete.replicationMetadataVersionId = 1; + } + messageEnvelope.payloadUnion = delete; + return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, 1, 0, 20); + } + + public static PubSubMessage createPutRecord( + byte[] serializedKey, + byte[] serializedValue, + byte[] serializedRmd, + PubSubTopicPartition pubSubTopicPartition) { + byte[] chunkKeyWithSuffix = KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(serializedKey); + KafkaKey kafkaKey = new KafkaKey(PUT, chunkKeyWithSuffix); + KafkaMessageEnvelope messageEnvelope = createKafkaMessageEnvelope(PUT, 0, 0); + + Put put = new Put(); + put.schemaId = 1; + put.putValue = ByteBuffer.wrap(serializedValue); + if (serializedRmd != null) { + put.replicationMetadataPayload = ByteBuffer.wrap(serializedRmd); + put.replicationMetadataVersionId = 1; + } + messageEnvelope.payloadUnion = put; + return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, 1, 0, serializedValue.length); + } + + public static PubSubMessage createUpdateRecord( + byte[] serializedKey, + byte[] serializedValue, + PubSubTopicPartition pubSubTopicPartition) { + byte[] chunkKeyWithSuffix = KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(serializedKey); + KafkaKey kafkaKey = new KafkaKey(UPDATE, chunkKeyWithSuffix); + KafkaMessageEnvelope messageEnvelope = createKafkaMessageEnvelope(UPDATE, 0, 0); + + Update update = new Update(); + update.schemaId = 1; + update.updateValue = ByteBuffer.wrap(serializedValue); + update.updateSchemaId = 1; + messageEnvelope.payloadUnion = update; + return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, 1, 0, serializedValue.length); + } +}