Skip to content

Commit

Permalink
[vpj] Add Spark as a compute engine for DataWriter job (#851)
Browse files Browse the repository at this point in the history
This PR adds Spark as an alternate compute engine to run the Data Writer task. Currently, it does not handle KIF and the code is written in a way that it will fall back to the MapReduce implementation in case KIF is used. This will be addressed in a follow up PR.

To use the new Spark based flow, set the following VPJ config:
"data.writer.compute.job.class = com.linkedin.venice.hadoop.spark.datawriter.jobs.DataWriterSparkJob"

VPJ has the following new configs:
1. "venice.spark.cluster": This will configure the spark.master config.
    * Refer to https://spark.apache.org/docs/latest/submitting-applications.html#master-urls for the supported values
2. "venice.spark.session.conf.*"
    * Configs with this prefix will be set when building the spark session
    * These will get applied to all Spark jobs that get triggered as a part of VPJ
    * It can be used to configure arbitrary cluster properties
3. "spark.data.writer.conf.*"
    * Configs with this prefix will be set when building the data writer spark job and passed as job properties
    * These will only get applied on the DataWriter Spark jobs
    * It is useful when there are custom input formats which need additional configs to be able to read the data
  • Loading branch information
nisargthakkar authored Feb 22, 2024
1 parent 3eff563 commit d14722a
Show file tree
Hide file tree
Showing 53 changed files with 2,947 additions and 123 deletions.
16 changes: 16 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@ def pulsarGroup = 'org.apache.pulsar'
def pulsarVersion = '2.10.3'
def alpnAgentVersion = '2.0.10'
def hadoopVersion = '2.10.2'
def apacheSparkVersion = '3.1.3'

ext.libraries = [
alpnAgent: "org.mortbay.jetty.alpn:jetty-alpn-agent:${alpnAgentVersion}",
apacheSparkAvro: "org.apache.spark:spark-avro_2.12:${apacheSparkVersion}",
apacheSparkCore: "org.apache.spark:spark-core_2.12:${apacheSparkVersion}",
apacheSparkSql: "org.apache.spark:spark-sql_2.12:${apacheSparkVersion}",
avro: "org.apache.avro:avro:${avroVersion}",
avroCompiler: "org.apache.avro:avro-compiler:${avroVersion}",
avroMapred: "org.apache.avro:avro-mapred:${avroVersion}",
Expand Down Expand Up @@ -368,6 +372,18 @@ subprojects {
tasks.withType(Test) {
mustRunAfter tasks.withType(SpotBugsTask)

// For Spark to run on Java 17
jvmArgs "-XX:+IgnoreUnrecognizedVMOptions"
jvmArgs "--add-opens=java.base/java.nio=ALL-UNNAMED"
jvmArgs "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
jvmArgs "--add-opens=java.base/java.lang=ALL-UNNAMED"
jvmArgs "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"
jvmArgs "--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED"

if (JavaVersion.current() == JavaVersion.VERSION_11) {
jvmArgs "-Dio.netty.tryReflectionSetAccessible=true"
}

forkEvery = Integer.valueOf(System.getProperty('forkEvery', '0'))
maxParallelForks = Integer.valueOf(System.getProperty('maxParallelForks', '4'))
minHeapSize = System.getProperty('minHeapSize', '1g')
Expand Down
30 changes: 29 additions & 1 deletion clients/venice-push-job/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,35 @@ dependencies {
exclude group: 'javax.servlet'
}

implementation project(':clients:venice-thin-client') // Needed by the Post Bulk-load Analysis Job
implementation (libraries.apacheSparkAvro) {
// Spark 3.1 depends on Avro 1.8.2 - which uses avro-mapred with the hadoop2 classifier. Starting from Avro 1.9
// onwards, avro-mapred is no longer published with a hadoop2 classifier, but Gradle still looks for one.
exclude group: 'org.apache.avro', module: 'avro-mapred'

// Spark 3.3 depends on hadoop-client-runtime and hadoop-client-api, which are shaded jars that were added in Hadoop 3.0.3
exclude group: 'org.apache.hadoop', module: 'hadoop-client-runtime'
exclude group: 'org.apache.hadoop', module: 'hadoop-client-api'
}
implementation (libraries.apacheSparkCore) {
// Spark 3.1 depends on Avro 1.8.2 - which uses avro-mapred with the hadoop2 classifier. Starting from Avro 1.9
// onwards, avro-mapred is no longer published with a hadoop2 classifier, but Gradle still looks for one.
exclude group: 'org.apache.avro', module: 'avro-mapred'

// Spark 3.3 depends on hadoop-client-runtime and hadoop-client-api, which are shaded jars that were added in Hadoop 3.0.3
exclude group: 'org.apache.hadoop', module: 'hadoop-client-runtime'
exclude group: 'org.apache.hadoop', module: 'hadoop-client-api'
}
implementation (libraries.apacheSparkSql) {
// Spark 3.1 depends on Avro 1.8.2 - which uses avro-mapred with the hadoop2 classifier. Starting from Avro 1.9
// onwards, avro-mapred is no longer published with a hadoop2 classifier, but Gradle still looks for one.
exclude group: 'org.apache.avro', module: 'avro-mapred'

// Spark 3.3 depends on hadoop-client-runtime and hadoop-client-api, which are shaded jars that were added in Hadoop 3.0.3
exclude group: 'org.apache.hadoop', module: 'hadoop-client-runtime'
exclude group: 'org.apache.hadoop', module: 'hadoop-client-api'
}

implementation project(':clients:venice-thin-client') // Needed by the KME SchemaReader

implementation libraries.commonsIo
implementation libraries.fastUtil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class DefaultInputDataInfoProvider implements InputDataInfoProvider {
*/
private final Lazy<ExecutorService> hdfsExecutorService;

DefaultInputDataInfoProvider(PushJobSetting pushJobSetting, VeniceProperties props) {
public DefaultInputDataInfoProvider(PushJobSetting pushJobSetting, VeniceProperties props) {
this.pushJobSetting = pushJobSetting;
this.props = props;
this.hdfsExecutorService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@ public interface InputDataInfoProvider extends Closeable {
/**
* A POJO that contains input data information (schema information and input data file size)
*/
Logger LOGGER = LogManager.getLogger(InputDataInfoProvider.class);

class InputDataInfo {
private long inputFileDataSizeInBytes;
private final int numInputFiles;
private final boolean hasRecords;
private final long inputModificationTime;

InputDataInfo(long inputFileDataSizeInBytes, int numInputFiles, boolean hasRecords, long inputModificationTime) {
public InputDataInfo(
long inputFileDataSizeInBytes,
int numInputFiles,
boolean hasRecords,
long inputModificationTime) {
this(inputFileDataSizeInBytes, numInputFiles, hasRecords, inputModificationTime, true);
}

InputDataInfo(
public InputDataInfo(
long inputFileDataSizeInBytes,
int numInputFiles,
boolean hasRecords,
Expand Down Expand Up @@ -78,6 +81,9 @@ public long getInputModificationTime() {
* @param recordIterator The data accessor of input records.
*/
static void loadZstdTrainingSamples(VeniceRecordIterator recordIterator, PushJobZstdConfig pushJobZstdConfig) {
// It's preferable to make this as "private static final" in the class-level, but it's not possible due to
// "InputDataInfoProvider" being an interface.
final Logger logger = LogManager.getLogger(InputDataInfo.class);
int fileSampleSize = 0;
while (recordIterator.next()) {
if (recordIterator.getCurrentKey() == null) {
Expand All @@ -93,7 +99,7 @@ static void loadZstdTrainingSamples(VeniceRecordIterator recordIterator, PushJob
// At least 1 sample per file should be added until the max sample size is reached
if (fileSampleSize > 0) {
if (fileSampleSize + value.length > pushJobZstdConfig.getMaxBytesPerFile()) {
LOGGER.debug(
logger.debug(
"Read {} to build dictionary. Reached limit per file of {}.",
ByteUtils.generateHumanReadableByteCountString(fileSampleSize),
ByteUtils.generateHumanReadableByteCountString(pushJobZstdConfig.getMaxBytesPerFile()));
Expand All @@ -103,7 +109,7 @@ static void loadZstdTrainingSamples(VeniceRecordIterator recordIterator, PushJob

// addSample returns false when the data read no longer fits in the 'sample' buffer limit
if (!pushJobZstdConfig.getZstdDictTrainer().addSample(value)) {
LOGGER.debug(
logger.debug(
"Read {} to build dictionary. Reached sample limit of {}.",
ByteUtils.generateHumanReadableByteCountString(fileSampleSize),
ByteUtils.generateHumanReadableByteCountString(pushJobZstdConfig.getMaxSampleSize()));
Expand All @@ -114,7 +120,7 @@ static void loadZstdTrainingSamples(VeniceRecordIterator recordIterator, PushJob
pushJobZstdConfig.incrCollectedNumberOfSamples();
}

LOGGER.debug(
logger.debug(
"Read {} to build dictionary. Reached EOF.",
ByteUtils.generateHumanReadableByteCountString(fileSampleSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public class VenicePushJob implements AutoCloseable {
AvroProtocolDefinition.PUSH_JOB_DETAILS.getSerializer();

private InputStorageQuotaTracker inputStorageQuotaTracker;
private PushJobHeartbeatSenderFactory pushJobHeartbeatSenderFactory;
private final PushJobHeartbeatSenderFactory pushJobHeartbeatSenderFactory;
private boolean pushJobStatusUploadDisabledHasBeenLogged = false;

/**
Expand All @@ -246,7 +246,7 @@ public class VenicePushJob implements AutoCloseable {
* 3. Negative enums are error scenarios (Can be user or system errors)
*/
public enum PushJobCheckpoints {
INITIALIZE_PUSH_JOB(0), NEW_VERSION_CREATED(1), START_MAP_REDUCE_JOB(2), MAP_REDUCE_JOB_COMPLETED(3),
INITIALIZE_PUSH_JOB(0), NEW_VERSION_CREATED(1), START_DATA_WRITER_JOB(2), DATA_WRITER_JOB_COMPLETED(3),
START_JOB_STATUS_POLLING(4), JOB_STATUS_POLLING_COMPLETED(5), START_VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB(6),
VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED(7), QUOTA_EXCEEDED(-1), WRITE_ACL_FAILED(-2),
DUP_KEY_WITH_DIFF_VALUE(-3), INPUT_DATA_SCHEMA_VALIDATION_FAILED(-4),
Expand Down Expand Up @@ -1003,34 +1003,38 @@ private void validateInputDataSchema(String inputDataSchemaString) {
}

void runJobAndUpdateStatus() {
updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.START_MAP_REDUCE_JOB);
LOGGER.info("Configuring data writer job");
dataWriterComputeJob = getDataWriterComputeJob();
dataWriterComputeJob.configure(props, pushJobSetting);
LOGGER.info("Triggering data writer job");
dataWriterComputeJob.runJob();
if (dataWriterComputeJob.getStatus() != ComputeJob.Status.SUCCEEDED) {
if (!pushJobSetting.isSourceKafka) {
try {
checkLastModificationTimeAndLog();
} catch (IOException e) {
LOGGER.warn("Failed to check last modification time of input file", e);
try {
updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.START_DATA_WRITER_JOB);
LOGGER.info("Configuring data writer job");
dataWriterComputeJob = getDataWriterComputeJob();
dataWriterComputeJob.configure(props, pushJobSetting);
LOGGER.info("Triggering data writer job");
dataWriterComputeJob.runJob();
if (dataWriterComputeJob.getStatus() != ComputeJob.Status.SUCCEEDED) {
if (!pushJobSetting.isSourceKafka) {
try {
checkLastModificationTimeAndLog();
} catch (IOException e) {
LOGGER.warn("Failed to check last modification time of input file", e);
}
}
Throwable t = dataWriterComputeJob.getFailureReason();
if (t == null) {
throw new VeniceException(
"Data writer job failed unexpectedly with status: " + dataWriterComputeJob.getStatus());
} else {
throwVeniceException(t);
}
}
Throwable t = dataWriterComputeJob.getFailureReason();
if (t == null) {
throw new VeniceException(
"Data writer job failed unexpectedly with status: " + dataWriterComputeJob.getStatus());
} else {
throwVeniceException(t);
}
} else {
String errorMessage = updatePushJobDetailsWithJobDetails(dataWriterComputeJob.getTaskTracker());
if (errorMessage != null) {
throw new VeniceException(errorMessage);
String errorMessage = updatePushJobDetailsWithJobDetails(dataWriterComputeJob.getTaskTracker());
if (errorMessage != null) {
throw new VeniceException(errorMessage);
}
}
updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.DATA_WRITER_JOB_COMPLETED);
} finally {
Utils.closeQuietlyWithErrorLogged(dataWriterComputeJob);
}
updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.MAP_REDUCE_JOB_COMPLETED);
}

private void runValidateSchemaAndBuildDictJobAndUpdateStatus(JobConf conf) throws Exception {
Expand Down Expand Up @@ -1646,7 +1650,7 @@ private void updatePushJobDetailsWithDataWriterTracker() {
// size of the Zstd with Dict compressed data
pushJobDetails.totalZstdWithDictCompressedValueBytes = taskTracker.getTotalZstdCompressedValueSize();
LOGGER.info(
"pushJobDetails MR Counters: " + "\n\tTotal number of records: {} " + "\n\tSize of keys: {} "
"Data writer job summary: " + "\n\tTotal number of records: {} " + "\n\tSize of keys: {} "
+ "\n\tsize of uncompressed value: {} " + "\n\tConfigured value Compression Strategy: {} "
+ "\n\tFinal data size stored in Venice based on this compression strategy: {} "
+ "\n\tCompression Metrics collection is: {} ",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.linkedin.venice.hadoop.input.recordreader.avro;

import com.linkedin.venice.exceptions.VeniceUnsupportedOperationException;
import com.linkedin.venice.hadoop.input.recordreader.AbstractVeniceRecordReader;
import com.linkedin.venice.utils.ByteUtils;
import java.nio.ByteBuffer;
import org.apache.avro.Schema;


/**
* A record reader that returns the input key and value as is.
*/
public class IdentityVeniceRecordReader extends AbstractVeniceRecordReader<ByteBuffer, ByteBuffer> {
private static final IdentityVeniceRecordReader INSTANCE = new IdentityVeniceRecordReader();

private IdentityVeniceRecordReader() {
final Schema BYTES_SCHEMA = Schema.create(Schema.Type.BYTES);
configure(BYTES_SCHEMA, BYTES_SCHEMA);
}

public static IdentityVeniceRecordReader getInstance() {
return INSTANCE;
}

@Override
public Object getAvroKey(ByteBuffer keyBytes, ByteBuffer valueBytes) {
throw new VeniceUnsupportedOperationException("getAvroKey in IdentityVeniceRecordReader");
}

@Override
public byte[] getKeyBytes(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
return ByteUtils.extractByteArray(keyBuffer);
}

@Override
public Object getAvroValue(ByteBuffer keyBytes, ByteBuffer valueBytes) {
throw new VeniceUnsupportedOperationException("getAvroValue in IdentityVeniceRecordReader");
}

@Override
public byte[] getValueBytes(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
return ByteUtils.extractByteArray(valueBuffer);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package com.linkedin.venice.hadoop.jobs;

import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.PushJobSetting;
import com.linkedin.venice.hadoop.input.kafka.KafkaInputRecordReader;
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.VeniceWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -14,6 +20,20 @@
public abstract class DataWriterComputeJob implements ComputeJob {
private static final Logger LOGGER = LogManager.getLogger(DataWriterComputeJob.class);

/**
* Pass-through the properties whose names start with:
* <ul>
* <li> {@link VeniceWriter#VENICE_WRITER_CONFIG_PREFIX} </li>
* <li> {@link ConfigKeys#KAFKA_CONFIG_PREFIX} </li>
* <li> {@link KafkaInputRecordReader#KIF_RECORD_READER_KAFKA_CONFIG_PREFIX} </li>
* </ul>
**/
public static final List<String> PASS_THROUGH_CONFIG_PREFIXES = Collections.unmodifiableList(
Arrays.asList(
VeniceWriter.VENICE_WRITER_CONFIG_PREFIX,
ConfigKeys.KAFKA_CONFIG_PREFIX,
KafkaInputRecordReader.KIF_RECORD_READER_KAFKA_CONFIG_PREFIX));

private Status jobStatus = Status.NOT_STARTED;
private Throwable failureReason = null;

Expand Down Expand Up @@ -98,7 +118,7 @@ public void configure(VeniceProperties properties) {
LOGGER.warn("Data writer compute job needs additional configs to be configured.");
}

public abstract PushJobSetting getPushJobSetting();
protected abstract PushJobSetting getPushJobSetting();

public abstract void configure(VeniceProperties props, PushJobSetting pushJobSetting);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.linkedin.venice.hadoop.mapreduce.common;

import static com.linkedin.venice.hadoop.InputDataInfoProvider.LOGGER;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.COMPRESSION_METRIC_COLLECTION_ENABLED;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.SSL_CONFIGURATOR_CLASS_CONFIG;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.SSL_KEY_PASSWORD_PROPERTY_NAME;
Expand All @@ -21,9 +20,13 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public final class JobUtils {
private static final Logger LOGGER = LogManager.getLogger(JobUtils.class);

private JobUtils() {
}

Expand Down
Loading

0 comments on commit d14722a

Please sign in to comment.