Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed checking if append_entries_request batches are already present in follower log #25018

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from

Conversation

mmaslankaprv
Copy link
Member

@mmaslankaprv mmaslankaprv commented Feb 4, 2025

Background

When follower receives an append entries request that prev_log_index is smaller than its own prev_log_index it validates if the batches from the request matches (by checking a batch offset and corresponding term) its own log. It that is the case the batches are skipped to prevent truncation of valid batches and avoid the data loss.

Negative append_entries_request::prev_log_index

The validation of already matching batches was broken if they happened to be at the beginning of the log. In this case the prev_log_index is not initialised. This case was not correctly handled by the logic calculating the next offset when checking matching batches.

Replying with success when all request batches match

When follower receives an append entries request with the vector of records that are all present in its own log and their offsets and terms match it should reply with success and correct last_dirty_log_index.
This way a leader instead of moving the follower next_offset backward can start recovery process and deliver batches which the follower is missing.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.3.x
  • v24.2.x
  • v24.1.x

Release Notes

Bug Fixes

  • fixes a very rare situation in which Raft leader can enter into infinite loop trying to recover follower.


reply.last_dirty_log_index = adjusted_prev_log_index;
// limit the last flushed offset as the adjusted_prev_log_index
// may have not yet been flushed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean _flushed_offset may store an offset that has not been flushed? In this case, could you add a comment to _flushed_offset to explain what it actually denotes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the adjusted_prev_log_index may be greater than _flushed_offset

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, but why can't we reply with _flushed_offset if it is larger than adjusted_prev_log_index?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not want leader to see flushed offset which is larger then last log offset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so is reply.last_flushed_log_index the latest flushed log index that matches with the leader's log?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, not really. This check is to hold an invariant of flushed_offset <= log_end_offset

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is log_end_offset here? Is it a field of any structure?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reply.last_flushed_log_index = std::min(adjusted_prev_log_index, _flushed_offset);

