Skip to content

Commit

Permalink
[vpj] Clean up PushJobSetting to remove DataWriterMRJob specific fiel…
Browse files Browse the repository at this point in the history
…ds (#898)

"PushJobSetting" has two configs that are specific to "DataWriterMRJob". This is an anti-pattern and this commit moves them into "DataWriterMRJob"
  • Loading branch information
nisargthakkar authored Mar 14, 2024
1 parent da8f900 commit c769e89
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.etl.ETLValueSchemaTransformation;
import com.linkedin.venice.hadoop.jobs.DataWriterComputeJob;
import com.linkedin.venice.hadoop.mapreduce.datawriter.partition.VeniceMRPartitioner;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.schema.vson.VsonSchema;
import java.io.Serializable;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.hadoop.mapred.Partitioner;


/**
Expand All @@ -37,7 +35,6 @@ public class PushJobSetting implements Serializable {
public String incrementalPushVersion;
public boolean isDuplicateKeyAllowed;
public boolean enablePushJobStatusUpload;
public boolean enableReducerSpeculativeExecution;
public int controllerRetries;
public int controllerStatusPollRetries;
public long pollJobStatusIntervalMs;
Expand Down Expand Up @@ -150,7 +147,4 @@ public class PushJobSetting implements Serializable {
public transient Version sourceKafkaInputVersionInfo;
public CompressionStrategy sourceVersionCompressionStrategy;
public boolean sourceVersionChunkingEnabled;

// Configs to help with testing
public Class<? extends Partitioner> mapReducePartitionerClass = VeniceMRPartitioner.class;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import static com.linkedin.venice.hadoop.VenicePushJobConstants.LEGACY_AVRO_KEY_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.LEGACY_AVRO_VALUE_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.MAPPER_OUTPUT_DIRECTORY;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.MAP_REDUCE_PARTITIONER_CLASS_CONFIG;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.MULTI_REGION;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.NON_CRITICAL_EXCEPTION;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.NOT_SET;
Expand All @@ -58,7 +57,6 @@
import static com.linkedin.venice.hadoop.VenicePushJobConstants.POLL_STATUS_RETRY_ATTEMPTS;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.POST_VALIDATION_CONSUMPTION_ENABLED;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.PUSH_JOB_STATUS_UPLOAD_ENABLE;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.REDUCER_SPECULATIVE_EXECUTION_ENABLE;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_ENABLE;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_START_TIMESTAMP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.REWIND_EPOCH_TIME_BUFFER_IN_SECONDS_OVERRIDE;
Expand Down Expand Up @@ -346,8 +344,6 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) {
pushJobSettingToReturn.isIncrementalPush = props.getBoolean(INCREMENTAL_PUSH, false);
pushJobSettingToReturn.isDuplicateKeyAllowed = props.getBoolean(ALLOW_DUPLICATE_KEY, false);
pushJobSettingToReturn.enablePushJobStatusUpload = props.getBoolean(PUSH_JOB_STATUS_UPLOAD_ENABLE, false);
pushJobSettingToReturn.enableReducerSpeculativeExecution =
props.getBoolean(REDUCER_SPECULATIVE_EXECUTION_ENABLE, false);
pushJobSettingToReturn.controllerRetries = props.getInt(CONTROLLER_REQUEST_RETRY_ATTEMPTS, 1);
pushJobSettingToReturn.controllerStatusPollRetries = props.getInt(POLL_STATUS_RETRY_ATTEMPTS, 15);
pushJobSettingToReturn.pollJobStatusIntervalMs =
Expand Down Expand Up @@ -474,12 +470,6 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) {
pushJobSettingToReturn.dataWriterComputeJobClass = objectClass;
}

// Test related configs
if (props.containsKey(MAP_REDUCE_PARTITIONER_CLASS_CONFIG)) {
pushJobSettingToReturn.mapReducePartitionerClass =
ReflectUtils.loadClass(props.getString(MAP_REDUCE_PARTITIONER_CLASS_CONFIG));
}

return pushJobSettingToReturn;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_SECURITY_PROTOCOL;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_SOURCE_KEY_SCHEMA_STRING_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.KEY_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.MAP_REDUCE_PARTITIONER_CLASS_CONFIG;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.PARTITION_COUNT;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.REDUCER_SPECULATIVE_EXECUTION_ENABLE;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_ENABLE;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_POLICY;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.REPUSH_TTL_START_TIMESTAMP;
Expand Down Expand Up @@ -67,9 +69,11 @@
import com.linkedin.venice.hadoop.mapreduce.common.JobUtils;
import com.linkedin.venice.hadoop.mapreduce.datawriter.map.VeniceAvroMapper;
import com.linkedin.venice.hadoop.mapreduce.datawriter.map.VeniceVsonMapper;
import com.linkedin.venice.hadoop.mapreduce.datawriter.partition.VeniceMRPartitioner;
import com.linkedin.venice.hadoop.mapreduce.datawriter.reduce.VeniceReducer;
import com.linkedin.venice.hadoop.mapreduce.datawriter.task.CounterBackedMapReduceDataWriterTaskTracker;
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
import com.linkedin.venice.utils.ReflectUtils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.VeniceWriter;
import java.io.IOException;
Expand All @@ -82,6 +86,7 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.logging.log4j.LogManager;
Expand All @@ -95,6 +100,7 @@ public class DataWriterMRJob extends DataWriterComputeJob {
private static final Logger LOGGER = LogManager.getLogger(DataWriterMRJob.class);
public static final String HADOOP_PREFIX = "hadoop-conf.";

private VeniceProperties vpjProperties;
private PushJobSetting pushJobSetting;
private JobConf jobConf;

Expand All @@ -105,6 +111,7 @@ public class DataWriterMRJob extends DataWriterComputeJob {
@Override
public void configure(VeniceProperties props, PushJobSetting pushJobSetting) {
this.jobConf = new JobConf();
this.vpjProperties = props;
this.pushJobSetting = pushJobSetting;
setupMRConf(jobConf, pushJobSetting, props);
}
Expand Down Expand Up @@ -290,9 +297,16 @@ private void setupReducerConf(JobConf jobConf, PushJobSetting pushJobSetting) {
jobConf.setCombinerKeyGroupingComparator(KafkaInputValueGroupingComparator.class);
jobConf.setPartitionerClass(KafkaInputMRPartitioner.class);
} else {
jobConf.setPartitionerClass(pushJobSetting.mapReducePartitionerClass);
final Class<? extends Partitioner> partitionerClass;
// Test related configs
if (vpjProperties.containsKey(MAP_REDUCE_PARTITIONER_CLASS_CONFIG)) {
partitionerClass = ReflectUtils.loadClass(vpjProperties.getString(MAP_REDUCE_PARTITIONER_CLASS_CONFIG));
} else {
partitionerClass = VeniceMRPartitioner.class;
}
jobConf.setPartitionerClass(partitionerClass);
}
jobConf.setReduceSpeculativeExecution(pushJobSetting.enableReducerSpeculativeExecution);
jobConf.setReduceSpeculativeExecution(vpjProperties.getBoolean(REDUCER_SPECULATIVE_EXECUTION_ENABLE, false));
int partitionCount = pushJobSetting.partitionCount * pushJobSetting.amplificationFactor;
jobConf.setInt(PARTITION_COUNT, partitionCount);
jobConf.setNumReduceTasks(partitionCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ private void logAccumulatorValues() {
logAccumulatorValue(accumulatorsForDataWriterJob.outputRecordCounter);
logAccumulatorValue(accumulatorsForDataWriterJob.emptyRecordCounter);
logAccumulatorValue(accumulatorsForDataWriterJob.totalKeySizeCounter);
logAccumulatorValue(accumulatorsForDataWriterJob.uncompressedValueSizeCounter);
logAccumulatorValue(accumulatorsForDataWriterJob.compressedValueSizeCounter);
logAccumulatorValue(accumulatorsForDataWriterJob.gzipCompressedValueSizeCounter);
logAccumulatorValue(accumulatorsForDataWriterJob.zstdCompressedValueSizeCounter);
Expand Down

0 comments on commit c769e89

Please sign in to comment.