Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/mongocxx/include/mongocxx/v1/change_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,35 @@ class change_stream {
///
MONGOCXX_ABI_EXPORT_CDECL(bsoncxx::v1::stdx::optional<bsoncxx::v1::document::view>) get_resume_token() const;

///
/// Obtain the next event document.
///
/// This function blocks until an event document is available by repeatedly advancing the underlying cursor.
///
/// @note Calling `this->begin()` after `this->next()` does not advance the underlying cursor state.
///
/// @warning Invalidates all views to the current event document and resume token.
///
/// @throws mongocxx::v1::server_error when a server-side error is encountered and a raw server error is available.
/// @throws mongocxx::v1::exception for all other runtime errors.
///
MONGOCXX_ABI_EXPORT_CDECL(bsoncxx::v1::document::view) next();

///
/// Try to obtain the next event document.
///
/// This function does not block: the underlying cursor is advanced only once.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Advancing the cursor may block (for up to maxAwaitTimeMS). try_next gives users an option to exit early and/or save the current resume token.

Suggested change
/// This function does not block: the underlying cursor is advanced only once.
/// This function does not block indefinitely: the underlying cursor is advanced only once.

/// An empty optional is returned when no event document is currently available.
///
/// @note Calling `this->begin()` after `this->try_next()` does not advance the underlying cursor state.
///
/// @warning Invalidates all views to the current event document and resume token.
///
/// @throws mongocxx::v1::server_error when a server-side error is encountered and a raw server error is available.
/// @throws mongocxx::v1::exception for all other runtime errors.
///
MONGOCXX_ABI_EXPORT_CDECL(bsoncxx::v1::stdx::optional<bsoncxx::v1::document::view>) try_next();

class internal;

private:
Expand Down Expand Up @@ -389,7 +418,7 @@ class change_stream::iterator {
/// Advance the underlying cursor to obtain the next event document.
/// Compare equal to the end iterator when there are no event documents available.
///
/// @note Pre-increment and post-increment are equivalent.
/// @note Pre-increment, post-increment, and @ref v1::change_stream::next() are equivalent.
///
/// @warning Invalidates all views to the current event document and resume token.
///
Expand Down
20 changes: 20 additions & 0 deletions src/mongocxx/lib/mongocxx/v1/change_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,26 @@ bsoncxx::v1::stdx::optional<bsoncxx::v1::document::view> change_stream::get_resu
return ret;
}

bsoncxx::v1::document::view change_stream::next() {
while (true) {
internal::advance_iterator(*this);

if (impl::with(this)->_state == state::has_doc) {
return impl::with(this)->_doc;
}
}
}

bsoncxx::v1::stdx::optional<bsoncxx::v1::document::view> change_stream::try_next() {
internal::advance_iterator(*this);

if (impl::with(this)->_state == state::has_doc) {
return impl::with(this)->_doc;
}

return {};
}

change_stream::change_stream(void* impl) : _impl{impl} {}

change_stream change_stream::internal::make(mongoc_change_stream_t* stream) {
Expand Down
132 changes: 121 additions & 11 deletions src/mongocxx/test/v1/change_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <mongocxx/test/private/scoped_bson.hh>

#include <chrono>
#include <functional>
#include <utility>

#include <bsoncxx/private/bson.hh>
Expand All @@ -36,6 +37,7 @@
#include <bsoncxx/test/system_error.hh>

#include <catch2/catch_test_macros.hpp>
#include <catch2/generators/catch_generators.hpp>
#include <catch2/matchers/catch_matchers_exception.hpp>
#include <catch2/matchers/catch_matchers_string.hpp>

Expand Down Expand Up @@ -256,30 +258,86 @@ TEST_CASE("begin", "[mongocxx][v1][change_stream]") {
->interpose([&](mongoc_change_stream_t* stream, bson_t const** bson) -> bool {
(void)bson;
CHECK(reinterpret_cast<test_data_type*>(stream) == &data);
++data.next_count;
data.doc = scoped_bson{BCON_NEW("next", BCON_INT32(data.next_count))};
*bson = data.doc.bson();
return true;

// Return up to three documents for increment assertions.
if (data.next_count < 3) {
++data.next_count;
data.doc = scoped_bson{BCON_NEW("next", BCON_INT32(data.next_count))};
*bson = data.doc.bson();
return true;
}

// Allow final increments to obtain no event document.
else {
++data.next_count; // 4: try_next(), 5: next().
return false;
}
})
.forever();

auto change_stream_error_document = libmongoc::change_stream_error_document.create_instance();
change_stream_error_document
->interpose([&](mongoc_change_stream_t const* stream, bson_error_t* err, bson_t const** bson) -> bool {
(void)(stream);
(void)err;
(void)bson;
return false;
CHECK(stream);
REQUIRE(err != nullptr);
REQUIRE(bson != nullptr);

// Allow final `.try_next()` to return a null optional.
if (data.next_count < 5) {
return false;
}

// Allow final `.next()` to return via exception.
else {
bson_set_error(err, 0, 123, "advance failure");
err->reserved = 2u; // MONGOC_ERROR_CATEGORY
return true;
}
})
.forever();

SECTION("iteration") {
// `change_stream::next()`, `change_stream::try_next()`, and `iterator::operator++()` are equivalent.
enum struct next_type {
no,
next,
try_next,
};

auto const use_next = GENERATE(next_type::no, next_type::next, next_type::try_next);
CAPTURE(use_next);

auto stream = change_stream::internal::make(reinterpret_cast<mongoc_change_stream_t*>(&data));

CHECK(change_stream::internal::can_get_more(stream));
CHECK(data.next_count == 0);

auto iter = stream.begin();
change_stream::iterator iter;

switch (use_next) {
case next_type::no: {
iter = stream.begin();
} break;

case next_type::next: {
auto const doc = stream.next();
CHECK(doc == scoped_bson{R"({"next": 1})"}.value());

iter = stream.begin(); // `next()` behaves like `begin()` (consecutive calls).
} break;

case next_type::try_next: {
auto const doc_opt = stream.try_next();
CHECK(doc_opt == scoped_bson{R"({"next": 1})"}.value());

iter = stream.begin(); // `try_next()` behaves like `begin()` (consecutive calls).
} break;

default: {
FAIL("should not reach this point");
} break;
}

REQUIRE(iter != stream.end());
CHECK(change_stream::internal::has_doc(stream));
CHECK(change_stream::internal::doc(stream) == *iter);
Expand All @@ -288,7 +346,24 @@ TEST_CASE("begin", "[mongocxx][v1][change_stream]") {
CHECK(data.next_count == 1);
CHECK(data.error_document_count == 0);

CHECK_NOTHROW(++iter);
switch (use_next) {
case next_type::no: {
CHECK_NOTHROW(++iter); // Pre-increment and post-increment are equivalent.
} break;

case next_type::next: {
CHECK(stream.next() == scoped_bson{R"({"next": 2})"}.value());
} break;

case next_type::try_next: {
CHECK(stream.try_next() == scoped_bson{R"({"next": 2})"}.value());
} break;

default: {
FAIL("should not reach this point");
} break;
}

REQUIRE(iter != stream.end());
CHECK(change_stream::internal::has_doc(stream));
CHECK(change_stream::internal::doc(stream) == *iter);
Expand All @@ -297,14 +372,49 @@ TEST_CASE("begin", "[mongocxx][v1][change_stream]") {
CHECK(data.next_count == 2);
CHECK(data.error_document_count == 0);

CHECK_NOTHROW(iter++);
switch (use_next) {
case next_type::no: {
CHECK_NOTHROW(iter++); // Pre-increment and post-increment are equivalent.
} break;

case next_type::next: {
CHECK(stream.next() == scoped_bson{R"({"next": 3})"}.value());
} break;

case next_type::try_next: {
CHECK(stream.try_next() == scoped_bson{R"({"next": 3})"}.value());
} break;

default: {
FAIL("should not reach this point");
} break;
}

REQUIRE(iter != stream.end());
CHECK(change_stream::internal::has_doc(stream));
CHECK(change_stream::internal::doc(stream) == *iter);
CHECK(iter->find("next") != iter->end());
CHECK((*iter)["next"].get_int32().value == 3);
CHECK(data.next_count == 3);
CHECK(data.error_document_count == 0);

switch (use_next) {
case next_type::no: {
CHECK(++iter == stream.end());
} break;

case next_type::next: {
CHECK_THROWS_WITH(stream.next(), Catch::Matchers::ContainsSubstring("advance failure"));
} break;

case next_type::try_next: {
CHECK(stream.try_next() == bsoncxx::v1::stdx::nullopt);
} break;

default: {
FAIL("should not reach this point");
} break;
}
}

SECTION("equality") {
Expand Down