Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
minmingzhu committed Jul 31, 2024
1 parent eb494a3 commit 2ca0800
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 57 deletions.
9 changes: 0 additions & 9 deletions mllib-dal/src/main/native/CorrelationSampleImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,6 @@ void weak(sycl::queue& queue, const string& path, dal::preview::spmd::communicat
JNIEXPORT jlong JNICALL
Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationSampleTrainDAL(
JNIEnv *env, jobject obj, jint rank, jint rank_count, jstring ip_port){
cout << "main:\n" << endl;
const char *str = env->GetStringUTFChars(ip_port, 0);
ccl::string ccl_ip_port(str);
const char* path = "/home/damon/storage/DataRoot/HiBench_CSV/Correlation/Input/4000000";
string pathStr;
pathStr.append(path);
auto gpus = get_gpus();

auto t1 = chrono::high_resolution_clock::now();
Expand All @@ -255,8 +249,6 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationSampleTrainDAL(
auto local_rank = getLocalRank(rank_count, rank, ccl_comm);
auto rank_id = local_rank % gpus.size();
auto device = gpus[rank_id];
cout << "RankID = " << rank << ", Running on " << device.get_info<sycl::info::device::name>() << endl;
cout << "RankID = " << rank << ", Running on " << device.get_platform().get_info<sycl::info::platform::name>() << endl;
t2 = chrono::high_resolution_clock::now();
cout << "RankID = " << rank
<< ", OneCCL create communicator took "
Expand All @@ -276,7 +268,6 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationSampleTrainDAL(
.count() /
1000
<< " secs" << endl;
weak(q, pathStr, comm);
env->ReleaseStringUTFChars(ip_port, str);
return 0;
}
4 changes: 2 additions & 2 deletions mllib-dal/src/main/native/KMeansImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,10 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCe
const char* cstr = env->GetStringUTFChars(breakdown_name, nullptr);
std::string c_breakdown_name(cstr);

// auto queue = getGPU(device, gpuIndices);
auto device = sycl::device(sycl::gpu_selector_v);
// auto device = sycl::device(sycl::gpu_selector_v);
sycl::queue queue{device};
ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto queue = getGPU(device, gpuIndices);
auto t1 = std::chrono::high_resolution_clock::now();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,63 +27,21 @@ import java.util.logging.{Level, Logger}


object CorrelationSample {
LibLoader.loadLibraries()
private val logger = Logger.getLogger("util.OneDAL")
private val logLevel = Level.INFO
case class Params(
input: String = null,
corrType: String = "pearson",
minPartitionNum: Int = 1,
maxPartitionBytes: String = "1g",
device: String = "HOST")

def main(args: Array[String]): Unit = {

val defaultParams = Params()

val parser = new OptionParser[Params]("Correlation") {
head("Correlation: an example app for computing correlations.")
opt[String]("corrType")
.text(s"String specifying the method to use for computing correlation. " +
s"Supported: `pearson` (default), `spearman`, default: ${defaultParams.corrType}")
.action((x, c) => c.copy(corrType = x))
opt[Int]("minPartitionNum")
.text(s"minPartitionNum, default: 1")
.action((x, c) => c.copy(minPartitionNum = x))
opt[String]("maxPartitionBytes")
.text(s"maxPartitionBytes, default: 1g")
.action((x, c) => c.copy(maxPartitionBytes = x))
opt[String]("device")
.text(s"oneapi device, default: host")
.action((x, c) => c.copy(device = x))
arg[String]("<input>")
.text("input path to labeled examples")
.required()
.action((x, c) => c.copy(input = x))
}
parser.parse(args, defaultParams) match {
case Some(params) => run(params)
case _ => sys.exit(1)
}
}

def run(params: Params): Unit = {
val spark = SparkSession
val rep_num = args(0).toInt
val spark = SparkSession
.builder
.appName(s"Correlations with $params")
.config("spark.sql.files.maxPartitionBytes", params.maxPartitionBytes)
.config("spark.sql.files.minPartitionNum", params.minPartitionNum)
.config("spark.oap.mllib.device", params.device)
.getOrCreate()
logger.info(params.toString)

import spark.implicits._
logger.info(s"loading data")
logger.info(params.input)

val data = spark.sparkContext.parallelize( 1 to 24 ).repartition(24)
val data = spark.sparkContext.parallelize( 1 to rep_num ).repartition(rep_num)
logger.info(s"getNumPartitions ${data.getNumPartitions}")

val useDevice = spark.conf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val executorNum = Utils.sparkExecutorNum(data.sparkContext)
val executorCores = Utils.sparkExecutorCores()
logger.info(s"executorNum ${executorNum}")
Expand Down

0 comments on commit 2ca0800

Please sign in to comment.