diff --git a/Cargo.lock b/Cargo.lock index ce4d89490d54c..17bacc32c4572 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3593,6 +3593,22 @@ dependencies = [ "serde", ] +[[package]] +name = "databend-common-meta-cache" +version = "0.1.0" +dependencies = [ + "anyhow", + "databend-common-base", + "databend-common-meta-client", + "databend-common-meta-types", + "futures", + "log", + "pretty_assertions", + "thiserror 1.0.65", + "tokio", + "tonic", +] + [[package]] name = "databend-common-meta-client" version = "0.1.0" @@ -4988,6 +5004,7 @@ dependencies = [ "databend-common-grpc", "databend-common-http", "databend-common-meta-api", + "databend-common-meta-cache", "databend-common-meta-client", "databend-common-meta-kvapi", "databend-common-meta-raft-store", diff --git a/Cargo.toml b/Cargo.toml index 431dda1d0272a..21b7c67a9becf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,6 +92,7 @@ members = [ "src/meta/app", "src/meta/app-types", "src/meta/binaries", + "src/meta/cache", "src/meta/client", "src/meta/control", "src/meta/ee", @@ -141,6 +142,7 @@ databend-common-management = { path = "src/query/management" } databend-common-meta-api = { path = "src/meta/api" } databend-common-meta-app = { path = "src/meta/app" } databend-common-meta-app-types = { path = "src/meta/app-types" } +databend-common-meta-cache = { path = "src/meta/cache" } databend-common-meta-client = { path = "src/meta/client" } databend-common-meta-control = { path = "src/meta/control" } databend-common-meta-embedded = { path = "src/meta/embedded" } diff --git a/src/meta/cache/Cargo.toml b/src/meta/cache/Cargo.toml new file mode 100644 index 0000000000000..290f042916fac --- /dev/null +++ b/src/meta/cache/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "databend-common-meta-cache" +description = """ +A distributed cache implementation that: +- Maintains a local view of data stored in the meta-service +- Automatically synchronizes with the meta-service via watch API +- Provides safe concurrent access with two-level locking +- Handles connection failures with automatic recovery +- Ensures data consistency through sequence number tracking +""" +version = { workspace = true } +authors = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +edition = { workspace = true } + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +doctest = false +test = true + +[dependencies] +databend-common-base = { workspace = true } +databend-common-meta-client = { workspace = true } +databend-common-meta-types = { workspace = true } +futures = { workspace = true } +log = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tonic = { workspace = true } + +[dev-dependencies] +anyhow = { workspace = true } +pretty_assertions = { workspace = true } + +[lints] +workspace = true diff --git a/src/meta/cache/README.md b/src/meta/cache/README.md new file mode 100644 index 0000000000000..880c74a899f75 --- /dev/null +++ b/src/meta/cache/README.md @@ -0,0 +1,94 @@ +# Databend Common Meta Cache + +A distributed cache implementation based on meta-service, providing reliable resource management and data synchronization across distributed systems. + + +## Features + +- **Automatic Synchronization**: Background watcher task keeps local cache in sync with meta-service +- **Concurrency Control**: Two-level concurrency control mechanism for safe access +- **Event-based Updates**: Real-time updates through meta-service watch API +- **Safe Reconnection**: Automatic recovery from connection failures with state consistency + +## Key Components + +### Cache Structure + +```text +/foo +/.. +/.. +``` + +- ``: User-defined string to identify a cache instance + +### Main Types + +- `Cache`: The main entry point for cache operations + - Provides safe access to cached data +- `CacheData`: Internal data structure holding the cached values +- `EventWatcher`: Background task that watches for changes in meta-service + - Handles synchronization with meta-service + +## Usage + +```rust +let client = MetaGrpcClient::try_create(/*..*/); +let cache = Cache::new( + client, + "your/cache/key/space/in/meta/service", + "your-app-name-for-logging", +).await; + +// Access cached data +cache.try_access(|c: &CacheData| { + println!("last-seq:{}", c.last_seq); + println!("all data: {:?}", c.data); +}).await?; + +// Get a specific value +let value = cache.try_get("key").await?; + +// List all entries under a prefix +let entries = cache.try_list_dir("prefix").await?; +``` + +## Concurrency Control + +The cache employs a two-level concurrency control mechanism: + +1. **Internal Lock (Mutex)**: Protects concurrent access between user operations and the background cache updater. This lock is held briefly during each operation. + +2. **External Lock (Method Design)**: Public methods require `&mut self` even for read-only operations. This prevents concurrent access to the cache instance from multiple call sites. External synchronization should be implemented by the caller if needed. + +This design intentionally separates concerns: +- The internal lock handles short-term, fine-grained synchronization with the updater +- The external lock requirement (`&mut self`) enables longer-duration access patterns without blocking the background updater unnecessarily + +Note that despite requiring `&mut self`, all operations are logically read-only with respect to the cache's public API. + +## Initialization Process + +When a `Cache` is created, it goes through the following steps: + +1. Creates a new instance with specified prefix and context +2. Spawns a background task to watch for key-value changes +3. Establishes a watch stream to meta-service +4. Fetches and processes initial data +5. Waits for the cache to be fully initialized before returning +6. Maintains continuous synchronization + +The initialization is complete only when the cache has received a full copy of the data from meta-service, ensuring users see a consistent view of the data. + +## Error Handling + +The cache implements robust error handling: + +- Connection failures are automatically retried in the background +- Background watcher task automatically recovers from errors +- Users are shielded from transient errors through the abstraction +- The cache ensures data consistency by tracking sequence numbers + +## License + +This project is licensed under the Apache License 2.0 - see the LICENSE file for details. diff --git a/src/meta/cache/src/cache.rs b/src/meta/cache/src/cache.rs new file mode 100644 index 0000000000000..d13631a8636d0 --- /dev/null +++ b/src/meta/cache/src/cache.rs @@ -0,0 +1,234 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; +use std::sync::atomic; +use std::sync::Arc; + +use databend_common_base::runtime::spawn_named; +use databend_common_meta_client::ClientHandle; +use databend_common_meta_types::SeqV; +use futures::FutureExt; +use tokio::sync::oneshot; +use tokio::sync::Mutex; +use tokio::sync::MutexGuard; +use tokio::task::JoinHandle; + +use crate::cache_data::CacheData; +use crate::errors::Unsupported; +use crate::event_watcher::EventWatcher; + +/// Cache implemented on top of the distributed meta-service. +/// +/// This cache provides a local view of data stored in the meta-service, with automatic +/// background updates when the underlying data changes. +/// +/// ## Features +/// +/// - **Automatic Synchronization**: Background watcher task keeps local cache in sync with meta-service +/// - **Concurrency Control**: Two-level concurrency control mechanism for safe access +/// - **Safe Reconnection**: Automatic recovery from connection failures with state consistency +/// - **Consistent Initialization**: Ensures cache is fully initialized before use +/// +/// ## Concurrency Control +/// +/// The cache employs a two-level concurrency control mechanism: +/// +/// 1. **Internal Lock (Mutex)**: Protects concurrent access between user operations and the +/// background cache updater. This lock is held briefly during each operation. +/// +/// 2. **External Lock (Method Design)**: Public methods require `&mut self` even for read-only +/// operations. This prevents concurrent access to the cache instance from multiple call sites. +/// External synchronization should be implemented by the caller if needed. +/// +/// This design intentionally separates concerns: +/// - The internal lock handles short-term, fine-grained synchronization with the updater +/// - The external lock requirement (`&mut self`) enables longer-duration access patterns +/// without blocking the background updater unnecessarily +/// +/// Note that despite requiring `&mut self`, all operations are logically read-only +/// with respect to the cache's public API. +/// +/// ## Error Handling +/// +/// - Background watcher task automatically recovers from errors by: +/// - Resetting the cache state +/// - Re-establishing the watch stream +/// - Re-fetching all data to ensure consistency +/// - Users are shielded from transient errors through the abstraction +pub struct Cache { + /// The dir path to store the cache ids, without trailing slash. + /// + /// Such as `foo`, not `foo/` + prefix: String, + + /// The metadata client to interact with the remote meta-service. + meta_client: Arc, + + /// The background watcher task handle. + watcher_task_handle: Option>, + + /// The sender to cancel the background watcher task. + /// + /// When this sender is dropped, the corresponding receiver becomes ready, + /// which signals the background task to terminate gracefully. + #[allow(dead_code)] + watcher_cancel_tx: oneshot::Sender<()>, + + data: Arc>>, + + /// A process-wide unique identifier for the cache. Used for debugging purposes. + uniq: u64, + + ctx: String, +} + +impl fmt::Display for Cache { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Cache({})({}/)[uniq={}]", + self.ctx, self.uniq, self.prefix + ) + } +} + +impl Cache { + /// Create a new cache. + /// + /// The created cache starts to watch key-value change event. + /// It does not return until initialization is started. + /// Thus, it is safe to access the data once this method is returned, because initialization holds a lock. + /// + /// # Parameters + /// + /// * `meta_client` - The metadata client to interact with the remote meta-service. + /// * `prefix` - The prefix of the cache name and also the directory name to store in meta-service. + /// * `ctx` - The context info of the cache, used for debugging purposes. + /// + /// This method spawns a background task to watch to the meta-service key value change events. + /// The task will be notified to quit when this instance is dropped. + pub async fn new( + meta_client: Arc, + prefix: impl ToString, + ctx: impl ToString, + ) -> Self { + let prefix = prefix.to_string(); + let prefix = prefix.trim_end_matches('/').to_string(); + + let (cancel_tx, cancel_rx) = oneshot::channel::<()>(); + + static UNIQ: atomic::AtomicU64 = atomic::AtomicU64::new(0); + let uniq = UNIQ.fetch_add(1, atomic::Ordering::SeqCst); + + let mut cache = Cache { + prefix, + meta_client, + watcher_task_handle: None, + watcher_cancel_tx: cancel_tx, + data: Arc::new(Mutex::new(Err(Unsupported::new("Cache not initialized")))), + uniq, + ctx: ctx.to_string(), + }; + + cache.spawn_watcher_task(cancel_rx).await; + + cache + } + + /// Get a SeqV from the cache by key. + pub async fn try_get(&mut self, key: &str) -> Result, Unsupported> { + self.try_access(|cache_data| cache_data.data.get(key).cloned()) + .await + } + + /// Get the last sequence number of the cache. + pub async fn try_last_seq(&mut self) -> Result { + self.try_access(|cache_data| cache_data.last_seq).await + } + + /// List all entries in the cache directory. + pub async fn try_list_dir(&mut self, prefix: &str) -> Result, Unsupported> { + let prefix = prefix.trim_end_matches('/'); + let left = format!("{}/", prefix); + let right = format!("{}0", prefix); + + self.try_access(|cache_data| { + cache_data + .data + .range(left..right) + .map(|(k, v)| (k.to_string(), v.clone())) + .collect() + }) + .await + } + + /// Get the internal cache data. + pub async fn cache_data(&mut self) -> MutexGuard<'_, Result> { + self.data.lock().await + } + + /// Access the cache data in read-only mode. + pub async fn try_access( + &mut self, + f: impl FnOnce(&CacheData) -> T, + ) -> Result { + let guard = self.data.lock().await; + + let g = guard.as_ref().map_err(|e| e.clone())?; + + let t = f(g); + Ok(t) + } + + /// Spawns a background task to watch to the meta-service key value change events, feed to the cache. + /// + /// It does not return until a full copy of the cache is received. + async fn spawn_watcher_task(&mut self, cancel_rx: oneshot::Receiver<()>) { + let (left, right) = self.key_range(); + + let ctx = format!("{}-watcher", self); + let watcher = EventWatcher { + left, + right, + meta_client: self.meta_client.clone(), + data: self.data.clone(), + ctx: ctx.to_string(), + }; + + // For receiving a signal when the cache has started to initialize and safe to use: + // i.e., if the user acquired the data lock, they can see a complete view of the data(fully initialized). + let (started_tx, started_rx) = oneshot::channel::<()>(); + + let task_name = ctx.to_string(); + let fu = watcher.main(Some(started_tx), cancel_rx.map(|_| ())); + + let handle = spawn_named(fu, task_name); + self.watcher_task_handle = Some(handle); + + // Wait for the sending end to be dropped, indicating that the cache has started to initialize. + started_rx.await.ok(); + } + + /// The left-close right-open range for the cached keys. + /// + /// Since `'0'` is the next char of `'/'`. + /// `[prefix + "/", prefix + "0")` is the range of the cache ids. + fn key_range(&self) -> (String, String) { + let left = self.prefix.clone() + "/"; + let right = self.prefix.clone() + "0"; + + (left, right) + } +} diff --git a/src/meta/cache/src/cache_data.rs b/src/meta/cache/src/cache_data.rs new file mode 100644 index 0000000000000..2340957989cec --- /dev/null +++ b/src/meta/cache/src/cache_data.rs @@ -0,0 +1,59 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use databend_common_meta_types::SeqV; +use log::debug; +use log::warn; + +/// The local data that reflects a range of key-values on remote meta-service. +#[derive(Debug, Clone, Default)] +pub struct CacheData { + /// The last sequence number ever seen from the meta-service. + pub last_seq: u64, + /// The key-value data stored in the cache. + pub data: BTreeMap, +} + +impl CacheData { + /// Process the watch response and update the local cache. + /// + /// Returns the new last_seq. + pub(crate) fn apply_update( + &mut self, + key: String, + prev: Option, + current: Option, + ) -> u64 { + debug!( + "meta-Cache process update(key: {}, prev: {:?}, current: {:?})", + key, prev, current + ); + match (prev, current) { + (_, Some(entry)) => { + self.last_seq = entry.seq; + self.data.insert(key, entry); + } + (Some(_entry), None) => { + self.data.remove(&key); + } + (None, None) => { + warn!("both prev and current are None when Cache processing watch response; Not possible, but ignoring"); + } + }; + + self.last_seq + } +} diff --git a/src/meta/cache/src/errors/connection_closed.rs b/src/meta/cache/src/errors/connection_closed.rs new file mode 100644 index 0000000000000..1aeaae5321dfb --- /dev/null +++ b/src/meta/cache/src/errors/connection_closed.rs @@ -0,0 +1,155 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; +use std::io; + +use tonic::Status; + +use crate::errors::either::Either; + +/// The connection to the meta-service has been closed. +/// +/// This error is used to represent various types of connection failures: +/// - Network errors (IO errors) +/// - Protocol errors (gRPC status errors) +/// - Stream closure +/// - Other connection-related issues +/// +/// The error includes: +/// - The reason for the connection closure +/// - A chain of contexts describing when the error occurred +/// +/// # Usage +/// +/// ```rust +/// let err = ConnectionClosed::new_str("connection reset") +/// .context("establishing watch stream") +/// .context("initializing cache"); +/// ``` +#[derive(thiserror::Error, Debug)] +pub struct ConnectionClosed { + /// The reason for the connection closure. + /// Can be either an IO error or a string description. + reason: Either, + + /// A chain of contexts describing when the error occurred. + /// Each context is added using the `context` method. + when: Vec, +} + +impl ConnectionClosed { + /// Create a new connection closed error. + /// + /// # Parameters + /// + /// * `reason` - The reason for the connection closure. + /// Can be either an IO error or a string description. + pub fn new(reason: impl Into>) -> Self { + ConnectionClosed { + reason: reason.into(), + when: vec![], + } + } + + /// Create a new connection closed error from an io::Error. + /// + /// # Parameters + /// + /// * `reason` - The IO error that caused the connection closure. + pub fn new_io_error(reason: impl Into) -> Self { + ConnectionClosed { + reason: Either::A(reason.into()), + when: vec![], + } + } + + /// Create a new connection closed error from a string. + /// + /// # Parameters + /// + /// * `reason` - A string description of why the connection was closed. + pub fn new_str(reason: impl ToString) -> Self { + ConnectionClosed { + reason: Either::B(reason.to_string()), + when: vec![], + } + } + + /// Append a context to the error. + /// + /// This method can be used to build a chain of contexts describing + /// when the error occurred. + /// + /// # Parameters + /// + /// * `context` - A string describing when the error occurred. + /// + /// # Returns + /// + /// The error with the new context appended. + pub fn context(mut self, context: impl ToString) -> Self { + self.when.push(context.to_string()); + self + } +} + +impl fmt::Display for ConnectionClosed { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "distributed-cache connection closed: {}", self.reason)?; + + if self.when.is_empty() { + return Ok(()); + } + + write!(f, "; when: (")?; + + for (i, when) in self.when.iter().enumerate() { + if i > 0 { + write!(f, "; ")?; + } + write!(f, "{}", when)?; + } + + write!(f, ")") + } +} + +impl From for ConnectionClosed { + fn from(err: io::Error) -> Self { + ConnectionClosed::new_io_error(err) + } +} + +impl From for ConnectionClosed { + fn from(status: Status) -> Self { + ConnectionClosed::new_str(status.to_string()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_context() { + let err = ConnectionClosed::new_str("test") + .context("context") + .context("context2"); + assert_eq!( + err.to_string(), + "distributed-cache connection closed: test; when: (context; context2)" + ); + } +} diff --git a/src/meta/cache/src/errors/either.rs b/src/meta/cache/src/errors/either.rs new file mode 100644 index 0000000000000..3a5fda378aa27 --- /dev/null +++ b/src/meta/cache/src/errors/either.rs @@ -0,0 +1,64 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; +use std::io; + +/// Either is a type that can be either one of two types. +/// +/// This is used to represent errors that can be either an `io::Error` or a `String`. +/// It provides a convenient way to handle different types of error sources +/// in a unified way. +/// +/// # Examples +/// +/// ```rust +/// let err: Either = +/// Either::A(io::Error::new(io::ErrorKind::Other, "network error")); +/// let err: Either = Either::B("protocol error".to_string()); +/// ``` +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Either { + /// The first possible type. + A(A), + /// The second possible type. + B(B), +} + +impl fmt::Display for Either { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Either::A(e) => write!(f, "{}", e), + Either::B(s) => write!(f, "{}", s), + } + } +} + +impl From for Either { + fn from(e: io::Error) -> Self { + Either::A(e) + } +} + +impl From for Either { + fn from(s: String) -> Self { + Either::B(s) + } +} + +impl From<&str> for Either { + fn from(s: &str) -> Self { + Either::B(s.to_string()) + } +} diff --git a/src/meta/cache/src/errors/mod.rs b/src/meta/cache/src/errors/mod.rs new file mode 100644 index 0000000000000..6c20841e79191 --- /dev/null +++ b/src/meta/cache/src/errors/mod.rs @@ -0,0 +1,21 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod connection_closed; +mod either; +mod unsupported; + +pub use connection_closed::ConnectionClosed; +pub use either::Either; +pub use unsupported::Unsupported; diff --git a/src/meta/cache/src/errors/unsupported.rs b/src/meta/cache/src/errors/unsupported.rs new file mode 100644 index 0000000000000..f974a95f3e4f6 --- /dev/null +++ b/src/meta/cache/src/errors/unsupported.rs @@ -0,0 +1,68 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; + +/// An error indicating that caching is not supported. +#[derive(thiserror::Error, Debug, Clone)] +pub struct Unsupported { + /// The reason for the unsupported operation. + /// + /// This error is raised when a feature or operation is not supported, + /// typically due to version incompatibility between meta client and meta server. + reason: String, + + /// A chain of contexts describing when the error occurred. + /// Each context is added using the `context` method. + when: Vec, +} + +impl fmt::Display for Unsupported { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Unsupported: {}", self.reason)?; + + if !self.when.is_empty() { + write!(f, "; when: ({})", self.when.join(", "))?; + } + Ok(()) + } +} + +impl Unsupported { + pub fn new(reason: impl fmt::Display) -> Self { + Self { + reason: reason.to_string(), + when: vec![], + } + } + + pub fn context(mut self, context: impl fmt::Display) -> Self { + self.when.push(context.to_string()); + self + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_unsupported() { + let error = Unsupported::new("test"); + assert_eq!(error.to_string(), "Unsupported: test"); + + let error = error.context("test").context("test2"); + assert_eq!(error.to_string(), "Unsupported: test; when: (test, test2)"); + } +} diff --git a/src/meta/cache/src/event_watcher.rs b/src/meta/cache/src/event_watcher.rs new file mode 100644 index 0000000000000..e953d424e9d39 --- /dev/null +++ b/src/meta/cache/src/event_watcher.rs @@ -0,0 +1,349 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::sync::Arc; +use std::time::Duration; + +use databend_common_meta_client::ClientHandle; +use databend_common_meta_types::anyerror::func_name; +use databend_common_meta_types::protobuf::StreamItem; +use databend_common_meta_types::protobuf::WatchRequest; +use databend_common_meta_types::protobuf::WatchResponse; +use databend_common_meta_types::MetaClientError; +use databend_common_meta_types::SeqV; +use futures::FutureExt; +use futures::Stream; +use futures::TryStreamExt; +use log::debug; +use log::error; +use log::info; +use log::warn; +use tokio::sync::oneshot; +use tokio::sync::Mutex; +use tokio::sync::MutexGuard; +use tonic::Status; +use tonic::Streaming; + +use crate::cache_data::CacheData; +use crate::errors::ConnectionClosed; +use crate::errors::Unsupported; + +/// Watch cache events and update local copy. +pub(crate) struct EventWatcher { + /// The left-closed bound of the key range to watch. + pub(crate) left: String, + /// The right-open bound of the key range to watch. + pub(crate) right: String, + /// The metadata client to interact with the remote meta-service. + pub(crate) meta_client: Arc, + /// The shared cache data protected by a mutex. + pub(crate) data: Arc>>, + /// Contains descriptive information about the context of this watcher. + pub(crate) ctx: String, +} + +impl EventWatcher { + /// Subscribe to the key-value changes in the interested range and feed them into the local cache. + /// + /// This method continuously monitors the meta-service for changes within the specified key range + /// and updates the local cache accordingly. + /// + /// # Parameters + /// + /// - `started` - An optional oneshot channel sender that is consumed when the cache initialization + /// begins. This signals that it's safe for users to acquire the cache data lock. + /// - `cancel` - A future that, when completed, will terminate the subscription loop. + /// + /// # Error Handling + /// + /// - Connection failures are automatically retried with exponential backoff + /// - On error, the cache is reset and re-fetched to ensure consistency + /// - The watcher continues running until explicitly canceled + pub(crate) async fn main( + mut self, + mut started: Option>, + mut cancel: impl Future + Send + 'static, + ) { + let mut c = std::pin::pin!(cancel); + + loop { + // 1. Retry until a successful connection is established. + + let strm = { + // Hold the lock until the cache is fully initialized. + let mut d = self.data.lock().await; + + // The data lock is acquired and will be kept until the cache is fully initialized. + // At this point, we notify the caller that initialization has started by consuming + // the `started` sender. This signals to the receiving end that it's now safe to + // acquire the data lock, as we're about to populate the cache with initial data. + started.take(); + + let strm_res = self.retry_initialize_cache(&mut d).await; + + match strm_res { + Ok(strm) => { + info!("{}: cache initialized", self.ctx); + strm + } + Err(unsupported) => { + let sleep_time = Duration::from_secs(60 * 5); + + warn!( + "{}: watch stream not supported: {}; retry in {:?}", + self.ctx, unsupported, sleep_time + ); + + *d = Err(unsupported); + + tokio::time::sleep(sleep_time).await; + continue; + } + } + }; + + // 2. Watch for changes in the stream and apply them to the local cache. + + let res = self.watch_kv_changes(strm, c.as_mut()).await; + match res { + Ok(_) => { + info!("{} watch loop exited normally(canceled by user)", self.ctx); + return; + } + Err(e) => { + error!("{} watcher loop exited with error: {}; reset cache and re-fetch all data to re-establish", self.ctx, e); + // continue + } + } + } + } + + /// Repeatedly attempts to initialize the cache until successful or until it's determined that + /// the databend-meta service doesn't support the required functionality. + /// + /// This method will: + /// 1. If a connection error occurs, wait with exponential backoff and retry + /// 2. If an unsupported operation error occurs, return that error + async fn retry_initialize_cache( + &self, + cache_data: &mut MutexGuard<'_, Result>, + ) -> Result, Unsupported> { + let mut sleep_duration = Duration::from_millis(50); + let max_sleep = Duration::from_secs(5); + + loop { + let res = self.initialize_cache(cache_data).await; + let conn_err = match res { + Ok(strm_res) => return strm_res, + Err(conn_err) => conn_err, + }; + + error!( + "{}: while establish cache, error: {}; retrying in {:?}", + self.ctx, conn_err, sleep_duration + ); + + tokio::time::sleep(sleep_duration).await; + sleep_duration = std::cmp::min(sleep_duration * 3 / 2, max_sleep); + } + } + + /// Reset the cache by creating a watch stream and processing the initial flush. + /// + /// This method ensures the cache is in a consistent state by: + /// 1. Clearing existing cache data + /// 2. Establishing a new watch stream + /// 3. Fetching the last sequence number + /// 4. Processing all events until the cache is fully synchronized + async fn initialize_cache( + &self, + cache_data: &mut MutexGuard<'_, Result>, + ) -> Result, Unsupported>, ConnectionClosed> { + // Everytime when establishing a cache, the old data must be cleared and receive a new one. + **cache_data = Ok(Default::default()); + + let strm_res = self.new_watch_stream().await?; + + let mut strm = match strm_res { + Ok(strm) => strm, + Err(unsupported) => { + return Ok(Err(unsupported)); + } + }; + + // strm is established before getting last_seq thus the stream does not miss any events before last_seq. + // Thus, we can always see an event with seq >= last_seq. + // + // In other side, if we list the prefix and get the last_seq before establishing the stream, we may miss some events. + // For example, after listing key values `a->SeqV(1, b"foo")`, + // then key `a` is deleted. + // Then the watch stream is established, no event will be received. + let last_seq = self.get_last_seq().await?; + + while let Some(watch_response) = strm.try_next().await? { + let Some((key, prev, current)) = Self::decode_watch_response(watch_response) else { + continue; + }; + + // Safe unwrap: before entering this method, the cache data is ensured to be Ok. + let d = cache_data.as_mut().unwrap(); + + let new_seq = d.apply_update(key, prev, current); + + if new_seq >= last_seq { + info!( + "{}: cache is ready, initial_flush finished upto seq={}", + self.ctx, last_seq + ); + break; + } + } + Ok(Ok(strm)) + } + + /// Create a new watch stream to watch the key-value change event in the interested range. + pub(crate) async fn new_watch_stream( + &self, + ) -> Result, Unsupported>, ConnectionClosed> { + let watch = + WatchRequest::new(self.left.clone(), Some(self.right.clone())).with_initial_flush(true); + + let res = self.meta_client.request(watch).await; + + let client_err = match res { + Ok(strm) => { + debug!("{}: watch stream established", self.ctx); + return Ok(Ok(strm)); + } + Err(client_err) => client_err, + }; + + warn!( + "{}: error when establishing watch stream: {}", + self.ctx, client_err + ); + + match client_err { + MetaClientError::HandshakeError(hs_err) => { + let unsupported = Unsupported::new(hs_err) + .context(func_name!()) + .context(&self.ctx); + Ok(Err(unsupported)) + } + + MetaClientError::NetworkError(net_err) => { + let conn_err = ConnectionClosed::new_str(net_err.to_string()) + .context("send watch request") + .context(&self.ctx); + + Err(conn_err) + } + } + } + + /// The main loop of the cache engine. + /// + /// This function watches for key-value changes in the metadata store and processes them. + /// Changes are applied to the local in-memory cache atomically. + /// + /// # Arguments + /// + /// * `strm` - The watch stream from the meta-service + /// * `cancel` - A future that, when ready, signals this loop to terminate + /// + /// # Returns + /// + /// Returns `Ok(())` if terminated normally (i.e., the `cancel` future is ready), or + /// `Err(ConnectionClosed)` if the metadata connection was closed unexpectedly. + pub(crate) async fn watch_kv_changes( + &mut self, + mut strm: impl Stream> + Send + Unpin + 'static, + mut cancel: impl Future + Send, + ) -> Result<(), ConnectionClosed> { + let mut c = std::pin::pin!(cancel); + + loop { + let watch_result = futures::select! { + _ = c.as_mut().fuse() => { + info!("cache loop canceled by user"); + return Ok(()); + } + + watch_result = strm.try_next().fuse() => { + watch_result + } + }; + + let Some(watch_response) = watch_result? else { + error!("{} watch-stream closed", self.ctx); + return Err(ConnectionClosed::new_str("watch-stream closed").context(&self.ctx)); + }; + + let Some((key, prev, current)) = Self::decode_watch_response(watch_response) else { + continue; + }; + + let mut cache_data = self.data.lock().await; + + // Safe unwrap: before entering this method, the cache data is ensured to be Ok. + let d = cache_data.as_mut().unwrap(); + + let new_seq = d.apply_update(key.clone(), prev.clone(), current.clone()); + + debug!( + "{}: process update(key: {}, prev: {:?}, current: {:?}), new_seq={:?}", + self.ctx, key, prev, current, new_seq + ); + } + } + + /// List all the key under the range and find out the greatest seq. + async fn get_last_seq(&self) -> Result { + let mut strm = self.meta_client.list(&self.left).await.map_err(|e| { + ConnectionClosed::new_str(e.to_string()) + .context("list-to-get-last-seq") + .context(&self.ctx) + })?; + + let mut last_seq = 0; + + while let Some(item) = strm.try_next().await? { + let StreamItem { key, value } = item; + debug!( + "{}: list_kv to get last_seq: key: {}, value: {:?}", + self.ctx, key, value + ); + let seq = value.map(|v| v.seq).unwrap_or(0); + if seq > last_seq { + last_seq = seq; + } + } + + debug!("{}: get last_seq from list_kv: {}", self.ctx, last_seq); + + Ok(last_seq) + } + + fn decode_watch_response( + watch_response: WatchResponse, + ) -> Option<(String, Option, Option)> { + let Some((key, prev, current)) = watch_response.unpack() else { + warn!("unexpected WatchResponse with empty event",); + return None; + }; + + Some((key, prev, current)) + } +} diff --git a/src/meta/cache/src/lib.rs b/src/meta/cache/src/lib.rs new file mode 100644 index 0000000000000..49a3d180bfab3 --- /dev/null +++ b/src/meta/cache/src/lib.rs @@ -0,0 +1,102 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![allow(clippy::uninlined_format_args)] + +//! A distributed cache implementation that maintains a local view of data stored in a meta-service. +//! +//! Features: +//! - Automatic synchronization with meta-service +//! - Safe concurrent access +//! - Event-based updates +//! - Safe reconnection +//! - Consistent initialization +//! - Sequence-based consistency +//! +//! # Example +//! +//! ```rust +//! use databend_meta_cache::Cache; +//! use databend_meta_client::MetaClient; +//! +//! let meta_client = MetaClient::new("127.0.0.1:9191"); +//! let cache = Cache::new("my_prefix", meta_client).await?; +//! +//! // Get a value +//! let value = cache.try_get("key").await?; +//! +//! // List directory contents +//! let entries = cache.try_list_dir("dir").await?; +//! ``` +//! +//! # Cache Key Structure +//! +//! ```text +//! /foo +//! /.. +//! /.. +//! ``` +//! +//! - `` is a user-defined string to identify a cache instance. +//! +//! # Initialization Process +//! +//! When a [`Cache`] is created, it: +//! 1. Creates a new instance with the specified prefix +//! 2. Spawns a background task to watch for changes +//! 3. Establishes a watch stream with initial flush +//! 4. Fetches and processes initial data +//! 5. Maintains continuous synchronization +//! +//! # Cache Update Process +//! +//! The cache update process works as follows: +//! 1. Watcher monitors the watch stream for changes +//! 2. On receiving an event, it applies the update atomically +//! 3. Updates are applied based on event type (insert/update/delete) +//! 4. Sequence numbers are tracked for consistency +//! +//! # Error Handling +//! +//! The cache handles connection errors automatically with exponential backoff. +//! If the connection is lost, it will attempt to reconnect with increasing delays. +//! +//! ```text +//! | cache +----> spawn()-. (1) +//! | + o-. | +//! | | | | +//! | | | v watch Stream +//! | | `------->o KV-Change-Watcher (task) <---------------------------. +//! | | cancel | | +//! | | | | +//! | | In memory BTree | | +//! | +-+ /foo <--+ | | +//! | | /... <--+-+ | +//! | | /... <--+ | | +//! | | | | +//! | + last_seq: u64 <----' Meta-Service | +//! | /foo --+ | +//! | .-> /... --+--' +//! | | /... --' +//! | | +//! | ... -------------' +//! | Update by other threads +//! ``` + +mod cache; +mod cache_data; +pub mod errors; +mod event_watcher; + +pub use cache::Cache; diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml index 656b7d2ef528d..9d4e4f0c03c9d 100644 --- a/src/meta/service/Cargo.toml +++ b/src/meta/service/Cargo.toml @@ -61,6 +61,7 @@ tonic-reflection = { workspace = true } watcher = { workspace = true } [dev-dependencies] +databend-common-meta-cache = { workspace = true } databend-common-meta-semaphore = { workspace = true } env_logger = { workspace = true } maplit = { workspace = true } diff --git a/src/meta/service/tests/it/grpc/mod.rs b/src/meta/service/tests/it/grpc/mod.rs index 2998987074132..b819d0374b8b4 100644 --- a/src/meta/service/tests/it/grpc/mod.rs +++ b/src/meta/service/tests/it/grpc/mod.rs @@ -27,3 +27,4 @@ pub mod metasrv_grpc_tls; pub mod metasrv_grpc_transaction; pub mod metasrv_grpc_watch; pub mod t51_metasrv_grpc_semaphore; +pub mod t52_metasrv_grpc_cache; diff --git a/src/meta/service/tests/it/grpc/t52_metasrv_grpc_cache.rs b/src/meta/service/tests/it/grpc/t52_metasrv_grpc_cache.rs new file mode 100644 index 0000000000000..c107ef4ef592d --- /dev/null +++ b/src/meta/service/tests/it/grpc/t52_metasrv_grpc_cache.rs @@ -0,0 +1,154 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Test special cases of grpc API: transaction(). + +use std::time::Duration; + +use databend_common_base::base::Stoppable; +use databend_common_meta_cache::Cache; +use databend_common_meta_kvapi::kvapi::KVApi; +use databend_common_meta_types::SeqV; +use databend_common_meta_types::UpsertKV; +use log::debug; +use test_harness::test; +use tokio::time::sleep; + +use crate::testing::meta_service_test_harness; +use crate::tests::service::make_grpc_client; + +#[test(harness = meta_service_test_harness)] +#[fastrace::trace] +async fn test_cache_basic() -> anyhow::Result<()> { + let tcs = crate::tests::start_metasrv_cluster(&[0, 1, 2]).await?; + + let addresses = tcs + .iter() + .map(|tc| tc.config.grpc_api_address.clone()) + .collect::>(); + + let a0 = || addresses[0].clone(); + let a1 = || addresses[1].clone(); + let a2 = || addresses[2].clone(); + + let cli = make_grpc_client(vec![a1(), a2(), a0()])?; + + let client = || cli.clone(); + + cli.upsert_kv(UpsertKV::update("po/a", b"x")).await?; + + cli.upsert_kv(UpsertKV::update("pp/a", b"a")).await?; + cli.upsert_kv(UpsertKV::update("pp/c", b"c")).await?; + + cli.upsert_kv(UpsertKV::update("pq/d", b"d")).await?; + + let mut c = Cache::new(client(), "pp", "test").await; + + // Check initial state + + let got = c.try_list_dir("pp").await?; + assert_eq!( + vec![ + (s("pp/a"), SeqV::new(2, b("a"))), + (s("pp/c"), SeqV::new(3, b("c"))), + ], + got + ); + + let last_seq = c.try_last_seq().await?; + assert_eq!(last_seq, 3); + + // Receive update + + cli.upsert_kv(UpsertKV::update("pp/b", b"b")).await?; + cli.upsert_kv(UpsertKV::update("pp/c", b"c2")).await?; + cli.upsert_kv(UpsertKV::delete("pp/a")).await?; + + sleep(Duration::from_millis(500)).await; + + let got = c.try_get("pp/b").await?; + assert_eq!(got, Some(SeqV::new(5, b("b")))); + + let got = c.try_get("pp/c").await?; + assert_eq!(got, Some(SeqV::new(6, b("c2")))); + + let got = c.try_get("pp/a").await?; + assert_eq!(got, None); + + Ok(()) +} + +/// Test cache survive leader down and switch +#[test(harness = meta_service_test_harness)] +#[fastrace::trace] +async fn test_cache_when_leader_down() -> anyhow::Result<()> { + let mut tcs = crate::tests::start_metasrv_cluster(&[0, 1, 2]).await?; + + debug!("foofoo"); + + let addresses = tcs + .iter() + .map(|tc| tc.config.grpc_api_address.clone()) + .collect::>(); + + let a0 = || addresses[0].clone(); + let a1 = || addresses[1].clone(); + let a2 = || addresses[2].clone(); + + // a0() will be shut down + let cli = make_grpc_client(vec![a1(), a2(), a0()])?; + + let client = || cli.clone(); + + cli.upsert_kv(UpsertKV::update("pp/a", b"a")).await?; + cli.upsert_kv(UpsertKV::update("pp/c", b"c")).await?; + + let mut c = Cache::new(client(), "pp", "test").await; + + cli.upsert_kv(UpsertKV::update("pp/after_cache_create", b"x")) + .await?; + + let got = c.try_get("pp/c").await?; + assert_eq!(got, Some(SeqV::new(2, b("c")))); + + // Stop the first node, which is the leader + let mut stopped = tcs.remove(0); + { stopped }.grpc_srv.take().unwrap().stop(None).await?; + + sleep(Duration::from_secs(3)).await; + + // Receive update + + cli.upsert_kv(UpsertKV::update("pp/after_stop_n0", b"3")) + .await?; + cli.upsert_kv(UpsertKV::update("pp/c", b"c2")).await?; + + sleep(Duration::from_secs(3)).await; + + let got = c.try_get("pp/after_stop_n0").await?; + assert_eq!(got, Some(SeqV::new(4, b("3")))); + + let got = c.try_get("pp/c").await?; + assert_eq!(got, Some(SeqV::new(5, b("c2")))); + + Ok(()) +} + +fn s(x: impl ToString) -> String { + x.to_string() +} + +fn b(x: impl ToString) -> Vec { + x.to_string().into_bytes() +}