Skip to content

Commit 2c50b4e

Browse files
authored
[FLINK-20681][yarn] Support specifying the hdfs path for yarn.ship-archives and yarn.ship-files (apache#23219)
1 parent 103de5b commit 2c50b4e

9 files changed

Lines changed: 197 additions & 66 deletions

File tree

docs/layouts/shortcodes/generated/yarn_config_configuration.html

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,13 @@
162162
<td><h5>yarn.ship-archives</h5></td>
163163
<td style="word-wrap: break-word;">(none)</td>
164164
<td>List&lt;String&gt;</td>
165-
<td>A semicolon-separated list of archives to be shipped to the YARN cluster. These archives will be un-packed when localizing and they can be any of the following types: ".tar.gz", ".tar", ".tgz", ".dst", ".jar", ".zip".</td>
165+
<td>A semicolon-separated list of archives to be shipped to the YARN cluster. These archives can come from the local path of flink client or HDFS. They will be un-packed when localizing and they can be any of the following types: ".tar.gz", ".tar", ".tgz", ".dst", ".jar", ".zip". For example, "/path/to/local/archive.jar;hdfs://$namenode_address/path/to/archive.jar"</td>
166166
</tr>
167167
<tr>
168168
<td><h5>yarn.ship-files</h5></td>
169169
<td style="word-wrap: break-word;">(none)</td>
170170
<td>List&lt;String&gt;</td>
171-
<td>A semicolon-separated list of files and/or directories to be shipped to the YARN cluster.</td>
171+
<td>A semicolon-separated list of files and/or directories to be shipped to the YARN cluster. These files/directories can come from the local path of flink client or HDFS. For example, "/path/to/local/file;/path/to/local/directory;hdfs://$namenode_address/path/of/file;hdfs://$namenode_address/path/of/directory"</td>
172172
</tr>
173173
<tr>
174174
<td><h5>yarn.staging-directory</h5></td>

flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@
4343
import java.io.File;
4444
import java.time.Duration;
4545
import java.util.Arrays;
46+
import java.util.Objects;
4647
import java.util.concurrent.CompletableFuture;
48+
import java.util.stream.Collectors;
4749

4850
import static org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR;
4951
import static org.assertj.core.api.Assertions.assertThat;
@@ -81,7 +83,10 @@ private void deployPerJob(Configuration configuration, JobGraph jobGraph) throws
8183
createYarnClusterDescriptor(configuration)) {
8284

8385
yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
84-
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
86+
yarnClusterDescriptor.addShipFiles(
87+
Arrays.stream(Objects.requireNonNull(flinkLibFolder.listFiles()))
88+
.map(file -> new Path(file.toURI()))
89+
.collect(Collectors.toList()));
8590

8691
final int masterMemory =
8792
yarnClusterDescriptor

flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@
5353
import java.util.Arrays;
5454
import java.util.Collection;
5555
import java.util.List;
56+
import java.util.Objects;
5657
import java.util.concurrent.CompletableFuture;
58+
import java.util.stream.Collectors;
5759

5860
import static org.apache.flink.yarn.util.TestUtils.getTestJarPath;
5961
import static org.assertj.core.api.Assertions.assertThat;
@@ -93,7 +95,10 @@ void testFlinkContainerMemory() throws Exception {
9395
true);
9496

9597
clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
96-
clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
98+
clusterDescriptor.addShipFiles(
99+
Arrays.stream(Objects.requireNonNull(flinkLibFolder.listFiles()))
100+
.map(file -> new Path(file.toURI()))
101+
.collect(Collectors.toList()));
97102

98103
final File streamingWordCountFile = getTestJarPath("WindowJoin.jar");
99104

flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,8 @@ YarnClusterDescriptor createYarnClusterDescriptor(
404404
org.apache.flink.configuration.Configuration flinkConfiguration) {
405405
final YarnClusterDescriptor yarnClusterDescriptor =
406406
createYarnClusterDescriptorWithoutLibDir(flinkConfiguration);
407-
yarnClusterDescriptor.addShipFiles(Collections.singletonList(flinkLibFolder));
407+
yarnClusterDescriptor.addShipFiles(
408+
Collections.singletonList(new Path(flinkLibFolder.toURI())));
408409
return yarnClusterDescriptor;
409410
}
410411

flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,4 +667,12 @@ private static void validateAclString(String acl) {
667667
acl));
668668
}
669669
}
670+
671+
public static Path getPathFromLocalFile(File localFile) {
672+
return new Path(localFile.toURI());
673+
}
674+
675+
public static Path getPathFromLocalFilePathStr(String localPathStr) {
676+
return getPathFromLocalFile(new File(localPathStr));
677+
}
670678
}

flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java

Lines changed: 66 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.apache.flink.util.Preconditions;
6565
import org.apache.flink.util.ShutdownHookUtil;
6666
import org.apache.flink.util.StringUtils;
67+
import org.apache.flink.util.function.FunctionUtils;
6768
import org.apache.flink.yarn.configuration.YarnConfigOptions;
6869
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
6970
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
@@ -72,6 +73,7 @@
7273
import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
7374
import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
7475

76+
import org.apache.hadoop.fs.FileStatus;
7577
import org.apache.hadoop.fs.FileSystem;
7678
import org.apache.hadoop.fs.Path;
7779
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -136,6 +138,8 @@
136138
import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
137139
import static org.apache.flink.util.Preconditions.checkArgument;
138140
import static org.apache.flink.util.Preconditions.checkNotNull;
141+
import static org.apache.flink.yarn.Utils.getPathFromLocalFile;
142+
import static org.apache.flink.yarn.Utils.getPathFromLocalFilePathStr;
139143
import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
140144
import static org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;
141145

@@ -155,10 +159,19 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
155159
/** True if the descriptor must not shut down the YarnClient. */
156160
private final boolean sharedYarnClient;
157161

158-
/** Lazily initialized list of files to ship. */
159-
private final List<File> shipFiles = new LinkedList<>();
162+
/**
163+
* Lazily initialized list of files to ship. The path string for the files which is configured
164+
* by {@link YarnConfigOptions#SHIP_FILES} will be converted to {@link Path} with schema and
165+
* absolute path.
166+
*/
167+
private final List<Path> shipFiles = new LinkedList<>();
160168

161-
private final List<File> shipArchives = new LinkedList<>();
169+
/**
170+
* Lazily initialized list of archives to ship. The path string for the archives which is
171+
* configured by {@link YarnConfigOptions#SHIP_ARCHIVES} will be converted to {@link Path} with
172+
* schema and absolute path.
173+
*/
174+
private final List<Path> shipArchives = new LinkedList<>();
162175

163176
private final String yarnQueue;
164177

@@ -202,16 +215,25 @@ public YarnClusterDescriptor(
202215
this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
203216
}
204217

