From f4dced525449e958dae8dc78a94df7b73a70df1d Mon Sep 17 00:00:00 2001 From: Nisarg Thakkar Date: Tue, 19 Mar 2024 12:01:59 -0700 Subject: [PATCH] [vpj] Cleanup dictionary fetching logic from various sources (#903) Currently VPJ either builds a dictionary or fetches it from various sources to optimize the compression ratios. However, this logic seemed a bit confusing and there was scope to streamline the implementation. This commit splits "VenicePushJob#getCompressionDictionary()" into two functions - "VenicePushJob#getCompressionDictionary()" and "VenicePushJob#fetchOrBuildCompressionDictionary()". "VenicePushJob#fetchOrBuildCompressionDictionary()" handles the logic of how to get the compression dictionary based on various store and job properties, and "VenicePushJob#getCompressionDictionary()" unifies the error handling and fallback to synthetic dictionaries if the compression strategy is "ZSTD_WITH_DICT". The case where use.mapper.to.build.dictionary is false, and dictionary build fails, would previously throw exceptions (even for "ZSTD_WITH_DICT"). This change handles it gracefully and uses a dictionary built on synthetic data instead. --- .../linkedin/venice/hadoop/VenicePushJob.java | 188 +++++++++--------- .../AbstractInputRecordProcessor.java | 22 +- 2 files changed, 101 insertions(+), 109 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index 9337ac2ff2c..5c9704c622b 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -1127,7 +1127,7 @@ protected static boolean shouldBuildZstdCompressionDictionary( if (!inputFileHasRecords) { if (pushJobSetting.storeCompressionStrategy == CompressionStrategy.ZSTD_WITH_DICT) { LOGGER.info( - "compression strategy is {} with no input records: dictionary will be generated from synthetic data or current version data for hybrid stores", + "Compression strategy is {} with no input records. A dictionary will be generated from synthetic data or current version data for hybrid stores", pushJobSetting.storeCompressionStrategy); } else { LOGGER.info("No compression dictionary will be generated as there are no records"); @@ -1425,8 +1425,37 @@ private ControllerClient getControllerClient( } private Optional getCompressionDictionary() throws VeniceException { - ByteBuffer compressionDictionary = null; + if (!pushJobSetting.isZstdDictCreationRequired) { + return Optional.empty(); + } + + ByteBuffer compressionDictionary; + try { + compressionDictionary = fetchOrBuildCompressionDictionary(); + } catch (Exception e) { + LOGGER.warn("Failed to fetch or build compression dictionary", e); + compressionDictionary = null; + } + + if (compressionDictionary != null && compressionDictionary.remaining() > 0) { + pushJobSetting.isZstdDictCreationSuccess = true; + return Optional.of(compressionDictionary); + } + + if (pushJobSetting.storeCompressionStrategy != CompressionStrategy.ZSTD_WITH_DICT) { + LOGGER.info( + "No dictionary fetched. But since the compression strategy is not {}, it is not required", + CompressionStrategy.ZSTD_WITH_DICT); + pushJobSetting.isZstdDictCreationSuccess = false; + return Optional.empty(); + } + + LOGGER.info("No dictionary fetched. Creating a default dictionary since compression strategy is ZSTD_WITH_DICT"); + pushJobSetting.isZstdDictCreationSuccess = true; + return Optional.of(emptyPushZstdDictionary.get()); + } + private ByteBuffer fetchOrBuildCompressionDictionary() throws VeniceException { // Prepare the param builder, which can be used by different scenarios. KafkaInputDictTrainer.ParamBuilder paramBuilder = new KafkaInputDictTrainer.ParamBuilder() .setKeySchema(AvroCompatibilityHelper.toParsingForm(pushJobSetting.storeKeySchema)) @@ -1438,20 +1467,16 @@ private Optional getCompressionDictionary() throws VeniceException { .setDictSampleSize( props.getInt(COMPRESSION_DICTIONARY_SAMPLE_SIZE, DEFAULT_COMPRESSION_DICTIONARY_SAMPLE_SIZE)); if (pushJobSetting.isSourceKafka) { - /** - * Currently KIF repush will always build a dict in Azkaban Job driver if necessary. - */ - boolean rebuildDict = pushJobSetting.kafkaInputBuildNewDictEnabled; paramBuilder.setSourceVersionChunkingEnabled(pushJobSetting.sourceKafkaInputVersionInfo.isChunkingEnabled()); - // Repush + // Currently, KIF repush will always build a dict in Azkaban Job driver if necessary. if (pushJobSetting.storeCompressionStrategy == CompressionStrategy.ZSTD_WITH_DICT) { - if (rebuildDict) { + if (pushJobSetting.kafkaInputBuildNewDictEnabled) { LOGGER.info("Rebuild a new Zstd dictionary from the input topic: {}", pushJobSetting.kafkaInputTopic); paramBuilder.setKafkaInputBroker(pushJobSetting.kafkaInputBrokerUrl) .setTopicName(pushJobSetting.kafkaInputTopic) .setSourceVersionCompressionStrategy(pushJobSetting.sourceKafkaInputVersionInfo.getCompressionStrategy()); KafkaInputDictTrainer dictTrainer = new KafkaInputDictTrainer(paramBuilder.build()); - compressionDictionary = ByteBuffer.wrap(dictTrainer.trainDict()); + return ByteBuffer.wrap(dictTrainer.trainDict()); } else { LOGGER.info("Reading Zstd dictionary from input topic: {}", pushJobSetting.kafkaInputTopic); // set up ssl properties and kafka consumer properties @@ -1460,104 +1485,70 @@ private Optional getCompressionDictionary() throws VeniceException { kafkaConsumerProperties.putAll(this.sslProperties.get()); } kafkaConsumerProperties.setProperty(KAFKA_BOOTSTRAP_SERVERS, pushJobSetting.kafkaInputBrokerUrl); - compressionDictionary = DictionaryUtils + return DictionaryUtils .readDictionaryFromKafka(pushJobSetting.kafkaInputTopic, new VeniceProperties(kafkaConsumerProperties)); } } + LOGGER.info( + "No dictionary will be fetched for repush workloads with compression strategy: {}", + CompressionStrategy.ZSTD_WITH_DICT); + return null; + } - return Optional.ofNullable(compressionDictionary); - } else { - if (pushJobSetting.isZstdDictCreationRequired) { - if (pushJobSetting.storeCompressionStrategy == CompressionStrategy.ZSTD_WITH_DICT - && !inputDataInfo.hasRecords()) { - /** - * Special handling for empty push with ZSTD_WITH_DICT: This compression strategy needs a dictionary even if - * there is no input data, so we generate a dictionary either based on synthetic data or from the current version. - */ - if (pushJobSetting.hybridStoreConfig != null) { - /** - * For hybrid store: Push Job will try to train a dict based on the records of the current version, and - * it won't work for the very first version, and the following versions will work. - */ - LOGGER.info( - "compression strategy is {} for hybrid store with no input records: Attempt to generate dictionary from current version data", - pushJobSetting.storeCompressionStrategy); - String storeName = getPushJobSetting().storeName; - try { - // Get the latest version - RepushInfoResponse repushInfoResponse = ControllerClient.retryableRequest( - controllerClient, - pushJobSetting.controllerRetries, - c -> c.getRepushInfo(storeName, Optional.empty())); - if (repushInfoResponse.isError()) { - throw new VeniceException( - "Could not get repush info for store " + storeName + " with error: " - + repushInfoResponse.getError()); - } - int sourceVersion = repushInfoResponse.getRepushInfo().getVersion().getNumber(); - String sourceTopicName = Version.composeKafkaTopic(storeName, sourceVersion); - String sourceKafkaUrl = repushInfoResponse.getRepushInfo().getKafkaBrokerUrl(); - LOGGER.info( - "Rebuild a new Zstd dictionary from the source topic: {} in Kafka: {}", - sourceTopicName, - sourceKafkaUrl); - paramBuilder.setKafkaInputBroker(repushInfoResponse.getRepushInfo().getKafkaBrokerUrl()) - .setTopicName(sourceTopicName) - .setSourceVersionCompressionStrategy( - repushInfoResponse.getRepushInfo().getVersion().getCompressionStrategy()); - KafkaInputDictTrainer dictTrainer = new KafkaInputDictTrainer(paramBuilder.build()); - compressionDictionary = ByteBuffer.wrap(dictTrainer.trainDict()); - - return Optional.of(compressionDictionary); - } catch (Exception e) { - LOGGER.warn( - "Encountered an exception when trying to build a dict from an existing version for an empty push to a hybrid store: " - + storeName + ", so the push job will use a default dict", - e); - } - } + if (pushJobSetting.storeCompressionStrategy == CompressionStrategy.ZSTD_WITH_DICT && !inputDataInfo.hasRecords()) { + LOGGER.info("Compression strategy is {} with no input records", pushJobSetting.storeCompressionStrategy); - /** - * For Batch only store or first push to a hybrid store: Build dictionary based on synthetic data - */ - LOGGER.info( - "compression strategy is {} with no input records: Generating dictionary from synthetic data", - pushJobSetting.storeCompressionStrategy); - compressionDictionary = emptyPushZstdDictionary.get(); - return Optional.of(compressionDictionary); - } + if (pushJobSetting.hybridStoreConfig == null) { + return null; + } - if (!pushJobSetting.useMapperToBuildDict) { - compressionDictionary = ByteBuffer.wrap(getInputDataInfoProvider().trainZstdDictionary()); - pushJobSetting.isZstdDictCreationSuccess = true; - } else { - if (pushJobSetting.isZstdDictCreationSuccess) { - LOGGER.info( - "Retrieving the Zstd dictionary trained by {}", - ValidateSchemaAndBuildDictMapper.class.getSimpleName()); - compressionDictionary = validateSchemaAndBuildDictMapperOutput.getZstdDictionary(); - } else { - if (pushJobSetting.storeCompressionStrategy == CompressionStrategy.ZSTD_WITH_DICT) { - // This should not happen - String err = "Dictionary creation failed for the configured ZSTD compression type"; - LOGGER.error(err); - throw new VeniceException(err); - } // else case: Dictionary creation failed, but it was not needed for the push job to succeed - } - } + /** + * Special handling for empty push with ZSTD_WITH_DICT. This compression strategy needs a dictionary even if + * there is no input data, so we try to generate a dictionary from the current version. Note that it won't work + * for the very first version, and the following versions will work. + */ + LOGGER.info("Since this is a hybrid store, attempting to generate dictionary from current version data"); + String storeName = getPushJobSetting().storeName; + + // Get the latest version + RepushInfoResponse repushInfoResponse = ControllerClient.retryableRequest( + controllerClient, + pushJobSetting.controllerRetries, + c -> c.getRepushInfo(storeName, Optional.empty())); + + if (repushInfoResponse.isError()) { + LOGGER.warn("Could not get repush info for store {} with error: {}", storeName, repushInfoResponse.getError()); + return null; } - if (compressionDictionary != null) { - LOGGER.info("Zstd dictionary size = {} bytes", compressionDictionary.limit()); - } else { + + int sourceVersion = repushInfoResponse.getRepushInfo().getVersion().getNumber(); + String sourceTopicName = Version.composeKafkaTopic(storeName, sourceVersion); + String sourceKafkaUrl = repushInfoResponse.getRepushInfo().getKafkaBrokerUrl(); + LOGGER.info( + "Rebuild a new Zstd dictionary from the source topic: {} in Kafka: {}", + sourceTopicName, + sourceKafkaUrl); + paramBuilder.setKafkaInputBroker(repushInfoResponse.getRepushInfo().getKafkaBrokerUrl()) + .setTopicName(sourceTopicName) + .setSourceVersionCompressionStrategy( + repushInfoResponse.getRepushInfo().getVersion().getCompressionStrategy()); + KafkaInputDictTrainer dictTrainer = new KafkaInputDictTrainer(paramBuilder.build()); + return ByteBuffer.wrap(dictTrainer.trainDict()); + } + + if (!pushJobSetting.useMapperToBuildDict) { + return ByteBuffer.wrap(getInputDataInfoProvider().trainZstdDictionary()); + } else { + // In case of pushJobSetting.useMapperToBuildDict job, the dictionary will already have been generated + if (pushJobSetting.isZstdDictCreationSuccess) { LOGGER.info( - "No Compression dictionary is generated with the compression strategy {} " - + "and compressionMetricCollectionEnabled is {}", - pushJobSetting.storeCompressionStrategy, - (pushJobSetting.compressionMetricCollectionEnabled ? "Enabled" : "Disabled")); + "Retrieving the Zstd dictionary trained by {}", + ValidateSchemaAndBuildDictMapper.class.getSimpleName()); + return validateSchemaAndBuildDictMapperOutput.getZstdDictionary(); } - - return Optional.ofNullable(compressionDictionary); } + + return null; } private void throwVeniceException(Throwable e) throws VeniceException { @@ -2153,7 +2144,8 @@ void createNewStoreVersion( Optional dictionary; if (askControllerToSendControlMessage) { - dictionary = optionalCompressionDictionary.map(ByteBuffer::array).map(EncodingUtils::base64EncodeToString); + dictionary = + optionalCompressionDictionary.map(ByteUtils::extractByteArray).map(EncodingUtils::base64EncodeToString); } else { dictionary = Optional.empty(); } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractInputRecordProcessor.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractInputRecordProcessor.java index db45efb1f79..ffaf2bcfd49 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractInputRecordProcessor.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractInputRecordProcessor.java @@ -48,7 +48,7 @@ public abstract class AbstractInputRecordProcessor exten private CompressionStrategy compressionStrategy; private boolean compressionMetricCollectionEnabled; private CompressorFactory compressorFactory; - private VeniceCompressor[] compressor; + private VeniceCompressor[] compressors; protected AbstractVeniceRecordReader veniceRecordReader; private static final byte[] EMPTY_BYTES = new byte[0]; @@ -121,11 +121,11 @@ protected boolean process( // Compress and save the details based on the configured compression strategy: This should not fail byte[] finalRecordValue; try { - finalRecordValue = compressor[compressionStrategy.getValue()].compress(recordValue); + finalRecordValue = compressors[compressionStrategy.getValue()].compress(recordValue); } catch (IOException e) { throw new VeniceException( "Caught an IO exception while trying to to use compression strategy: " - + compressor[compressionStrategy.getValue()].getCompressionStrategy().name(), + + compressors[compressionStrategy.getValue()].getCompressionStrategy().name(), e); } // record the final stored value length @@ -137,13 +137,13 @@ protected boolean process( // Compress based on all compression strategies to collect metrics byte[] compressedRecordValue; for (CompressionStrategy compressionStrategy: CompressionStrategy.values()) { - if (compressionStrategy != NO_OP && compressor[compressionStrategy.getValue()] != null) { + if (compressionStrategy != NO_OP && compressors[compressionStrategy.getValue()] != null) { if (compressionStrategy == this.compressionStrategy) { // Extra check to not redo compression compressedRecordValue = finalRecordValue; } else { try { - compressedRecordValue = compressor[compressionStrategy.getValue()].compress(recordValue); + compressedRecordValue = compressors[compressionStrategy.getValue()].compress(recordValue); } catch (IOException e) { LOGGER.warn( "Compression to collect metrics failed for compression strategy: {}", @@ -187,7 +187,7 @@ protected void configureTask(VeniceProperties props) { } // init compressor array - this.compressor = new VeniceCompressor[CompressionStrategy.getCompressionStrategyTypesArrayLength()]; + this.compressors = new VeniceCompressor[CompressionStrategy.getCompressionStrategyTypesArrayLength()]; setupCompression(props); } @@ -224,13 +224,13 @@ private void setupCompression(VeniceProperties props) { switch (compressionStrategy) { case NO_OP: case GZIP: - this.compressor[compressionStrategy.getValue()] = compressorFactory.getCompressor(compressionStrategy); + this.compressors[compressionStrategy.getValue()] = compressorFactory.getCompressor(compressionStrategy); break; case ZSTD_WITH_DICT: if (isZstdDictCreationRequired && isZstdDictCreationSuccess) { // case 1a - this.compressor[ZSTD_WITH_DICT.getValue()] = getZstdCompressor(props); + this.compressors[ZSTD_WITH_DICT.getValue()] = getZstdCompressor(props); } // else: case 1b or 1c break; @@ -248,11 +248,11 @@ private void setupCompression(VeniceProperties props) { if (compressionStrategy == ZSTD_WITH_DICT) { if (isZstdDictCreationRequired && isZstdDictCreationSuccess) { // case 2a - this.compressor[ZSTD_WITH_DICT.getValue()] = getZstdCompressor(props); + this.compressors[ZSTD_WITH_DICT.getValue()] = getZstdCompressor(props); } // else: case 2b } else { // case 2c - this.compressor[compressionStrategy.getValue()] = compressorFactory.getCompressor(compressionStrategy); + this.compressors[compressionStrategy.getValue()] = compressorFactory.getCompressor(compressionStrategy); } } } @@ -271,7 +271,7 @@ private VeniceCompressor getZstdCompressor(VeniceProperties props) { ByteBuffer compressionDictionary = readDictionaryFromKafka(topicName, props); int compressionLevel = props.getInt(ZSTD_COMPRESSION_LEVEL, Zstd.maxCompressionLevel()); - if (compressionDictionary != null && compressionDictionary.limit() > 0) { + if (compressionDictionary != null && compressionDictionary.remaining() > 0) { return compressorFactory.createVersionSpecificCompressorIfNotExist( ZSTD_WITH_DICT, topicName,