Skip to content

Commit

Permalink
r/tests: added test validating processing all matching batches
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Maślanka <[email protected]>
  • Loading branch information
mmaslankaprv committed Feb 5, 2025
1 parent 7665c07 commit e67b4df
Showing 1 changed file with 98 additions and 0 deletions.
98 changes: 98 additions & 0 deletions src/v/raft/tests/basic_raft_fixture_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -862,3 +862,101 @@ TEST_F_CORO(raft_fixture, test_no_stepdown_on_append_entries_timeout) {
ASSERT_EQ_CORO(term_before, new_leader_node.raft()->term());
ASSERT_TRUE_CORO(new_leader_node.raft()->is_leader());
}

/**
* This synthetic test is there to trigger a situation in which follower
* receives an append entries request which contains only batches that matches
* its log. This trigger a condition in which the follower should reply with
* success to the leader so the leader can continue recovery process.
*
* The test uses reply interception to 'trick' the leader to send the append
* entries with the batches that the follower already has.
*/
TEST_F_CORO(raft_fixture, test_redelivery_of_matching_logs) {
co_await create_simple_group(3);
auto leader_id = co_await wait_for_leader(10s);
model::node_id non_leader_id{0};
for (auto& [id, n] : nodes()) {
n->set_default_recovery_read_size(1);
}

for (auto& [id, _] : nodes()) {
if (id != leader_id) {
non_leader_id = id;
break;
}
}
auto& leader_node = node(leader_id);
/**
* Replicate data to all nodes
*/
auto r = co_await leader_node.raft()->replicate(
make_batches(200, 1, 10),
replicate_options(consistency_level::quorum_ack, 10s));
/**
* Prevent one node from receiving append entries
*/
leader_node.on_dispatch(
[non_leader_id](model::node_id id, raft::msg_type t) {
if (t == raft::msg_type::append_entries && id == non_leader_id) {
throw std::runtime_error("error");
}
return ss::now();
});
/**
* Replicate data with one of the nodes being blocked
*/
r = co_await leader_node.raft()->replicate(
make_batches(200, 1, 10),
replicate_options(consistency_level::quorum_ack, 10s));
ASSERT_FALSE_CORO(r.has_error());

/**
* Append some batches to the blocked node with different term
*/
auto log_ap = node(non_leader_id)
.raft()
->log()
->make_appender(storage::log_append_config{
.should_fsync = storage::log_append_config::fsync::no,
.io_priority = ss::default_priority_class(),
});
auto reader = model::make_fragmented_memory_record_batch_reader(
make_batches(200, 1, 10, model::term_id(2)));
co_await reader.for_each_ref(std::move(log_ap), model::no_timeout);

auto term_1_match_offset = node(non_leader_id).raft()->dirty_offset();

/**
* Trick the leader right at the offset where the leader and follower log
* would match
*/
ss::condition_variable reply_intercepted;
size_t intercept_count = 0;
leader_node.set_reply_interceptor(
[&, term_1_match_offset](reply_variant reply, model::node_id) {
return ss::visit(
std::move(reply),
[&, term_1_match_offset](append_entries_reply& a_r) {
if (
a_r.last_dirty_log_index
== model::prev_offset(term_1_match_offset)) {
a_r.result = reply_result::failure;
intercept_count++;
reply_intercepted.signal();
}
return ss::make_ready_future<reply_variant>(a_r);
},
[](auto& r) {
return ss::make_ready_future<reply_variant>(std::move(r));
});
});
/**
* Recover communication and wait for the intercept to trigger
*/
leader_node.reset_on_dispatch();
co_await reply_intercepted.wait([&] { return intercept_count > 5; });
leader_node.reset_reply_interceptor();

co_await wait_for_committed_offset(leader_node.raft()->dirty_offset(), 5s);
}

0 comments on commit e67b4df

Please sign in to comment.