-
-
Notifications
You must be signed in to change notification settings - Fork 8.8k
Optimization of data initialization for large sparce datasets #11390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 18 commits
11d4b63
4589533
3464f8c
e211ab9
9221573
0a793e3
e249a3b
1be6f5d
396f4b3
55a89d7
8a15c70
085627f
b1e714f
70fd6bc
edef9e7
606c537
6f885b0
c0dbd7e
1cb3693
560a67a
0ac338e
61b3878
98ef541
2b090e6
9f5ba75
8cdd7db
15aa65b
58920a0
0b7037a
820b79a
6d553fb
ea5c4fe
7dc15f2
390efd1
3250cc0
a8df33b
0fcd8a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ | |
| #include <cstdint> // for uint8_t | ||
| #include <limits> | ||
| #include <memory> | ||
| #include <vector> | ||
| #include <type_traits> // for enable_if_t, is_same_v, is_signed_v | ||
|
|
||
| #include "../data/adapter.h" | ||
|
|
@@ -113,18 +114,18 @@ class DenseColumnIter : public Column<BinIdxT> { | |
| private: | ||
| using Base = Column<BinIdxT>; | ||
| /* flags for missing values in dense columns */ | ||
| LBitField32 missing_flags_; | ||
| Span<uint8_t> missing_flags_; | ||
| size_t feature_offset_; | ||
|
|
||
| public: | ||
| explicit DenseColumnIter(common::Span<const BinIdxT> index, bst_bin_t index_base, | ||
| LBitField32 missing_flags, size_t feature_offset) | ||
| Span<uint8_t> missing_flags, size_t feature_offset) | ||
| : Base{index, index_base}, missing_flags_{missing_flags}, feature_offset_{feature_offset} {} | ||
| DenseColumnIter(DenseColumnIter const&) = delete; | ||
| DenseColumnIter(DenseColumnIter&&) = default; | ||
|
|
||
| [[nodiscard]] bool IsMissing(size_t ridx) const { | ||
| return missing_flags_.Check(feature_offset_ + ridx); | ||
| return missing_flags_[feature_offset_ + ridx]; | ||
| } | ||
|
|
||
| bst_bin_t operator[](size_t ridx) const { | ||
|
|
@@ -148,15 +149,11 @@ class ColumnMatrix { | |
| * @brief A bit set for indicating whether an element in a dense column is missing. | ||
| */ | ||
| struct MissingIndicator { | ||
| using BitFieldT = LBitField32; | ||
| using T = typename BitFieldT::value_type; | ||
|
|
||
| BitFieldT missing; | ||
| RefResourceView<T> storage; | ||
| static_assert(std::is_same_v<T, std::uint32_t>); | ||
| Span<uint8_t> missing; | ||
| RefResourceView<uint8_t> storage; | ||
|
|
||
| template <typename U> | ||
| [[nodiscard]] std::enable_if_t<!std::is_signed_v<U>, U> static InitValue(bool init) { | ||
| [[nodiscard]] std::enable_if_t<!std::is_signed_v<U>, U> static InitValue(uint8_t init) { | ||
| return init ? ~U{0} : U{0}; | ||
| } | ||
|
|
||
|
|
@@ -166,45 +163,40 @@ class ColumnMatrix { | |
| * @param init Initialize the indicator to true or false. | ||
| */ | ||
| MissingIndicator(std::size_t n_elements, bool init) { | ||
| auto m_size = missing.ComputeStorageSize(n_elements); | ||
| storage = common::MakeFixedVecWithMalloc(m_size, InitValue<T>(init)); | ||
| storage = common::MakeFixedVecWithMalloc(n_elements, static_cast<uint8_t>(init)); | ||
| this->InitView(); | ||
| } | ||
| /** @brief Set the i^th element to be a valid element (instead of missing). */ | ||
| void SetValid(typename LBitField32::index_type i) { missing.Clear(i); } | ||
| void SetValid(size_t i) { missing[i] = 0; } | ||
| /** @brief assign the storage to the view. */ | ||
| void InitView() { | ||
| missing = LBitField32{Span{storage.data(), static_cast<size_t>(storage.size())}}; | ||
| missing = Span{storage.data(), static_cast<size_t>(storage.size())}; | ||
| } | ||
|
|
||
| void GrowTo(std::size_t n_elements, bool init) { | ||
| CHECK(storage.Resource()->Type() == ResourceHandler::kMalloc) | ||
| << "[Internal Error]: Cannot grow the vector when external memory is used."; | ||
| auto m_size = missing.ComputeStorageSize(n_elements); | ||
| CHECK_GE(m_size, storage.size()); | ||
| if (m_size == storage.size()) { | ||
| CHECK_GE(n_elements, storage.size()); | ||
| if (n_elements == storage.size()) { | ||
| return; | ||
| } | ||
| // grow the storage | ||
| auto resource = std::dynamic_pointer_cast<common::MallocResource>(storage.Resource()); | ||
| CHECK(resource); | ||
| resource->Resize(m_size * sizeof(T), InitValue<std::byte>(init)); | ||
| storage = RefResourceView<T>{resource->DataAs<T>(), m_size, resource}; | ||
| resource->Resize(n_elements * sizeof(uint8_t), InitValue<std::byte>(init)); | ||
| storage = RefResourceView<uint8_t>{resource->DataAs<uint8_t>(), n_elements, resource}; | ||
|
|
||
| this->InitView(); | ||
| } | ||
| }; | ||
|
|
||
| void InitStorage(GHistIndexMatrix const& gmat, double sparse_threshold); | ||
| void InitStorage(GHistIndexMatrix const& gmat, double sparse_threshold, int n_threads); | ||
|
|
||
| template <typename ColumnBinT, typename BinT, typename RIdx> | ||
| void SetBinSparse(BinT bin_id, RIdx rid, bst_feature_t fid, ColumnBinT* local_index) { | ||
| if (type_[fid] == kDenseColumn) { | ||
| ColumnBinT* begin = &local_index[feature_offsets_[fid]]; | ||
| begin[rid] = bin_id - index_base_[fid]; | ||
| // not thread-safe with bit field. | ||
| // FIXME(jiamingy): We can directly assign kMissingId to the index to avoid missing | ||
| // flags. | ||
| missing_.SetValid(feature_offsets_[fid] + rid); | ||
| } else { | ||
| ColumnBinT* begin = &local_index[feature_offsets_[fid]]; | ||
|
|
@@ -214,15 +206,28 @@ class ColumnMatrix { | |
| } | ||
| } | ||
|
|
||
| template <typename ColumnBinT, typename BinT, typename RIdx> | ||
| void SetBinSparse(BinT bin_id, RIdx rid, bst_feature_t fid, ColumnBinT* local_index, size_t nnz) { | ||
| if (type_[fid] == kDenseColumn) { | ||
| ColumnBinT* begin = &local_index[feature_offsets_[fid]]; | ||
| begin[rid] = bin_id - index_base_[fid]; | ||
|
||
| missing_.SetValid(feature_offsets_[fid] + rid); | ||
| } else { | ||
| ColumnBinT* begin = &local_index[feature_offsets_[fid]]; | ||
| begin[nnz] = bin_id - index_base_[fid]; | ||
| row_ind_[feature_offsets_[fid] + nnz] = rid; | ||
| } | ||
| } | ||
|
|
||
| public: | ||
| // get number of features | ||
| [[nodiscard]] bst_feature_t GetNumFeature() const { | ||
| return static_cast<bst_feature_t>(type_.size()); | ||
| } | ||
|
|
||
| ColumnMatrix() = default; | ||
| ColumnMatrix(GHistIndexMatrix const& gmat, double sparse_threshold) { | ||
| this->InitStorage(gmat, sparse_threshold); | ||
| ColumnMatrix(GHistIndexMatrix const& gmat, double sparse_threshold, int n_threads = 1) { | ||
|
||
| this->InitStorage(gmat, sparse_threshold, n_threads); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -232,7 +237,7 @@ class ColumnMatrix { | |
| void InitFromSparse(SparsePage const& page, const GHistIndexMatrix& gmat, double sparse_threshold, | ||
| int32_t n_threads) { | ||
| auto batch = data::SparsePageAdapterBatch{page.GetView()}; | ||
| this->InitStorage(gmat, sparse_threshold); | ||
| this->InitStorage(gmat, sparse_threshold, n_threads); | ||
| // ignore base row id here as we always has one column matrix for each sparse page. | ||
| this->PushBatch(n_threads, batch, std::numeric_limits<float>::quiet_NaN(), gmat, 0); | ||
| } | ||
|
|
@@ -283,7 +288,7 @@ class ColumnMatrix { | |
| SetIndexNoMissing(base_rowid, gmat.index.data<RowBinIdxT>(), size, n_features, n_threads); | ||
| }); | ||
| } else { | ||
| SetIndexMixedColumns(base_rowid, batch, gmat, missing); | ||
| SetIndexMixedColumns(base_rowid, batch, gmat, missing, n_threads); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -349,7 +354,7 @@ class ColumnMatrix { | |
| */ | ||
| template <typename Batch> | ||
| void SetIndexMixedColumns(size_t base_rowid, Batch const& batch, const GHistIndexMatrix& gmat, | ||
| float missing) { | ||
| float missing, int n_threads) { | ||
| auto n_features = gmat.Features(); | ||
|
|
||
| missing_.GrowTo(feature_offsets_[n_features], true); | ||
|
|
@@ -366,19 +371,69 @@ class ColumnMatrix { | |
| using ColumnBinT = decltype(t); | ||
| ColumnBinT* local_index = reinterpret_cast<ColumnBinT*>(index_.data()); | ||
| size_t const batch_size = batch.Size(); | ||
| size_t k{0}; | ||
| for (size_t rid = 0; rid < batch_size; ++rid) { | ||
| auto line = batch.GetLine(rid); | ||
| for (size_t i = 0; i < line.Size(); ++i) { | ||
| auto coo = line.GetElement(i); | ||
| if (is_valid(coo)) { | ||
| auto fid = coo.column_idx; | ||
| const uint32_t bin_id = row_index[k]; | ||
| SetBinSparse(bin_id, rid + base_rowid, fid, local_index); | ||
| ++k; | ||
|
|
||
| dmlc::OMPException exc; | ||
trivialfis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| std::vector<size_t> n_elements((n_threads + 1) * n_features, 0); | ||
| std::vector<size_t> k_offsets(n_threads + 1, 0); | ||
razdoburdin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| size_t block_size = DivRoundUp(batch_size, n_threads); | ||
| #pragma omp parallel num_threads(n_threads) | ||
| { | ||
| exc.Run([&, is_valid]() { | ||
| int tid = omp_get_thread_num(); | ||
| size_t begin = block_size * tid; | ||
| size_t end = std::min(begin + block_size, batch_size); | ||
| for (size_t rid = begin; rid < end; ++rid) { | ||
| const auto& line = batch.GetLine(rid); | ||
| for (size_t i = 0; i < line.Size(); ++i) { | ||
| auto coo = line.GetElement(i); | ||
| if (is_valid(coo)) { | ||
| auto fid = coo.column_idx; | ||
| if ((type_[fid] != kDenseColumn)) { | ||
| n_elements[(tid + 1) * n_features + fid] += 1; | ||
| } | ||
| k_offsets[tid + 1] += 1; | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| } | ||
| exc.Rethrow(); | ||
|
|
||
| ParallelFor(n_features, n_threads, [&](auto fid) { | ||
| n_elements[fid] += num_nonzeros_[fid]; | ||
| for (int tid = 0; tid < n_threads; ++tid) { | ||
| n_elements[(tid + 1) * n_features + fid] += | ||
| n_elements[tid * n_features + fid]; | ||
| } | ||
| num_nonzeros_[fid] = n_elements[n_threads * n_features + fid]; | ||
| }); | ||
| std::partial_sum(k_offsets.cbegin(), k_offsets.cend(), k_offsets.begin()); | ||
|
|
||
| #pragma omp parallel num_threads(n_threads) | ||
| { | ||
| std::vector<size_t> nnz_offsets(n_features, 0); | ||
razdoburdin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| exc.Run([&, is_valid, base_rowid, row_index]() { | ||
| int tid = omp_get_thread_num(); | ||
| size_t begin = block_size * tid; | ||
| size_t end = std::min(begin + block_size, batch_size); | ||
| size_t k = 0; | ||
| for (size_t rid = begin; rid < end; ++rid) { | ||
| const auto& line = batch.GetLine(rid); | ||
| for (size_t i = 0; i < line.Size(); ++i) { | ||
| auto coo = line.GetElement(i); | ||
| if (is_valid(coo)) { | ||
| auto fid = coo.column_idx; | ||
| const uint32_t bin_id = row_index[k_offsets[tid] + k]; | ||
| size_t nnz = n_elements[tid * n_features + fid] + nnz_offsets[fid]; | ||
razdoburdin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| SetBinSparse(bin_id, rid + base_rowid, fid, local_index, nnz); | ||
| ++k; | ||
| nnz_offsets[fid] += (type_[fid] != kDenseColumn); | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| } | ||
| exc.Rethrow(); | ||
| }); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -164,6 +164,28 @@ template <typename T> | |
| return ref; | ||
| } | ||
|
|
||
| /** | ||
| * @brief Make a fixed size `RefResourceView` with malloc resource. | ||
| * Use n_threads to initilise the storage | ||
| */ | ||
| template <typename T> | ||
| [[nodiscard]] RefResourceView<T> MakeFixedVecWithMalloc(std::size_t n_elements, T const& init, | ||
| int n_threads) { | ||
| auto resource = std::make_shared<common::MallocResource>(n_elements * sizeof(T)); | ||
| auto ref = RefResourceView{resource->DataAs<T>(), n_elements, resource}; | ||
|
|
||
| size_t block_size = n_elements / n_threads + (n_elements % n_threads > 0); | ||
| #pragma omp parallel num_threads(n_threads) | ||
|
||
| { | ||
| int tid = omp_get_thread_num(); | ||
| auto begin = tid * block_size; | ||
| auto end = std::min((tid + 1) * block_size, n_elements); | ||
| auto size = end > begin ? end - begin : 0; | ||
| std::fill_n(ref.data() + begin, size, init); | ||
| } | ||
| return ref; | ||
| } | ||
|
|
||
| template <typename T> | ||
| class ReallocVector : public RefResourceView<T> { | ||
| static_assert(!std::is_reference_v<T>); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -121,7 +121,7 @@ class GHistIndexMatrix { | |
|
|
||
| auto n_bins_total = cut.TotalBins(); | ||
| const size_t n_index = row_ptr[rbegin + batch.Size()]; // number of entries in this page | ||
| ResizeIndex(n_index, isDense_); | ||
| ResizeIndex(n_index, isDense_, n_threads); | ||
| if (isDense_) { | ||
| index.SetBinOffset(cut.Ptrs()); | ||
| } | ||
|
|
@@ -142,7 +142,7 @@ class GHistIndexMatrix { | |
| } | ||
|
|
||
| // The function is only created to avoid using the column matrix in the header. | ||
| void ResizeColumns(double sparse_thresh); | ||
| void ResizeColumns(double sparse_thresh, int n_threads); | ||
|
|
||
| public: | ||
| /** @brief row pointer to rows by element position */ | ||
|
|
@@ -224,7 +224,7 @@ class GHistIndexMatrix { | |
|
|
||
| if (rbegin + batch.Size() == n_samples_total) { | ||
| // finished | ||
| this->ResizeColumns(sparse_thresh); | ||
| this->ResizeColumns(sparse_thresh, ctx->Threads()); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -233,7 +233,7 @@ class GHistIndexMatrix { | |
| void PushAdapterBatchColumns(Context const* ctx, Batch const& batch, float missing, | ||
| size_t rbegin); | ||
|
|
||
| void ResizeIndex(const size_t n_index, const bool isDense); | ||
| void ResizeIndex(const size_t n_index, const bool isDense, int n_threads = 1); | ||
|
||
|
|
||
| void GetFeatureCounts(size_t* counts) const { | ||
| auto nfeature = cut.Ptrs().size() - 1; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this function still used now that we have a new
SetBinSparse?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original SetBinSparse is also used