Skip to content

Commit

Permalink
[SPARK-45642][CORE][SQL] Fix `FileSystem.isFile & FileSystem.isDirect…
Browse files Browse the repository at this point in the history
…ory is deprecated`

### What changes were proposed in this pull request?
The pr aims to fix `FileSystem.isFile & FileSystem.isDirectory is deprecated` & make some error message prompts more accurate.

### Why are the changes needed?
- Prepare for future Hadoop to truly eliminate this method
- Reduce warn prompts.
- Make some error message prompts more accurate.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Pass GA.
- Manually test

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#43505 from panbingkun/SPARK-45642.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
panbingkun authored and dongjoon-hyun committed Dec 10, 2023
1 parent 7a43de1 commit fd009d6
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, FileNotFoundException, IOException}
import java.net.InetAddress
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
Expand Down Expand Up @@ -593,4 +593,11 @@ private[spark] object SparkHadoopUtil extends Logging {
}
}

def isFile(fs: FileSystem, path: Path): Boolean = {
try {
fs.getFileStatus(path).isFile
} catch {
case _: FileNotFoundException => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
try {
// Fetch the entry first to avoid an RPC when it's already removed.
listing.read(classOf[LogInfo], inProgressLog)
if (!fs.isFile(new Path(inProgressLog))) {
if (!SparkHadoopUtil.isFile(fs, new Path(inProgressLog))) {
listing.synchronized {
listing.delete(classOf[LogInfo], inProgressLog)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ private[spark] object Utils
throw new IOException(s"Failed to create directory ${targetDir.getPath}")
}
val dest = new File(targetDir, filename.getOrElse(path.getName))
if (fs.isFile(path)) {
if (fs.getFileStatus(path).isFile) {
val in = fs.open(path)
try {
downloadFile(path.toString, in, dest, fileOverwrite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1774,7 +1774,7 @@ class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem {
status
}

override def isFile(path: Path): Boolean = super.isFile(local(path))
override def getFileStatus(path: Path): FileStatus = super.getFileStatus(local(path))

override def globStatus(pathPattern: Path): Array[FileStatus] = {
val newPath = new Path(pathPattern.toUri.getPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources

import java.io.FileNotFoundException

import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -222,9 +224,15 @@ abstract class PartitioningAwareFileIndex(
caseInsensitiveMap.get(FileIndexOptions.BASE_PATH_PARAM).map(new Path(_)) match {
case Some(userDefinedBasePath) =>
val fs = userDefinedBasePath.getFileSystem(hadoopConf)
if (!fs.isDirectory(userDefinedBasePath)) {
throw new IllegalArgumentException(s"Option '${FileIndexOptions.BASE_PATH_PARAM}' " +
s"must be a directory")
try {
if (!fs.getFileStatus(userDefinedBasePath).isDirectory) {
throw new IllegalArgumentException(s"Option '${FileIndexOptions.BASE_PATH_PARAM}' " +
s"must be a directory")
}
} catch {
case _: FileNotFoundException =>
throw new IllegalArgumentException(s"Option '${FileIndexOptions.BASE_PATH_PARAM}' " +
s"not found")
}
val qualifiedBasePath = fs.makeQualified(userDefinedBasePath)
val qualifiedBasePathStr = qualifiedBasePath.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object FileStreamSink extends Logging {
val hdfsPath = new Path(singlePath)
try {
val fs = hdfsPath.getFileSystem(hadoopConf)
if (fs.isDirectory(hdfsPath)) {
if (fs.getFileStatus(hdfsPath).isDirectory) {
val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf)
fs.exists(metadataPath)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[streaming] object HdfsUtils {
val dfs = getFileSystemForPath(dfsPath, conf)
// If the file exists and we have append support, append instead of creating a new file
val stream: FSDataOutputStream = {
if (dfs.isFile(dfsPath)) {
if (SparkHadoopUtil.isFile(dfs, dfsPath)) {
if (conf.getBoolean("dfs.support.append", true) ||
conf.getBoolean("hdfs.append.support", false) ||
dfs.isInstanceOf[RawLocalFileSystem]) {
Expand Down

0 comments on commit fd009d6

Please sign in to comment.