Skip to content

Commit 617432e

Browse files
authored
Ensure BatchCursorFlux does not drop an error
The BatchCursor class can be closed by the BatchCursorFlux class but there is no signalling for the BatchCursor onNext publisher. This leads to a next() called after the cursor was closed error and that breaks the reactive streams spec as the error ends up being dropped. JAVA-4849
1 parent d5fb38f commit 617432e

File tree

3 files changed

+83
-5
lines changed

3 files changed

+83
-5
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursor.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import reactor.core.publisher.Mono;
2121

2222
import java.util.List;
23+
import java.util.function.Supplier;
2324

24-
import static com.mongodb.reactivestreams.client.internal.MongoOperationPublisher.sinkToCallback;
2525

2626
/**
2727
* <p>This class is not part of the public API and may be removed or changed at any time</p>
@@ -35,7 +35,22 @@ public BatchCursor(final AsyncBatchCursor<T> wrapped) {
3535
}
3636

3737
public Publisher<List<T>> next() {
38-
return Mono.create(sink -> wrapped.next(sinkToCallback(sink)));
38+
return next(() -> false);
39+
}
40+
41+
public Publisher<List<T>> next(final Supplier<Boolean> hasBeenCancelled) {
42+
return Mono.create(sink -> wrapped.next(
43+
(result, t) -> {
44+
if (!hasBeenCancelled.get()) {
45+
if (t != null) {
46+
sink.error(t);
47+
} else if (result == null) {
48+
sink.success();
49+
} else {
50+
sink.success(result);
51+
}
52+
}
53+
}));
3954
}
4055

4156
public void setBatchSize(final int batchSize) {

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorFlux.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private void recurseCursor(){
8484
sink.complete();
8585
} else {
8686
batchCursor.setBatchSize(calculateBatchSize(sink.requestedFromDownstream()));
87-
Mono.from(batchCursor.next())
87+
Mono.from(batchCursor.next(() -> sink.isCancelled()))
8888
.doOnCancel(this::closeCursor)
8989
.doOnError((e) -> {
9090
try {

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/internal/BatchCursorFluxTest.java

+65-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
package com.mongodb.reactivestreams.client.internal;
1717

1818
import com.mongodb.MongoClientSettings;
19+
import com.mongodb.MongoCursorNotFoundException;
1920
import com.mongodb.client.model.changestream.ChangeStreamDocument;
2021
import com.mongodb.client.result.InsertOneResult;
2122
import com.mongodb.event.CommandEvent;
23+
import com.mongodb.event.CommandStartedEvent;
2224
import com.mongodb.internal.connection.TestCommandListener;
2325
import com.mongodb.reactivestreams.client.FindPublisher;
2426
import com.mongodb.reactivestreams.client.MongoClient;
@@ -35,13 +37,17 @@
3537
import org.junit.jupiter.api.AfterEach;
3638
import org.junit.jupiter.api.BeforeEach;
3739
import org.junit.jupiter.api.DisplayName;
40+
import org.junit.jupiter.api.Test;
3841
import org.junit.jupiter.api.extension.ExtendWith;
3942
import org.mockito.Mock;
4043
import org.mockito.junit.jupiter.MockitoExtension;
4144
import org.reactivestreams.Publisher;
45+
import reactor.core.publisher.Flux;
46+
import reactor.core.publisher.Hooks;
4247
import reactor.core.publisher.Mono;
4348

4449
import java.util.List;
50+
import java.util.concurrent.atomic.AtomicBoolean;
4551
import java.util.stream.Collectors;
4652
import java.util.stream.IntStream;
4753

@@ -56,13 +62,12 @@
5662
import static java.util.Collections.singletonList;
5763
import static org.junit.jupiter.api.Assertions.assertAll;
5864
import static org.junit.jupiter.api.Assertions.assertEquals;
65+
import static org.junit.jupiter.api.Assertions.assertFalse;
5966
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
6067
import static org.junit.jupiter.api.Assertions.assertNotNull;
6168
import static org.junit.jupiter.api.Assumptions.assumeTrue;
6269
import static org.mockito.Mockito.when;
6370

64-
import org.junit.jupiter.api.Test;
65-
6671
@ExtendWith(MockitoExtension.class)
6772
public class BatchCursorFluxTest {
6873

@@ -299,6 +304,64 @@ void changeStreamPublisherCompletesAfterDroppingCollection() {
299304
subscriber.assertNoErrors();
300305
}
301306

307+
@Test
308+
@DisplayName("Ensure BatchCursor does not drop an error")
309+
public void testBatchCursorDoesNotDropAnError() {
310+
try {
311+
AtomicBoolean errorDropped = new AtomicBoolean();
312+
Hooks.onErrorDropped(t -> errorDropped.set(true));
313+
Mono.from(collection.insertMany(createDocs(200))).block();
314+
315+
Flux.fromStream(IntStream.range(1, 200).boxed())
316+
.flatMap(i ->
317+
Flux.fromIterable(asList(1, 2))
318+
.flatMap(x -> Flux.from(collection.find()))
319+
.take(1)
320+
321+
)
322+
.collectList()
323+
.block(TIMEOUT_DURATION);
324+
325+
assertFalse(errorDropped.get());
326+
} finally {
327+
Hooks.resetOnErrorDropped();
328+
}
329+
}
330+
331+
@Test
332+
@DisplayName("Ensure BatchCursor reports cursor errors")
333+
@SuppressWarnings("OptionalGetWithoutIsPresent")
334+
public void testBatchCursorReportsCursorErrors() {
335+
List<Document> docs = createDocs(200);
336+
Mono.from(collection.insertMany(docs)).block(TIMEOUT_DURATION);
337+
338+
TestSubscriber<Document> subscriber = new TestSubscriber<>();
339+
FindPublisher<Document> findPublisher = collection.find().batchSize(50);
340+
findPublisher.subscribe(subscriber);
341+
assertCommandNames(emptyList());
342+
343+
subscriber.requestMore(100);
344+
subscriber.assertReceivedOnNext(docs.subList(0, 100));
345+
assertCommandNames(asList("find", "getMore"));
346+
347+
BsonDocument getMoreCommand = commandListener.getCommandStartedEvents().stream()
348+
.filter(e -> e.getCommandName().equals("getMore"))
349+
.map(e -> ((CommandStartedEvent) e).getCommand())
350+
.findFirst()
351+
.get();
352+
353+
Mono.from(client.getDatabase(getDefaultDatabaseName()).runCommand(
354+
new BsonDocument("killCursors", new BsonString(collection.getNamespace().getCollectionName()))
355+
.append("cursors", new BsonArray(singletonList(getMoreCommand.getNumber("getMore"))))
356+
)).block(TIMEOUT_DURATION);
357+
358+
subscriber.requestMore(200);
359+
List<Throwable> onErrorEvents = subscriber.getOnErrorEvents();
360+
subscriber.assertTerminalEvent();
361+
assertEquals(1, onErrorEvents.size());
362+
assertEquals(MongoCursorNotFoundException.class, onErrorEvents.get(0).getClass());
363+
}
364+
302365
private void assertCommandNames(final List<String> commandNames) {
303366
assertIterableEquals(commandNames,
304367
commandListener.getCommandStartedEvents().stream().map(CommandEvent::getCommandName).collect(Collectors.toList()));

0 commit comments

Comments
 (0)