Skip to content

Commit 9e73be3

Browse files
David NavasMarcelo Vanzin
David Navas
authored and
Marcelo Vanzin
committed
[SPARK-27726][CORE] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
The details of the PR are explored in-depth in the sub-tasks of the umbrella jira SPARK-27726. Briefly: 1. Stop issuing asynchronous requests to cleanup elements in the tracking store when a request is already pending 2. Fix a couple of thread-safety issues (mutable state and mis-ordered updates) 3. Move Summary deletion outside of Stage deletion loop like Tasks already are 4. Reimplement multi-delete in a removeAllKeys call which allows InMemoryStore to implement it in a performant manner. 5. Some generic typing and exception handling cleanup We see about five orders of magnitude improvement in the deletion code, which for us is the difference between a server that needs restarting daily, and one that is stable over weeks. Unit tests for the fire-once asynchronous code and the removeAll calls in both LevelDB and InMemoryStore are supplied. It was noted that the testing code for the LevelDB and InMemoryStore is highly repetitive, and should probably be merged, but we did not attempt that in this PR. A version of this code was run in our production 2.3.3 and we were able to sustain higher throughput without going into GC overload (which was happening on a daily basis some weeks ago). A version of this code was also put under a purpose-built Performance Suite of tests to verify performance under both types of Store implementations for both before and after code streams and for both total and partial delete cases (this code is not included in this PR). Closes apache#24616 from davidnavas/PentaBugFix. Authored-by: David Navas <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 1e0facb commit 9e73be3

File tree

11 files changed

+450
-158
lines changed

11 files changed

+450
-158
lines changed

common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java

+169-77
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,18 @@
2121
import java.util.Collection;
2222
import java.util.Collections;
2323
import java.util.Iterator;
24+
import java.util.HashSet;
2425
import java.util.List;
2526
import java.util.NoSuchElementException;
2627
import java.util.concurrent.ConcurrentHashMap;
2728
import java.util.concurrent.ConcurrentMap;
29+
import java.util.function.BiConsumer;
30+
import java.util.function.Predicate;
2831
import java.util.stream.Collectors;
2932
import java.util.stream.Stream;
3033

3134
import com.google.common.base.Objects;
3235
import com.google.common.base.Preconditions;
33-
import com.google.common.base.Throwables;
3436

3537
import org.apache.spark.annotation.Private;
3638

