diff --git a/mllib-dal/src/main/native/OneCCL.cpp b/mllib-dal/src/main/native/OneCCL.cpp index 15bfd62f4..377ad7c9c 100644 --- a/mllib-dal/src/main/native/OneCCL.cpp +++ b/mllib-dal/src/main/native/OneCCL.cpp @@ -135,7 +135,6 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init( } logger::println(logger::INFO, "OneCCL (native): ccl::create_communicator(size, rank, kvs)"); logger::println(logger::INFO, "ccl::create_communicator %d ,%d", size, rank); - auto comm = ccl::create_communicator(size, rank, kvs); { std::lock_guard lock(g_mtx); g_comms.push_back(ccl::create_communicator(size, rank, kvs)); diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala index cf3a7e987..822625149 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala @@ -25,6 +25,8 @@ import org.apache.spark.ml.linalg.{Matrices, Matrix, Vector} import org.apache.spark.ml.util.Instrumentation import org.apache.spark.sql.Dataset +import java.time.Instant + class NaiveBayesDALModel private[mllib] ( val uid: String, val pi: Vector, @@ -41,7 +43,7 @@ class NaiveBayesDALImpl(val uid: String, labelCol: String, featuresCol: String): NaiveBayesDALModel = { val sparkContext = labeledPoints.rdd.sparkContext - val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() val kvsIPPort = getOneCCLIPPort(labeledPoints.rdd) val labeledPointsTables = if (OneDAL.isDenseDataset(labeledPoints, featuresCol)) { diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala index 9262e8ac8..9defb9d29 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.Dataset import org.apache.spark.ml.tree import org.apache.spark.mllib.tree.model.ImpurityStats +import java.time.Instant import java.util import java.util.{ArrayList, Map} import scala.collection.mutable.HashMap @@ -58,7 +59,7 @@ class RandomForestClassifierDALImpl(val uid: String, val metrics_name = "RFClassifier_" + executorNum val rfcTimer = new Utils.AlgoTimeMetrics(metrics_name, sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) - val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() // used run Random Forest unit test val isTest = sparkContext.getConf.getBoolean("spark.oap.mllib.isTest", false) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index 961eaa986..576d2aa2f 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -17,6 +17,7 @@ package com.intel.oap.mllib.feature import com.intel.daal.datamanagement.data.{HomogenNumericTable, NumericTable} + import java.nio.DoubleBuffer import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL, Service, Utils} @@ -32,6 +33,8 @@ import java.util.Arrays import com.intel.oneapi.dal.table.{Common, HomogenTable, RowAccessor} import org.apache.spark.storage.StorageLevel +import java.time.Instant + class PCADALModel private[mllib] ( val k: Int, val pc: OldDenseMatrix, @@ -48,7 +51,7 @@ class PCADALImpl(val k: Int, val metrics_name = "PCA_" + executorNum val pcaTimer = new Utils.AlgoTimeMetrics(metrics_name, sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) - val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) pcaTimer.record("Preprocessing") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala index 7847386a5..b6ec7077d 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala @@ -26,6 +26,7 @@ import org.apache.spark.ml.recommendation.ALS.Rating import org.apache.spark.rdd.RDD import java.nio.{ByteBuffer, ByteOrder, FloatBuffer} +import java.time.Instant import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag @@ -57,7 +58,8 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], def train(): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = { val executorNum = Utils.sparkExecutorNum(data.sparkContext) val executorCores = Utils.sparkExecutorCores() - val storePath = data.sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + val storePath = data.sparkContext.getConf + .get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() val nFeatures = data.max()(new Ordering[Rating[ID]]() { override def compare(x: Rating[ID], y: Rating[ID]): Int = diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala index c278bfb7e..f127b10a3 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala @@ -29,6 +29,8 @@ import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors import org.apache.spark.sql.Dataset import org.apache.spark.rdd.RDD +import java.time.Instant + /** * Model fitted by [[LinearRegressionDALImpl]]. @@ -74,7 +76,7 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, val metrics_name = "LinearRegression_" + executorNum val lrTimer = new Utils.AlgoTimeMetrics(metrics_name, sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) - val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) val isTest = sparkContext.getConf.getBoolean("spark.oap.mllib.isTest", false) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala index eb814de9a..a7baa0365 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala @@ -25,6 +25,7 @@ import org.apache.spark.ml.classification.DecisionTreeClassificationModel import org.apache.spark.ml.linalg.Matrix import org.apache.spark.sql.Dataset +import java.time.Instant import java.util import scala.collection.JavaConversions._ @@ -50,7 +51,7 @@ class RandomForestRegressorDALImpl(val uid: String, val metrics_name = "RFRegressor_" + executorNum val rfrTimer = new Utils.AlgoTimeMetrics(metrics_name, sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) - val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) // used run Random Forest unit test val isTest = sparkContext.getConf.getBoolean("spark.oap.mllib.isTest", false) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index 92d6ee1b4..3cccb1202 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -24,6 +24,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.{Matrix, Vector} import org.apache.spark.rdd.RDD +import java.time.Instant + class CorrelationDALImpl( val executorNum: Int, val executorCores: Int) @@ -34,7 +36,7 @@ class CorrelationDALImpl( val metrics_name = "Correlation_" + executorNum val corTimer = new Utils.AlgoTimeMetrics(metrics_name, sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) - val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) corTimer.record("Preprocessing") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index 89c59a39e..c8bc25e4e 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -26,6 +26,8 @@ import org.apache.spark.rdd.RDD import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oneapi.dal.table.Common +import java.time.Instant + class SummarizerDALImpl(val executorNum: Int, val executorCores: Int) extends Serializable with Logging { @@ -35,7 +37,7 @@ class SummarizerDALImpl(val executorNum: Int, val metrics_name = "Summarizer_" + executorNum val sumTimer = new Utils.AlgoTimeMetrics(metrics_name, sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) - val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) sumTimer.record("Preprocessing")