Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
minmingzhu committed Jul 17, 2024
1 parent bf9dd45 commit e534230
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 9 deletions.
1 change: 0 additions & 1 deletion mllib-dal/src/main/native/OneCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init(
}
logger::println(logger::INFO, "OneCCL (native): ccl::create_communicator(size, rank, kvs)");
logger::println(logger::INFO, "ccl::create_communicator %d ,%d", size, rank);
auto comm = ccl::create_communicator(size, rank, kvs);
{
std::lock_guard<std::mutex> lock(g_mtx);
g_comms.push_back(ccl::create_communicator(size, rank, kvs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.ml.linalg.{Matrices, Matrix, Vector}
import org.apache.spark.ml.util.Instrumentation
import org.apache.spark.sql.Dataset

import java.time.Instant

class NaiveBayesDALModel private[mllib] (
val uid: String,
val pi: Vector,
Expand All @@ -41,7 +43,7 @@ class NaiveBayesDALImpl(val uid: String,
labelCol: String,
featuresCol: String): NaiveBayesDALModel = {
val sparkContext = labeledPoints.rdd.sparkContext
val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath")
val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now()
val kvsIPPort = getOneCCLIPPort(labeledPoints.rdd)

val labeledPointsTables = if (OneDAL.isDenseDataset(labeledPoints, featuresCol)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.Dataset
import org.apache.spark.ml.tree
import org.apache.spark.mllib.tree.model.ImpurityStats

import java.time.Instant
import java.util
import java.util.{ArrayList, Map}
import scala.collection.mutable.HashMap
Expand Down Expand Up @@ -58,7 +59,7 @@ class RandomForestClassifierDALImpl(val uid: String,
val metrics_name = "RFClassifier_" + executorNum
val rfcTimer = new Utils.AlgoTimeMetrics(metrics_name, sparkContext)
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath")
val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now()
// used run Random Forest unit test
val isTest = sparkContext.getConf.getBoolean("spark.oap.mllib.isTest", false)
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.intel.oap.mllib.feature

import com.intel.daal.datamanagement.data.{HomogenNumericTable, NumericTable}

import java.nio.DoubleBuffer
import com.intel.oap.mllib.Utils.getOneCCLIPPort
import com.intel.oap.mllib.{OneCCL, OneDAL, Service, Utils}
Expand All @@ -32,6 +33,8 @@ import java.util.Arrays
import com.intel.oneapi.dal.table.{Common, HomogenTable, RowAccessor}
import org.apache.spark.storage.StorageLevel

import java.time.Instant

class PCADALModel private[mllib] (
val k: Int,
val pc: OldDenseMatrix,
Expand All @@ -48,7 +51,7 @@ class PCADALImpl(val k: Int,
val metrics_name = "PCA_" + executorNum
val pcaTimer = new Utils.AlgoTimeMetrics(metrics_name, sparkContext)
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath")
val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now()
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)
pcaTimer.record("Preprocessing")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.rdd.RDD

import java.nio.{ByteBuffer, ByteOrder, FloatBuffer}
import java.time.Instant
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

Expand Down Expand Up @@ -57,7 +58,8 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]],
def train(): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = {
val executorNum = Utils.sparkExecutorNum(data.sparkContext)
val executorCores = Utils.sparkExecutorCores()
val storePath = data.sparkContext.getConf.get("spark.oap.mllib.kvsStorePath")
val storePath = data.sparkContext.getConf
.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now()

val nFeatures = data.max()(new Ordering[Rating[ID]]() {
override def compare(x: Rating[ID], y: Rating[ID]): Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors
import org.apache.spark.sql.Dataset
import org.apache.spark.rdd.RDD

import java.time.Instant


/**
* Model fitted by [[LinearRegressionDALImpl]].
Expand Down Expand Up @@ -74,7 +76,7 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean,
val metrics_name = "LinearRegression_" + executorNum
val lrTimer = new Utils.AlgoTimeMetrics(metrics_name, sparkContext)
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath")
val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now()
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)

val isTest = sparkContext.getConf.getBoolean("spark.oap.mllib.isTest", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.sql.Dataset

import java.time.Instant
import java.util
import scala.collection.JavaConversions._

Expand All @@ -50,7 +51,7 @@ class RandomForestRegressorDALImpl(val uid: String,
val metrics_name = "RFRegressor_" + executorNum
val rfrTimer = new Utils.AlgoTimeMetrics(metrics_name, sparkContext)
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath")
val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now()
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)
// used run Random Forest unit test
val isTest = sparkContext.getConf.getBoolean("spark.oap.mllib.isTest", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.ml.linalg.{Matrix, Vector}
import org.apache.spark.rdd.RDD

import java.time.Instant

class CorrelationDALImpl(
val executorNum: Int,
val executorCores: Int)
Expand All @@ -34,7 +36,7 @@ class CorrelationDALImpl(
val metrics_name = "Correlation_" + executorNum
val corTimer = new Utils.AlgoTimeMetrics(metrics_name, sparkContext)
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath")
val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now()
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)
corTimer.record("Preprocessing")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.spark.rdd.RDD
import com.intel.oap.mllib.Utils.getOneCCLIPPort
import com.intel.oneapi.dal.table.Common

import java.time.Instant

class SummarizerDALImpl(val executorNum: Int,
val executorCores: Int)
extends Serializable with Logging {
Expand All @@ -35,7 +37,7 @@ class SummarizerDALImpl(val executorNum: Int,
val metrics_name = "Summarizer_" + executorNum
val sumTimer = new Utils.AlgoTimeMetrics(metrics_name, sparkContext)
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath")
val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now()
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)
sumTimer.record("Preprocessing")

Expand Down

0 comments on commit e534230

Please sign in to comment.