diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index 15e7d6f8869d..d66e87db64bb 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -129,19 +129,22 @@ std::shared_ptr MemoryManager::addRootPool( options.checkUsageLeak = checkUsageLeak_; options.debugEnabled = debugEnabled_; - folly::SharedMutex::WriteHolder guard{mutex_}; - if (pools_.find(poolName) != pools_.end()) { - VELOX_FAIL("Duplicate root pool name found: {}", poolName); + std::shared_ptr pool; + { + folly::SharedMutex::WriteHolder guard{mutex_}; + if (pools_.find(poolName) != pools_.end()) { + VELOX_FAIL("Duplicate root pool name found: {}", poolName); + } + pool = std::make_shared( + this, + poolName, + MemoryPool::Kind::kAggregate, + nullptr, + std::move(reclaimer), + poolDestructionCb_, + options); + pools_.emplace(poolName, pool); } - auto pool = std::make_shared( - this, - poolName, - MemoryPool::Kind::kAggregate, - nullptr, - std::move(reclaimer), - poolDestructionCb_, - options); - pools_.emplace(poolName, pool); VELOX_CHECK_EQ(pool->capacity(), 0); arbitrator_->reserveMemory(pool.get(), capacity); return pool; @@ -158,6 +161,14 @@ std::shared_ptr MemoryManager::addLeafPool( return defaultRoot_->addLeafChild(poolName, threadSafe, nullptr); } +uint64_t MemoryManager::shrinkPool(MemoryPool* pool, uint64_t decrementBytes) { + VELOX_CHECK_NOT_NULL(pool); + if (arbitrator_ == nullptr) { + return pool->shrink(decrementBytes); + } + return arbitrator_->releaseMemory(pool, decrementBytes); +} + bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) { VELOX_CHECK_NOT_NULL(pool); VELOX_CHECK_NE(pool->capacity(), kMaxMemory); @@ -176,7 +187,7 @@ void MemoryManager::dropPool(MemoryPool* pool) { VELOX_FAIL("The dropped memory pool {} not found", pool->name()); } pools_.erase(it); - arbitrator_->releaseMemory(pool); + arbitrator_->releaseMemory(pool, 0); } MemoryPool& MemoryManager::deprecatedSharedLeafPool() { diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index 3a8c35a312b1..13624d771747 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -161,6 +161,10 @@ class MemoryManager { const std::string& name = "", bool threadSafe = true); + /// Invoked to shrink a memory pool's free capacity with up to + /// 'decrementBytes'. + uint64_t shrinkPool(MemoryPool* pool, uint64_t decrementBytes); + /// Invoked to grows a memory pool's free capacity with at least /// 'incrementBytes'. The function returns true on success, otherwise false. bool growPool(MemoryPool* pool, uint64_t incrementBytes); diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index 8c6e4d31f6e1..b7e652eebf5c 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -96,8 +96,9 @@ class NoopArbitrator : public MemoryArbitrator { // Noop arbitrator has no memory capacity limit so no operation needed for // memory pool capacity release. - void releaseMemory(MemoryPool* /*unused*/) override { + uint64_t releaseMemory(MemoryPool* /*unused*/, uint64_t /*unused*/) override { // No-op + return 0ULL; } // Noop arbitrator has no memory capacity limit so no operation needed for @@ -161,6 +162,10 @@ void MemoryArbitrator::unregisterAllFactories() { SharedArbitrator::unregisterFactory(); } +uint64_t MemoryArbitrator::capacity() { + return capacity_; +} + std::unique_ptr MemoryReclaimer::create() { return std::unique_ptr(new MemoryReclaimer()); } diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index f000e4da23ae..08ae3f949828 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -120,9 +120,10 @@ class MemoryArbitrator { /// the memory arbitration on demand when actual memory allocation happens. virtual void reserveMemory(MemoryPool* pool, uint64_t bytes) = 0; - /// Invoked by the memory manager to return back all the reserved memory - /// capacity of a destroying memory pool. - virtual void releaseMemory(MemoryPool* pool) = 0; + /// Invoked by the memory manager to return back the specified amount of + /// reserved memory capacity of a destroying memory pool. If 0 is specified, + /// release all reserve memory. Returns the actually released amount of bytes. + virtual uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) = 0; /// Invoked by the memory manager to grow a memory pool's capacity. /// 'pool' is the memory pool to request to grow. 'candidates' is a list @@ -148,6 +149,7 @@ class MemoryArbitrator { const std::vector>& pools, uint64_t targetBytes) = 0; + uint64_t capacity(); /// The internal execution stats of the memory arbitrator. struct Stats { /// The number of arbitration requests. diff --git a/velox/common/memory/MemoryPool.cpp b/velox/common/memory/MemoryPool.cpp index 6ef34c5b0e04..1ab2597ec34d 100644 --- a/velox/common/memory/MemoryPool.cpp +++ b/velox/common/memory/MemoryPool.cpp @@ -756,6 +756,16 @@ bool MemoryPoolImpl::incrementReservationThreadSafe( treeMemoryUsage())); } +uint64_t MemoryPoolImpl::shrinkManaged( + MemoryPool* requestor, + uint64_t targetBytes) { + if (parent_ != nullptr) { + return parent_->shrinkManaged(requestor, targetBytes); + } + VELOX_CHECK_NULL(parent_); + return manager_->shrinkPool(requestor, targetBytes); +}; + bool MemoryPoolImpl::maybeIncrementReservation(uint64_t size) { std::lock_guard l(mutex_); return maybeIncrementReservationLocked(size); diff --git a/velox/common/memory/MemoryPool.h b/velox/common/memory/MemoryPool.h index 68c7ade6a4ca..f46d3bc626f6 100644 --- a/velox/common/memory/MemoryPool.h +++ b/velox/common/memory/MemoryPool.h @@ -363,6 +363,12 @@ class MemoryPool : public std::enable_shared_from_this { /// without actually freeing the used memory. virtual uint64_t freeBytes() const = 0; + /// Try shrinking up to the specified amount of free memory via memory + /// manager. + virtual uint64_t shrinkManaged( + MemoryPool* requestor, + uint64_t targetBytes = 0) = 0; + /// Invoked to free up to the specified amount of free memory by reducing /// this memory pool's capacity without actually freeing any used memory. The /// function returns the actually freed memory capacity in bytes. If @@ -625,6 +631,8 @@ class MemoryPoolImpl : public MemoryPool { uint64_t reclaim(uint64_t targetBytes) override; + uint64_t shrinkManaged(MemoryPool* requestor, uint64_t targetBytes) override; + uint64_t shrink(uint64_t targetBytes = 0) override; uint64_t grow(uint64_t bytes) noexcept override; diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index dee7ff980fbf..fbf02bbfe2a9 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -163,10 +163,11 @@ void SharedArbitrator::reserveMemory(MemoryPool* pool, uint64_t /*unused*/) { pool->grow(reserveBytes); } -void SharedArbitrator::releaseMemory(MemoryPool* pool) { +uint64_t SharedArbitrator::releaseMemory(MemoryPool* pool, uint64_t bytes) { std::lock_guard l(mutex_); - const uint64_t freedBytes = pool->shrink(0); + const uint64_t freedBytes = pool->shrink(bytes); incrementFreeCapacityLocked(freedBytes); + return freedBytes; } std::vector SharedArbitrator::getCandidateStats( @@ -246,10 +247,7 @@ bool SharedArbitrator::ensureCapacity( if (checkCapacityGrowth(*requestor, targetBytes)) { return true; } - const uint64_t reclaimedBytes = reclaim(requestor, targetBytes); - // NOTE: return the reclaimed bytes back to the arbitrator and let the memory - // arbitration process to grow the requestor's memory capacity accordingly. - incrementFreeCapacity(reclaimedBytes); + reclaim(requestor, targetBytes); // Check if the requestor has been aborted in reclaim operation above. if (requestor->aborted()) { ++numFailures_; @@ -294,32 +292,34 @@ bool SharedArbitrator::arbitrateMemory( const uint64_t growTarget = std::min( maxGrowBytes(*requestor), std::max(memoryPoolTransferCapacity_, targetBytes)); - uint64_t freedBytes = decrementFreeCapacity(growTarget); - if (freedBytes >= targetBytes) { - requestor->grow(freedBytes); - return true; - } - VELOX_CHECK_LT(freedBytes, growTarget); + uint64_t unusedFreedBytes = decrementFreeCapacity(growTarget); auto freeGuard = folly::makeGuard([&]() { // Returns the unused freed memory capacity back to the arbitrator. - if (freedBytes > 0) { - incrementFreeCapacity(freedBytes); + if (unusedFreedBytes > 0) { + incrementFreeCapacity(unusedFreedBytes); } }); - freedBytes += - reclaimFreeMemoryFromCandidates(candidates, growTarget - freedBytes); - if (freedBytes >= targetBytes) { - const uint64_t bytesToGrow = std::min(growTarget, freedBytes); - requestor->grow(bytesToGrow); - freedBytes -= bytesToGrow; + if (unusedFreedBytes >= targetBytes) { + requestor->grow(unusedFreedBytes); + unusedFreedBytes = 0; + return true; + } + VELOX_CHECK_LT(unusedFreedBytes, growTarget); + + reclaimFreeMemoryFromCandidates(candidates, growTarget - unusedFreedBytes); + unusedFreedBytes += decrementFreeCapacity(growTarget - unusedFreedBytes); + if (unusedFreedBytes >= targetBytes) { + requestor->grow(unusedFreedBytes); + unusedFreedBytes = 0; return true; } - VELOX_CHECK_LT(freedBytes, growTarget); - freedBytes += reclaimUsedMemoryFromCandidates( - requestor, candidates, growTarget - freedBytes); + VELOX_CHECK_LT(unusedFreedBytes, growTarget); + reclaimUsedMemoryFromCandidates( + requestor, candidates, growTarget - unusedFreedBytes); + unusedFreedBytes += decrementFreeCapacity(growTarget - unusedFreedBytes); if (requestor->aborted()) { ++numFailures_; VELOX_MEM_POOL_ABORTED("The requestor pool has been aborted."); @@ -327,18 +327,22 @@ bool SharedArbitrator::arbitrateMemory( VELOX_CHECK(!requestor->aborted()); - if (freedBytes < targetBytes) { + if (unusedFreedBytes < targetBytes) { VELOX_MEM_LOG(WARNING) << "Failed to arbitrate sufficient memory for memory pool " << requestor->name() << ", request " << succinctBytes(targetBytes) - << ", only " << succinctBytes(freedBytes) + << ", only " << succinctBytes(unusedFreedBytes) << " has been freed, Arbitrator state: " << toString(); return false; } - const uint64_t bytesToGrow = std::min(freedBytes, growTarget); - requestor->grow(bytesToGrow); - freedBytes -= bytesToGrow; + if (unusedFreedBytes > growTarget) { + requestor->grow(growTarget); + unusedFreedBytes -= growTarget; + return true; + } + requestor->grow(unusedFreedBytes); + unusedFreedBytes = 0; return true; } @@ -359,7 +363,9 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates( if (bytesToShrink <= 0) { break; } - freedBytes += candidate.pool->shrink(bytesToShrink); + uint64_t shrunk = candidate.pool->shrink(bytesToShrink); + incrementFreeCapacity(shrunk); + freedBytes += shrunk; if (freedBytes >= targetBytes) { break; } @@ -399,6 +405,7 @@ uint64_t SharedArbitrator::reclaim( uint64_t freedBytes{0}; try { freedBytes = pool->shrink(targetBytes); + incrementFreeCapacity(freedBytes); if (freedBytes < targetBytes) { pool->reclaim(targetBytes - freedBytes); } @@ -408,7 +415,7 @@ uint64_t SharedArbitrator::reclaim( abort(pool, std::current_exception()); // Free up all the free capacity from the aborted pool as the associated // query has failed at this point. - pool->shrink(); + incrementFreeCapacity(pool->shrink()); } const uint64_t newCapacity = pool->capacity(); VELOX_CHECK_GE(oldCapacity, newCapacity); diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index 1ce67853618f..0a329e1d63af 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -43,7 +43,7 @@ class SharedArbitrator : public MemoryArbitrator { void reserveMemory(MemoryPool* pool, uint64_t /*unused*/) final; - void releaseMemory(MemoryPool* pool) final; + uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) final; bool growMemory( MemoryPool* pool, diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index 259ab44d9780..f386b83cc9b2 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -157,13 +157,9 @@ class FakeTestArbitrator : public MemoryArbitrator { .memoryPoolTransferCapacity = config.memoryPoolTransferCapacity, .retryArbitrationFailure = config.retryArbitrationFailure}) {} - void reserveMemory(MemoryPool* pool, uint64_t bytes) override { - VELOX_NYI(); - } + void reserveMemory(MemoryPool* pool, uint64_t bytes) override{VELOX_NYI()} - void releaseMemory(MemoryPool* pool) override { - VELOX_NYI(); - } + uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) override{VELOX_NYI()} std::string kind() const override { return "USER"; diff --git a/velox/common/memory/tests/MemoryManagerTest.cpp b/velox/common/memory/tests/MemoryManagerTest.cpp index af7cd52053cc..e4436f3dfe6f 100644 --- a/velox/common/memory/tests/MemoryManagerTest.cpp +++ b/velox/common/memory/tests/MemoryManagerTest.cpp @@ -125,7 +125,7 @@ class FakeTestArbitrator : public MemoryArbitrator { VELOX_NYI(); } - void releaseMemory(MemoryPool* pool) override { + uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) override { VELOX_NYI(); } @@ -209,9 +209,6 @@ TEST_F(MemoryManagerTest, addPoolWithArbitrator) { options.allocator = allocator.get(); options.capacity = kCapacity; options.arbitratorKind = arbitratorKind_; - // The arbitrator capacity will be overridden by the memory manager's - // capacity. - options.capacity = options.capacity; const uint64_t initialPoolCapacity = options.capacity / 32; options.memoryPoolInitCapacity = initialPoolCapacity; MemoryManager manager{options}; diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 332821f11e1d..571dae6971d5 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -307,7 +307,7 @@ class MockMemoryOperator { for (const auto& allocation : allocationsToFree) { pool_->free(allocation.buffer, allocation.size); } - return pool_->shrink(targetBytes); + return pool_->shrinkManaged(pool, targetBytes); } void abort(MemoryPool* pool) { diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index 7b0f178b5c75..c0e3f1b0e500 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -67,11 +67,13 @@ class QueryCtx { return cache_; } - folly::Executor* executor() const { - if (executor_ != nullptr) { - return executor_; - } - auto executor = executorKeepalive_.get(); + bool isExecutorSupplied() const { + auto executor = executor0(); + return executor != nullptr; + } + + folly::Executor* FOLLY_NONNULL executor() const { + auto executor = executor0(); VELOX_CHECK(executor, "Executor was not supplied."); return executor; } @@ -130,6 +132,14 @@ class QueryCtx { } } + folly::Executor* executor0() const { + if (executor_ != nullptr) { + return executor_; + } + auto executor = executorKeepalive_.get(); + return executor; + } + const std::string queryId_; std::unordered_map> connectorConfigs_; diff --git a/velox/dwio/dwrf/test/WriterFlushTest.cpp b/velox/dwio/dwrf/test/WriterFlushTest.cpp index 6e45907e7e34..aba03a948335 100644 --- a/velox/dwio/dwrf/test/WriterFlushTest.cpp +++ b/velox/dwio/dwrf/test/WriterFlushTest.cpp @@ -208,6 +208,10 @@ class MockMemoryPool : public velox::memory::MemoryPool { VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__); } + uint64_t shrinkManaged(MemoryPool* /*unused*/, uint64_t /*unused*/) override { + VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__); + } + uint64_t grow(uint64_t /*unused*/) noexcept override { VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__); } diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index 8d622c56edc7..481a1bfdffdd 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -216,6 +216,7 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { hashBits = HashBitRange(startBit, startBit + spillConfig.joinPartitionBits); } + spillFinished_ = false; spiller_ = std::make_unique( Spiller::Type::kHashJoinBuild, table_->rows(), @@ -238,6 +239,11 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { spillChildVectors_.resize(tableType_->size()); } +void HashBuild::finishSpill(SpillPartitionSet& partitionSet) { + spiller_->finishSpill(partitionSet); + spillFinished_ = true; +} + bool HashBuild::isInputFromSpill() const { return spillInputReader_ != nullptr; } @@ -787,13 +793,13 @@ bool HashBuild::finishHashBuild() { VELOX_CHECK_NOT_NULL(build->table_); otherTables.push_back(std::move(build->table_)); if (build->spiller_ != nullptr) { - build->spiller_->finishSpill(spillPartitions); + build->finishSpill(spillPartitions); build->recordSpillStats(); } } if (spiller_ != nullptr) { - spiller_->finishSpill(spillPartitions); + finishSpill(spillPartitions); recordSpillStats(); // Remove the spilled partitions which are empty so as we don't need to @@ -1078,12 +1084,13 @@ void HashBuild::reclaim(uint64_t /*unused*/) { // NOTE: a hash build operator is reclaimable if it is in the middle of table // build processing and is not under non-reclaimable execution section. if ((state_ != State::kRunning && state_ != State::kWaitForBuild) || - nonReclaimableSection_) { + nonReclaimableSection_ || spillFinished_) { // TODO: add stats to record the non-reclaimable case and reduce the log // frequency if it is too verbose. LOG(WARNING) << "Can't reclaim from hash build operator, state_[" << stateName(state_) << "], nonReclaimableSection_[" - << nonReclaimableSection_ << "], " << toString(); + << nonReclaimableSection_ << "], spillFinished_[" + << spillFinished_ << "], " << toString(); return; } diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 8df3ec8944f7..2725e68133c1 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -133,6 +133,9 @@ class HashBuild final : public Operator { // in memory, then we will recursively spill part(s) of its data on disk. void setupSpiller(SpillPartition* spillPartition = nullptr); + // Finalize the spiller and return the spilled partitions. + void finishSpill(SpillPartitionSet& partitionSet); + // Invoked when either there is no more input from the build source or from // the spill input reader during the restoring. void noMoreInputInternal(); @@ -293,6 +296,7 @@ class HashBuild final : public Operator { uint64_t numSpillRows_{0}; uint64_t numSpillBytes_{0}; + bool spillFinished_{true}; std::unique_ptr spiller_; // Used to read input from previously spilled data for restoring. diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 32f29b42d131..f8665435f5e5 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -563,7 +563,7 @@ uint64_t Operator::MemoryReclaimer::reclaim( "facebook::velox::exec::Operator::MemoryReclaimer::reclaim", pool); op_->reclaim(targetBytes); - return pool->shrink(targetBytes); + return pool->shrinkManaged(pool, targetBytes); } void Operator::MemoryReclaimer::abort( diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 61f68d75f367..3ad08313ed80 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -470,12 +470,6 @@ RowVectorPtr Task::next(ContinueFuture* future) { drivers.reserve(numDriversUngrouped_); createSplitGroupStateLocked(kUngroupedGroupId); createDriversLocked(self, kUngroupedGroupId, drivers); - if (self->pool_->stats().currentBytes != 0) { - VELOX_FAIL( - "Unexpected memory pool allocations during task[{}] driver initialization: {}", - self->taskId_, - self->pool_->treeMemoryUsage()); - } drivers_ = std::move(drivers); } @@ -671,12 +665,6 @@ void Task::start( drivers.reserve(self->numDriversUngrouped_); self->createSplitGroupStateLocked(kUngroupedGroupId); self->createDriversLocked(self, kUngroupedGroupId, drivers); - if (self->pool_->stats().currentBytes != 0) { - VELOX_FAIL( - "Unexpected memory pool allocations during task[{}] driver initialization: {}", - self->taskId_, - self->pool_->treeMemoryUsage()); - } // Prevent the connecting structures from being cleaned up before all // split groups are finished during the grouped execution mode. @@ -756,9 +744,16 @@ void Task::resume(std::shared_ptr self) { continue; } VELOX_CHECK(!driver->isOnThread() && !driver->isTerminated()); - if (!driver->state().hasBlockingFuture) { + if (!driver->state().hasBlockingFuture && + driver->task()->queryCtx()->isExecutorSupplied()) { // Do not continue a Driver that is blocked on external // event. The Driver gets enqueued by the promise realization. + // + // Do not continue the driver if no executor is supplied, + // Since it's likely that we are in single-thread execution. + // + // 2023/07.13 Hongze: Is there a way to hide the execution model + // (single or async) from here? Driver::enqueue(driver); } } diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 06dd1706a179..c9294247b0c7 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -33,7 +33,155 @@ using namespace facebook::velox; using namespace facebook::velox::common::testutil; namespace facebook::velox::exec::test { -namespace { + +class TaskTest : public HiveConnectorTestBase { + protected: + static std::pair, std::vector> + executeSingleThreaded( + core::PlanFragment plan, + const std::unordered_map>& + filePaths = {}) { + auto task = Task::create( + "single.execution.task.0", plan, 0, std::make_shared()); + + for (const auto& [nodeId, paths] : filePaths) { + for (const auto& path : paths) { + task->addSplit(nodeId, exec::Split(makeHiveConnectorSplit(path))); + } + task->noMoreSplits(nodeId); + } + + VELOX_CHECK(task->supportsSingleThreadedExecution()); + + vector_size_t numRows = 0; + std::vector results; + for (;;) { + auto result = task->next(); + if (!result) { + break; + } + + for (auto& child : result->children()) { + child->loadedVector(); + } + results.push_back(result); + numRows += result->size(); + } + + VELOX_CHECK(waitForTaskCompletion(task.get())); + + auto planNodeStats = toPlanStats(task->taskStats()); + VELOX_CHECK(planNodeStats.count(plan.planNode->id())); + VELOX_CHECK_EQ(numRows, planNodeStats.at(plan.planNode->id()).outputRows); + VELOX_CHECK_EQ( + results.size(), planNodeStats.at(plan.planNode->id()).outputVectors); + + return {task, results}; + } +}; + +TEST_F(TaskTest, wrongPlanNodeForSplit) { + auto connectorSplit = std::make_shared( + "test", + "file:/tmp/abc", + facebook::velox::dwio::common::FileFormat::DWRF, + 0, + 100); + + auto plan = PlanBuilder() + .tableScan(ROW({"a", "b"}, {INTEGER(), DOUBLE()})) + .project({"a * a", "b + b"}) + .planFragment(); + + auto task = Task::create( + "task-1", + std::move(plan), + 0, + std::make_shared(driverExecutor_.get())); + + // Add split for the source node. + task->addSplit("0", exec::Split(folly::copy(connectorSplit))); + + // Add an empty split. + task->addSplit("0", exec::Split()); + + // Try to add split for a non-source node. + auto errorMessage = + "Splits can be associated only with leaf plan nodes which require splits. Plan node ID 1 doesn't refer to such plan node."; + VELOX_ASSERT_THROW( + task->addSplit("1", exec::Split(folly::copy(connectorSplit))), + errorMessage) + + VELOX_ASSERT_THROW( + task->addSplitWithSequence( + "1", exec::Split(folly::copy(connectorSplit)), 3), + errorMessage) + + VELOX_ASSERT_THROW(task->setMaxSplitSequenceId("1", 9), errorMessage) + + VELOX_ASSERT_THROW(task->noMoreSplits("1"), errorMessage) + + VELOX_ASSERT_THROW(task->noMoreSplitsForGroup("1", 5), errorMessage) + + // Try to add split for non-existent node. + errorMessage = + "Splits can be associated only with leaf plan nodes which require splits. Plan node ID 12 doesn't refer to such plan node."; + VELOX_ASSERT_THROW( + task->addSplit("12", exec::Split(folly::copy(connectorSplit))), + errorMessage) + + VELOX_ASSERT_THROW( + task->addSplitWithSequence( + "12", exec::Split(folly::copy(connectorSplit)), 3), + errorMessage) + + VELOX_ASSERT_THROW(task->setMaxSplitSequenceId("12", 9), errorMessage) + + VELOX_ASSERT_THROW(task->noMoreSplits("12"), errorMessage) + + VELOX_ASSERT_THROW(task->noMoreSplitsForGroup("12", 5), errorMessage) + + // Try to add split for a Values source node. + plan = + PlanBuilder() + .values({makeRowVector(ROW({"a", "b"}, {INTEGER(), DOUBLE()}), 10)}) + .project({"a * a", "b + b"}) + .planFragment(); + + auto valuesTask = Task::create( + "task-2", + std::move(plan), + 0, + std::make_shared(driverExecutor_.get())); + errorMessage = + "Splits can be associated only with leaf plan nodes which require splits. Plan node ID 0 doesn't refer to such plan node."; + VELOX_ASSERT_THROW( + valuesTask->addSplit("0", exec::Split(folly::copy(connectorSplit))), + errorMessage) +} + +TEST_F(TaskTest, duplicatePlanNodeIds) { + auto plan = PlanBuilder() + .tableScan(ROW({"a", "b"}, {INTEGER(), DOUBLE()})) + .hashJoin( + {"a"}, + {"a1"}, + PlanBuilder() + .tableScan(ROW({"a1", "b1"}, {INTEGER(), DOUBLE()})) + .planNode(), + "", + {"b", "b1"}) + .planFragment(); + + VELOX_ASSERT_THROW( + Task::create( + "task-1", + std::move(plan), + 0, + std::make_shared(driverExecutor_.get())), + "Plan node IDs must be unique. Found duplicate ID: 0.") +} + // A test join node whose build is skewed in terms of process time. The driver // id 0 processes slower than other drivers if paralelism greater than 1 class TestSkewedJoinNode : public core::PlanNode { @@ -174,433 +322,76 @@ class TestSkewedJoinProbe : public exec::Operator { int32_t operatorId, exec::DriverCtx* driverCtx, std::shared_ptr joinNode) - : Operator( - driverCtx, - nullptr, - operatorId, - joinNode->id(), - "CustomJoinProbe") {} - - bool needsInput() const override { - return !finished_; - } - - void addInput(RowVectorPtr /* input */) override {} - - RowVectorPtr getOutput() override { - finished_ = true; - return nullptr; - } - - exec::BlockingReason isBlocked(ContinueFuture* future) override { - auto joinBridge = operatorCtx_->task()->getCustomJoinBridge( - operatorCtx_->driverCtx()->splitGroupId, planNodeId()); - auto buildFinished = - std::dynamic_pointer_cast(joinBridge) - ->buildFinishedOrFuture(future); - if (!buildFinished.has_value()) { - return exec::BlockingReason::kWaitForJoinBuild; - } - return exec::BlockingReason::kNotBlocked; - } - - bool isFinished() override { - return finished_; - } - - private: - bool finished_{false}; -}; - -class TestSkewedJoinBridgeTranslator - : public exec::Operator::PlanNodeTranslator { - std::unique_ptr toOperator( - exec::DriverCtx* ctx, - int32_t id, - const core::PlanNodePtr& node) override { - if (auto joinNode = - std::dynamic_pointer_cast(node)) { - return std::make_unique(id, ctx, joinNode); - } - return nullptr; - } - - std::unique_ptr toJoinBridge( - const core::PlanNodePtr& node) override { - if (std::dynamic_pointer_cast(node)) { - return std::make_unique(); - } - return nullptr; - } - - exec::OperatorSupplier toOperatorSupplier( - const core::PlanNodePtr& node) override { - if (auto joinNode = - std::dynamic_pointer_cast(node)) { - return [joinNode](int32_t operatorId, exec::DriverCtx* ctx) { - return std::make_unique(operatorId, ctx, joinNode); - }; - } - return nullptr; - } -}; - -class ExternalBlocker { - public: - folly::SemiFuture continueFuture() { - if (isBlocked_) { - auto [promise, future] = makeVeloxContinuePromiseContract(); - continuePromise_ = std::move(promise); - return std::move(future); - } - return folly::SemiFuture(); - } - - void unblock() { - if (isBlocked_) { - continuePromise_.setValue(); - isBlocked_ = false; - } - } - - void block() { - isBlocked_ = true; - } - - bool isBlocked() const { - return isBlocked_; - } - - private: - bool isBlocked_ = false; - folly::Promise continuePromise_; -}; - -// A test node that normally just re-project/passthrough the output from input -// When the node is blocked by external even (via externalBlocker), the operator -// will signal kBlocked. The pipeline can ONLY proceed again when it is -// unblocked externally. -class TestExternalBlockableNode : public core::PlanNode { - public: - TestExternalBlockableNode( - const core::PlanNodeId& id, - core::PlanNodePtr source, - std::shared_ptr externalBlocker) - : PlanNode(id), - sources_{std::move(source)}, - externalBlocker_(std::move(externalBlocker)) {} - - const RowTypePtr& outputType() const override { - return sources_[0]->outputType(); - } - - const std::vector& sources() const override { - return sources_; - } - - std::string_view name() const override { - return "external blocking node"; - } - - ExternalBlocker* externalBlocker() const { - return externalBlocker_.get(); - } - - private: - void addDetails(std::stringstream& /* stream */) const override {} - - std::vector sources_; - std::shared_ptr externalBlocker_; -}; - -class TestExternalBlockableOperator : public exec::Operator { - public: - TestExternalBlockableOperator( - int32_t operatorId, - exec::DriverCtx* driverCtx, - std::shared_ptr node) - : Operator( - driverCtx, - node->outputType(), - operatorId, - node->id(), - "ExternalBlockable"), - externalBlocker_(node->externalBlocker()) {} - - bool needsInput() const override { - return !noMoreInput_; - } - - void addInput(RowVectorPtr input) override { - input_ = std::move(input); - } - - RowVectorPtr getOutput() override { - // If this operator is signaled to be blocked externally - if (externalBlocker_->isBlocked()) { - continueFuture_ = externalBlocker_->continueFuture(); - return nullptr; - } - auto output = std::move(input_); - input_ = nullptr; - return output; - } - - exec::BlockingReason isBlocked(ContinueFuture* future) override { - if (continueFuture_.valid()) { - *future = std::move(continueFuture_); - return exec::BlockingReason::kWaitForConsumer; - } - return exec::BlockingReason::kNotBlocked; - } - - bool isFinished() override { - return noMoreInput_; - } - - private: - RowVectorPtr input_; - ExternalBlocker* externalBlocker_; - folly::SemiFuture continueFuture_; -}; - -class TestExternalBlockableTranslator - : public exec::Operator::PlanNodeTranslator { - std::unique_ptr toOperator( - exec::DriverCtx* ctx, - int32_t id, - const core::PlanNodePtr& node) override { - if (auto castedNode = - std::dynamic_pointer_cast(node)) { - return std::make_unique( - id, ctx, castedNode); - } - return nullptr; - } -}; - -// A test node creates operator that allocate memory from velox memory pool on -// construction. -class TestBadMemoryNode : public core::PlanNode { - public: - TestBadMemoryNode(const core::PlanNodeId& id, core::PlanNodePtr source) - : PlanNode(id), sources_{std::move(source)} {} - - const RowTypePtr& outputType() const override { - return sources_[0]->outputType(); - } - - const std::vector& sources() const override { - return sources_; - } - - std::string_view name() const override { - return "bad memory node"; - } - - private: - void addDetails(std::stringstream& /* stream */) const override {} - - std::vector sources_; -}; - -class TestBadMemoryOperator : public exec::Operator { - public: - TestBadMemoryOperator( - int32_t operatorId, - exec::DriverCtx* driverCtx, - std::shared_ptr node) - : Operator( - driverCtx, - node->outputType(), - operatorId, - node->id(), - "BadMemory") { - pool()->allocateNonContiguous(1, allocation_); - } - - bool needsInput() const override { - return !noMoreInput_; - } - - void addInput(RowVectorPtr /*unused*/) override {} - - RowVectorPtr getOutput() override { - return nullptr; - } - - exec::BlockingReason isBlocked(ContinueFuture* /*unused*/) override { - return exec::BlockingReason::kNotBlocked; - } - - bool isFinished() override { - return noMoreInput_; - } - - private: - memory::Allocation allocation_; -}; - -class TestBadMemoryTranslator : public exec::Operator::PlanNodeTranslator { - std::unique_ptr toOperator( - exec::DriverCtx* ctx, - int32_t id, - const core::PlanNodePtr& node) override { - if (auto castedNode = - std::dynamic_pointer_cast(node)) { - return std::make_unique(id, ctx, castedNode); - } - return nullptr; - } -}; -} // namespace -class TaskTest : public HiveConnectorTestBase { - protected: - static std::pair, std::vector> - executeSingleThreaded( - core::PlanFragment plan, - const std::unordered_map>& - filePaths = {}) { - auto task = Task::create( - "single.execution.task.0", plan, 0, std::make_shared()); - - for (const auto& [nodeId, paths] : filePaths) { - for (const auto& path : paths) { - task->addSplit(nodeId, exec::Split(makeHiveConnectorSplit(path))); - } - task->noMoreSplits(nodeId); - } - - VELOX_CHECK(task->supportsSingleThreadedExecution()); - - vector_size_t numRows = 0; - std::vector results; - for (;;) { - auto result = task->next(); - if (!result) { - break; - } - - for (auto& child : result->children()) { - child->loadedVector(); - } - results.push_back(result); - numRows += result->size(); - } - - VELOX_CHECK(waitForTaskCompletion(task.get())); - - auto planNodeStats = toPlanStats(task->taskStats()); - VELOX_CHECK(planNodeStats.count(plan.planNode->id())); - VELOX_CHECK_EQ(numRows, planNodeStats.at(plan.planNode->id()).outputRows); - VELOX_CHECK_EQ( - results.size(), planNodeStats.at(plan.planNode->id()).outputVectors); - - return {task, results}; - } -}; - -TEST_F(TaskTest, wrongPlanNodeForSplit) { - auto connectorSplit = std::make_shared( - "test", - "file:/tmp/abc", - facebook::velox::dwio::common::FileFormat::DWRF, - 0, - 100); - - auto plan = PlanBuilder() - .tableScan(ROW({"a", "b"}, {INTEGER(), DOUBLE()})) - .project({"a * a", "b + b"}) - .planFragment(); - - auto task = Task::create( - "task-1", - std::move(plan), - 0, - std::make_shared(driverExecutor_.get())); - - // Add split for the source node. - task->addSplit("0", exec::Split(folly::copy(connectorSplit))); - - // Add an empty split. - task->addSplit("0", exec::Split()); - - // Try to add split for a non-source node. - auto errorMessage = - "Splits can be associated only with leaf plan nodes which require splits. Plan node ID 1 doesn't refer to such plan node."; - VELOX_ASSERT_THROW( - task->addSplit("1", exec::Split(folly::copy(connectorSplit))), - errorMessage) - - VELOX_ASSERT_THROW( - task->addSplitWithSequence( - "1", exec::Split(folly::copy(connectorSplit)), 3), - errorMessage) - - VELOX_ASSERT_THROW(task->setMaxSplitSequenceId("1", 9), errorMessage) - - VELOX_ASSERT_THROW(task->noMoreSplits("1"), errorMessage) - - VELOX_ASSERT_THROW(task->noMoreSplitsForGroup("1", 5), errorMessage) + : Operator( + driverCtx, + nullptr, + operatorId, + joinNode->id(), + "CustomJoinProbe") {} - // Try to add split for non-existent node. - errorMessage = - "Splits can be associated only with leaf plan nodes which require splits. Plan node ID 12 doesn't refer to such plan node."; - VELOX_ASSERT_THROW( - task->addSplit("12", exec::Split(folly::copy(connectorSplit))), - errorMessage) + bool needsInput() const override { + return !finished_; + } - VELOX_ASSERT_THROW( - task->addSplitWithSequence( - "12", exec::Split(folly::copy(connectorSplit)), 3), - errorMessage) + void addInput(RowVectorPtr /* input */) override {} - VELOX_ASSERT_THROW(task->setMaxSplitSequenceId("12", 9), errorMessage) + RowVectorPtr getOutput() override { + finished_ = true; + return nullptr; + } - VELOX_ASSERT_THROW(task->noMoreSplits("12"), errorMessage) + exec::BlockingReason isBlocked(ContinueFuture* future) override { + auto joinBridge = operatorCtx_->task()->getCustomJoinBridge( + operatorCtx_->driverCtx()->splitGroupId, planNodeId()); + auto buildFinished = + std::dynamic_pointer_cast(joinBridge) + ->buildFinishedOrFuture(future); + if (!buildFinished.has_value()) { + return exec::BlockingReason::kWaitForJoinBuild; + } + return exec::BlockingReason::kNotBlocked; + } - VELOX_ASSERT_THROW(task->noMoreSplitsForGroup("12", 5), errorMessage) + bool isFinished() override { + return finished_; + } - // Try to add split for a Values source node. - plan = - PlanBuilder() - .values({makeRowVector(ROW({"a", "b"}, {INTEGER(), DOUBLE()}), 10)}) - .project({"a * a", "b + b"}) - .planFragment(); + private: + bool finished_{false}; +}; - auto valuesTask = Task::create( - "task-2", - std::move(plan), - 0, - std::make_shared(driverExecutor_.get())); - errorMessage = - "Splits can be associated only with leaf plan nodes which require splits. Plan node ID 0 doesn't refer to such plan node."; - VELOX_ASSERT_THROW( - valuesTask->addSplit("0", exec::Split(folly::copy(connectorSplit))), - errorMessage) -} +class TestSkewedJoinBridgeTranslator + : public exec::Operator::PlanNodeTranslator { + std::unique_ptr toOperator( + exec::DriverCtx* ctx, + int32_t id, + const core::PlanNodePtr& node) override { + if (auto joinNode = + std::dynamic_pointer_cast(node)) { + return std::make_unique(id, ctx, joinNode); + } + return nullptr; + } -TEST_F(TaskTest, duplicatePlanNodeIds) { - auto plan = PlanBuilder() - .tableScan(ROW({"a", "b"}, {INTEGER(), DOUBLE()})) - .hashJoin( - {"a"}, - {"a1"}, - PlanBuilder() - .tableScan(ROW({"a1", "b1"}, {INTEGER(), DOUBLE()})) - .planNode(), - "", - {"b", "b1"}) - .planFragment(); + std::unique_ptr toJoinBridge( + const core::PlanNodePtr& node) override { + if (std::dynamic_pointer_cast(node)) { + return std::make_unique(); + } + return nullptr; + } - VELOX_ASSERT_THROW( - Task::create( - "task-1", - std::move(plan), - 0, - std::make_shared(driverExecutor_.get())), - "Plan node IDs must be unique. Found duplicate ID: 0.") -} + exec::OperatorSupplier toOperatorSupplier( + const core::PlanNodePtr& node) override { + if (auto joinNode = + std::dynamic_pointer_cast(node)) { + return [joinNode](int32_t operatorId, exec::DriverCtx* ctx) { + return std::make_unique(operatorId, ctx, joinNode); + }; + } + return nullptr; + } +}; // This test simulates the following execution sequence that potentially can // cause a deadlock: @@ -832,6 +623,140 @@ TEST_F(TaskTest, singleThreadedCrossJoin) { } } +class ExternalBlocker { + public: + folly::SemiFuture continueFuture() { + if (isBlocked_) { + auto [promise, future] = makeVeloxContinuePromiseContract(); + continuePromise_ = std::move(promise); + return std::move(future); + } + return folly::SemiFuture(); + } + + void unblock() { + if (isBlocked_) { + continuePromise_.setValue(); + isBlocked_ = false; + } + } + + void block() { + isBlocked_ = true; + } + + bool isBlocked() const { + return isBlocked_; + } + + private: + bool isBlocked_ = false; + folly::Promise continuePromise_; +}; + +// A test node that normally just re-project/passthrough the output from input +// When the node is blocked by external even (via externalBlocker), the operator +// will signal kBlocked. The pipeline can ONLY proceed again when it is +// unblocked externally. +class TestExternalBlockableNode : public core::PlanNode { + public: + TestExternalBlockableNode( + const core::PlanNodeId& id, + core::PlanNodePtr source, + std::shared_ptr externalBlocker) + : PlanNode(id), + sources_{std::move(source)}, + externalBlocker_(std::move(externalBlocker)) {} + + const RowTypePtr& outputType() const override { + return sources_[0]->outputType(); + } + + const std::vector& sources() const override { + return sources_; + } + + std::string_view name() const override { + return "external blocking node"; + } + + ExternalBlocker* externalBlocker() const { + return externalBlocker_.get(); + } + + private: + void addDetails(std::stringstream& /* stream */) const override {} + + std::vector sources_; + std::shared_ptr externalBlocker_; +}; + +class TestExternalBlockableOperator : public exec::Operator { + public: + TestExternalBlockableOperator( + int32_t operatorId, + exec::DriverCtx* driverCtx, + std::shared_ptr node) + : Operator( + driverCtx, + node->outputType(), + operatorId, + node->id(), + "ExternalBlockable"), + externalBlocker_(node->externalBlocker()) {} + + bool needsInput() const override { + return !noMoreInput_; + } + + void addInput(RowVectorPtr input) override { + input_ = std::move(input); + } + + RowVectorPtr getOutput() override { + // If this operator is signaled to be blocked externally + if (externalBlocker_->isBlocked()) { + continueFuture_ = externalBlocker_->continueFuture(); + return nullptr; + } + auto output = std::move(input_); + input_ = nullptr; + return output; + } + + exec::BlockingReason isBlocked(ContinueFuture* future) override { + if (continueFuture_.valid()) { + *future = std::move(continueFuture_); + return exec::BlockingReason::kWaitForConsumer; + } + return exec::BlockingReason::kNotBlocked; + } + + bool isFinished() override { + return noMoreInput_; + } + + private: + RowVectorPtr input_; + ExternalBlocker* externalBlocker_; + folly::SemiFuture continueFuture_; +}; + +class TestExternalBlockableTranslator + : public exec::Operator::PlanNodeTranslator { + std::unique_ptr toOperator( + exec::DriverCtx* ctx, + int32_t id, + const core::PlanNodePtr& node) override { + if (auto castedNode = + std::dynamic_pointer_cast(node)) { + return std::make_unique( + id, ctx, castedNode); + } + return nullptr; + } +}; + TEST_F(TaskTest, singleThreadedExecutionExternalBlockable) { exec::Operator::registerOperator( std::make_unique()); @@ -1275,31 +1200,4 @@ DEBUG_ONLY_TEST_F(TaskTest, raceBetweenTaskPauseAndTerminate) { taskThread.join(); } -TEST_F(TaskTest, driverCreationMemoryAllocationCheck) { - exec::Operator::registerOperator(std::make_unique()); - auto data = makeRowVector({ - makeFlatVector(1'000, [](auto row) { return row; }), - }); - auto plan = - PlanBuilder() - .values({data}) - .addNode([&](std::string id, core::PlanNodePtr input) mutable { - return std::make_shared(id, input); - }) - .planFragment(); - for (bool singleThreadExecution : {false, true}) { - SCOPED_TRACE(fmt::format("singleThreadExecution: ", singleThreadExecution)); - auto badTask = Task::create( - "driverCreationMemoryAllocationCheck", - plan, - 0, - std::make_shared()); - if (singleThreadExecution) { - VELOX_ASSERT_THROW( - Task::start(badTask, 1), "Unexpected memory pool allocations"); - } else { - VELOX_ASSERT_THROW(badTask->next(), "Unexpected memory pool allocations"); - } - } -} } // namespace facebook::velox::exec::test