Skip to content

Commit 795e151

Browse files
committed
fix
1 parent 6008ef3 commit 795e151

File tree

1 file changed

+10
-11
lines changed

1 file changed

+10
-11
lines changed

jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import java.util.concurrent.Executors
2222

2323
import scala.collection.mutable
2424
import scala.collection.mutable.ArrayBuffer
25-
import scala.concurrent.{ExecutionContext, Future}
25+
import scala.concurrent.{Await, ExecutionContext, Future}
26+
import scala.concurrent.duration.DurationInt
2627

2728
import ai.rapids.cudf._
2829
import org.apache.commons.logging.LogFactory
@@ -145,19 +146,17 @@ private[spark] class DiskExternalMemoryIterator(val parent: String) extends Exte
145146
}
146147

147148
private def checkAndWaitCachingDone(path: String): Unit = {
148-
var count = 1
149+
val futureOpt = taskFutures.get(path)
150+
if (futureOpt.isEmpty) {
151+
throw new RuntimeException(s"Failed to find the caching process for $path")
152+
}
149153
// Wait 6s to check if the caching is done.
150154
// TODO, make it configurable
151-
while (count < 120) {
152-
val futureOpt = taskFutures.get(path)
153-
val exist = new File(path).exists()
154-
if (futureOpt.isDefined && futureOpt.get.isCompleted && exist) {
155-
return
156-
}
157-
count += 1
158-
Thread.sleep(50)
155+
// If timeout, it's going to throw exception
156+
val success = Await.result(futureOpt.get, 6.seconds)
157+
if (!success) { // Failed to cache
158+
throw new RuntimeException(s"Failed to cache table to $path")
159159
}
160-
throw new RuntimeException(s"The cache file $path does not exist")
161160
}
162161

163162
/**

0 commit comments

Comments
 (0)