diff --git a/mllib-dal/src/main/native/CorrelationSampleImpl.cpp b/mllib-dal/src/main/native/CorrelationSampleImpl.cpp index 390b37916..4fffe4812 100644 --- a/mllib-dal/src/main/native/CorrelationSampleImpl.cpp +++ b/mllib-dal/src/main/native/CorrelationSampleImpl.cpp @@ -27,11 +27,7 @@ #define ONEDAL_DATA_PARALLEL #endif -#include "oneapi/dal/algo/covariance.hpp" -#include "oneapi/dal/io/csv.hpp" #include "Communicator.hpp" -#include "oneapi/dal/table/row_accessor.hpp" -#include "oneapi/dal/table/common.hpp" #include "com_intel_oap_mllib_stat_CorrelationDALImpl.h" @@ -39,69 +35,7 @@ namespace dal = oneapi::dal; using namespace std; namespace fs = std::filesystem; -std::ostream &operator<<(std::ostream &stream, const oneapi::dal::table &table) { - std::cout << "output : " << std::endl; - auto arr = oneapi::dal::row_accessor(table).pull(); - const auto x = arr.get_data(); - - if (table.get_row_count() <= 10) { - for (std::int64_t i = 0; i < table.get_row_count(); i++) { - if(table.get_column_count() <= 20) { - for (std::int64_t j = 0; j < table.get_column_count(); j++) { - std::cout << std::setw(10) << std::setiosflags(std::ios::fixed) - << std::setprecision(6) << x[i * table.get_column_count() + j]; - } - std::cout << std::endl; - } else { - for (std::int64_t j = 0; j < 20; j++) { - std::cout << std::setw(10) << std::setiosflags(std::ios::fixed) - << std::setprecision(6) << x[i * table.get_column_count() + j]; - } - std::cout << std::endl; - } - } - } - else { - for (std::int64_t i = 0; i < 5; i++) { - if(table.get_column_count() <= 20) { - for (std::int64_t j = 0; j < table.get_column_count(); j++) { - std::cout << std::setw(10) << std::setiosflags(std::ios::fixed) - << std::setprecision(6) << x[i * table.get_column_count() + j]; - } - std::cout << std::endl; - } else { - for (std::int64_t j = 0; j < 20; j++) { - std::cout << std::setw(10) << std::setiosflags(std::ios::fixed) - << std::setprecision(6) << x[i * table.get_column_count() + j]; - } - std::cout << std::endl; - } - } - std::cout << "..." << (table.get_row_count() - 10) << " lines skipped..." << std::endl; - for (std::int64_t i = table.get_row_count() - 5; i < table.get_row_count(); i++) { - if(table.get_column_count() <= 20) { - for (std::int64_t j = 0; j < table.get_column_count(); j++) { - std::cout << std::setw(10) << std::setiosflags(std::ios::fixed) - << std::setprecision(6) << x[i * table.get_column_count() + j]; - } - std::cout << std::endl; - } else { - for (std::int64_t j = 0; j < 20; j++) { - std::cout << std::setw(10) << std::setiosflags(std::ios::fixed) - << std::setprecision(6) << x[i * table.get_column_count() + j]; - } - std::cout << std::endl; - } - } - } - return stream; -} - -inline bool check_file(const std::string& name) { - return std::ifstream{ name }.good(); -} - -ccl::shared_ptr_class getCclPortKvs(ccl::string ccl_ip_port){ +inline ccl::shared_ptr_class getCclPortKvs(ccl::string ccl_ip_port){ std::cout << "ccl_ip_port = " << ccl_ip_port << std::endl; auto kvs_attr = ccl::create_kvs_attr(); std::cout << "ccl_ip_port 1 "<< std::endl; @@ -114,7 +48,7 @@ ccl::shared_ptr_class getCclPortKvs(ccl::string ccl_ip_port){ return kvs; } -std::vector get_gpus() +inline std::vector get_gpus() { auto platforms = sycl::platform::get_platforms(); @@ -129,103 +63,6 @@ std::vector get_gpus() return {}; } -inline std::string get_data_path(const std::string& name) { - const std::vector paths = { "./data", "samples/oneapi/dpc/mpi/data" }; - - for (const auto& path : paths) { - const std::string try_path = path + "/" + name; - if (check_file(try_path)) { - return try_path; - } - } - - return name; -} - -std::vector get_file_path(const std::string& path) { - std::vector result; - for (auto& file : fs::directory_iterator(path)){ - if(fs::is_empty(file.path())){ - continue; - }else if(file.path().extension()==".crc" || file.path().extension()==""){ - continue; - }else{ - result.push_back(file.path()); - } - } - return result; -} - -int getLocalRank(int size, int rank, ccl::communicator& comm) -{ - /* Obtain local rank among nodes sharing the same host name */ - char zero = static_cast(0); - std::vector name(MPI_MAX_PROCESSOR_NAME + 1, zero); - int resultlen = 0; - std::string str(name.begin(), name.end()); - std::vector allNames((MPI_MAX_PROCESSOR_NAME + 1) * size, zero); - std::vector aReceiveCount(size, MPI_MAX_PROCESSOR_NAME + 1); - ccl::allgatherv((int8_t *)name.data(), name.size(), (int8_t *)allNames.data(), aReceiveCount, comm).wait(); - int localRank = 0; - for (int i = 0; i < rank; i++) - { - auto nameBegin = allNames.begin() + i * (MPI_MAX_PROCESSOR_NAME + 1); - std::string nbrName(nameBegin, nameBegin + (MPI_MAX_PROCESSOR_NAME + 1)); - if (nbrName == str) localRank++; - } - return localRank; -} - -void weak(sycl::queue& queue, const string& path, dal::preview::spmd::communicator& comm) { - - const auto cov_desc = dal::covariance::descriptor{}.set_result_options( - dal::covariance::result_options::cor_matrix | dal::covariance::result_options::means); - - auto rank_id = comm.get_rank(); - auto rank_count = comm.get_rank_count(); - - auto input_vec = get_file_path(path); - const auto train_data_file_name = get_data_path(input_vec[rank_id]); - cout <<"RankID = " << rank_id << " File name: " << train_data_file_name << endl; - auto t1 = chrono::high_resolution_clock::now(); - const auto x_train = dal::read(queue, dal::csv::data_source{ train_data_file_name }); - - auto rows = x_train.get_row_count(); - auto cols = x_train.get_column_count(); - auto size = rows * cols; - cout <<"RankID = " << rank_id << ", table size " << size << endl; - comm.barrier(); - // MPI_Barrier(MPI_COMM_WORLD); - auto t2 = chrono::high_resolution_clock::now(); - - cout <<"RankID = " << rank_id << ", loading CSV took " - << (float)chrono::duration_cast( - t2 - t1) - .count() / - 1000 - << " secs" << endl; - t1 = chrono::high_resolution_clock::now(); - const auto result = dal::preview::compute(comm, cov_desc, x_train); - t2 = chrono::high_resolution_clock::now(); - cout <<"RankID = " << rank_id << ", cov training step took " - << (float)chrono::duration_cast( - t2 - t1) - .count() / - 1000 - << " secs" << endl; - if(comm.get_rank() == 0) { - cout << "Mean:\n" << result.get_means() << endl; - cout << "Correlation:\n" << result.get_cor_matrix() << endl; - t2 = chrono::high_resolution_clock::now(); - cout <<"RankID = " << rank_id << ", training step took " - << (float)chrono::duration_cast( - t2 - t1) - .count() / - 1000 - << " secs" << endl; - } -} - JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationSampleTrainDAL( JNIEnv *env, jobject obj, jint rank, jint rank_count, jstring ip_port){ @@ -247,9 +84,6 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationSampleTrainDAL( t1 = chrono::high_resolution_clock::now(); auto kvs = getCclPortKvs(ccl_ip_port); -// auto ccl_comm = ccl::create_communicator(rank_count, rank, kvs); -// auto local_rank = getLocalRank(rank_count, rank, ccl_comm); -// auto rank_id = local_rank % gpus.size(); t2 = chrono::high_resolution_clock::now(); cout << "RankID = " << rank << ", OneCCL create kvs took "