From ab55d3d89d937a4ac7971b80309f3933b380a5bb Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Mon, 5 Aug 2024 15:51:32 +0800 Subject: [PATCH] update --- .../oap/mllib/stat/CorrelationDALImpl.scala | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index 922834073..a4adb0ef3 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -51,19 +51,19 @@ class CorrelationDALImpl( val kvsIPPort = getOneCCLIPPort(coalescedTables) val training_breakdown_name = "Correlation_training_breakdown_" + executorNum; -// coalescedTables.mapPartitionsWithIndex { (rank, iter) => -// logInfo(s"set ZE_AFFINITY_MASK") -// val gpuIndices = if (useDevice == "GPU") { -// val resources = TaskContext.get().resources() -// resources("gpu").addresses.map(_.toInt) -// } else { -// null -// } -// logInfo(s"set ZE_AFFINITY_MASK rank is $rank.") -// logInfo(s"gpuIndices is ${gpuIndices.mkString(", ")}.") -// OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) -// Iterator.empty -// }.count() + coalescedTables.mapPartitionsWithIndex { (rank, iter) => + logInfo(s"set ZE_AFFINITY_MASK") + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + logInfo(s"set ZE_AFFINITY_MASK rank is $rank.") + logInfo(s"gpuIndices is ${gpuIndices.mkString(", ")}.") + OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) + Iterator.empty + }.count() if (useDevice == "CPU") { coalescedTables.mapPartitionsWithIndex { (rank, table) => @@ -145,6 +145,10 @@ class CorrelationDALImpl( } + def CorrelationSampleTrainDAL(data: RDD[Vector]) = { + + } + @native private[mllib] def cCorrelationTrainDAL(rank: Int, data: Long, numRows: Long,