Skip to content

Commit

Permalink
refactor(log-store): extract common read write methods to log store s…
Browse files Browse the repository at this point in the history
…tate
  • Loading branch information
wenym1 committed Feb 9, 2025
1 parent 7c7e0ad commit d5fbe68
Show file tree
Hide file tree
Showing 6 changed files with 421 additions and 343 deletions.
2 changes: 1 addition & 1 deletion src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ pub trait StateStore: StateStoreRead + StateStoreReadLog + StaticSendSync + Clon
/// written by itself. Each local state store is not `Clone`, and is owned by a streaming state
/// table.
pub trait LocalStateStore: StaticSendSync {
type FlushedSnapshotReader: StateStoreRead + Clone;
type FlushedSnapshotReader: StateStoreRead;
type Iter<'a>: StateStoreIter + 'a;
type RevIter<'a>: StateStoreIter + 'a;

Expand Down
18 changes: 6 additions & 12 deletions src/stream/src/common/log_store_impl/kv_log_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::executor::monitor::StreamingMetrics;
pub(crate) mod buffer;
pub mod reader;
pub(crate) mod serde;
pub(crate) mod state;
#[cfg(test)]
pub mod test_utils;
mod writer;
Expand Down Expand Up @@ -303,6 +304,7 @@ mod v1 {

pub(crate) use v2::KV_LOG_STORE_V2_INFO;

use crate::common::log_store_impl::kv_log_store::state::new_log_store_state;
use crate::task::ActorId;

/// A new version of log store schema. Compared to v1, the v2 added a new vnode column to the log store pk,
Expand Down Expand Up @@ -410,25 +412,17 @@ impl<S: StateStore> LogStoreFactory for KvLogStoreFactory<S> {

let (tx, rx) = new_log_store_buffer(self.max_row_count, self.metrics.clone());

let (read_state, write_state) = new_log_store_state(table_id, local_state_store, serde);

let reader = KvLogStoreReader::new(
table_id,
local_state_store.new_flushed_snapshot_reader(),
serde.clone(),
read_state,
rx,
self.metrics.clone(),
pause_rx,
self.identity.clone(),
);

let writer = KvLogStoreWriter::new(
table_id,
local_state_store,
serde,
tx,
self.metrics,
pause_tx,
self.identity,
);
let writer = KvLogStoreWriter::new(write_state, tx, self.metrics, pause_tx, self.identity);

(reader, writer)
}
Expand Down
Loading

0 comments on commit d5fbe68

Please sign in to comment.