Skip to content

Commit

Permalink
[server] Fully Assembled Record Size Metric (#1009)
Browse files Browse the repository at this point in the history
Added a new per-store metric called `assembled_record_value_size_in_bytes` at the drainer stage. This will record the assembled / original size of larger / chunked records by recording the size field of the `ChunkedValueManifest`. The size field is sufficient, because the size of the fully assembled record is known before the `VeniceWriter` performs chunking. The metric is added to the drainer (which is after any partial update reassembling work), so all large records should be covered.

Moved `TestChunkingUtils` into the `venice-test-common` project, which is hopefully a more easily accessible test utils location and can be helpful in more tests in the future. Moved several helper functions from `TestKafkaTopicDumper` into `TestChunkingUtils`, in hopes that they can be more easily reused in creating chunked `PubSubMessage` objects. Added a test util `prependSchemaId()` to help with prepending the schema id into the put value bytes, which is useful in multiple other tests.

Added a unit test with `testAssembledValueSizeSensor()` in `TestStoreIngestionTask` where a chunked record and its manifest are sent to the drainer, and the metrics should be populated. Added a check inside `testLargeValues()` in `TestBatch` that the metric is populated if chunking is enabled when sending large records and that the metric is not populated, otherwise.
  • Loading branch information
KaiSernLim authored Jun 10, 2024
1 parent 8e78b81 commit e382726
Show file tree
Hide file tree
Showing 15 changed files with 387 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,11 @@
import com.linkedin.venice.pubsub.manager.TopicManagerRepository;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.serializer.AvroGenericDeserializer;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.system.store.MetaStoreWriter;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.ComplementSet;
Expand Down Expand Up @@ -139,6 +141,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -322,6 +325,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected final StorageEngineBackedCompressorFactory compressorFactory;
protected final Lazy<VeniceCompressor> compressor;
protected final boolean isChunked;
protected final ChunkedValueManifestSerializer manifestSerializer;
protected final PubSubTopicRepository pubSubTopicRepository;
private final String[] msgForLagMeasurement;
private final Runnable runnableForKillIngestionTasksForNonCurrentVersions;
Expand Down Expand Up @@ -477,6 +481,7 @@ public StoreIngestionTask(
this.compressorFactory = builder.getCompressorFactory();
this.compressor = Lazy.of(() -> compressorFactory.getCompressor(compressionStrategy, kafkaVersionTopic));
this.isChunked = version.isChunkingEnabled();
this.manifestSerializer = new ChunkedValueManifestSerializer(true);
this.msgForLagMeasurement = new String[subPartitionCount];
for (int i = 0; i < this.msgForLagMeasurement.length; i++) {
this.msgForLagMeasurement[i] = kafkaVersionTopic + "_" + i;
Expand Down Expand Up @@ -3226,6 +3231,18 @@ private int processKafkaDataMessage(
// update checksum for this PUT message if needed.
partitionConsumptionState.maybeUpdateExpectedChecksum(keyBytes, put);

if (metricsEnabled && recordLevelMetricEnabled.get()
&& put.getSchemaId() == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) {
try {
ChunkedValueManifest chunkedValueManifest = manifestSerializer.deserialize(
ByteUtils.extractByteArray(put.getPutValue()), // must be done before recordTransformer changes putValue
AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion());
hostLevelIngestionStats.recordAssembledValueSize(chunkedValueManifest.getSize(), currentTimeMs);
} catch (VeniceException | IllegalArgumentException | AvroRuntimeException e) {
LOGGER.error("Failed to deserialize ChunkedValueManifest to record assembled value size", e);
}
}

// Check if put.getSchemaId is positive, if not default to 1
int putSchemaId = put.getSchemaId() > 0 ? put.getSchemaId() : 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
* (3) Per store and total: The stat is registered for each store on this host and the total number for this host.
*/
public class HostLevelIngestionStats extends AbstractVeniceStats {
public static final String ASSEMBLED_RECORD_VALUE_SIZE_IN_BYTES = "assembled_record_value_size_in_bytes";

// The aggregated bytes ingested rate for the entire host
private final LongAdderRateGauge totalBytesConsumedRate;
// The aggregated records ingested rate for the entire host
Expand All @@ -47,6 +49,7 @@ public class HostLevelIngestionStats extends AbstractVeniceStats {
private final Sensor consumerRecordsQueuePutLatencySensor;
private final Sensor keySizeSensor;
private final Sensor valueSizeSensor;
private final Sensor assembledValueSizeSensor;
private final Sensor unexpectedMessageSensor;
private final Sensor inconsistentStoreMetadataSensor;
private final Sensor ingestionFailureSensor;
Expand Down Expand Up @@ -275,6 +278,14 @@ public HostLevelIngestionStats(
new Max(),
TehutiUtils.getPercentileStat(getName() + AbstractVeniceStats.DELIMITER + valueSizeSensorName));

this.assembledValueSizeSensor = registerSensor(
ASSEMBLED_RECORD_VALUE_SIZE_IN_BYTES,
new Avg(),
new Min(),
new Max(),
TehutiUtils
.getPercentileStat(getName() + AbstractVeniceStats.DELIMITER + ASSEMBLED_RECORD_VALUE_SIZE_IN_BYTES));

String viewTimerSensorName = "total_view_writer_latency";
this.viewProducerLatencySensor = registerPerStoreAndTotalSensor(
viewTimerSensorName,
Expand Down Expand Up @@ -458,6 +469,10 @@ public void recordValueSize(long bytes, long currentTimeMs) {
valueSizeSensor.record(bytes, currentTimeMs);
}

public void recordAssembledValueSize(long bytes, long currentTimeMs) {
assembledValueSizeSensor.record(bytes, currentTimeMs);
}

public void recordIngestionFailure() {
ingestionFailureSensor.record();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
* b) If it is negative, then it's a {@link ChunkedValueManifest}, and we continue to the next steps.
* 3. The {@link ChunkedValueManifest} is deserialized, and its chunk keys are extracted.
* 4. Each chunk key is queried.
* 5. The chunks are stitched back together using the various adpater interfaces of this package,
* depending on whether it is the single get or batch get/compute path that needs to re-assembe
* 5. The chunks are stitched back together using the various adapter interfaces of this package,
* depending on whether it is the single get or batch get/compute path that needs to re-assemble
* a chunked value.
*/
public class ChunkingUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME;
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;
import static com.linkedin.venice.utils.ByteUtils.SIZE_OF_INT;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
Expand Down Expand Up @@ -81,6 +80,7 @@
import com.linkedin.venice.storage.protocol.ChunkedKeySuffix;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.ChunkingTestUtils;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.lazy.Lazy;
Expand Down Expand Up @@ -314,11 +314,7 @@ public void testLeaderCanSendValueChunksIntoDrainer()
stringBuilder.append("abcdefghabcdefghabcdefghabcdefgh");
}
String valueString = stringBuilder.toString();
byte[] valueBytes = valueString.getBytes();
byte[] schemaIdPrependedValueBytes = new byte[4 + valueBytes.length];
ByteUtils.writeInt(schemaIdPrependedValueBytes, 1, 0);
System.arraycopy(valueBytes, 0, schemaIdPrependedValueBytes, 4, valueBytes.length);
ByteBuffer updatedValueBytes = ByteBuffer.wrap(schemaIdPrependedValueBytes, 4, valueBytes.length);
ByteBuffer updatedValueBytes = ChunkingTestUtils.prependSchemaId(valueString.getBytes(), 1);
ByteBuffer updatedRmdBytes = ByteBuffer.wrap(new byte[] { 0xa, 0xb });
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord = mock(PubSubMessage.class);
when(consumerRecord.getOffset()).thenReturn(100L);
Expand Down Expand Up @@ -485,17 +481,13 @@ public void testReadingChunkedRmdFromStorage() {
chunkedValueManifest.keysWithChunkIdSuffix = new ArrayList<>(1);
chunkedValueManifest.size = 8;
chunkedValueManifest.keysWithChunkIdSuffix.add(chunkedKeyWithSuffix1);
int manifestSchemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion();
byte[] chunkedManifestBytes = chunkedValueManifestSerializer.serialize(topicName, chunkedValueManifest);
byte[] chunkedManifestWithSchemaBytes = new byte[SIZE_OF_INT + chunkedManifestBytes.length];
System.arraycopy(chunkedManifestBytes, 0, chunkedManifestWithSchemaBytes, 4, chunkedManifestBytes.length);
ByteUtils.writeInt(
chunkedManifestWithSchemaBytes,
AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(),
0);
chunkedManifestBytes = ChunkingTestUtils.prependSchemaId(chunkedManifestBytes, manifestSchemaId).array();
byte[] topLevelKey2 = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(key2);
byte[] chunkedKey1InKey2 = chunkedKeyWithSuffix1.array();

when(storageEngine.getReplicationMetadata(subPartition, topLevelKey2)).thenReturn(chunkedManifestWithSchemaBytes);
when(storageEngine.getReplicationMetadata(subPartition, topLevelKey2)).thenReturn(chunkedManifestBytes);
when(storageEngine.getReplicationMetadata(subPartition, chunkedKey1InKey2)).thenReturn(chunkedValue1);
List<byte[]> chunkedValues = new ArrayList<>(1);
chunkedValues.add(chunkedValue1);
Expand Down Expand Up @@ -528,17 +520,12 @@ public void testReadingChunkedRmdFromStorage() {
chunkedValueManifest.keysWithChunkIdSuffix.add(chunkedKeyWithSuffix1);
chunkedValueManifest.keysWithChunkIdSuffix.add(chunkedKeyWithSuffix2);
chunkedManifestBytes = chunkedValueManifestSerializer.serialize(topicName, chunkedValueManifest);
chunkedManifestWithSchemaBytes = new byte[SIZE_OF_INT + chunkedManifestBytes.length];
System.arraycopy(chunkedManifestBytes, 0, chunkedManifestWithSchemaBytes, SIZE_OF_INT, chunkedManifestBytes.length);
ByteUtils.writeInt(
chunkedManifestWithSchemaBytes,
AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(),
0);
chunkedManifestBytes = ChunkingTestUtils.prependSchemaId(chunkedManifestBytes, manifestSchemaId).array();
byte[] topLevelKey3 = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(key3);
byte[] chunkedKey1InKey3 = chunkedKeyWithSuffix1.array();
byte[] chunkedKey2InKey3 = chunkedKeyWithSuffix2.array();

when(storageEngine.getReplicationMetadata(subPartition, topLevelKey3)).thenReturn(chunkedManifestWithSchemaBytes);
when(storageEngine.getReplicationMetadata(subPartition, topLevelKey3)).thenReturn(chunkedManifestBytes);
when(storageEngine.getReplicationMetadata(subPartition, chunkedKey1InKey3)).thenReturn(chunkedValue1);
when(storageEngine.getReplicationMetadata(subPartition, chunkedKey2InKey3)).thenReturn(chunkedValue2);
when(storageEngine.multiGetReplicationMetadata(eq(subPartition), any()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
import com.linkedin.venice.unit.matchers.NonEmptyStringMatcher;
import com.linkedin.venice.utils.ByteArray;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.ChunkingTestUtils;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.DiskUsage;
import com.linkedin.venice.utils.Pair;
Expand Down Expand Up @@ -4429,6 +4430,77 @@ public void testStoreIngestionRecordTransformerError(AAConfig aaConfig) throws E
.recordTransformerError(eq(storeNameWithoutVersionInfo), anyInt(), anyDouble(), anyLong());
}

@DataProvider
public static Object[][] testAssembledValueSizeProvider() {
Set<Integer> schemaIdSet = new HashSet<>();
for (AvroProtocolDefinition avroProtocolDefinition: AvroProtocolDefinition.values()) {
avroProtocolDefinition.currentProtocolVersion.ifPresent(schemaIdSet::add);
}
return DataProviderUtils.allPermutationGenerator(AAConfig.values(), schemaIdSet.toArray());
}

/**
* Create messages for a chunked record (in multiple chunks) and its manifest, and verify that the drainer
* records the size of the chunked record as described in the size field of the manifest.
* Also, verify that metrics are only emitted when the correct schemaId=-20 is on the manifest message,
* and not emitted on any other invalid schemaId values.
* @throws Exception
*/
@Test(dataProvider = "testAssembledValueSizeProvider")
public void testAssembledValueSizeSensor(AAConfig aaConfig, int testSchemaId) throws Exception {
int numChunks = 10;
long expectedRecordSize = (long) numChunks * ChunkingTestUtils.CHUNK_LENGTH;
PubSubTopicPartition tp = new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_FOO);
List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> messages = new ArrayList<>(numChunks + 1); // + manifest
for (int i = 0; i < numChunks; i++) {
messages.add(ChunkingTestUtils.createChunkedRecord(putKeyFoo, 1, 1, i, 0, tp));
}
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> manifestMessage =
ChunkingTestUtils.createChunkValueManifestRecord(putKeyFoo, messages.get(0), numChunks, tp);
messages.add(manifestMessage);

// The only expected real-life case should be when schemaId values are chunk=-10 and manifest=-20
boolean useRealSchemaId = testSchemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion();
runTest(Collections.singleton(PARTITION_FOO), () -> {
TestUtils.waitForNonDeterministicAssertion(
5,
TimeUnit.SECONDS,
() -> assertTrue(storeIngestionTaskUnderTest.hasAnySubscription()));

for (PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> message: messages) {
try {
Put put = (Put) message.getValue().getPayloadUnion();
if (!useRealSchemaId) {
put.schemaId = testSchemaId;
}
LeaderProducedRecordContext leaderProducedRecordContext = mock(LeaderProducedRecordContext.class);
when(leaderProducedRecordContext.getMessageType()).thenReturn(MessageType.PUT);
when(leaderProducedRecordContext.getValueUnion()).thenReturn(put);
when(leaderProducedRecordContext.getKeyBytes()).thenReturn(putKeyFoo);

storeIngestionTaskUnderTest.produceToStoreBufferService(
message,
leaderProducedRecordContext,
PARTITION_FOO,
localKafkaConsumerService.kafkaUrl,
System.nanoTime(),
System.currentTimeMillis());
} catch (InterruptedException e) {
throw new VeniceException(e);
}
}
}, aaConfig);

// Verify that the size of the assembled record was recorded in the metrics only if schemaId=-20
HostLevelIngestionStats hostLevelIngestionStats = storeIngestionTaskUnderTest.hostLevelIngestionStats;
if (useRealSchemaId) {
verify(hostLevelIngestionStats).recordAssembledValueSize(eq(expectedRecordSize), anyLong());
verify(hostLevelIngestionStats, times(1)).recordAssembledValueSize(anyLong(), anyLong());
} else {
verify(hostLevelIngestionStats, times(0)).recordAssembledValueSize(anyLong(), anyLong());
}
}

@Test
public void testGetOffsetToOnlineLagThresholdPerPartition() {
ReadOnlyStoreRepository storeRepository = mock(ReadOnlyStoreRepository.class);
Expand Down
Loading

0 comments on commit e382726

Please sign in to comment.