Skip to content

Commit 1e14bc8

Browse files
committed
Merge remote-tracking branch 'upstream/main' into feature_cluster
2 parents 963c0f7 + 6f5d941 commit 1e14bc8

File tree

41 files changed

+757
-156
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+757
-156
lines changed

src/meta/raft-store/src/applier.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ where SM: StateMachineApi + 'static
7373
pub(crate) cmd_ctx: CmdContext,
7474

7575
/// The changes have been made by the applying one log entry
76-
changes: Vec<Change<Vec<u8>, String>>,
76+
/// `(key, prev, result)`.
77+
changes: Vec<(String, Option<SeqV>, Option<SeqV>)>,
7778
}
7879

7980
impl<'a, SM> Applier<'a, SM>
@@ -586,8 +587,7 @@ where SM: StateMachineApi + 'static
586587
return;
587588
}
588589

589-
self.changes
590-
.push(Change::new(prev, result).with_id(key.to_string()))
590+
self.changes.push((key.to_string(), prev, result))
591591
}
592592

593593
/// Retrieve the proposing time from a raft-log.

src/meta/raft-store/src/state_machine_api.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use std::fmt::Debug;
1616

1717
use databend_common_meta_types::protobuf::WatchResponse;
1818
use databend_common_meta_types::sys_data::SysData;
19-
use databend_common_meta_types::Change;
2019
use databend_common_meta_types::SeqV;
2120
use tokio::sync::mpsc;
2221
use tonic::Status;
@@ -27,7 +26,7 @@ use crate::state_machine::ExpireKey;
2726

2827
/// Send a key-value change event to subscribers.
2928
pub trait SMEventSender: Debug + Sync + Send {
30-
fn send(&self, change: Change<Vec<u8>, String>);
29+
fn send(&self, change: (String, Option<SeqV>, Option<SeqV>));
3130

3231
/// Inform to send all items in `strm` to `tx`.
3332
///
@@ -39,21 +38,51 @@ pub trait SMEventSender: Debug + Sync + Send {
3938
);
4039
}
4140

42-
/// The API a state machine implements
41+
/// The API a state machine implements.
42+
///
43+
/// The state machine is responsible for managing the application's persistent state,
44+
/// including application kv data and expired key data.
4345
pub trait StateMachineApi: Send + Sync {
4446
type Map: MapApi<String> + MapApi<ExpireKey> + 'static;
4547

48+
/// Returns the current expire key cursor position.
49+
///
50+
/// The expiry key cursor marks a boundary in the key space:
51+
/// - All keys before this cursor (exclusive) have already been processed and deleted
52+
/// - This cursor position is used to track progress when incrementally cleaning up expired keys
4653
fn get_expire_cursor(&self) -> ExpireKey;
4754

55+
/// Updates the expiry key cursor position.
56+
///
57+
/// This method is called after a batch of expired keys have been processed and deleted.
58+
/// The new cursor position indicates that all keys before it (exclusive) have been
59+
/// successfully cleaned up.
4860
fn set_expire_cursor(&mut self, cursor: ExpireKey);
4961

50-
/// Return a reference to the map that stores app data.
62+
/// Returns a reference to the map that stores application data.
63+
///
64+
/// This method provides read-only access to the underlying key-value store
65+
/// that contains the application's persistent state, including application kv data and expired key data.
5166
fn map_ref(&self) -> &Self::Map;
5267

53-
/// Return a mutable reference to the map that stores app data.
68+
/// Returns a mutable reference to the map that stores application data.
69+
///
70+
/// This method provides read-write access to the underlying key-value store
71+
/// that contains the application's persistent state, including application kv data and expired key data.
72+
/// Changes made through this reference will be persisted according to the state machine's replication
73+
/// protocol.
5474
fn map_mut(&mut self) -> &mut Self::Map;
5575

76+
/// Returns a mutable reference to the system data.
77+
///
78+
/// This method provides read-write access to the system data, which includes
79+
/// metadata about the state machine and its configuration.
5680
fn sys_data_mut(&mut self) -> &mut SysData;
5781

82+
/// Returns an optional reference to the event sender.
83+
///
84+
/// This method returns an event sender that can be used to send state change events to subscribers.
85+
///
86+
/// The implementation could just return `None` if the state machine does not support subscribing.
5887
fn event_sender(&self) -> Option<&dyn SMEventSender>;
5988
}

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ use crate::version::from_digit_ver;
7878
use crate::version::to_digit_ver;
7979
use crate::version::METASRV_SEMVER;
8080
use crate::version::MIN_METACLI_SEMVER;
81-
use crate::watcher::WatchStream;
81+
use crate::watcher::watch_stream::WatchStream;
8282

