Skip to content

Commit 260fc22

Browse files
committed
Address review comments
1 parent eacdc79 commit 260fc22

File tree

6 files changed

+50
-34
lines changed

6 files changed

+50
-34
lines changed

tiledb/sm/query/readers/filtered_data.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -329,13 +329,14 @@ class FilteredData {
329329
/* ********************************* */
330330

331331
/**
332-
* Get the fixed filtered data for the result tile.
332+
* Get a pointer to the fixed filtered data for the result tile and a future
333+
* which signals when the data is valid.
333334
*
334335
* @param fragment Fragment metadata for the tile.
335336
* @param rt Result tile.
336337
* @return Fixed filtered data pointer.
337338
*/
338-
inline std::tuple<void*, ThreadPool::SharedTask> fixed_filtered_data(
339+
inline std::pair<void*, ThreadPool::SharedTask> fixed_filtered_data(
339340
const FragmentMetadata* fragment, const ResultTile* rt) {
340341
auto offset{
341342
fragment->loaded_metadata()->file_offset(name_, rt->tile_idx())};
@@ -346,13 +347,14 @@ class FilteredData {
346347
}
347348

348349
/**
349-
* Get the var filtered data for the result tile.
350+
* Get a pointer to the var filtered data for the result tile and a future
351+
* which signals when the data is valid.
350352
*
351353
* @param fragment Fragment metadata for the tile.
352354
* @param rt Result tile.
353355
* @return Var filtered data pointer.
354356
*/
355-
inline std::tuple<void*, ThreadPool::SharedTask> var_filtered_data(
357+
inline std::pair<void*, ThreadPool::SharedTask> var_filtered_data(
356358
const FragmentMetadata* fragment, const ResultTile* rt) {
357359
if (!var_sized_) {
358360
return {nullptr, ThreadPool::SharedTask()};
@@ -367,13 +369,14 @@ class FilteredData {
367369
}
368370

369371
/**
370-
* Get the nullable filtered data for the result tile.
372+
* Get a pointer to the nullable filtered data for the result tile and a
373+
* future which signals when the data is valid.
371374
*
372375
* @param fragment Fragment metadata for the tile.
373376
* @param rt Result tile.
374377
* @return Nullable filtered data pointer.
375378
*/
376-
inline std::tuple<void*, ThreadPool::SharedTask> nullable_filtered_data(
379+
inline std::pair<void*, ThreadPool::SharedTask> nullable_filtered_data(
377380
const FragmentMetadata* fragment, const ResultTile* rt) {
378381
if (!nullable_) {
379382
return {nullptr, ThreadPool::SharedTask()};

tiledb/sm/query/readers/reader_base.cc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,7 @@ void ReaderBase::read_tiles(
747747
// 'TileData' objects should be returned by this function and passed into
748748
// 'unfilter_tiles' so that the filter pipeline can stop using the
749749
// 'ResultTile' object to get access to the filtered data.
750-
std::tuple<void*, ThreadPool::SharedTask> n = {
750+
std::pair<void*, ThreadPool::SharedTask> n = {
751751
nullptr, ThreadPool::SharedTask()};
752752
ResultTile::TileData tile_data{
753753
val_only ? n :
@@ -929,9 +929,6 @@ Status ReaderBase::unfilter_tiles(
929929

930930
for (size_t i = 0; i < num_tiles; i++) {
931931
auto result_tile = result_tiles[i];
932-
// if (skip_field(result_tile->frag_idx(), name)) {
933-
// continue;
934-
// }
935932
ThreadPool::SharedTask task =
936933
resources_.compute_tp().execute([name,
937934
validity_only,

tiledb/sm/query/readers/reader_base.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -543,31 +543,31 @@ class ReaderBase : public StrategyBase {
543543

544544
/**
545545
* Concurrently executes across each name in `names` and each result tile
546-
* in 'result_tiles'.
546+
* in 'result_tiles'. Attaches a future to each result_tile that is signaling
547+
* when reading the corresponding data from disk is done.
547548
*
548549
* This must be the entry point for reading attribute tiles because it
549550
* generates stats for reading attributes.
550551
*
551552
* @param names The attribute names.
552553
* @param result_tiles The retrieved tiles will be stored inside the
553554
* `ResultTile` instances in this vector.
554-
* @return Filtered data blocks.
555555
*/
556556
void read_attribute_tiles(
557557
const std::vector<NameToLoad>& names,
558558
const std::vector<ResultTile*>& result_tiles) const;
559559

560560
/**
561561
* Concurrently executes across each name in `names` and each result tile
562-
* in 'result_tiles'.
562+
* in 'result_tiles'. Attaches a future to each result_tile that is signaling
563+
* when reading the corresponding data from disk is done.
563564
*
564565
* This must be the entry point for reading coordinate tiles because it
565566
* generates stats for reading coordinates.
566567
*
567568
* @param names The coordinate/dimension names.
568569
* @param result_tiles The retrieved tiles will be stored inside the
569570
* `ResultTile` instances in this vector.
570-
* @return Filtered data blocks.
571571
*/
572572
void read_coordinate_tiles(
573573
const std::vector<std::string>& names,
@@ -578,13 +578,13 @@ class ReaderBase : public StrategyBase {
578578
* in the appropriate result tile.
579579
*
580580
* Concurrently executes across each name in `names` and each result tile
581-
* in 'result_tiles'.
581+
* in 'result_tiles'. Attaches a future to each result_tile that is signaling
582+
* when reading the corresponding data from disk is done.
582583
*
583584
* @param names The field names.
584585
* @param result_tiles The retrieved tiles will be stored inside the
585586
* `ResultTile` instances in this vector.
586587
* @param validity_only Is the field read for validity only.
587-
* @return Filtered data blocks.
588588
*/
589589
void read_tiles(
590590
const std::vector<NameToLoad>& names,

tiledb/sm/query/readers/result_tile.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,13 @@ ResultTile::~ResultTile() {
9595
try {
9696
// Wait for all tasks to be done
9797
wait_all_attrs();
98+
} catch (...) {
99+
}
100+
101+
try {
102+
// Wait for all tasks to be done
98103
wait_all_coords();
99104
} catch (...) {
100-
return;
101105
}
102106
}
103107

tiledb/sm/query/readers/result_tile.h

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -214,16 +214,16 @@ class ResultTile {
214214
/* CONSTRUCTORS & DESTRUCTORS */
215215
/* ********************************* */
216216
TileData(
217-
std::tuple<void*, ThreadPool::SharedTask> fixed_filtered_data,
218-
std::tuple<void*, ThreadPool::SharedTask> var_filtered_data,
219-
std::tuple<void*, ThreadPool::SharedTask> validity_filtered_data,
217+
std::pair<void*, ThreadPool::SharedTask> fixed_filtered_data,
218+
std::pair<void*, ThreadPool::SharedTask> var_filtered_data,
219+
std::pair<void*, ThreadPool::SharedTask> validity_filtered_data,
220220
shared_ptr<FilteredData> filtered_data)
221-
: fixed_filtered_data_(std::get<0>(fixed_filtered_data))
222-
, var_filtered_data_(std::get<0>(var_filtered_data))
223-
, validity_filtered_data_(std::get<0>(validity_filtered_data))
224-
, fixed_filtered_data_task_(std::get<1>(fixed_filtered_data))
225-
, var_filtered_data_task_(std::get<1>(var_filtered_data))
226-
, validity_filtered_data_task_(std::get<1>(validity_filtered_data))
221+
: fixed_filtered_data_(fixed_filtered_data.first)
222+
, var_filtered_data_(var_filtered_data.first)
223+
, validity_filtered_data_(validity_filtered_data.first)
224+
, fixed_filtered_data_task_(fixed_filtered_data.second)
225+
, var_filtered_data_task_(var_filtered_data.second)
226+
, validity_filtered_data_task_(validity_filtered_data.second)
227227
, filtered_data_(std::move(filtered_data)) {
228228
}
229229

@@ -232,16 +232,21 @@ class ResultTile {
232232
if (fixed_filtered_data_task_.valid()) {
233233
auto st = fixed_filtered_data_task_.wait();
234234
}
235+
} catch (...) {
236+
}
235237

238+
try {
236239
if (var_filtered_data_task_.valid()) {
237240
auto st = var_filtered_data_task_.wait();
238241
}
242+
} catch (...) {
243+
}
239244

245+
try {
240246
if (validity_filtered_data_task_.valid()) {
241247
auto st = validity_filtered_data_task_.wait();
242248
}
243249
} catch (...) {
244-
return;
245250
}
246251
}
247252

@@ -285,7 +290,7 @@ class ResultTile {
285290
}
286291

287292
/** Clear the held filtered data. */
288-
inline void clear_filtered_data() {
293+
inline void release_filtered_data() {
289294
filtered_data_ = nullptr;
290295
}
291296

tiledb/sm/query/readers/sparse_global_order_reader.cc

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,12 @@ bool SparseGlobalOrderReader<BitmapType>::add_next_cell_to_queue(
781781
// Try to find a new tile.
782782
if (result_tiles_it[frag_idx] != result_tiles[frag_idx].end()) {
783783
// Find a cell in the current result tile.
784+
785+
// This enforces all the coords unfiltering results to be available before
786+
// taking the lock on tile_queue_mutex_. This is to avoid a deadlock where
787+
// a lock is held forever while waiting for a result to be available,
788+
// while the next scheduled task is deadlocking on that lock
789+
rc.tile_->wait_all_coords();
784790
rc = GlobalOrderResultCoords(&*result_tiles_it[frag_idx], 0);
785791

786792
// All tiles should at least have one cell available.
@@ -815,12 +821,6 @@ bool SparseGlobalOrderReader<BitmapType>::add_next_cell_to_queue(
815821
return true;
816822
}
817823

818-
// This enforces all the coords unfiltering results to be available before
819-
// taking the lock on tile_queue_mutex_. This is to avoid a deadlock where a
820-
// lock is held forever while waiting for a result to be available, while
821-
// the next scheduled task is deadlocking on that lock
822-
rc.tile_->wait_all_coords();
823-
824824
std::unique_lock<std::mutex> ul(tile_queue_mutex_);
825825

826826
// Add all the cells in this tile with the same coordinates as this cell
@@ -945,6 +945,13 @@ SparseGlobalOrderReader<BitmapType>::merge_result_cell_slabs(
945945
read_state_.frag_idx()[f].cell_idx_ :
946946
0;
947947
GlobalOrderResultCoords rc(&*(rt_it[f]), cell_idx);
948+
// This enforces all the coords unfiltering results to be available
949+
// before taking the lock on tile_queue_mutex_. This is to avoid a
950+
// deadlock where a lock is held forever while waiting for a result to
951+
// be available, while the next scheduled task is deadlocking on that
952+
// lock
953+
rc.tile_->wait_all_coords();
954+
948955
bool res = add_next_cell_to_queue(
949956
rc, rt_it, result_tiles, tile_queue, to_delete);
950957
{

0 commit comments

Comments
 (0)