Skip to content

Commit 5133fbf

Browse files
committed
Issues calling reclaimer / arbitrator APIs in single-thread execution (5790)
1 parent 2c674af commit 5133fbf

18 files changed

+486
-528
lines changed

velox/common/memory/Memory.cpp

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -129,19 +129,22 @@ std::shared_ptr<MemoryPool> MemoryManager::addRootPool(
129129
options.checkUsageLeak = checkUsageLeak_;
130130
options.debugEnabled = debugEnabled_;
131131

132-
folly::SharedMutex::WriteHolder guard{mutex_};
133-
if (pools_.find(poolName) != pools_.end()) {
134-
VELOX_FAIL("Duplicate root pool name found: {}", poolName);
132+
std::shared_ptr<MemoryPool> pool;
133+
{
134+
folly::SharedMutex::WriteHolder guard{mutex_};
135+
if (pools_.find(poolName) != pools_.end()) {
136+
VELOX_FAIL("Duplicate root pool name found: {}", poolName);
137+
}
138+
pool = std::make_shared<MemoryPoolImpl>(
139+
this,
140+
poolName,
141+
MemoryPool::Kind::kAggregate,
142+
nullptr,
143+
std::move(reclaimer),
144+
poolDestructionCb_,
145+
options);
146+
pools_.emplace(poolName, pool);
135147
}
136-
auto pool = std::make_shared<MemoryPoolImpl>(
137-
this,
138-
poolName,
139-
MemoryPool::Kind::kAggregate,
140-
nullptr,
141-
std::move(reclaimer),
142-
poolDestructionCb_,
143-
options);
144-
pools_.emplace(poolName, pool);
145148
VELOX_CHECK_EQ(pool->capacity(), 0);
146149
arbitrator_->reserveMemory(pool.get(), capacity);
147150
return pool;
@@ -158,6 +161,14 @@ std::shared_ptr<MemoryPool> MemoryManager::addLeafPool(
158161
return defaultRoot_->addLeafChild(poolName, threadSafe, nullptr);
159162
}
160163

164+
uint64_t MemoryManager::shrinkPool(MemoryPool* pool, uint64_t decrementBytes) {
165+
VELOX_CHECK_NOT_NULL(pool);
166+
if (arbitrator_ == nullptr) {
167+
return pool->shrink(decrementBytes);
168+
}
169+
return arbitrator_->releaseMemory(pool, decrementBytes);
170+
}
171+
161172
bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) {
162173
VELOX_CHECK_NOT_NULL(pool);
163174
VELOX_CHECK_NE(pool->capacity(), kMaxMemory);
@@ -176,7 +187,7 @@ void MemoryManager::dropPool(MemoryPool* pool) {
176187
VELOX_FAIL("The dropped memory pool {} not found", pool->name());
177188
}
178189
pools_.erase(it);
179-
arbitrator_->releaseMemory(pool);
190+
arbitrator_->releaseMemory(pool, 0);
180191
}
181192

182193
MemoryPool& MemoryManager::deprecatedSharedLeafPool() {

velox/common/memory/Memory.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ class MemoryManager {
161161
const std::string& name = "",
162162
bool threadSafe = true);
163163

164+
/// Invoked to shrink a memory pool's free capacity with up to
165+
/// 'decrementBytes'.
166+
uint64_t shrinkPool(MemoryPool* pool, uint64_t decrementBytes);
167+
164168
/// Invoked to grows a memory pool's free capacity with at least
165169
/// 'incrementBytes'. The function returns true on success, otherwise false.
166170
bool growPool(MemoryPool* pool, uint64_t incrementBytes);

velox/common/memory/MemoryArbitrator.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,9 @@ class NoopArbitrator : public MemoryArbitrator {
9696

9797
// Noop arbitrator has no memory capacity limit so no operation needed for
9898
// memory pool capacity release.
99-
void releaseMemory(MemoryPool* /*unused*/) override {
99+
uint64_t releaseMemory(MemoryPool* /*unused*/, uint64_t /*unused*/) override {
100100
// No-op
101+
return 0ULL;
101102
}
102103

103104
// Noop arbitrator has no memory capacity limit so no operation needed for
@@ -161,6 +162,10 @@ void MemoryArbitrator::unregisterAllFactories() {
161162
SharedArbitrator::unregisterFactory();
162163
}
163164

165+
uint64_t MemoryArbitrator::capacity() {
166+
return capacity_;
167+
}
168+
164169
std::unique_ptr<MemoryReclaimer> MemoryReclaimer::create() {
165170
return std::unique_ptr<MemoryReclaimer>(new MemoryReclaimer());
166171
}

velox/common/memory/MemoryArbitrator.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,10 @@ class MemoryArbitrator {
120120
/// the memory arbitration on demand when actual memory allocation happens.
121121
virtual void reserveMemory(MemoryPool* pool, uint64_t bytes) = 0;
122122

123-
/// Invoked by the memory manager to return back all the reserved memory
124-
/// capacity of a destroying memory pool.
125-
virtual void releaseMemory(MemoryPool* pool) = 0;
123+
/// Invoked by the memory manager to return back the specified amount of
124+
/// reserved memory capacity of a destroying memory pool. If 0 is specified,
125+
/// release all reserve memory. Returns the actually released amount of bytes.
126+
virtual uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) = 0;
126127

127128
/// Invoked by the memory manager to grow a memory pool's capacity.
128129
/// 'pool' is the memory pool to request to grow. 'candidates' is a list
@@ -148,6 +149,7 @@ class MemoryArbitrator {
148149
const std::vector<std::shared_ptr<MemoryPool>>& pools,
149150
uint64_t targetBytes) = 0;
150151

152+
uint64_t capacity();
151153
/// The internal execution stats of the memory arbitrator.
152154
struct Stats {
153155
/// The number of arbitration requests.

velox/common/memory/MemoryPool.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,16 @@ bool MemoryPoolImpl::incrementReservationThreadSafe(
756756
treeMemoryUsage()));
757757
}
758758

759+
uint64_t MemoryPoolImpl::shrinkManaged(
760+
MemoryPool* requestor,
761+
uint64_t targetBytes) {
762+
if (parent_ != nullptr) {
763+
return parent_->shrinkManaged(requestor, targetBytes);
764+
}
765+
VELOX_CHECK_NULL(parent_);
766+
return manager_->shrinkPool(requestor, targetBytes);
767+
};
768+
759769
bool MemoryPoolImpl::maybeIncrementReservation(uint64_t size) {
760770
std::lock_guard<std::mutex> l(mutex_);
761771
return maybeIncrementReservationLocked(size);

velox/common/memory/MemoryPool.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,12 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
363363
/// without actually freeing the used memory.
364364
virtual uint64_t freeBytes() const = 0;
365365

366+
/// Try shrinking up to the specified amount of free memory via memory
367+
/// manager.
368+
virtual uint64_t shrinkManaged(
369+
MemoryPool* requestor,
370+
uint64_t targetBytes = 0) = 0;
371+
366372
/// Invoked to free up to the specified amount of free memory by reducing
367373
/// this memory pool's capacity without actually freeing any used memory. The
368374
/// function returns the actually freed memory capacity in bytes. If
@@ -625,6 +631,8 @@ class MemoryPoolImpl : public MemoryPool {
625631

626632
uint64_t reclaim(uint64_t targetBytes) override;
627633

634+
uint64_t shrinkManaged(MemoryPool* requestor, uint64_t targetBytes) override;
635+
628636
uint64_t shrink(uint64_t targetBytes = 0) override;
629637

630638
uint64_t grow(uint64_t bytes) noexcept override;

velox/common/memory/SharedArbitrator.cpp

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,11 @@ void SharedArbitrator::reserveMemory(MemoryPool* pool, uint64_t /*unused*/) {
163163
pool->grow(reserveBytes);
164164
}
165165

166-
void SharedArbitrator::releaseMemory(MemoryPool* pool) {
166+
uint64_t SharedArbitrator::releaseMemory(MemoryPool* pool, uint64_t bytes) {
167167
std::lock_guard<std::mutex> l(mutex_);
168-
const uint64_t freedBytes = pool->shrink(0);
168+
const uint64_t freedBytes = pool->shrink(bytes);
169169
incrementFreeCapacityLocked(freedBytes);
170+
return freedBytes;
170171
}
171172

172173
std::vector<SharedArbitrator::Candidate> SharedArbitrator::getCandidateStats(
@@ -246,10 +247,7 @@ bool SharedArbitrator::ensureCapacity(
246247
if (checkCapacityGrowth(*requestor, targetBytes)) {
247248
return true;
248249
}
249-
const uint64_t reclaimedBytes = reclaim(requestor, targetBytes);
250-
// NOTE: return the reclaimed bytes back to the arbitrator and let the memory
251-
// arbitration process to grow the requestor's memory capacity accordingly.
252-
incrementFreeCapacity(reclaimedBytes);
250+
reclaim(requestor, targetBytes);
253251
// Check if the requestor has been aborted in reclaim operation above.
254252
if (requestor->aborted()) {
255253
++numFailures_;
@@ -294,51 +292,57 @@ bool SharedArbitrator::arbitrateMemory(
294292
const uint64_t growTarget = std::min(
295293
maxGrowBytes(*requestor),
296294
std::max(memoryPoolTransferCapacity_, targetBytes));
297-
uint64_t freedBytes = decrementFreeCapacity(growTarget);
298-
if (freedBytes >= targetBytes) {
299-
requestor->grow(freedBytes);
300-
return true;
301-
}
302-
VELOX_CHECK_LT(freedBytes, growTarget);
295+
uint64_t unusedFreedBytes = decrementFreeCapacity(growTarget);
303296

304297
auto freeGuard = folly::makeGuard([&]() {
305298
// Returns the unused freed memory capacity back to the arbitrator.
306-
if (freedBytes > 0) {
307-
incrementFreeCapacity(freedBytes);
299+
if (unusedFreedBytes > 0) {
300+
incrementFreeCapacity(unusedFreedBytes);
308301
}
309302
});
310303

311-
freedBytes +=
312-
reclaimFreeMemoryFromCandidates(candidates, growTarget - freedBytes);
313-
if (freedBytes >= targetBytes) {
314-
const uint64_t bytesToGrow = std::min(growTarget, freedBytes);
315-
requestor->grow(bytesToGrow);
316-
freedBytes -= bytesToGrow;
304+
if (unusedFreedBytes >= targetBytes) {
305+
requestor->grow(unusedFreedBytes);
306+
unusedFreedBytes = 0;
307+
return true;
308+
}
309+
VELOX_CHECK_LT(unusedFreedBytes, growTarget);
310+
311+
reclaimFreeMemoryFromCandidates(candidates, growTarget - unusedFreedBytes);
312+
unusedFreedBytes += decrementFreeCapacity(growTarget - unusedFreedBytes);
313+
if (unusedFreedBytes >= targetBytes) {
314+
requestor->grow(unusedFreedBytes);
315+
unusedFreedBytes = 0;
317316
return true;
318317
}
319318

320-
VELOX_CHECK_LT(freedBytes, growTarget);
321-
freedBytes += reclaimUsedMemoryFromCandidates(
322-
requestor, candidates, growTarget - freedBytes);
319+
VELOX_CHECK_LT(unusedFreedBytes, growTarget);
320+
reclaimUsedMemoryFromCandidates(
321+
requestor, candidates, growTarget - unusedFreedBytes);
322+
unusedFreedBytes += decrementFreeCapacity(growTarget - unusedFreedBytes);
323323
if (requestor->aborted()) {
324324
++numFailures_;
325325
VELOX_MEM_POOL_ABORTED("The requestor pool has been aborted.");
326326
}
327327

328328
VELOX_CHECK(!requestor->aborted());
329329

330-
if (freedBytes < targetBytes) {
330+
if (unusedFreedBytes < targetBytes) {
331331
VELOX_MEM_LOG(WARNING)
332332
<< "Failed to arbitrate sufficient memory for memory pool "
333333
<< requestor->name() << ", request " << succinctBytes(targetBytes)
334-
<< ", only " << succinctBytes(freedBytes)
334+
<< ", only " << succinctBytes(unusedFreedBytes)
335335
<< " has been freed, Arbitrator state: " << toString();
336336
return false;
337337
}
338338

339-
const uint64_t bytesToGrow = std::min(freedBytes, growTarget);
340-
requestor->grow(bytesToGrow);
341-
freedBytes -= bytesToGrow;
339+
if (unusedFreedBytes > growTarget) {
340+
requestor->grow(growTarget);
341+
unusedFreedBytes -= growTarget;
342+
return true;
343+
}
344+
requestor->grow(unusedFreedBytes);
345+
unusedFreedBytes = 0;
342346
return true;
343347
}
344348

@@ -359,7 +363,9 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates(
359363
if (bytesToShrink <= 0) {
360364
break;
361365
}
362-
freedBytes += candidate.pool->shrink(bytesToShrink);
366+
uint64_t shrunk = candidate.pool->shrink(bytesToShrink);
367+
incrementFreeCapacity(shrunk);
368+
freedBytes += shrunk;
363369
if (freedBytes >= targetBytes) {
364370
break;
365371
}
@@ -399,6 +405,7 @@ uint64_t SharedArbitrator::reclaim(
399405
uint64_t freedBytes{0};
400406
try {
401407
freedBytes = pool->shrink(targetBytes);
408+
incrementFreeCapacity(freedBytes);
402409
if (freedBytes < targetBytes) {
403410
pool->reclaim(targetBytes - freedBytes);
404411
}
@@ -408,7 +415,7 @@ uint64_t SharedArbitrator::reclaim(
408415
abort(pool, std::current_exception());
409416
// Free up all the free capacity from the aborted pool as the associated
410417
// query has failed at this point.
411-
pool->shrink();
418+
incrementFreeCapacity(pool->shrink());
412419
}
413420
const uint64_t newCapacity = pool->capacity();
414421
VELOX_CHECK_GE(oldCapacity, newCapacity);

velox/common/memory/SharedArbitrator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class SharedArbitrator : public MemoryArbitrator {
4343

4444
void reserveMemory(MemoryPool* pool, uint64_t /*unused*/) final;
4545

46-
void releaseMemory(MemoryPool* pool) final;
46+
uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) final;
4747

4848
bool growMemory(
4949
MemoryPool* pool,

velox/common/memory/tests/MemoryArbitratorTest.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,9 @@ class FakeTestArbitrator : public MemoryArbitrator {
157157
.memoryPoolTransferCapacity = config.memoryPoolTransferCapacity,
158158
.retryArbitrationFailure = config.retryArbitrationFailure}) {}
159159

160-
void reserveMemory(MemoryPool* pool, uint64_t bytes) override {
161-
VELOX_NYI();
162-
}
160+
void reserveMemory(MemoryPool* pool, uint64_t bytes) override{VELOX_NYI()}
163161

164-
void releaseMemory(MemoryPool* pool) override {
165-
VELOX_NYI();
166-
}
162+
uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) override{VELOX_NYI()}
167163

168164
std::string kind() const override {
169165
return "USER";

velox/common/memory/tests/MemoryManagerTest.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class FakeTestArbitrator : public MemoryArbitrator {
125125
VELOX_NYI();
126126
}
127127

128-
void releaseMemory(MemoryPool* pool) override {
128+
uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) override {
129129
VELOX_NYI();
130130
}
131131

@@ -209,9 +209,6 @@ TEST_F(MemoryManagerTest, addPoolWithArbitrator) {
209209
options.allocator = allocator.get();
210210
options.capacity = kCapacity;
211211
options.arbitratorKind = arbitratorKind_;
212-
// The arbitrator capacity will be overridden by the memory manager's
213-
// capacity.
214-
options.capacity = options.capacity;
215212
const uint64_t initialPoolCapacity = options.capacity / 32;
216213
options.memoryPoolInitCapacity = initialPoolCapacity;
217214
MemoryManager manager{options};

velox/common/memory/tests/MockSharedArbitratorTest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ class MockMemoryOperator {
307307
for (const auto& allocation : allocationsToFree) {
308308
pool_->free(allocation.buffer, allocation.size);
309309
}
310-
return pool_->shrink(targetBytes);
310+
return pool_->shrinkManaged(pool, targetBytes);
311311
}
312312

313313
void abort(MemoryPool* pool) {

velox/core/QueryCtx.h

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,13 @@ class QueryCtx {
6767
return cache_;
6868
}
6969

70-
folly::Executor* executor() const {
71-
if (executor_ != nullptr) {
72-
return executor_;
73-
}
74-
auto executor = executorKeepalive_.get();
70+
bool isExecutorSupplied() const {
71+
auto executor = executor0();
72+
return executor != nullptr;
73+
}
74+
75+
folly::Executor* FOLLY_NONNULL executor() const {
76+
auto executor = executor0();
7577
VELOX_CHECK(executor, "Executor was not supplied.");
7678
return executor;
7779
}
@@ -130,6 +132,14 @@ class QueryCtx {
130132
}
131133
}
132134

135+
folly::Executor* executor0() const {
136+
if (executor_ != nullptr) {
137+
return executor_;
138+
}
139+
auto executor = executorKeepalive_.get();
140+
return executor;
141+
}
142+
133143
const std::string queryId_;
134144

135145
std::unordered_map<std::string, std::shared_ptr<Config>> connectorConfigs_;

velox/dwio/dwrf/test/WriterFlushTest.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,10 @@ class MockMemoryPool : public velox::memory::MemoryPool {
208208
VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__);
209209
}
210210

211+
uint64_t shrinkManaged(MemoryPool* /*unused*/, uint64_t /*unused*/) override {
212+
VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__);
213+
}
214+
211215
uint64_t grow(uint64_t /*unused*/) noexcept override {
212216
VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__);
213217
}

0 commit comments

Comments
 (0)