8383
pub struct MetaServiceImpl {
8484
token: GrpcToken,
@@ -396,7 +396,7 @@ impl MetaService for MetaServiceImpl {
396396
let mn = &self.meta_node;
397397

398398
let sender = mn.add_watcher(watch, tx.clone()).await?;
399-
let stream = WatchStream::new(rx, sender, mn.subscriber_handle.clone());
399+
let stream = WatchStream::new(rx, sender, mn.dispatcher_handle.clone());
400400

401401
if flush {
402402
let sm = mn.raft_store.state_machine.clone();

src/meta/service/src/meta_service/meta_node.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,9 @@ use crate::request_handling::Forwarder;
9494
use crate::request_handling::Handler;
9595
use crate::store::RaftStore;
9696
use crate::version::METASRV_COMMIT_VERSION;
97-
use crate::watcher::EventSubscriber;
98-
use crate::watcher::StreamSender;
99-
use crate::watcher::SubscriberHandle;
97+
use crate::watcher::dispatch::Dispatcher;
98+
use crate::watcher::dispatch::DispatcherHandle;
99+
use crate::watcher::watch_stream::WatchStreamSender;
100100

101101
pub type LogStore = RaftStore;
102102
pub type SMStore = RaftStore;
@@ -107,7 +107,7 @@ pub type MetaRaft = Raft<TypeConfig>;
107107
/// MetaNode is the container of metadata related components and threads, such as storage, the raft node and a raft-state monitor.
108108
pub struct MetaNode {
109109
pub raft_store: RaftStore,
110-
pub subscriber_handle: SubscriberHandle,
110+
pub dispatcher_handle: DispatcherHandle,
111111
pub raft: MetaRaft,
112112
pub running_tx: watch::Sender<()>,
113113
pub running_rx: watch::Receiver<()>,
@@ -159,15 +159,15 @@ impl MetaNodeBuilder {
159159

160160
let (tx, rx) = watch::channel::<()>(());
161161

162-
let handle = EventSubscriber::spawn();
162+
let handle = Dispatcher::spawn();
163163

164164
sto.get_state_machine()
165165
.await
166166
.set_event_sender(Box::new(handle.clone()));
167167

168168
let meta_node = Arc::new(MetaNode {
169169
raft_store: sto.clone(),
170-
subscriber_handle: handle,
170+
dispatcher_handle: handle,
171171
raft: raft.clone(),
172172
running_tx: tx,
173173
running_rx: rx,
@@ -1148,12 +1148,12 @@ impl MetaNode {
11481148
&self,
11491149
request: WatchRequest,
11501150
tx: mpsc::Sender<Result<WatchResponse, Status>>,
1151-
) -> Result<Arc<StreamSender>, Status> {
1151+
) -> Result<Arc<WatchStreamSender>, Status> {
11521152
let stream_sender = self
1153-
.subscriber_handle
1154-
.request_blocking(|d: &mut EventSubscriber| d.add_watcher(request, tx))
1153+
.dispatcher_handle
1154+
.request_blocking(|d: &mut Dispatcher| d.add_watcher(request, tx))
11551155
.await
1156-
.map_err(|_e| Status::internal("EventSubscriber closed"))?
1156+
.map_err(|_e| Status::internal("watch-event-Dispatcher closed"))?
11571157
.map_err(Status::invalid_argument)?;
11581158

11591159
Ok(stream_sender)

src/meta/service/src/watcher/command.rs renamed to src/meta/service/src/watcher/dispatch/command.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,40 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use databend_common_meta_types::Change;
15+
use databend_common_meta_types::SeqV;
1616
use futures::future::BoxFuture;
1717

18-
use crate::watcher::subscriber::EventSubscriber;
18+
use crate::watcher::dispatch::Dispatcher;
1919

20-
/// An event sent to EventDispatcher.
20+
/// A command sent to [`Dispatcher`].
2121
#[allow(clippy::type_complexity)]
22-
pub(crate) enum Command {
23-
/// Submit a kv change event to dispatcher
24-
KVChange(Change<Vec<u8>, String>),
22+
pub enum Command {
23+
/// Submit a key-value update event to dispatcher.
24+
Update(Update),
2525

26-
/// Send a fn to [`EventSubscriber`] to run it.
26+
/// Send a fn to [`Dispatcher`] to run it.
2727
///
2828
/// The function will be called with a mutable reference to the dispatcher.
2929
Request {
30-
req: Box<dyn FnOnce(&mut EventSubscriber) + Send + 'static>,
30+
req: Box<dyn FnOnce(&mut Dispatcher) + Send + 'static>,
3131
},
3232

33-
/// Send a fn to [`EventSubscriber`] to run it asynchronously.
33+
/// Send a fn to [`Dispatcher`] to run it asynchronously.
3434
RequestAsync {
35-
req: Box<dyn FnOnce(&mut EventSubscriber) -> BoxFuture<'static, ()> + Send + 'static>,
35+
req: Box<dyn FnOnce(&mut Dispatcher) -> BoxFuture<'static, ()> + Send + 'static>,
3636
},
3737
}
38+
39+
/// An update event for a key.
40+
#[derive(Debug, Clone, PartialEq, Eq)]
41+
pub struct Update {
42+
pub key: String,
43+
pub before: Option<SeqV>,
44+
pub after: Option<SeqV>,
45+
}
46+
47+
impl Update {
48+
pub fn new(key: String, before: Option<SeqV>, after: Option<SeqV>) -> Self {
49+
Self { key, before, after }
50+
}
51+
}

0 commit comments

Comments
 (0)