Skip to content

Commit

Permalink
[vpj] Unify shared temp directories for VPJ (#1025)
Browse files Browse the repository at this point in the history
Currently, VPJ uses temp directories for at least two features:
1. To store the output from ValidateSchemaAndBuildDict mapper job to pass the dictionary to the driver
2. To store schemas for TTL repush

In the future, we might add more such cases where data needs to be stored in a
temp directory. For operational reasons, it is desirable to have a temp directory that is shared by all
VPJ jobs, and inside this, we can create other feature-specific shared directories that are also shared
by all VPJ jobs. These shared directories will have 777 permissions so any user can write to it. If
features have private data that need restricted permissions, the feature implementation can create files
or subdirectories inside the feature directories and apply the restricted permissions to those.

After this commit, the the temp directory will be:
.
|____<hadoop.tmp.dir> (Specified by env, or default /tmp)
| |____venice-push-job (777 permissions) - shared by all VPJ
| | |____<job.execution.id>_<unique-suffix> (700 permissions) - shared by all features in this execution
| | | |____veniceMapperOutput (700 permissions)
| | | |____rmd_schemas (700 permissions)
| | | |____value_schemas (700 permissions)
| | | |____...<features_added_in the future> (700 permissions)

* Address review comments to fix code comments
  • Loading branch information
nisargthakkar authored Jun 10, 2024
1 parent d37e6a9 commit 4082a43
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public class PushJobSetting implements Serializable {
public String jobId;
public String jobExecutionId;
public String jobServerName;
// Path was not serializable till HDFS version 3.0.0, so we use URI instead:
// https://issues.apache.org/jira/browse/HADOOP-13519
public String sharedTmpDir;
public String jobTmpDir;
public boolean enableSSL;
public Class<? extends VenicePushJob> vpjEntryClass;
public String veniceControllerUrl;
Expand Down Expand Up @@ -58,7 +62,6 @@ public class PushJobSetting implements Serializable {
public boolean compressionMetricCollectionEnabled;
/** Refer {@link VenicePushJobConstants#USE_MAPPER_TO_BUILD_DICTIONARY} **/
public boolean useMapperToBuildDict;
public String useMapperToBuildDictOutputPath;
public boolean repushTTLEnabled;
// specify time to drop stale records.
public long repushTTLStartTimeMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ public class ValidateSchemaAndBuildDictMapperOutputReader implements Closeable {
private final InputStream inputStream;
private final DataFileStream avroDataFileStream;
private final FileSystem fs;
private final String outputDir;
private final Path outputDir;

public ValidateSchemaAndBuildDictMapperOutputReader(String outputDir, String fileName) throws Exception {
Validate.notEmpty(
public ValidateSchemaAndBuildDictMapperOutputReader(Path outputDir, String fileName) throws Exception {
Validate.notNull(
outputDir,
ValidateSchemaAndBuildDictMapper.class.getSimpleName() + " output directory should not be empty");
ValidateSchemaAndBuildDictMapper.class.getSimpleName() + " output directory should not be null");
Validate.notEmpty(
fileName,
ValidateSchemaAndBuildDictMapper.class.getSimpleName() + " output fileName should not be empty");

this.outputDir = outputDir;
Path filePath = new Path(String.format("%s/%s", outputDir, fileName));
Path filePath = new Path(outputDir, fileName);

LOGGER.info(
"Reading file {} to retrieve info persisted by {}",
Expand Down Expand Up @@ -94,7 +94,7 @@ public void close() {

// delete the output directory: It should not affect other VPJs as this is unique
try {
fs.delete(new Path(outputDir), true);
fs.delete(outputDir, true);
} catch (IOException e) {
LOGGER.error("Failed to delete directory: {}", outputDir, e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package com.linkedin.venice.hadoop;

import static com.linkedin.venice.hadoop.VenicePushJob.getValidateSchemaAndBuildDictionaryOutputFileNameNoExtension;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.MAPPER_OUTPUT_DIRECTORY;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.VALIDATE_SCHEMA_AND_BUILD_DICT_MAPPER_OUTPUT_DIRECTORY;
import static org.apache.hadoop.mapreduce.MRJobConfig.ID;

import java.io.IOException;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
Expand All @@ -30,37 +28,6 @@
public class ValidateSchemaAndBuildDictOutputFormat extends AvroOutputFormat {
private static final Logger LOGGER = LogManager.getLogger(ValidateSchemaAndBuildDictOutputFormat.class);

private static void createDirectoryWithPermission(FileSystem fs, Path path, String permission) throws IOException {
createDirectoryWithPermission(fs, path, permission, false);
}

private static void createDirectoryWithPermission(FileSystem fs, Path path, String permission, boolean deleteIfExists)
throws IOException {
LOGGER.info("Trying to create path {} with permission {}", path.getName(), permission);
boolean createPath = false;
// check if the path needs to be created
if (fs.exists(path)) {
if (deleteIfExists) {
LOGGER.info("path {} exists already, but will be deleted and recreated", path);
fs.delete(path, true);
createPath = true;
} else {
LOGGER.info("path {} exists already", path);
}
} else {
createPath = true;
}

// create if needed
if (createPath) {
LOGGER.info("Creating path {} with permission {}", path.getName(), permission);
fs.mkdirs(path);
// mkdirs(path,permission) didn't set the right permission when
// tested in hdfs, so splitting it like this, it works!
fs.setPermission(path, new FsPermission(permission));
}
}

/**
* 1. The parent directory should be accessible by every user/group (777)
* 2. unique sub-directory for this VPJ should be accessible only by
Expand All @@ -70,21 +37,9 @@ private static void createDirectoryWithPermission(FileSystem fs, Path path, Stri
* @param job mapred config
* @throws IOException
*/
protected static void setValidateSchemaAndBuildDictionaryOutputDirPath(JobConf job) throws IOException {
// parent directory: Common directory under which all the different push jobs
// create their job specific directories.
String parentOutputDir = job.get(MAPPER_OUTPUT_DIRECTORY);
Path outputPath = new Path(parentOutputDir);
FileSystem fs = outputPath.getFileSystem(job);
createDirectoryWithPermission(fs, outputPath, "777");

// store+job specific unique directory under parent directory: already derived in VPJ driver
// and passed along with the format: {$storeName}-{$JOB_EXEC_ID}-{$randomUniqueString}
// this job creates it and VPJ driver deletes it after consuming the data in this directory
// in ValidateSchemaAndBuildDictMapperOutputReader. setting 700 permissions for pii.
protected static void setValidateSchemaAndBuildDictionaryOutputDirPath(JobConf job) {
String fullOutputDir = job.get(VALIDATE_SCHEMA_AND_BUILD_DICT_MAPPER_OUTPUT_DIRECTORY);
outputPath = new Path(fullOutputDir);
createDirectoryWithPermission(fs, outputPath, "700");
Path outputPath = new Path(fullOutputDir);

LOGGER.info(
"{} Output will be stored in path: {}",
Expand Down
Loading

0 comments on commit 4082a43

Please sign in to comment.