@@ -43,7 +45,7 @@
4345
public class InMemoryStore implements KVStore {
4446

4547
private Object metadata;
46-
private ConcurrentMap<Class<?>, InstanceList> data = new ConcurrentHashMap<>();
48+
private InMemoryLists inMemoryLists = new InMemoryLists();
4749

4850
@Override
4951
public <T> T getMetadata(Class<T> klass) {
@@ -57,13 +59,13 @@ public void setMetadata(Object value) {
5759

5860
@Override
5961
public long count(Class<?> type) {
60-
InstanceList list = data.get(type);
62+
InstanceList<?> list = inMemoryLists.get(type);
6163
return list != null ? list.size() : 0;
6264
}
6365

6466
@Override
6567
public long count(Class<?> type, String index, Object indexedValue) throws Exception {
66-
InstanceList list = data.get(type);
68+
InstanceList<?> list = inMemoryLists.get(type);
6769
int count = 0;
6870
Object comparable = asKey(indexedValue);
6971
KVTypeInfo.Accessor accessor = list.getIndexAccessor(index);
@@ -77,45 +79,51 @@ public long count(Class<?> type, String index, Object indexedValue) throws Excep
7779

7880
@Override
7981
public <T> T read(Class<T> klass, Object naturalKey) {
80-
InstanceList list = data.get(klass);
81-
Object value = list != null ? list.get(naturalKey) : null;
82+
InstanceList<T> list = inMemoryLists.get(klass);
83+
T value = list != null ? list.get(naturalKey) : null;
8284
if (value == null) {
8385
throw new NoSuchElementException();
8486
}
85-
return klass.cast(value);
87+
return value;
8688
}
8789

8890
@Override
8991
public void write(Object value) throws Exception {
90-
InstanceList list = data.computeIfAbsent(value.getClass(), key -> {
91-
try {
92-
return new InstanceList(key);
93-
} catch (Exception e) {
94-
throw Throwables.propagate(e);
95-
}
96-
});
97-
list.put(value);
92+
inMemoryLists.write(value);
9893
}
9994

10095
@Override
10196
public void delete(Class<?> type, Object naturalKey) {
102-
InstanceList list = data.get(type);
97+
InstanceList<?> list = inMemoryLists.get(type);
10398
if (list != null) {
10499
list.delete(naturalKey);
105100
}
106101
}
107102

108103
@Override
109104
public <T> KVStoreView<T> view(Class<T> type){
110-
InstanceList list = data.get(type);
111-
return list != null ? list.view(type)
112-
: new InMemoryView<>(type, Collections.<T>emptyList(), null);
105+
InstanceList<T> list = inMemoryLists.get(type);
106+
return list != null ? list.view() : emptyView();
113107
}
114108

115109
@Override
116110
public void close() {
117111
metadata = null;
118-
data.clear();
112+
inMemoryLists.clear();
113+
}
114+
115+
@Override
116+
public <T> boolean removeAllByIndexValues(
117+
Class<T> klass,
118+
String index,
119+
Collection<?> indexValues) {
120+
InstanceList<T> list = inMemoryLists.get(klass);
121+
122+
if (list != null) {
123+
return list.countingRemoveAllByIndexValues(index, indexValues) > 0;
124+
} else {
125+
return false;
126+
}
119127
}
120128

121129
@SuppressWarnings("unchecked")
@@ -126,64 +134,150 @@ private static Comparable<Object> asKey(Object in) {
126134
return (Comparable<Object>) in;
127135
}
128136

129-
private static class InstanceList {
137+
@SuppressWarnings("unchecked")
138+
private static <T> KVStoreView<T> emptyView() {
139+
return (InMemoryView<T>) InMemoryView.EMPTY_VIEW;
140+
}
141+
142+
/**
143+
* Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a
144+
* class of type T to an InstanceList of type T.
145+
*/
146+
private static class InMemoryLists {
147+
private final ConcurrentMap<Class<?>, InstanceList<?>> data = new ConcurrentHashMap<>();
148+
149+
@SuppressWarnings("unchecked")
150+
public <T> InstanceList<T> get(Class<T> type) {
151+
return (InstanceList<T>) data.get(type);
152+
}
153+
154+
@SuppressWarnings("unchecked")
155+
public <T> void write(T value) throws Exception {
156+
InstanceList<T> list =
157+
(InstanceList<T>) data.computeIfAbsent(value.getClass(), InstanceList::new);
158+
list.put(value);
159+
}
160+
161+
public void clear() {
162+
data.clear();
163+
}
164+
}
165+
166+
private static class InstanceList<T> {
167+
168+
/**
169+
* A BiConsumer to control multi-entity removal. We use this in a forEach rather than an
170+
* iterator because there is a bug in jdk8 which affects remove() on all concurrent map
171+
* iterators. https://bugs.openjdk.java.net/browse/JDK-8078645
172+
*/
173+
private static class CountingRemoveIfForEach<T> implements BiConsumer<Comparable<Object>, T> {
174+
private final ConcurrentMap<Comparable<Object>, T> data;
175+
private final Predicate<? super T> filter;
176+
177+
/**
178+
* Keeps a count of the number of elements removed. This count is not currently surfaced
179+
* to clients of KVStore as Java's generic removeAll() construct returns only a boolean,
180+
* but I found it handy to have the count of elements removed while debugging; a count being
181+
* no more complicated than a boolean, I've retained that behavior here, even though there
182+
* is no current requirement.
183+
*/
184+
private int count = 0;
185+
186+
CountingRemoveIfForEach(
187+
ConcurrentMap<Comparable<Object>, T> data,
188+
Predicate<? super T> filter) {
189+
this.data = data;
190+
this.filter = filter;
191+
}
192+
193+
@Override
194+
public void accept(Comparable<Object> key, T value) {
195+
if (filter.test(value)) {
196+
if (data.remove(key, value)) {
197+
count++;
198+
}
199+
}
200+
}
201+
202+
public int count() { return count; }
203+
}
130204

131205
private final KVTypeInfo ti;
132206
private final KVTypeInfo.Accessor naturalKey;
133-
private final ConcurrentMap<Comparable<Object>, Object> data;
134-
135-
private int size;
207+
private final ConcurrentMap<Comparable<Object>, T> data;
136208

137-
private InstanceList(Class<?> type) throws Exception {
138-
this.ti = new KVTypeInfo(type);
209+
private InstanceList(Class<?> klass) {
210+
this.ti = new KVTypeInfo(klass);
139211
this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
140212
this.data = new ConcurrentHashMap<>();
141-
this.size = 0;
142213
}
143214

144215
KVTypeInfo.Accessor getIndexAccessor(String indexName) {
145216
return ti.getAccessor(indexName);
146217
}
147218

148-
public Object get(Object key) {
219+
int countingRemoveAllByIndexValues(String index, Collection<?> indexValues) {
220+
Predicate<? super T> filter = getPredicate(ti.getAccessor(index), indexValues);
221+
CountingRemoveIfForEach<T> callback = new CountingRemoveIfForEach<>(data, filter);
222+
223+
data.forEach(callback);
224+
return callback.count();
225+
}
226+
227+
public T get(Object key) {
149228
return data.get(asKey(key));
150229
}
151230

152-
public void put(Object value) throws Exception {
153-
Preconditions.checkArgument(ti.type().equals(value.getClass()),
154-
"Unexpected type: %s", value.getClass());
155-
if (data.put(asKey(naturalKey.get(value)), value) == null) {
156-
size++;
157-
}
231+
public void put(T value) throws Exception {
232+
data.put(asKey(naturalKey.get(value)), value);
158233
}
159234

160235
public void delete(Object key) {
161-
if (data.remove(asKey(key)) != null) {
162-
size--;
163-
}
236+
data.remove(asKey(key));
164237
}
165238

166239
public int size() {
167-
return size;
240+
return data.size();
168241
}
169242

170-
@SuppressWarnings("unchecked")
171-
public <T> InMemoryView<T> view(Class<T> type) {
172-
Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: %s", type);
173-
Collection<T> all = (Collection<T>) data.values();
174-
return new InMemoryView<>(type, all, ti);
243+
public InMemoryView<T> view() {
244+
return new InMemoryView<>(data.values(), ti);
245+
}
246+
247+
private static <T> Predicate<? super T> getPredicate(
248+
KVTypeInfo.Accessor getter,
249+
Collection<?> values) {
250+
if (Comparable.class.isAssignableFrom(getter.getType())) {
251+
HashSet<?> set = new HashSet<>(values);
252+
253+
return (value) -> set.contains(indexValueForEntity(getter, value));
254+
} else {
255+
HashSet<Comparable> set = new HashSet<>(values.size());
256+
for (Object key : values) {
257+
set.add(asKey(key));
258+
}
259+
return (value) -> set.contains(asKey(indexValueForEntity(getter, value)));
260+
}
175261
}
176262

263+
private static Object indexValueForEntity(KVTypeInfo.Accessor getter, Object entity) {
264+
try {
265+
return getter.get(entity);
266+
} catch (ReflectiveOperationException e) {
267+
throw new RuntimeException(e);
268+
}
269+
}
177270
}
178271

179272
private static class InMemoryView<T> extends KVStoreView<T> {
273+
private static final InMemoryView<?> EMPTY_VIEW =
274+
new InMemoryView<>(Collections.emptyList(), null);
180275

181276
private final Collection<T> elements;
182277
private final KVTypeInfo ti;
183278
private final KVTypeInfo.Accessor natural;
184279

185-
InMemoryView(Class<T> type, Collection<T> elements, KVTypeInfo ti) {
186-
super(type);
280+
InMemoryView(Collection<T> elements, KVTypeInfo ti) {
187281
this.elements = elements;
188282
this.ti = ti;
189283
this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null;
@@ -195,34 +289,32 @@ public Iterator<T> iterator() {
195289
return new InMemoryIterator<>(elements.iterator());
196290
}
197291

198-
try {
199-
KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;
200-
int modifier = ascending ? 1 : -1;
292+
KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;
293+
int modifier = ascending ? 1 : -1;
201294

202-
final List<T> sorted = copyElements();
203-
Collections.sort(sorted, (e1, e2) -> modifier * compare(e1, e2, getter));
204-
Stream<T> stream = sorted.stream();
295+
final List<T> sorted = copyElements();
296+
sorted.sort((e1, e2) -> modifier * compare(e1, e2, getter));
297+
Stream<T> stream = sorted.stream();
205298

206-
if (first != null) {
207-
stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0);
208-
}
209-
210-
if (last != null) {
211-
stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0);
212-
}
299+
if (first != null) {
300+
Comparable<?> firstKey = asKey(first);
301+
stream = stream.filter(e -> modifier * compare(e, getter, firstKey) >= 0);
302+
}
213303

214-
if (skip > 0) {
215-
stream = stream.skip(skip);
216-
}
304+
if (last != null) {
305+
Comparable<?> lastKey = asKey(last);
306+
stream = stream.filter(e -> modifier * compare(e, getter, lastKey) <= 0);
307+
}
217308

218-
if (max < sorted.size()) {
219-
stream = stream.limit((int) max);
220-
}
309+
if (skip > 0) {
310+
stream = stream.skip(skip);
311+
}
221312

222-
return new InMemoryIterator<>(stream.iterator());
223-
} catch (Exception e) {
224-
throw Throwables.propagate(e);
313+
if (max < sorted.size()) {
314+
stream = stream.limit((int) max);
225315
}
316+
317+
return new InMemoryIterator<>(stream.iterator());
226318
}
227319

228320
/**
@@ -232,9 +324,10 @@ private List<T> copyElements() {
232324
if (parent != null) {
233325
KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
234326
Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index.");
327+
Comparable<?> parentKey = asKey(parent);
235328

236329
return elements.stream()
237-
.filter(e -> compare(e, parentGetter, parent) == 0)
330+
.filter(e -> compare(e, parentGetter, parentKey) == 0)
238331
.collect(Collectors.toList());
239332
} else {
240333
return new ArrayList<>(elements);
@@ -243,24 +336,23 @@ private List<T> copyElements() {
243336

244337
private int compare(T e1, T e2, KVTypeInfo.Accessor getter) {
245338
try {
246-
int diff = compare(e1, getter, getter.get(e2));
339+
int diff = compare(e1, getter, asKey(getter.get(e2)));
247340
if (diff == 0 && getter != natural) {
248-
diff = compare(e1, natural, natural.get(e2));
341+
diff = compare(e1, natural, asKey(natural.get(e2)));
249342
}
250343
return diff;
251-
} catch (Exception e) {
252-
throw Throwables.propagate(e);
344+
} catch (ReflectiveOperationException e) {
345+
throw new RuntimeException(e);
253346
}
254347
}
255348

256-
private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) {
349+
private int compare(T e1, KVTypeInfo.Accessor getter, Comparable<?> v2) {
257350
try {
258-
return asKey(getter.get(e1)).compareTo(asKey(v2));
259-
} catch (Exception e) {
260-
throw Throwables.propagate(e);
351+
return asKey(getter.get(e1)).compareTo(v2);
352+
} catch (ReflectiveOperationException e) {
353+
throw new RuntimeException(e);
261354
}
262355
}
263-
264356
}
265357

266358
private static class InMemoryIterator<T> implements KVStoreIterator<T> {

0 commit comments

Comments
 (0)