Skip to content

Commit f284677

Browse files
authored
refactor(log-store): extract common read write methods to log store state (#20432)
1 parent 367636b commit f284677

File tree

6 files changed

+423
-343
lines changed

6 files changed

+423
-343
lines changed

src/storage/src/store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ pub trait StateStore: StateStoreRead + StateStoreReadLog + StaticSendSync + Clon
394394
/// written by itself. Each local state store is not `Clone`, and is owned by a streaming state
395395
/// table.
396396
pub trait LocalStateStore: StaticSendSync {
397-
type FlushedSnapshotReader: StateStoreRead + Clone;
397+
type FlushedSnapshotReader: StateStoreRead;
398398
type Iter<'a>: StateStoreIter + 'a;
399399
type RevIter<'a>: StateStoreIter + 'a;
400400

src/stream/src/common/log_store_impl/kv_log_store/mod.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::executor::monitor::StreamingMetrics;
3535
pub(crate) mod buffer;
3636
pub mod reader;
3737
pub(crate) mod serde;
38+
pub(crate) mod state;
3839
#[cfg(test)]
3940
pub mod test_utils;
4041
mod writer;
@@ -303,6 +304,7 @@ mod v1 {
303304

304305
pub(crate) use v2::KV_LOG_STORE_V2_INFO;
305306

307+
use crate::common::log_store_impl::kv_log_store::state::new_log_store_state;
306308
use crate::task::ActorId;
307309

308310
/// A new version of log store schema. Compared to v1, the v2 added a new vnode column to the log store pk,
@@ -410,25 +412,17 @@ impl<S: StateStore> LogStoreFactory for KvLogStoreFactory<S> {
410412

411413
let (tx, rx) = new_log_store_buffer(self.max_row_count, self.metrics.clone());
412414

415+
let (read_state, write_state) = new_log_store_state(table_id, local_state_store, serde);
416+
413417
let reader = KvLogStoreReader::new(
414-
table_id,
415-
local_state_store.new_flushed_snapshot_reader(),
416-
serde.clone(),
418+
read_state,
417419
rx,
418420
self.metrics.clone(),
419421
pause_rx,
420422
self.identity.clone(),
421423
);
422424

423-
let writer = KvLogStoreWriter::new(
424-
table_id,
425-
local_state_store,
426-
serde,
427-
tx,
428-
self.metrics,
429-
pause_tx,
430-
self.identity,
431-
);
425+
let writer = KvLogStoreWriter::new(write_state, tx, self.metrics, pause_tx, self.identity);
432426

433427
(reader, writer)
434428
}

0 commit comments

Comments
 (0)