does this run a risk of last_flushed_log_index moving backwards, as seen by the leader? (which I think will have implications on leader commit index computation?

Perhaps we should just set

reply.last_flushed_log_index = _flushed_offset;

and let the _flushed_offset computation be monotonic

@mmaslankaprv mmaslankaprv marked this pull request as ready for review February 4, 2025 16:19
@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Feb 4, 2025

CI test results

test results on build#61563
test_id test_kind job_url test_status passed
rptest.tests.compaction_recovery_test.CompactionRecoveryTest.test_index_recovery ducktape https://buildkite.com/redpanda/redpanda/builds/61563#0194d20c-ff99-4b94-b2f7-a64d44ed7679 FLAKY 1/3
rptest.tests.datalake.compaction_test.CompactionGapsTest.test_translation_no_gaps.cloud_storage_type=CloudStorageType.S3.catalog_type=CatalogType.REST_JDBC ducktape https://buildkite.com/redpanda/redpanda/builds/61563#0194d20c-ff98-4ec0-9f56-38befe604032 FLAKY 1/2
test results on build#61682
test_id test_kind job_url test_status passed
rptest.tests.compaction_recovery_test.CompactionRecoveryTest.test_index_recovery ducktape https://buildkite.com/redpanda/redpanda/builds/61682#0194dcb6-b3e7-4275-b585-63769e3a91eb FLAKY 1/3
rptest.tests.compaction_recovery_test.CompactionRecoveryUpgradeTest.test_index_recovery_after_upgrade ducktape https://buildkite.com/redpanda/redpanda/builds/61682#0194dc9a-225b-4365-b41c-e42b927c3e92 FLAKY 1/2
rptest.tests.datalake.compaction_test.CompactionGapsTest.test_translation_no_gaps.cloud_storage_type=CloudStorageType.S3.catalog_type=CatalogType.REST_HADOOP ducktape https://buildkite.com/redpanda/redpanda/builds/61682#0194dcb6-b3e7-4275-b585-63769e3a91eb FLAKY 1/2
rptest.tests.datalake.compaction_test.CompactionGapsTest.test_translation_no_gaps.cloud_storage_type=CloudStorageType.S3.catalog_type=CatalogType.REST_JDBC ducktape https://buildkite.com/redpanda/redpanda/builds/61682#0194dcb6-b3e4-449e-a254-c66f8797a6ea FLAKY 1/2
rptest.tests.datalake.custom_partitioning_test.DatalakeCustomPartitioningTest.test_basic.cloud_storage_type=CloudStorageType.S3.catalog_type=CatalogType.REST_HADOOP ducktape https://buildkite.com/redpanda/redpanda/builds/61682#0194dcb6-b3e5-4007-9090-5e5e97766310 FLAKY 1/2
rptest.tests.partition_movement_test.PartitionMovementTest.test_availability_when_one_node_down ducktape https://buildkite.com/redpanda/redpanda/builds/61682#0194dc9a-225a-472c-827d-daaa26f07098 FLAKY 1/2
rptest.tests.scaling_up_test.ScalingUpTest.test_scaling_up_with_recovered_topic ducktape https://buildkite.com/redpanda/redpanda/builds/61682#0194dcb6-b3e6-4ddf-b17d-2a71ef0b0f40 FLAKY 1/2
rptest.tests.write_caching_fi_test.WriteCachingFailureInjectionTest.test_crash_all ducktape https://buildkite.com/redpanda/redpanda/builds/61682#0194dc9a-225b-4ae0-9014-9e69b7cda65e FLAKY 1/2

private:
model::node_id _id;
model::revision_id _revision;
prefix_logger _logger;
ss::sstring _base_directory;
config::mock_property<size_t> _max_inflight_requests{16};
config::mock_property<size_t> _max_queued_bytes{1_MiB};
config::mock_property<size_t> _default_recovery_read_size{32_KiB};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason we change it for existing tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no particular reason, i will make sure it is the same as before

@bashtanov
Copy link
Contributor

Assertion triggered in function body are not propagated to the test itself

Why is that? Anything wrong with the macro? AFAIK it's meant to work with both gtest and boost.

Copy link
Contributor

@bharathv bharathv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm modulo one question, took me a bit to digest the change, had to dig up Alexey's change that added these checks. Would be nice to get a blessing from @ztlpn too.


reply.last_dirty_log_index = adjusted_prev_log_index;
// limit the last flushed offset as the adjusted_prev_log_index
// may have not yet been flushed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reply.last_flushed_log_index = std::min(adjusted_prev_log_index, _flushed_offset);

does this run a risk of last_flushed_log_index moving backwards, as seen by the leader? (which I think will have implications on leader commit index computation?

Perhaps we should just set

reply.last_flushed_log_index = _flushed_offset;

and let the _flushed_offset computation be monotonic

src/v/raft/tests/raft_fixture.cc Outdated Show resolved Hide resolved
When follower receives an append entries request that `prev_log_index` is
smaller than its own `prev_log_index` it validates if the
batches from the request matches (by checking a batch offset and
corresponding term) its own log. It that is the case the batches are
skipped to prevent truncation of valid batches and avoid the data loss.

The validation of already matching batches was broken if they happened
to be at the beginning of the log. In this case the `prev_log_index` is
not initialized being negative. This case was not correctly handled by
the logic calculating the next offset when checking matching batches.

That lead to a situation in which a range of batches starting with 0 was
never matching.

Fixed the issue by correctly adjusting the `prev_log_index` if it is
uninitialized.

Signed-off-by: Michał Maślanka <[email protected]>
When follower receives an append entries request with the vector of
records that are all present in its own log and their offsets and terms
match it should reply with success and correct `last_dirty_log_index`.
This way a leader instead of moving the follower `next_offset` backward
can start recovery process and deliver batches which the follower is
missing.

Signed-off-by: Michał Maślanka <[email protected]>
@mmaslankaprv mmaslankaprv force-pushed the fix-matching-entries-check branch from e67b4df to 12b3024 Compare February 6, 2025 12:49
Assertion triggered in function body are not propagated to the test
itself. Change the method to throw an exception in case of timeout
instead of using an assertion.

Signed-off-by: Michał Maślanka <[email protected]>
The reply interceptor allows test creator to modify or drop the reply
that is about to be processed by the RPC requester. This allow tests to
take more control over the Raft protocol behavior and test some rare
edge cases which might be hard to trigger otherwise.

Signed-off-by: Michał Maślanka <[email protected]>
@mmaslankaprv mmaslankaprv force-pushed the fix-matching-entries-check branch from 12b3024 to 5c4c17b Compare February 6, 2025 12:57
@mmaslankaprv mmaslankaprv force-pushed the fix-matching-entries-check branch from 5c4c17b to ed45488 Compare February 6, 2025 17:35
Copy link
Contributor

@bashtanov bashtanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions as I'm not sure I understand the test.

Comment on lines +707 to +711
std::ranges::copy(
_nodes | std::views::keys
| std::views::filter(
[leader_id](model::node_id id) { return id != leader_id; }),
std::back_inserter(followers));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use copy_if?

/**
* Recover communication and wait for the intercept to trigger
*/
new_leader_node.reset_dispatch_handlers();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will enable the new leader to send vote requests to the old leader. I guess it won't anyway, as it has been elected already. Do we need this?

@@ -395,7 +395,8 @@ class raft_fixture
chunked_vector<model::record_batch> make_batches(
size_t batch_count,
size_t batch_record_count,
size_t record_payload_size) {
size_t record_payload_size,
model::term_id term = model::term_id(0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you decide to keep it just in case it is needed in future?

* Recover communication and wait for the intercept to trigger
*/
new_leader_node.reset_dispatch_handlers();
co_await reply_intercepted.wait([&] { return intercept_count > 5; });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't produce anything after the second election. What are the 20+ messages that are replicated from the new leader to the old one?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants