KAFKA-20489: Add enable.transactional.statestores#22141
Conversation
bbejeck
left a comment
There was a problem hiding this comment.
Thanks for the PR @nicktelford - overall looks good. Can we add some tests?
| true, | ||
| Importance.LOW, | ||
| ENABLE_METRICS_PUSH_DOC) | ||
| .define(TRANSACTIONAL_STATE_STORES_CONFIG, |
There was a problem hiding this comment.
I think we want some sort of check in StreamsConfig that will throw if users set TRANSACTIONAL_STATE_STORES_CONFIG with out setting PROCESSING_GUARANTEE to EXACTLY_ONCE_V2
There was a problem hiding this comment.
You can enable it even under ALOS. Doing so should yield improved throughput for RocksDB stores, because writes are buffered in a WriteBatch, instead of writing to the underlying RocksDB database directly, which incurs locking overhead on every write.
|
Kicked off a system test - will post the results here later |
bbejeck
left a comment
There was a problem hiding this comment.
Thanks for the PR @nicktelford LGTM can we get some coverage for the new behavior?
Maybe something like this:
StateManagerUtilTest: the wipe branch with txn=true and with/without corrupted storesProcessorStateManagerTest: hasCorruptedStores(), the new initializeStoreOffsets opt-out branchStreamTaskTest: the new postCommit / EOS commit-interval pathTaskManagerTest: no verification of the new markChangelogAsCorrupted call inside closeDirtyAndRevive
Introduces the `enable.transactional.statestores` config (default false). When enabled, uncommitted writes are held in an in-memory buffer per store and are not flushed to the underlying base store until the Kafka transaction commits, making staged writes invisible to IQ reads at the committed isolation level until the containing commit completes. The config is threaded through TopologyConfig, TaskConfig, task creators, and ProcessorStateManager. When transactional stores are active, the EOS state wipe on unclean shutdown is suppressed: since uncommitted data never reaches the base store, there is nothing to wipe; corruption is handled explicitly via markChangelogAsCorrupted. StreamTask.postCommit is extended to flush the pending write buffer on every commit interval, not only on task revocation or close. Under EOS the normal commit-interval path previously skipped maybeCheckpoint, the only path that calls stateMgr.commit() to flush each store's buffer. Without this, READ_COMMITTED IQ readers see no new data mid-run, and the uncommitted buffer grows unbounded between task lifecycle events.
The previous commit added the `enable.transactional.statestores` config but did not include tests. This adds coverage for: - `StreamsConfigTest`: default disabled, and explicit enable. - `ProcessorStateManagerTest`: no corruption when checkpoint is absent on a non-empty dir under EOS+transactional mode (since uncommitted data is never written to the base store), and a new `hasCorruptedStores()` assertion helper. - `StandbyTaskTest`: state dir is not wiped on dirty close with EOS+transactional unless stores are explicitly marked corrupted. - `StateManagerUtilTest`: `closeStateManager` with `transactionalStateStores=true` skips wipe when no corrupted stores, and wipes only when `hasCorruptedStores()` returns true. - `StreamTaskTest`: `postCommit` does not checkpoint under plain EOS, but does when transactional state stores are enabled. - `TaskManagerTest`: `markChangelogAsCorrupted` is verified to be called before `postCommit` in the corruption-handling path. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
b07db0f to
d08ed6a
Compare
bbejeck
left a comment
There was a problem hiding this comment.
Thanks @nicktelford, PR LGTM I'l merge once the build completes
Introduces the `enable.transactional.statestores` config (default false). When enabled, uncommitted writes are held in an in-memory buffer per store and are not flushed to the underlying base store until the Kafka transaction commits, making staged writes invisible to IQ reads at the committed isolation level until the containing commit completes. The config is threaded through TopologyConfig, TaskConfig, task creators, and ProcessorStateManager. When transactional stores are active, the EOS state wipe on unclean shutdown is suppressed: since uncommitted data never reaches the base store, there is nothing to wipe; corruption is handled explicitly via markChangelogAsCorrupted. StreamTask.postCommit is extended to flush the pending write buffer on every commit interval, not only on task revocation or close. Under EOS the normal commit-interval path previously skipped maybeCheckpoint, the only path that calls stateMgr.commit() to flush each store's buffer. Without this, READ_COMMITTED IQ readers see no new data mid-run, and the uncommitted buffer grows unbounded between task lifecycle events. Reviewers: Bill Bejeck <bbejeck@apache.org>
Introduces the
enable.transactional.statestoresconfig (defaultfalse). When enabled, uncommitted writes are held in an in-memory buffer
per store and are not flushed to the underlying base store until the
Kafka transaction commits, making staged writes invisible to IQ reads at
the committed isolation level until the containing commit completes.
The config is threaded through TopologyConfig, TaskConfig, task
creators, and ProcessorStateManager. When transactional stores are
active, the EOS state wipe on unclean shutdown is suppressed: since
uncommitted data never reaches the base store, there is nothing to wipe;
corruption is handled explicitly via markChangelogAsCorrupted.
StreamTask.postCommit is extended to flush the pending write buffer on
every commit interval, not only on task revocation or close. Under EOS
the normal commit-interval path previously skipped maybeCheckpoint, the
only path that calls stateMgr.commit() to flush each store's buffer.
Without this, READ_COMMITTED IQ readers see no new data mid-run, and the
uncommitted buffer grows unbounded between task lifecycle events.
Reviewers: Bill Bejeck bbejeck@apache.org