From fd009d652f7922254ccc7cc631b8df3a6b821532 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Sun, 10 Dec 2023 14:11:19 -0800 Subject: [PATCH] [SPARK-45642][CORE][SQL] Fix `FileSystem.isFile & FileSystem.isDirectory is deprecated` ### What changes were proposed in this pull request? The pr aims to fix `FileSystem.isFile & FileSystem.isDirectory is deprecated` & make some error message prompts more accurate. ### Why are the changes needed? - Prepare for future Hadoop to truly eliminate this method - Reduce warn prompts. - Make some error message prompts more accurate. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. - Manually test ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43505 from panbingkun/SPARK-45642. Authored-by: panbingkun Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 9 ++++++++- .../spark/deploy/history/FsHistoryProvider.scala | 2 +- .../main/scala/org/apache/spark/util/Utils.scala | 2 +- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 2 +- .../datasources/PartitioningAwareFileIndex.scala | 14 +++++++++++--- .../sql/execution/streaming/FileStreamSink.scala | 2 +- .../apache/spark/streaming/util/HdfsUtils.scala | 2 +- 7 files changed, 24 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 50906f76b6e10..628b688dedba2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, FileNotFoundException, IOException} import java.net.InetAddress import java.security.PrivilegedExceptionAction import java.text.DateFormat @@ -593,4 +593,11 @@ private[spark] object SparkHadoopUtil extends Logging { } } + def isFile(fs: FileSystem, path: Path): Boolean = { + try { + fs.getFileStatus(path).isFile + } catch { + case _: FileNotFoundException => false + } + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 565499bb610b7..73fb0086b338c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -860,7 +860,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { // Fetch the entry first to avoid an RPC when it's already removed. listing.read(classOf[LogInfo], inProgressLog) - if (!fs.isFile(new Path(inProgressLog))) { + if (!SparkHadoopUtil.isFile(fs, new Path(inProgressLog))) { listing.synchronized { listing.delete(classOf[LogInfo], inProgressLog) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 049999281f5bb..a074bd53d26d7 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -674,7 +674,7 @@ private[spark] object Utils throw new IOException(s"Failed to create directory ${targetDir.getPath}") } val dest = new File(targetDir, filename.getOrElse(path.getName)) - if (fs.isFile(path)) { + if (fs.getFileStatus(path).isFile) { val in = fs.open(path) try { downloadFile(path.toString, in, dest, fileOverwrite) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index e235b8aeb7780..d16a15df1b5ae 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -1774,7 +1774,7 @@ class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem { status } - override def isFile(path: Path): Boolean = super.isFile(local(path)) + override def getFileStatus(path: Path): FileStatus = super.getFileStatus(local(path)) override def globStatus(pathPattern: Path): Array[FileStatus] = { val newPath = new Path(pathPattern.toUri.getPath) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index dc41afe226b86..3efe614bcef92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import java.io.FileNotFoundException + import scala.collection.mutable import org.apache.hadoop.conf.Configuration @@ -222,9 +224,15 @@ abstract class PartitioningAwareFileIndex( caseInsensitiveMap.get(FileIndexOptions.BASE_PATH_PARAM).map(new Path(_)) match { case Some(userDefinedBasePath) => val fs = userDefinedBasePath.getFileSystem(hadoopConf) - if (!fs.isDirectory(userDefinedBasePath)) { - throw new IllegalArgumentException(s"Option '${FileIndexOptions.BASE_PATH_PARAM}' " + - s"must be a directory") + try { + if (!fs.getFileStatus(userDefinedBasePath).isDirectory) { + throw new IllegalArgumentException(s"Option '${FileIndexOptions.BASE_PATH_PARAM}' " + + s"must be a directory") + } + } catch { + case _: FileNotFoundException => + throw new IllegalArgumentException(s"Option '${FileIndexOptions.BASE_PATH_PARAM}' " + + s"not found") } val qualifiedBasePath = fs.makeQualified(userDefinedBasePath) val qualifiedBasePathStr = qualifiedBasePath.toString diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 04a1de02ea587..ea8db3c99de92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -51,7 +51,7 @@ object FileStreamSink extends Logging { val hdfsPath = new Path(singlePath) try { val fs = hdfsPath.getFileSystem(hadoopConf) - if (fs.isDirectory(hdfsPath)) { + if (fs.getFileStatus(hdfsPath).isDirectory) { val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf) fs.exists(metadataPath) } else { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index ef040681adf37..703fcb5edb3ce 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -30,7 +30,7 @@ private[streaming] object HdfsUtils { val dfs = getFileSystemForPath(dfsPath, conf) // If the file exists and we have append support, append instead of creating a new file val stream: FSDataOutputStream = { - if (dfs.isFile(dfsPath)) { + if (SparkHadoopUtil.isFile(dfs, dfsPath)) { if (conf.getBoolean("dfs.support.append", true) || conf.getBoolean("hdfs.append.support", false) || dfs.isInstanceOf[RawLocalFileSystem]) {