Skip to content

Commit

Permalink
[vpj] Cleanup dictionary fetching logic from various sources (#903)
Browse files Browse the repository at this point in the history
Currently VPJ either builds a dictionary or fetches it from various sources to optimize the compression ratios. However, this logic seemed a bit confusing and there was scope to streamline the implementation. This commit splits "VenicePushJob#getCompressionDictionary()" into two functions - "VenicePushJob#getCompressionDictionary()" and "VenicePushJob#fetchOrBuildCompressionDictionary()". "VenicePushJob#fetchOrBuildCompressionDictionary()" handles the logic of how to get the compression dictionary based on various store and job properties, and "VenicePushJob#getCompressionDictionary()" unifies the error handling and fallback to synthetic dictionaries if the compression strategy is "ZSTD_WITH_DICT".

The case where use.mapper.to.build.dictionary is false, and dictionary build fails, would previously throw exceptions (even for "ZSTD_WITH_DICT"). This change handles it gracefully and uses a dictionary built on synthetic data instead.
  • Loading branch information
nisargthakkar authored Mar 19, 2024
1 parent f0725b4 commit f4dced5
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,7 @@ protected static boolean shouldBuildZstdCompressionDictionary(
if (!inputFileHasRecords) {
if (pushJobSetting.storeCompressionStrategy == CompressionStrategy.ZSTD_WITH_DICT) {
LOGGER.info(
"compression strategy is {} with no input records: dictionary will be generated from synthetic data or current version data for hybrid stores",
"Compression strategy is {} with no input records. A dictionary will be generated from synthetic data or current version data for hybrid stores",
pushJobSetting.storeCompressionStrategy);
} else {
LOGGER.info("No compression dictionary will be generated as there are no records");
Expand Down Expand Up @@ -1425,8 +1425,37 @@ private ControllerClient getControllerClient(
}

private Optional<ByteBuffer> getCompressionDictionary() throws VeniceException {
ByteBuffer compressionDictionary = null;
if (!pushJobSetting.isZstdDictCreationRequired) {
return Optional.empty();
}

ByteBuffer compressionDictionary;
try {
compressionDictionary = fetchOrBuildCompressionDictionary();
} catch (Exception e) {
LOGGER.warn("Failed to fetch or build compression dictionary", e);
compressionDictionary = null;
}

if (compressionDictionary != null && compressionDictionary.remaining() > 0) {
pushJobSetting.isZstdDictCreationSuccess = true;
return Optional.of(compressionDictionary);
}

if (pushJobSetting.storeCompressionStrategy != CompressionStrategy.ZSTD_WITH_DICT) {
LOGGER.info(
"No dictionary fetched. But since the compression strategy is not {}, it is not required",
CompressionStrategy.ZSTD_WITH_DICT);
pushJobSetting.isZstdDictCreationSuccess = false;
return Optional.empty();
}

LOGGER.info("No dictionary fetched. Creating a default dictionary since compression strategy is ZSTD_WITH_DICT");
pushJobSetting.isZstdDictCreationSuccess = true;
return Optional.of(emptyPushZstdDictionary.get());
}

private ByteBuffer fetchOrBuildCompressionDictionary() throws VeniceException {
// Prepare the param builder, which can be used by different scenarios.
KafkaInputDictTrainer.ParamBuilder paramBuilder = new KafkaInputDictTrainer.ParamBuilder()
.setKeySchema(AvroCompatibilityHelper.toParsingForm(pushJobSetting.storeKeySchema))
Expand All @@ -1438,20 +1467,16 @@ private Optional<ByteBuffer> getCompressionDictionary() throws VeniceException {
.setDictSampleSize(
props.getInt(COMPRESSION_DICTIONARY_SAMPLE_SIZE, DEFAULT_COMPRESSION_DICTIONARY_SAMPLE_SIZE));
if (pushJobSetting.isSourceKafka) {
/**
* Currently KIF repush will always build a dict in Azkaban Job driver if necessary.
*/
boolean rebuildDict = pushJobSetting.kafkaInputBuildNewDictEnabled;
paramBuilder.setSourceVersionChunkingEnabled(pushJobSetting.sourceKafkaInputVersionInfo.isChunkingEnabled());
// Repush
// Currently, KIF repush will always build a dict in Azkaban Job driver if necessary.
if (pushJobSetting.storeCompressionStrategy == CompressionStrategy.ZSTD_WITH_DICT) {
if (rebuildDict) {
if (pushJobSetting.kafkaInputBuildNewDictEnabled) {
LOGGER.info("Rebuild a new Zstd dictionary from the input topic: {}", pushJobSetting.kafkaInputTopic);
paramBuilder.setKafkaInputBroker(pushJobSetting.kafkaInputBrokerUrl)
.setTopicName(pushJobSetting.kafkaInputTopic)
.setSourceVersionCompressionStrategy(pushJobSetting.sourceKafkaInputVersionInfo.getCompressionStrategy());
KafkaInputDictTrainer dictTrainer = new KafkaInputDictTrainer(paramBuilder.build());
compressionDictionary = ByteBuffer.wrap(dictTrainer.trainDict());
return ByteBuffer.wrap(dictTrainer.trainDict());
} else {
LOGGER.info("Reading Zstd dictionary from input topic: {}", pushJobSetting.kafkaInputTopic);
// set up ssl properties and kafka consumer properties
Expand All @@ -1460,104 +1485,70 @@ private Optional<ByteBuffer> getCompressionDictionary() throws VeniceException {
kafkaConsumerProperties.putAll(this.sslProperties.get());
}
kafkaConsumerProperties.setProperty(KAFKA_BOOTSTRAP_SERVERS, pushJobSetting.kafkaInputBrokerUrl);
compressionDictionary = DictionaryUtils
return DictionaryUtils
.readDictionaryFromKafka(pushJobSetting.kafkaInputTopic, new VeniceProperties(kafkaConsumerProperties));
}
}
LOGGER.info(
"No dictionary will be fetched for repush workloads with compression strategy: {}",
CompressionStrategy.ZSTD_WITH_DICT);
return null;
}

return Optional.ofNullable(compressionDictionary);
} else {
if (pushJobSetting.isZstdDictCreationRequired) {
if (pushJobSetting.storeCompressionStrategy == CompressionStrategy.ZSTD_WITH_DICT
&& !inputDataInfo.hasRecords()) {
/**
* Special handling for empty push with ZSTD_WITH_DICT: This compression strategy needs a dictionary even if
* there is no input data, so we generate a dictionary either based on synthetic data or from the current version.
*/
if (pushJobSetting.hybridStoreConfig != null) {
/**
* For hybrid store: Push Job will try to train a dict based on the records of the current version, and
* it won't work for the very first version, and the following versions will work.
*/
LOGGER.info(
"compression strategy is {} for hybrid store with no input records: Attempt to generate dictionary from current version data",
pushJobSetting.storeCompressionStrategy);
String storeName = getPushJobSetting().storeName;
try {
// Get the latest version
RepushInfoResponse repushInfoResponse = ControllerClient.retryableRequest(
controllerClient,
pushJobSetting.controllerRetries,
c -> c.getRepushInfo(storeName, Optional.empty()));
if (repushInfoResponse.isError()) {
throw new VeniceException(
"Could not get repush info for store " + storeName + " with error: "
+ repushInfoResponse.getError());
}
int sourceVersion = repushInfoResponse.getRepushInfo().getVersion().getNumber();
String sourceTopicName = Version.composeKafkaTopic(storeName, sourceVersion);
String sourceKafkaUrl = repushInfoResponse.getRepushInfo().getKafkaBrokerUrl();
LOGGER.info(
"Rebuild a new Zstd dictionary from the source topic: {} in Kafka: {}",
sourceTopicName,
sourceKafkaUrl);
paramBuilder.setKafkaInputBroker(repushInfoResponse.getRepushInfo().getKafkaBrokerUrl())
.setTopicName(sourceTopicName)
.setSourceVersionCompressionStrategy(
repushInfoResponse.getRepushInfo().getVersion().getCompressionStrategy());
KafkaInputDictTrainer dictTrainer = new KafkaInputDictTrainer(paramBuilder.build());
compressionDictionary = ByteBuffer.wrap(dictTrainer.trainDict());

return Optional.of(compressionDictionary);
} catch (Exception e) {
LOGGER.warn(
"Encountered an exception when trying to build a dict from an existing version for an empty push to a hybrid store: "
+ storeName + ", so the push job will use a default dict",
e);
}
}
if (pushJobSetting.storeCompressionStrategy == CompressionStrategy.ZSTD_WITH_DICT && !inputDataInfo.hasRecords()) {
LOGGER.info("Compression strategy is {} with no input records", pushJobSetting.storeCompressionStrategy);

/**
* For Batch only store or first push to a hybrid store: Build dictionary based on synthetic data
*/
LOGGER.info(
"compression strategy is {} with no input records: Generating dictionary from synthetic data",
pushJobSetting.storeCompressionStrategy);
compressionDictionary = emptyPushZstdDictionary.get();
return Optional.of(compressionDictionary);
}
if (pushJobSetting.hybridStoreConfig == null) {
return null;
}

if (!pushJobSetting.useMapperToBuildDict) {
compressionDictionary = ByteBuffer.wrap(getInputDataInfoProvider().trainZstdDictionary());
pushJobSetting.isZstdDictCreationSuccess = true;
} else {
if (pushJobSetting.isZstdDictCreationSuccess) {
LOGGER.info(
"Retrieving the Zstd dictionary trained by {}",
ValidateSchemaAndBuildDictMapper.class.getSimpleName());
compressionDictionary = validateSchemaAndBuildDictMapperOutput.getZstdDictionary();
} else {
if (pushJobSetting.storeCompressionStrategy == CompressionStrategy.ZSTD_WITH_DICT) {
// This should not happen
String err = "Dictionary creation failed for the configured ZSTD compression type";
LOGGER.error(err);
throw new VeniceException(err);
} // else case: Dictionary creation failed, but it was not needed for the push job to succeed
}
}
/**
* Special handling for empty push with ZSTD_WITH_DICT. This compression strategy needs a dictionary even if
* there is no input data, so we try to generate a dictionary from the current version. Note that it won't work
* for the very first version, and the following versions will work.
*/
LOGGER.info("Since this is a hybrid store, attempting to generate dictionary from current version data");
String storeName = getPushJobSetting().storeName;

// Get the latest version
RepushInfoResponse repushInfoResponse = ControllerClient.retryableRequest(
controllerClient,
pushJobSetting.controllerRetries,
c -> c.getRepushInfo(storeName, Optional.empty()));

if (repushInfoResponse.isError()) {
LOGGER.warn("Could not get repush info for store {} with error: {}", storeName, repushInfoResponse.getError());
return null;
}
if (compressionDictionary != null) {
LOGGER.info("Zstd dictionary size = {} bytes", compressionDictionary.limit());
} else {

int sourceVersion = repushInfoResponse.getRepushInfo().getVersion().getNumber();
String sourceTopicName = Version.composeKafkaTopic(storeName, sourceVersion);
String sourceKafkaUrl = repushInfoResponse.getRepushInfo().getKafkaBrokerUrl();
LOGGER.info(
"Rebuild a new Zstd dictionary from the source topic: {} in Kafka: {}",
sourceTopicName,
sourceKafkaUrl);
paramBuilder.setKafkaInputBroker(repushInfoResponse.getRepushInfo().getKafkaBrokerUrl())
.setTopicName(sourceTopicName)
.setSourceVersionCompressionStrategy(
repushInfoResponse.getRepushInfo().getVersion().getCompressionStrategy());
KafkaInputDictTrainer dictTrainer = new KafkaInputDictTrainer(paramBuilder.build());
return ByteBuffer.wrap(dictTrainer.trainDict());
}

if (!pushJobSetting.useMapperToBuildDict) {
return ByteBuffer.wrap(getInputDataInfoProvider().trainZstdDictionary());
} else {
// In case of pushJobSetting.useMapperToBuildDict job, the dictionary will already have been generated
if (pushJobSetting.isZstdDictCreationSuccess) {
LOGGER.info(
"No Compression dictionary is generated with the compression strategy {} "
+ "and compressionMetricCollectionEnabled is {}",
pushJobSetting.storeCompressionStrategy,
(pushJobSetting.compressionMetricCollectionEnabled ? "Enabled" : "Disabled"));
"Retrieving the Zstd dictionary trained by {}",
ValidateSchemaAndBuildDictMapper.class.getSimpleName());
return validateSchemaAndBuildDictMapperOutput.getZstdDictionary();
}

return Optional.ofNullable(compressionDictionary);
}

return null;
}

private void throwVeniceException(Throwable e) throws VeniceException {
Expand Down Expand Up @@ -2153,7 +2144,8 @@ void createNewStoreVersion(

Optional<String> dictionary;
if (askControllerToSendControlMessage) {
dictionary = optionalCompressionDictionary.map(ByteBuffer::array).map(EncodingUtils::base64EncodeToString);
dictionary =
optionalCompressionDictionary.map(ByteUtils::extractByteArray).map(EncodingUtils::base64EncodeToString);
} else {
dictionary = Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public abstract class AbstractInputRecordProcessor<INPUT_KEY, INPUT_VALUE> exten
private CompressionStrategy compressionStrategy;
private boolean compressionMetricCollectionEnabled;
private CompressorFactory compressorFactory;
private VeniceCompressor[] compressor;
private VeniceCompressor[] compressors;

protected AbstractVeniceRecordReader<INPUT_KEY, INPUT_VALUE> veniceRecordReader;
private static final byte[] EMPTY_BYTES = new byte[0];
Expand Down Expand Up @@ -121,11 +121,11 @@ protected boolean process(
// Compress and save the details based on the configured compression strategy: This should not fail
byte[] finalRecordValue;
try {
finalRecordValue = compressor[compressionStrategy.getValue()].compress(recordValue);
finalRecordValue = compressors[compressionStrategy.getValue()].compress(recordValue);
} catch (IOException e) {
throw new VeniceException(
"Caught an IO exception while trying to to use compression strategy: "
+ compressor[compressionStrategy.getValue()].getCompressionStrategy().name(),
+ compressors[compressionStrategy.getValue()].getCompressionStrategy().name(),
e);
}
// record the final stored value length
Expand All @@ -137,13 +137,13 @@ protected boolean process(
// Compress based on all compression strategies to collect metrics
byte[] compressedRecordValue;
for (CompressionStrategy compressionStrategy: CompressionStrategy.values()) {
if (compressionStrategy != NO_OP && compressor[compressionStrategy.getValue()] != null) {
if (compressionStrategy != NO_OP && compressors[compressionStrategy.getValue()] != null) {
if (compressionStrategy == this.compressionStrategy) {
// Extra check to not redo compression
compressedRecordValue = finalRecordValue;
} else {
try {
compressedRecordValue = compressor[compressionStrategy.getValue()].compress(recordValue);
compressedRecordValue = compressors[compressionStrategy.getValue()].compress(recordValue);
} catch (IOException e) {
LOGGER.warn(
"Compression to collect metrics failed for compression strategy: {}",
Expand Down Expand Up @@ -187,7 +187,7 @@ protected void configureTask(VeniceProperties props) {
}

// init compressor array
this.compressor = new VeniceCompressor[CompressionStrategy.getCompressionStrategyTypesArrayLength()];
this.compressors = new VeniceCompressor[CompressionStrategy.getCompressionStrategyTypesArrayLength()];
setupCompression(props);
}

Expand Down Expand Up @@ -224,13 +224,13 @@ private void setupCompression(VeniceProperties props) {
switch (compressionStrategy) {
case NO_OP:
case GZIP:
this.compressor[compressionStrategy.getValue()] = compressorFactory.getCompressor(compressionStrategy);
this.compressors[compressionStrategy.getValue()] = compressorFactory.getCompressor(compressionStrategy);
break;

case ZSTD_WITH_DICT:
if (isZstdDictCreationRequired && isZstdDictCreationSuccess) {
// case 1a
this.compressor[ZSTD_WITH_DICT.getValue()] = getZstdCompressor(props);
this.compressors[ZSTD_WITH_DICT.getValue()] = getZstdCompressor(props);
} // else: case 1b or 1c
break;

Expand All @@ -248,11 +248,11 @@ private void setupCompression(VeniceProperties props) {
if (compressionStrategy == ZSTD_WITH_DICT) {
if (isZstdDictCreationRequired && isZstdDictCreationSuccess) {
// case 2a
this.compressor[ZSTD_WITH_DICT.getValue()] = getZstdCompressor(props);
this.compressors[ZSTD_WITH_DICT.getValue()] = getZstdCompressor(props);
} // else: case 2b
} else {
// case 2c
this.compressor[compressionStrategy.getValue()] = compressorFactory.getCompressor(compressionStrategy);
this.compressors[compressionStrategy.getValue()] = compressorFactory.getCompressor(compressionStrategy);
}
}
}
Expand All @@ -271,7 +271,7 @@ private VeniceCompressor getZstdCompressor(VeniceProperties props) {
ByteBuffer compressionDictionary = readDictionaryFromKafka(topicName, props);
int compressionLevel = props.getInt(ZSTD_COMPRESSION_LEVEL, Zstd.maxCompressionLevel());

if (compressionDictionary != null && compressionDictionary.limit() > 0) {
if (compressionDictionary != null && compressionDictionary.remaining() > 0) {
return compressorFactory.createVersionSpecificCompressorIfNotExist(
ZSTD_WITH_DICT,
topicName,
Expand Down

0 comments on commit f4dced5

Please sign in to comment.