Skip to content

Commit 49d9f12

Browse files
authored
Make MemCache / Wrapper backends thread-safe (#2112)
* Atomic vector * Lockless MemCache * Lockless threadsafe wrapper. * Bugfix * Build fix. * Fixing elo-gaining bug.
1 parent dfb8fbe commit 49d9f12

File tree

3 files changed

+134
-36
lines changed

3 files changed

+134
-36
lines changed

src/neural/memcache.cc

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "neural/memcache.h"
2929

3030
#include "neural/cache.h"
31+
#include "utils/atomic_vector.h"
3132
#include "utils/smallarray.h"
3233

3334
namespace lczero {
@@ -79,11 +80,8 @@ class MemCacheComputation : public BackendComputation {
7980
MemCacheComputation(std::unique_ptr<BackendComputation> wrapped_computation,
8081
MemCache* memcache)
8182
: wrapped_computation_(std::move(wrapped_computation)),
82-
memcache_(memcache) {
83-
keys_.reserve(memcache_->max_batch_size_);
84-
values_.reserve(memcache_->max_batch_size_);
85-
result_ptrs_.reserve(memcache_->max_batch_size_);
86-
}
83+
memcache_(memcache),
84+
entries_(memcache->max_batch_size_) {}
8785

8886
private:
8987
size_t UsedBatchSize() const override {
@@ -99,10 +97,11 @@ class MemCacheComputation : public BackendComputation {
9997
return AddInputResult::FETCHED_IMMEDIATELY;
10098
}
10199
}
102-
keys_.push_back(hash);
103-
auto value = std::make_unique<CachedValue>();
104-
value->p.reset(new float[result.p.size()]);
105-
result_ptrs_.push_back(result);
100+
size_t entry_idx = entries_.emplace_back(
101+
Entry{hash, std::make_unique<CachedValue>(), result});
102+
auto& value = entries_[entry_idx].value;
103+
value->p.reset(pos.legal_moves.empty() ? nullptr
104+
: new float[pos.legal_moves.size()]);
106105
return wrapped_computation_->AddInput(
107106
pos, EvalResultPtr{&value->q,
108107
&value->d,
@@ -112,17 +111,21 @@ class MemCacheComputation : public BackendComputation {
112111

113112
virtual void ComputeBlocking() override {
114113
wrapped_computation_->ComputeBlocking();
115-
for (size_t i = 0; i < keys_.size(); ++i) {
116-
CachedValueToEvalResult(*values_[i], result_ptrs_[i]);
117-
memcache_->cache_.Insert(keys_[i], std::move(values_[i]));
114+
for (auto& entry : entries_) {
115+
CachedValueToEvalResult(*entry.value, entry.result_ptr);
116+
memcache_->cache_.Insert(entry.key, std::move(entry.value));
118117
}
119118
}
120119

120+
struct Entry {
121+
uint64_t key;
122+
std::unique_ptr<CachedValue> value;
123+
EvalResultPtr result_ptr;
124+
};
125+
121126
std::unique_ptr<BackendComputation> wrapped_computation_;
122-
std::vector<uint64_t> keys_;
123-
std::vector<std::unique_ptr<CachedValue>> values_;
124-
std::vector<EvalResultPtr> result_ptrs_;
125127
MemCache* memcache_;
128+
AtomicVector<Entry> entries_;
126129
};
127130

128131
std::unique_ptr<BackendComputation> MemCache::CreateComputation() {
@@ -138,8 +141,11 @@ std::optional<EvalResult> MemCache::GetCachedEvaluation(
138141
result.d = lock->d;
139142
result.q = lock->q;
140143
result.m = lock->m;
141-
std::copy(lock->p.get(), lock->p.get() + pos.legal_moves.size(),
142-
result.p.begin());
144+
if (lock->p) {
145+
result.p.reserve(pos.legal_moves.size());
146+
std::copy(lock->p.get(), lock->p.get() + pos.legal_moves.size(),
147+
std::back_inserter(result.p));
148+
}
143149
return result;
144150
}
145151

src/neural/wrapper.cc

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
#include "neural/encoder.h"
3434
#include "neural/shared_params.h"
35+
#include "utils/atomic_vector.h"
3536
#include "utils/fastmath.h"
3637

3738
namespace lczero {
@@ -77,30 +78,30 @@ class NetworkAsBackend : public Backend {
7778
class NetworkAsBackendComputation : public BackendComputation {
7879
public:
7980
NetworkAsBackendComputation(NetworkAsBackend* backend)
80-
: backend_(backend), computation_(backend_->network_->NewComputation()) {
81-
results_.reserve(backend_->attrs_.maximum_batch_size);
82-
moves_.reserve(backend_->attrs_.maximum_batch_size);
83-
transforms_.reserve(backend_->attrs_.maximum_batch_size);
84-
}
81+
: backend_(backend),
82+
computation_(backend_->network_->NewComputation()),
83+
entries_(backend_->attrs_.maximum_batch_size) {}
8584

86-
size_t UsedBatchSize() const override { return computation_->GetBatchSize(); }
85+
size_t UsedBatchSize() const override { return entries_.size(); }
8786

8887
AddInputResult AddInput(const EvalPosition& pos,
8988
EvalResultPtr result) override {
9089
int transform;
91-
computation_->AddInput(EncodePositionForNN(backend_->input_format_, pos.pos,
92-
8, backend_->fill_empty_history_,
93-
&transform));
94-
results_.push_back(result);
95-
moves_.emplace_back(pos.legal_moves.begin(), pos.legal_moves.end());
96-
transforms_.push_back(transform);
90+
const size_t idx = entries_.emplace_back(Entry{
91+
.input = EncodePositionForNN(backend_->input_format_, pos.pos, 8,
92+
backend_->fill_empty_history_, &transform),
93+
.legal_moves = MoveList(pos.legal_moves.begin(), pos.legal_moves.end()),
94+
.result = result,
95+
.transform = 0});
96+
entries_[idx].transform = transform;
9797
return ENQUEUED_FOR_EVAL;
9898
}
9999

100100
void ComputeBlocking() override {
101+
for (auto& entry : entries_) computation_->AddInput(std::move(entry.input));
101102
computation_->ComputeBlocking();
102-
for (size_t i = 0; i < results_.size(); ++i) {
103-
const EvalResultPtr& result = results_[i];
103+
for (size_t i = 0; i < entries_.size(); ++i) {
104+
const EvalResultPtr& result = entries_[i].result;
104105
if (result.q) *result.q = computation_->GetQVal(i);
105106
if (result.d) *result.d = computation_->GetDVal(i);
106107
if (result.m) *result.m = computation_->GetMVal(i);
@@ -110,8 +111,8 @@ class NetworkAsBackendComputation : public BackendComputation {
110111

111112
void SoftmaxPolicy(std::span<float> dst,
112113
const NetworkComputation* computation, int idx) {
113-
const std::vector<Move>& moves = moves_[idx];
114-
const int transform = transforms_[idx];
114+
const std::vector<Move>& moves = entries_[idx].legal_moves;
115+
const int transform = entries_[idx].transform;
115116
// Copy the values to the destination array and compute the maximum.
116117
const float max_p = std::accumulate(
117118
moves.begin(), moves.end(), -std::numeric_limits<float>::infinity(),
@@ -131,11 +132,16 @@ class NetworkAsBackendComputation : public BackendComputation {
131132
}
132133

133134
private:
135+
struct Entry {
136+
InputPlanes input;
137+
MoveList legal_moves;
138+
EvalResultPtr result;
139+
int transform;
140+
};
141+
134142
NetworkAsBackend* backend_;
135143
std::unique_ptr<NetworkComputation> computation_;
136-
std::vector<std::vector<Move>> moves_;
137-
std::vector<EvalResultPtr> results_;
138-
std::vector<int> transforms_;
144+
AtomicVector<Entry> entries_;
139145
};
140146

141147
std::unique_ptr<BackendComputation> NetworkAsBackend::CreateComputation() {

src/utils/atomic_vector.h

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
This file is part of Leela Chess Zero.
3+
Copyright (C) 2024 The LCZero Authors
4+
5+
Leela Chess is free software: you can redistribute it and/or modify
6+
it under the terms of the GNU General Public License as published by
7+
the Free Software Foundation, either version 3 of the License, or
8+
(at your option) any later version.
9+
10+
Leela Chess is distributed in the hope that it will be useful,
11+
but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
GNU General Public License for more details.
14+
15+
You should have received a copy of the GNU General Public License
16+
along with Leela Chess. If not, see <http://www.gnu.org/licenses/>.
17+
18+
Additional permission under GNU GPL version 3 section 7
19+
20+
If you modify this Program, or any covered work, by linking or
21+
combining it with NVIDIA Corporation's libraries from the NVIDIA CUDA
22+
Toolkit and the NVIDIA CUDA Deep Neural Network library (or a
23+
modified version of those libraries), containing parts covered by the
24+
terms of the respective license agreement, the licensors of this
25+
Program grant you additional permission to convey the resulting work.
26+
*/
27+
28+
#pragma once
29+
30+
namespace lczero {
31+
32+
template <typename T>
33+
class AtomicVector {
34+
public:
35+
explicit AtomicVector(size_t capacity) : capacity_(capacity), size_(0) {
36+
data_ = new
37+
typename std::aligned_storage<sizeof(T), alignof(T)>::type[capacity];
38+
}
39+
40+
~AtomicVector() {
41+
clear();
42+
delete[] data_;
43+
}
44+
45+
// Thread safe, returns the index of the inserted element.
46+
template <typename... Args>
47+
size_t emplace_back(Args&&... args) {
48+
size_t i = size_.fetch_add(1, std::memory_order_relaxed);
49+
assert(i < capacity_);
50+
new (&data_[i]) T(std::forward<Args>(args)...);
51+
return i;
52+
}
53+
54+
T& operator[](size_t i) {
55+
assert(i < size());
56+
return *reinterpret_cast<T*>(&data_[i]);
57+
}
58+
59+
const T& operator[](size_t i) const {
60+
assert(i < size());
61+
return *reinterpret_cast<const T*>(&data_[i]);
62+
}
63+
64+
size_t size() const { return size_.load(std::memory_order_relaxed); }
65+
size_t capacity() const { return capacity_; }
66+
67+
// Not thread safe.
68+
void clear() {
69+
for (size_t i = size_.load(std::memory_order_relaxed); i-- > 0;) {
70+
reinterpret_cast<T*>(&data_[i])->~T();
71+
}
72+
size_.store(0, std::memory_order_relaxed);
73+
}
74+
75+
T* begin() { return reinterpret_cast<T*>(data_); }
76+
T* end() { return reinterpret_cast<T*>(data_) + size(); }
77+
const T* begin() const { return reinterpret_cast<const T*>(data_); }
78+
const T* end() const { return reinterpret_cast<const T*>(data_) + size(); }
79+
80+
private:
81+
const size_t capacity_;
82+
std::atomic<size_t> size_;
83+
typename std::aligned_storage<sizeof(T), alignof(T)>::type* data_;
84+
};
85+
86+
} // namespace lczero

0 commit comments

Comments
 (0)