205-
private Optional<List<File>> decodeFilesToShipToCluster(
218+
private Optional<List<Path>> decodeFilesToShipToCluster(
206219
final Configuration configuration, final ConfigOption<List<String>> configOption) {
207220
checkNotNull(configuration);
208221
checkNotNull(configOption);
209222

210-
final List<File> files =
211-
ConfigUtils.decodeListFromConfig(configuration, configOption, File::new);
223+
List<Path> files =
224+
ConfigUtils.decodeListFromConfig(
225+
configuration, configOption, this::createPathWithSchema);
212226
return files.isEmpty() ? Optional.empty() : Optional.of(files);
213227
}
214228

229+
private Path createPathWithSchema(String path) {
230+
return isWithoutSchema(new Path(path)) ? getPathFromLocalFilePathStr(path) : new Path(path);
231+
}
232+
233+
private boolean isWithoutSchema(Path path) {
234+
return StringUtils.isNullOrWhitespaceOnly(path.toUri().getScheme());
235+
}
236+
215237
private Optional<Path> getLocalFlinkDistPath(final Configuration configuration) {
216238
final String localJarPath = configuration.getString(YarnConfigOptions.FLINK_DIST_JAR);
217239
if (localJarPath != null) {
@@ -227,7 +249,7 @@ private Optional<Path> getLocalFlinkDistPath(final Configuration configuration)
227249
// flink-dist jar
228250
final String decodedPath = getDecodedJarPath();
229251
return decodedPath.endsWith(".jar")
230-
? Optional.of(new Path(new File(decodedPath).toURI()))
252+
? Optional.of(getPathFromLocalFilePathStr(decodedPath))
231253
: Optional.empty();
232254
}
233255

@@ -245,10 +267,15 @@ private String getDecodedJarPath() {
245267
}
246268

247269
@VisibleForTesting
248-
List<File> getShipFiles() {
270+
List<Path> getShipFiles() {
249271
return shipFiles;
250272
}
251273

274+
@VisibleForTesting
275+
List<Path> getShipArchives() {
276+
return shipArchives;
277+
}
278+
252279
public YarnClient getYarnClient() {
253280
return yarnClient;
254281
}
@@ -293,28 +320,31 @@ public void setLocalJarPath(Path localJarPath) {
293320
*
294321
* @param shipFiles files to ship
295322
*/
296-
public void addShipFiles(List<File> shipFiles) {
323+
public void addShipFiles(List<Path> shipFiles) {
297324
checkArgument(
298-
!isUsrLibDirIncludedInShipFiles(shipFiles),
325+
!isUsrLibDirIncludedInShipFiles(shipFiles, yarnConfiguration),
299326
"User-shipped directories configured via : %s should not include %s.",
300327
YarnConfigOptions.SHIP_FILES.key(),
301328
ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR);
302329
this.shipFiles.addAll(shipFiles);
303330
}
304331

305-
private void addShipArchives(List<File> shipArchives) {
332+
private void addShipArchives(List<Path> shipArchives) {
306333
checkArgument(
307-
isArchiveOnlyIncludedInShipArchiveFiles(shipArchives),
334+
isArchiveOnlyIncludedInShipArchiveFiles(shipArchives, yarnConfiguration),
308335
"Directories or non-archive files are included.");
309336
this.shipArchives.addAll(shipArchives);
310337
}
311338

312-
private static boolean isArchiveOnlyIncludedInShipArchiveFiles(List<File> shipFiles) {
339+
private static boolean isArchiveOnlyIncludedInShipArchiveFiles(
340+
List<Path> shipFiles, YarnConfiguration yarnConfiguration) {
313341
long archivedFileCount =
314342
shipFiles.stream()
315-
.filter(File::isFile)
316-
.map(File::getName)
317-
.map(String::toLowerCase)
343+
.map(
344+
FunctionUtils.uncheckedFunction(
345+
path -> getFileStatus(path, yarnConfiguration)))
346+
.filter(FileStatus::isFile)
347+
.map(status -> status.getPath().getName().toLowerCase())
318348
.filter(
319349
name ->
320350
name.endsWith(".tar.gz")
@@ -327,6 +357,11 @@ private static boolean isArchiveOnlyIncludedInShipArchiveFiles(List<File> shipFi
327357
return archivedFileCount == shipFiles.size();
328358
}
329359

360+
private static FileStatus getFileStatus(Path path, YarnConfiguration yarnConfiguration)
361+
throws IOException {
362+
return path.getFileSystem(yarnConfiguration).getFileStatus(path);
363+
}
364+
330365
private void isReadyForDeployment(ClusterSpecification clusterSpecification) throws Exception {
331366

332367
if (this.flinkJarPath == null) {
@@ -837,15 +872,12 @@ private ApplicationReport startAppMaster(
837872
getFileReplication());
838873

839874
// The files need to be shipped and added to classpath.
840-
Set<File> systemShipFiles = CollectionUtil.newHashSetWithExpectedSize(shipFiles.size());
841-
for (File file : shipFiles) {
842-
systemShipFiles.add(file.getAbsoluteFile());
843-
}
875+
Set<Path> systemShipFiles = new HashSet<>(shipFiles);
844876

845877
final String logConfigFilePath =
846878
configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
847879
if (logConfigFilePath != null) {
848-
systemShipFiles.add(new File(logConfigFilePath));
880+
systemShipFiles.add(getPathFromLocalFilePathStr(logConfigFilePath));
849881
}
850882

851883
// Set-up ApplicationSubmissionContext for the application
@@ -911,31 +943,21 @@ private ApplicationReport startAppMaster(
911943
final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
912944
final List<String> uploadedDependencies =
913945
fileUploader.registerMultipleLocalResources(
914-
systemShipFiles.stream()
915-
.map(e -> new Path(e.toURI()))
916-
.collect(Collectors.toSet()),
917-
Path.CUR_DIR,
918-
LocalResourceType.FILE);
946+
systemShipFiles, Path.CUR_DIR, LocalResourceType.FILE);
919947
systemClassPaths.addAll(uploadedDependencies);
920948

921949
// upload and register ship-only files
922950
// Plugin files only need to be shipped and should not be added to classpath.
923951
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
924-
Set<File> shipOnlyFiles = new HashSet<>();
952+
Set<Path> shipOnlyFiles = new HashSet<>();
925953
addPluginsFoldersToShipFiles(shipOnlyFiles);
926954
fileUploader.registerMultipleLocalResources(
927-
shipOnlyFiles.stream()
928-
.map(e -> new Path(e.toURI()))
929-
.collect(Collectors.toSet()),
930-
Path.CUR_DIR,
931-
LocalResourceType.FILE);
955+
shipOnlyFiles, Path.CUR_DIR, LocalResourceType.FILE);
932956
}
933957

934958
if (!shipArchives.isEmpty()) {
935959
fileUploader.registerMultipleLocalResources(
936-
shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
937-
Path.CUR_DIR,
938-
LocalResourceType.ARCHIVE);
960+
shipArchives, Path.CUR_DIR, LocalResourceType.ARCHIVE);
939961
}
940962

941963
// only for application mode
@@ -1760,15 +1782,15 @@ public void run() {
17601782
}
17611783

17621784
@VisibleForTesting
1763-
void addLibFoldersToShipFiles(Collection<File> effectiveShipFiles) {
1785+
void addLibFoldersToShipFiles(Collection<Path> effectiveShipFiles) {
17641786
// Add lib folder to the ship files if the environment variable is set.
17651787
// This is for convenience when running from the command-line.
17661788
// (for other files users explicitly set the ship files)
17671789
String libDir = System.getenv().get(ENV_FLINK_LIB_DIR);
17681790
if (libDir != null) {
17691791
File directoryFile = new File(libDir);
17701792
if (directoryFile.isDirectory()) {
1771-
effectiveShipFiles.add(directoryFile);
1793+
effectiveShipFiles.add(getPathFromLocalFile(directoryFile));
17721794
} else {
17731795
throw new YarnDeploymentException(
17741796
"The environment variable '"
@@ -1801,9 +1823,9 @@ void addUsrLibFolderToShipFiles(Collection<File> effectiveShipFiles) {
18011823
}
18021824

18031825
@VisibleForTesting
1804-
void addPluginsFoldersToShipFiles(Collection<File> effectiveShipFiles) {
1826+
void addPluginsFoldersToShipFiles(Collection<Path> effectiveShipFiles) {
18051827
final Optional<File> pluginsDir = PluginConfig.getPluginsDir();
1806-
pluginsDir.ifPresent(effectiveShipFiles::add);
1828+
pluginsDir.ifPresent(dir -> effectiveShipFiles.add(getPathFromLocalFile(dir)));
18071829
}
18081830

18091831
ContainerLaunchContext setupApplicationMasterContainer(
@@ -1869,10 +1891,12 @@ private static YarnConfigOptions.UserJarInclusion getUserJarInclusionMode(
18691891
return config.get(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
18701892
}
18711893

1872-
private static boolean isUsrLibDirIncludedInShipFiles(List<File> shipFiles) {
1894+
private static boolean isUsrLibDirIncludedInShipFiles(
1895+
List<Path> shipFiles, YarnConfiguration yarnConfig) {
18731896
return shipFiles.stream()
1874-
.filter(File::isDirectory)
1875-
.map(File::getName)
1897+
.map(FunctionUtils.uncheckedFunction(path -> getFileStatus(path, yarnConfig)))
1898+
.filter(FileStatus::isDirectory)
1899+
.map(status -> status.getPath().getName().toLowerCase())
18761900
.anyMatch(name -> name.equals(DEFAULT_FLINK_USR_LIB_DIR));
18771901
}
18781902

flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -272,17 +272,25 @@ public class YarnConfigOptions {
272272
.noDefaultValue()
273273
.withDeprecatedKeys("yarn.ship-directories")
274274
.withDescription(
275-
"A semicolon-separated list of files and/or directories to be shipped to the YARN cluster.");
275+
"A semicolon-separated list of files and/or directories to be shipped to the YARN "
276+
+ "cluster. These files/directories can come from the local path of flink client "
277+
+ "or HDFS. For example, "
278+
+ "\"/path/to/local/file;/path/to/local/directory;"
279+
+ "hdfs://$namenode_address/path/of/file;"
280+
+ "hdfs://$namenode_address/path/of/directory\"");
276281

277282
public static final ConfigOption<List<String>> SHIP_ARCHIVES =
278283
key("yarn.ship-archives")
279284
.stringType()
280285
.asList()
281286
.noDefaultValue()
282287
.withDescription(
283-
"A semicolon-separated list of archives to be shipped to the YARN cluster."
284-
+ " These archives will be un-packed when localizing and they can be any of the following types: "
285-
+ "\".tar.gz\", \".tar\", \".tgz\", \".dst\", \".jar\", \".zip\".");
288+
"A semicolon-separated list of archives to be shipped to the YARN cluster. "
289+
+ "These archives can come from the local path of flink client or HDFS. "
290+
+ "They will be un-packed when localizing and they can be any of the following "
291+
+ "types: \".tar.gz\", \".tar\", \".tgz\", \".dst\", \".jar\", \".zip\". "
292+
+ "For example, \"/path/to/local/archive.jar;"
293+
+ "hdfs://$namenode_address/path/to/archive.jar\"");
286294

287295
public static final ConfigOption<String> FLINK_DIST_JAR =
288296
key("yarn.flink-dist-jar")

flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
import org.apache.flink.yarn.configuration.YarnConfigOptions;
4343
import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
4444

45-
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
46-
4745
import org.apache.commons.cli.CommandLine;
4846
import org.apache.commons.cli.CommandLineParser;
4947
import org.apache.commons.cli.DefaultParser;
@@ -60,6 +58,7 @@
6058
import java.util.List;
6159
import java.util.UUID;
6260

61+
import static org.apache.flink.yarn.Utils.getPathFromLocalFile;
6362
import static org.assertj.core.api.Assertions.assertThat;
6463
import static org.assertj.core.api.Assertions.assertThatThrownBy;
6564

@@ -565,7 +564,8 @@ void testShipFiles() throws Exception {
565564
YarnClusterDescriptor flinkYarnDescriptor =
566565
(YarnClusterDescriptor) clientFactory.createClusterDescriptor(executorConfig);
567566

568-
assertThat(flinkYarnDescriptor.getShipFiles()).isEqualTo(Lists.newArrayList(tmpFile));
567+
assertThat(flinkYarnDescriptor.getShipFiles())
568+
.containsExactly(getPathFromLocalFile(tmpFile));
569569
}
570570

571571
@Test

0 commit comments

Comments
 (0)