From b86ac0624456fc5a76c37d08ee2cd746c40675f8 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 6 Jan 2025 13:51:47 +0200 Subject: [PATCH 01/12] Return effective trim point LSN on successful trim request --- crates/admin/protobuf/cluster_ctrl_svc.proto | 7 ++- .../cluster_controller/grpc_svc_handler.rs | 28 +++++---- .../admin/src/cluster_controller/service.rs | 6 +- .../src/cluster_controller/service/state.rs | 7 ++- crates/bifrost/src/bifrost.rs | 33 ++++++++--- crates/bifrost/src/bifrost_admin.rs | 2 +- crates/bifrost/src/loglet.rs | 10 ++-- crates/bifrost/src/loglet_wrapper.rs | 13 ++-- .../bifrost/src/providers/local_loglet/mod.rs | 11 ++-- crates/bifrost/src/providers/memory_loglet.rs | 17 +++--- .../src/providers/replicated_loglet/loglet.rs | 59 ++++++++++++------- .../providers/replicated_loglet/tasks/trim.rs | 20 +++---- crates/types/src/logs/mod.rs | 2 +- tools/restatectl/src/commands/log/trim_log.rs | 15 ++++- 14 files changed, 152 insertions(+), 78 deletions(-) diff --git a/crates/admin/protobuf/cluster_ctrl_svc.proto b/crates/admin/protobuf/cluster_ctrl_svc.proto index a4342bd28b..052a6b0cbf 100644 --- a/crates/admin/protobuf/cluster_ctrl_svc.proto +++ b/crates/admin/protobuf/cluster_ctrl_svc.proto @@ -24,7 +24,7 @@ service ClusterCtrlSvc { rpc ListNodes(ListNodesRequest) returns (ListNodesResponse); - rpc TrimLog(TrimLogRequest) returns (google.protobuf.Empty); + rpc TrimLog(TrimLogRequest) returns (TrimLogResponse); rpc CreatePartitionSnapshot(CreatePartitionSnapshotRequest) returns (CreatePartitionSnapshotResponse); @@ -94,6 +94,11 @@ message TrimLogRequest { uint64 trim_point = 2; } +message TrimLogResponse { + uint32 log_id = 1; + optional uint64 trim_point = 2; +} + message CreatePartitionSnapshotRequest { uint32 partition_id = 1; } message CreatePartitionSnapshotResponse { string snapshot_id = 1; } diff --git a/crates/admin/src/cluster_controller/grpc_svc_handler.rs b/crates/admin/src/cluster_controller/grpc_svc_handler.rs index f9c8202393..de71169054 100644 --- a/crates/admin/src/cluster_controller/grpc_svc_handler.rs +++ b/crates/admin/src/cluster_controller/grpc_svc_handler.rs @@ -16,7 +16,7 @@ use restate_types::protobuf::cluster::ClusterConfiguration; use tonic::{async_trait, Request, Response, Status}; use tracing::info; -use restate_bifrost::{Bifrost, BifrostAdmin, Error as BiforstError}; +use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError}; use restate_core::{Metadata, MetadataWriter}; use restate_metadata_store::MetadataStoreClient; use restate_types::identifiers::PartitionId; @@ -33,7 +33,7 @@ use crate::cluster_controller::protobuf::{ CreatePartitionSnapshotResponse, DescribeLogRequest, DescribeLogResponse, FindTailRequest, FindTailResponse, ListLogsRequest, ListLogsResponse, ListNodesRequest, ListNodesResponse, SealAndExtendChainRequest, SealAndExtendChainResponse, SealedSegment, TailState, - TrimLogRequest, + TrimLogRequest, TrimLogResponse, }; use super::protobuf::{ @@ -167,20 +167,28 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { } /// Internal operations API to trigger the log truncation - async fn trim_log(&self, request: Request) -> Result, Status> { + async fn trim_log( + &self, + request: Request, + ) -> Result, Status> { let request = request.into_inner(); let log_id = LogId::from(request.log_id); let trim_point = Lsn::from(request.trim_point); - if let Err(err) = self + match self .controller_handle .trim_log(log_id, trim_point) .await .map_err(|_| Status::aborted("Node is shutting down"))? { - info!("Failed trimming the log: {err}"); - return Err(Status::internal(err.to_string())); + Err(err) => { + info!("Failed trimming the log: {err}"); + Err(Status::internal(err.to_string())) + } + Ok(trim_point) => Ok(Response::new(TrimLogResponse { + log_id: request.log_id, + trim_point: trim_point.map(Lsn::as_u64), + })), } - Ok(Response::new(())) } /// Handles ad-hoc snapshot requests, as sent by `restatectl snapshots create`. This is @@ -271,13 +279,13 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { .writeable_loglet(log_id) .await .map_err(|err| match err { - BiforstError::UnknownLogId(_) => Status::invalid_argument("Unknown log-id"), + BifrostError::UnknownLogId(_) => Status::invalid_argument("Unknown log-id"), err => Status::internal(err.to_string()), })?; let tail_state = tokio::time::timeout(Duration::from_secs(2), writable_loglet.find_tail()) .await - .map_err(|_elapsed| Status::deadline_exceeded("Timedout finding tail"))? + .map_err(|_elapsed| Status::deadline_exceeded("Timeout finding tail"))? .map_err(|err| Status::internal(err.to_string()))?; let response = FindTailResponse { @@ -297,7 +305,7 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { async fn get_cluster_configuration( &self, - _request: tonic::Request, + _request: Request, ) -> Result, Status> { let logs = Metadata::with_current(|m| m.logs_ref()); let partition_table = Metadata::with_current(|m| m.partition_table_ref()); diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index e5694601fc..fdf15d6ee7 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -174,7 +174,7 @@ enum ClusterControllerCommand { TrimLog { log_id: LogId, trim_point: Lsn, - response_tx: oneshot::Sender>, + response_tx: oneshot::Sender>>, }, CreateSnapshot { partition_id: PartitionId, @@ -213,7 +213,7 @@ impl ClusterControllerHandle { &self, log_id: LogId, trim_point: Lsn, - ) -> Result, ShutdownError> { + ) -> Result, anyhow::Error>, ShutdownError> { let (response_tx, response_rx) = oneshot::channel(); let _ = self @@ -587,7 +587,7 @@ impl Service { info!( ?log_id, trim_point_inclusive = ?trim_point, - "Manual trim log command received"); + "Trim log command received"); let result = bifrost_admin.trim(log_id, trim_point).await; let _ = response_tx.send(result.map_err(Into::into)); } diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b55488393..b1cdf0abc1 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -348,7 +348,12 @@ where debug!( "Automatic trim log '{log_id}' for all records before='{min_persisted_lsn}'" ); - bifrost_admin.trim(log_id, min_persisted_lsn).await? + let trim_point = bifrost_admin.trim(log_id, min_persisted_lsn).await?; + if let Some(trim_point) = trim_point { + debug!("Trimmed log '{log_id}' to '{trim_point}'",); + } else { + info!("Attempted to trim log '{log_id}' to '{min_persisted_lsn}' but got none response"); + } } } else { warn!("Stop automatically trimming log '{log_id}' because not all nodes are running a partition processor applying this log."); diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 06b86ff761..73d0613b58 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -163,7 +163,7 @@ impl Bifrost { } /// Read the next record from the LSN provided. The `from` indicates the LSN where we will - /// start reading from. This means that the record returned will have a LSN that is equal + /// start reading from. This means that the record returned will have an LSN that is equal /// or greater than `from`. If no records are committed yet at this LSN, this read operation /// will immediately return `None`. /// @@ -405,13 +405,16 @@ impl BifrostInner { Ok(trim_point.unwrap_or(Lsn::INVALID)) } - pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<(), Error> { + /// Trim the log to the specified LSN trim point (inclusive). Returns the new trim point LSN if + /// the log was actually trimmed by this call, or `None` otherwise. + pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result, Error> { let log_metadata = Metadata::with_current(|m| m.logs_ref()); let log_chain = log_metadata .chain(&log_id) .ok_or(Error::UnknownLogId(log_id))?; + let mut max_trim_point = Lsn::INVALID; for segment in log_chain.iter() { let loglet = self.get_loglet(log_id, segment).await?; @@ -419,10 +422,16 @@ impl BifrostInner { break; } - loglet.trim(trim_point).await?; + if let Some(effective_trim_point) = loglet.trim(trim_point).await? { + max_trim_point = Lsn::max(max_trim_point, effective_trim_point); + } } // todo: Update logs configuration to remove sealed and empty loglets - Ok(()) + Ok(if max_trim_point == Lsn::INVALID { + None + } else { + Some(max_trim_point) + }) } #[inline] @@ -681,7 +690,14 @@ mod tests { appender.append("").await?; } - bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?; + assert_eq!(bifrost_admin.trim(LOG_ID, Lsn::INVALID).await?, None); // no-op + assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?); + + assert_eq!( + bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?, + Some(Lsn::from(5)) + ); + assert_eq!(bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?, None); // no-op let tail = bifrost.find_tail(LOG_ID).await?; assert_eq!(tail.offset(), Lsn::from(11)); @@ -703,7 +719,10 @@ mod tests { } // trimming beyond the release point will fall back to the release point - bifrost_admin.trim(LOG_ID, Lsn::MAX).await?; + assert_eq!( + bifrost_admin.trim(LOG_ID, Lsn::MAX).await?, + Some(Lsn::from(10)) + ); assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset()); let new_trim_point = bifrost.get_trim_point(LOG_ID).await?; @@ -957,7 +976,7 @@ mod tests { let lsn = appender.append_batch(payloads).await?; println!("Appended batch {lsn}"); } - append_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + append_counter.fetch_add(1, Ordering::Relaxed); tokio::time::sleep(Duration::from_millis(1)).await; } println!("Appender terminated"); diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index bb3a6f50ff..26d2d8a7d9 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -71,7 +71,7 @@ impl<'a> BifrostAdmin<'a> { /// Set `trim_point` to the value returned from `find_tail()` or `Lsn::MAX` to /// trim all records of the log. #[instrument(level = "debug", skip(self), err)] - pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<()> { + pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result> { self.bifrost.inner.fail_if_shutting_down()?; self.bifrost.inner.trim(log_id, trim_point).await } diff --git a/crates/bifrost/src/loglet.rs b/crates/bifrost/src/loglet.rs index 4ac1e1fddc..c84eabdf18 100644 --- a/crates/bifrost/src/loglet.rs +++ b/crates/bifrost/src/loglet.rs @@ -127,14 +127,16 @@ pub trait Loglet: Send + Sync + std::fmt::Debug { async fn get_trim_point(&self) -> Result, OperationError>; /// Trim the loglet prefix up to and including the `trim_point`. - /// If trim_point equal or higher than the loglet tail, the loglet trims its data until the tail. + /// If trim_point equal or higher than the loglet tail, the loglet trims its data up to the tail. /// - /// It's acceptable to pass `trim_point` beyond the tail of the loglet (Offset::MAX is legal). - /// The behaviour in this case is equivalent to trim(find_tail() - 1). + /// It's acceptable to pass `trim_point` beyond the tail of the loglet (i.e. `Offset::MAX` is legal). + /// The behaviour in this case is equivalent to `trim(find_tail() - 1)`. /// /// Passing `Offset::INVALID` is a no-op. (success) /// Passing `Offset::OLDEST` trims the first record in the loglet (if exists). - async fn trim(&self, trim_point: LogletOffset) -> Result<(), OperationError>; + /// + /// Returns the new trim point offset if a trim was performed by this call, or `None` otherwise. + async fn trim(&self, trim_point: LogletOffset) -> Result, OperationError>; /// Seal the loglet. This operation is idempotent. /// diff --git a/crates/bifrost/src/loglet_wrapper.rs b/crates/bifrost/src/loglet_wrapper.rs index e7255e0406..dd7091f9e7 100644 --- a/crates/bifrost/src/loglet_wrapper.rs +++ b/crates/bifrost/src/loglet_wrapper.rs @@ -190,16 +190,21 @@ impl LogletWrapper { Ok(offset.map(|o| self.base_lsn.offset_by(o))) } - // trim_point is inclusive. - pub async fn trim(&self, trim_point: Lsn) -> Result<(), OperationError> { + /// `trim_point`: inclusive LSN up to which to trim the loglet; if the LSN is beyond the + /// loglet's end, it will be trimmed in full. + /// + /// Returns the effective trim point of the loglet after a trim was performed, `None` otherwise. + pub async fn trim(&self, trim_point: Lsn) -> Result, OperationError> { // trimming to INVALID is no-op if trim_point == Lsn::INVALID { - return Ok(()); + return Ok(None); } // saturate to the loglet max possible offset. let trim_point = trim_point.min(Lsn::new(LogletOffset::MAX.into())); let trim_point = trim_point.into_offset(self.base_lsn); - self.loglet.trim(trim_point).await + + let trim_offset = self.loglet.trim(trim_point).await?; + Ok(trim_offset.map(|o| self.base_lsn.offset_by(o))) } pub async fn seal(&self) -> Result<(), OperationError> { diff --git a/crates/bifrost/src/providers/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs index e9ab52f938..86a804d35a 100644 --- a/crates/bifrost/src/providers/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -216,9 +216,10 @@ impl Loglet for LocalLoglet { } } - /// Trim the log to the minimum of new_trim_point and last_committed_offset - /// new_trim_point is inclusive (will be trimmed) - async fn trim(&self, new_trim_point: LogletOffset) -> Result<(), OperationError> { + async fn trim( + &self, + new_trim_point: LogletOffset, + ) -> Result, OperationError> { let effective_trim_point = new_trim_point.min(LogletOffset::new( self.last_committed_offset.load(Ordering::Relaxed), )); @@ -233,7 +234,7 @@ impl Loglet for LocalLoglet { if current_trim_point >= effective_trim_point { // nothing to do since we have already trimmed beyond new_trim_point - return Ok(()); + return Ok(None); } counter!(BIFROST_LOCAL_TRIM).increment(1); @@ -255,7 +256,7 @@ impl Loglet for LocalLoglet { "Loglet trim operation enqueued" ); - Ok(()) + Ok(Some(effective_trim_point)) } async fn seal(&self) -> Result<(), OperationError> { diff --git a/crates/bifrost/src/providers/memory_loglet.rs b/crates/bifrost/src/providers/memory_loglet.rs index a1bb9ae0ff..fa18516b72 100644 --- a/crates/bifrost/src/providers/memory_loglet.rs +++ b/crates/bifrost/src/providers/memory_loglet.rs @@ -370,24 +370,27 @@ impl Loglet for MemoryLoglet { } } - async fn trim(&self, new_trim_point: LogletOffset) -> Result<(), OperationError> { + async fn trim( + &self, + requested_trim_point: LogletOffset, + ) -> Result, OperationError> { let mut log = self.log.lock().unwrap(); - let actual_trim_point = new_trim_point.min(LogletOffset::new( + let requested_trim_point = requested_trim_point.min(LogletOffset::new( self.last_committed_offset.load(Ordering::Relaxed), )); let current_trim_point = LogletOffset::new(self.trim_point_offset.load(Ordering::Relaxed)); - if current_trim_point >= actual_trim_point { - return Ok(()); + if current_trim_point >= requested_trim_point { + return Ok(Some(current_trim_point)); } - let trim_point_index = self.saturating_offset_to_index(actual_trim_point); + let trim_point_index = self.saturating_offset_to_index(requested_trim_point); self.trim_point_offset - .store(*actual_trim_point, Ordering::Relaxed); + .store(*requested_trim_point, Ordering::Relaxed); log.drain(0..=trim_point_index); - Ok(()) + Ok(Some(requested_trim_point)) } async fn seal(&self) -> Result<(), OperationError> { diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index 8e176db18c..b096ca68f2 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -46,7 +46,7 @@ pub(super) struct ReplicatedLoglet { #[debug(skip)] networking: Networking, #[debug(skip)] - logservers_rpc: LogServersRpc, + log_servers_rpc: LogServersRpc, #[debug(skip)] sequencers_rpc: SequencersRpc, #[debug(skip)] @@ -66,7 +66,7 @@ impl ReplicatedLoglet { segment_index: SegmentIndex, my_params: ReplicatedLogletParams, networking: Networking, - logservers_rpc: LogServersRpc, + log_servers_rpc: LogServersRpc, sequencers_rpc: SequencersRpc, record_cache: RecordCache, ) -> Self { @@ -85,7 +85,7 @@ impl ReplicatedLoglet { my_params.clone(), selector_strategy, networking.clone(), - logservers_rpc.store.clone(), + log_servers_rpc.store.clone(), record_cache.clone(), known_global_tail.clone(), ), @@ -107,7 +107,7 @@ impl ReplicatedLoglet { segment_index, my_params, networking, - logservers_rpc, + log_servers_rpc, sequencers_rpc, record_cache, known_global_tail, @@ -156,12 +156,12 @@ impl Loglet for ReplicatedLoglet { let known_global_tail = self.known_global_tail.clone(); let my_params = self.my_params.clone(); let networking = self.networking.clone(); - let logservers_rpc = self.logservers_rpc.clone(); + let log_servers_rpc = self.log_servers_rpc.clone(); let (rx_stream, reader_task) = ReadStreamTask::start( my_params, networking, - logservers_rpc, + log_servers_rpc, filter, from, to, @@ -183,7 +183,7 @@ impl Loglet for ReplicatedLoglet { } #[instrument( - level="trace", + level = "trace", skip_all, fields( otel.name = "replicated_loglet: enqueue_batch", @@ -219,7 +219,7 @@ impl Loglet for ReplicatedLoglet { // happening. let result = CheckSealTask::run( &self.my_params, - &self.logservers_rpc.get_loglet_info, + &self.log_servers_rpc.get_loglet_info, &self.known_global_tail, &self.networking, ) @@ -237,7 +237,7 @@ impl Loglet for ReplicatedLoglet { self.segment_index, self.my_params.clone(), self.networking.clone(), - self.logservers_rpc.clone(), + self.log_servers_rpc.clone(), self.sequencers_rpc.clone(), self.known_global_tail.clone(), self.record_cache.clone(), @@ -261,7 +261,7 @@ impl Loglet for ReplicatedLoglet { } #[instrument( - level="error", + level = "error", skip_all, fields( loglet_id = %self.my_params.loglet_id, @@ -271,7 +271,7 @@ impl Loglet for ReplicatedLoglet { async fn get_trim_point(&self) -> Result, OperationError> { GetTrimPointTask::new( &self.my_params, - self.logservers_rpc.clone(), + self.log_servers_rpc.clone(), self.known_global_tail.clone(), ) .run(self.networking.clone()) @@ -280,38 +280,53 @@ impl Loglet for ReplicatedLoglet { } #[instrument( - level="error", + level = "error", skip_all, fields( loglet_id = %self.my_params.loglet_id, - new_trim_point, + trim_point, otel.name = "replicated_loglet: trim", ) )] - /// Trim the log to the min(trim_point, last_committed_offset) - /// trim_point is inclusive (will be trimmed) - async fn trim(&self, trim_point: LogletOffset) -> Result<(), OperationError> { - let trim_point = trim_point.min(self.known_global_tail.latest_offset().prev_unchecked()); + async fn trim( + &self, + requested_trim_point: LogletOffset, + ) -> Result, OperationError> { + let known_global_tail = self.known_global_tail.latest_offset().prev_unchecked(); + let trim_point = if requested_trim_point > known_global_tail { + debug!( + loglet_id = %self.my_params.loglet_id, + requested_trim_point = %requested_trim_point, + known_global_tail = %self.known_global_tail, + "Trim point clamped to the last known global tail" + ); + known_global_tail + } else { + requested_trim_point + }; - TrimTask::new( + let new_trim_point = TrimTask::new( &self.my_params, - self.logservers_rpc.clone(), + self.log_servers_rpc.clone(), self.known_global_tail.clone(), ) .run(trim_point, self.networking.clone()) .await?; + info!( loglet_id=%self.my_params.loglet_id, ?trim_point, - "Loglet has been trimmed successfully" + ?new_trim_point, + "Loglet trim task completed successfully" ); - Ok(()) + + Ok(new_trim_point) } async fn seal(&self) -> Result<(), OperationError> { let _ = SealTask::new( self.my_params.clone(), - self.logservers_rpc.seal.clone(), + self.log_servers_rpc.seal.clone(), self.known_global_tail.clone(), ) .run(self.networking.clone()) diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/trim.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/trim.rs index 979ac3381b..3a6c26a367 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/trim.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/trim.rs @@ -33,7 +33,7 @@ struct TrimError; /// Sends a trim request to as many log-servers in the nodeset. /// /// We broadcast the trim to all nodes that we can, but only wait for write-quorum -/// responses before acknowleding the trim. +/// responses before acknowledging the trim. /// /// The trim operation is idempotent. It's safe to trim a loglet if it's already partially or fully /// trimmed and if it's sealed. @@ -47,22 +47,22 @@ struct TrimError; /// are trimmed to the requested (possibly clamped) trim point or higher. /// /// Calls to `get_trim_point()` that happen after this task should return the clamped trim point -/// or higher (best effort). +/// or higher (best-effort). pub struct TrimTask<'a> { my_params: &'a ReplicatedLogletParams, - logservers_rpc: LogServersRpc, + log_servers_rpc: LogServersRpc, known_global_tail: TailOffsetWatch, } impl<'a> TrimTask<'a> { pub fn new( my_params: &'a ReplicatedLogletParams, - logservers_rpc: LogServersRpc, + log_servers_rpc: LogServersRpc, known_global_tail: TailOffsetWatch, ) -> Self { Self { my_params, - logservers_rpc, + log_servers_rpc, known_global_tail, } } @@ -71,7 +71,7 @@ impl<'a> TrimTask<'a> { self, trim_point: LogletOffset, networking: Networking, - ) -> Result<(), OperationError> { + ) -> Result, OperationError> { // Use the entire nodeset except for StorageState::Disabled. let effective_nodeset = EffectiveNodeSet::new( &self.my_params.nodeset, @@ -87,7 +87,7 @@ impl<'a> TrimTask<'a> { known_global_tail = %self.known_global_tail, "Will not send trim messages to log-servers since the effective trim_point requested is 0" ); - return Ok(()); + return Ok(None); } let mut nodeset_checker = NodeSetChecker::<'_, bool>::new( @@ -116,7 +116,7 @@ impl<'a> TrimTask<'a> { inflight_requests.spawn({ let networking = networking.clone(); - let trim_rpc_router = self.logservers_rpc.trim.clone(); + let trim_rpc_router = self.log_servers_rpc.trim.clone(); let known_global_tail = self.known_global_tail.clone(); async move { @@ -155,13 +155,13 @@ impl<'a> TrimTask<'a> { // Let's keep the rest of the trim requests running in the background. debug!( loglet_id = %self.my_params.loglet_id, - trim_point = %trim_point, + trim_point = %trim_point, known_global_tail = %self.known_global_tail, "Loglet has been trimmed" ); // continue to run the rest of trim requests in the background inflight_requests.detach_all(); - return Ok(()); + return Ok(Some(trim_point)); } } diff --git a/crates/types/src/logs/mod.rs b/crates/types/src/logs/mod.rs index 170e84cfa5..731a0e34f4 100644 --- a/crates/types/src/logs/mod.rs +++ b/crates/types/src/logs/mod.rs @@ -119,7 +119,7 @@ impl From for crate::protobuf::common::Lsn { impl SequenceNumber for Lsn { /// The maximum possible sequence number, this is useful when creating a read stream - /// with an open ended tail. + /// with an open-ended tail. const MAX: Self = Lsn(u64::MAX); /// 0 is not a valid sequence number. This sequence number represents invalid position /// in the log, or that the log has been that has been trimmed. diff --git a/tools/restatectl/src/commands/log/trim_log.rs b/tools/restatectl/src/commands/log/trim_log.rs index 510edd9416..f23b81280e 100644 --- a/tools/restatectl/src/commands/log/trim_log.rs +++ b/tools/restatectl/src/commands/log/trim_log.rs @@ -48,13 +48,24 @@ async fn trim_log(connection: &ConnectionInfo, opts: &TrimLogOpts) -> anyhow::Re log_id: opts.log_id, trim_point: opts.trim_point, }; - client + let response = client .trim_log(trim_request) .await .with_context(|| "failed to submit trim request")? .into_inner(); - c_println!("Submitted"); + match response.trim_point { + Some(trim_point) => { + c_println!( + "Log id {} successfully trimmed to LSN {}", + opts.log_id, + trim_point + ); + } + None => { + c_println!("Log trim request was a no-op"); + } + } Ok(()) } From 128d72f1e45f3a102182b23f04c6edce19619f42 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 30 Dec 2024 14:25:00 +0200 Subject: [PATCH 02/12] Make mock-service-endpoint a workspace library --- Cargo.lock | 1 - Cargo.toml | 1 + tools/mock-service-endpoint/Cargo.toml | 4 +- tools/mock-service-endpoint/src/handler.rs | 393 +++++++++++++++++++ tools/mock-service-endpoint/src/lib.rs | 12 + tools/mock-service-endpoint/src/listener.rs | 57 +++ tools/mock-service-endpoint/src/main.rs | 414 +------------------- 7 files changed, 469 insertions(+), 413 deletions(-) create mode 100644 tools/mock-service-endpoint/src/handler.rs create mode 100644 tools/mock-service-endpoint/src/lib.rs create mode 100644 tools/mock-service-endpoint/src/listener.rs diff --git a/Cargo.lock b/Cargo.lock index 436138462b..f2e0ed8b2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4377,7 +4377,6 @@ dependencies = [ "serde_json", "thiserror 2.0.6", "tokio", - "tokio-stream", "tracing", "tracing-subscriber", ] diff --git a/Cargo.toml b/Cargo.toml index 12f59736b9..be427bb60f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ description = "Restate makes distributed applications easy!" [workspace.dependencies] # Own crates codederror = { path = "crates/codederror" } +mock-service-endpoint = { path = "tools/mock-service-endpoint" } restate-admin = { path = "crates/admin" } restate-admin-rest-model = { path = "crates/admin-rest-model" } restate-base64-util = { path = "crates/base64-util" } diff --git a/tools/mock-service-endpoint/Cargo.toml b/tools/mock-service-endpoint/Cargo.toml index b4bfd1cda2..2aae8a6ca8 100644 --- a/tools/mock-service-endpoint/Cargo.toml +++ b/tools/mock-service-endpoint/Cargo.toml @@ -22,7 +22,9 @@ prost = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } -tokio-stream = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } thiserror = { workspace = true } + +[lib] +path = "src/lib.rs" \ No newline at end of file diff --git a/tools/mock-service-endpoint/src/handler.rs b/tools/mock-service-endpoint/src/handler.rs new file mode 100644 index 0000000000..e579442fab --- /dev/null +++ b/tools/mock-service-endpoint/src/handler.rs @@ -0,0 +1,393 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::convert::Infallible; +use std::fmt::{Display, Formatter}; +use std::str::FromStr; + +use assert2::let_assert; +use async_stream::{stream, try_stream}; +use bytes::Bytes; +use futures::{pin_mut, Stream, StreamExt}; +use http_body_util::{BodyStream, Either, Empty, StreamBody}; +use hyper::body::{Frame, Incoming}; +use hyper::{Request, Response}; +use prost::Message; +use tracing::{debug, error}; + +use restate_service_protocol::codec::ProtobufRawEntryCodec; +use restate_service_protocol::message::{Decoder, Encoder, EncodingError, ProtocolMessage}; +use restate_types::errors::codes; +use restate_types::journal::raw::{EntryHeader, PlainRawEntry, RawEntryCodecError}; +use restate_types::journal::{Entry, EntryType, InputEntry}; +use restate_types::service_protocol::start_message::StateEntry; +use restate_types::service_protocol::{ + self, get_state_entry_message, output_entry_message, ServiceProtocolVersion, StartMessage, +}; + +#[derive(Debug, thiserror::Error)] +enum FrameError { + #[error(transparent)] + EncodingError(EncodingError), + #[error(transparent)] + Hyper(hyper::Error), + #[error("Stream ended before finished replay")] + UnexpectedEOF, + #[error("Journal does not contain expected messages")] + InvalidJournal, + #[error(transparent)] + RawEntryCodecError(#[from] RawEntryCodecError), + #[error(transparent)] + Serde(#[from] serde_json::Error), +} + +pub async fn serve( + req: Request, +) -> Result< + Response< + Either, StreamBody, Infallible>>>>, + >, + Infallible, +> { + let (req_head, req_body) = req.into_parts(); + let mut split = req_head.uri.path().rsplit('/'); + let handler_name = if let Some(handler_name) = split.next() { + handler_name + } else { + return Ok(Response::builder() + .status(404) + .body(Either::Left(Empty::new())) + .unwrap()); + }; + if let Some("Counter") = split.next() { + } else { + return Ok(Response::builder() + .status(404) + .body(Either::Left(Empty::new())) + .unwrap()); + }; + if let Some("invoke") = split.next() { + } else { + return Ok(Response::builder() + .status(404) + .body(Either::Left(Empty::new())) + .unwrap()); + }; + + let req_body = BodyStream::new(req_body); + let mut decoder = Decoder::new(ServiceProtocolVersion::V1, usize::MAX, None); + let encoder = Encoder::new(ServiceProtocolVersion::V1); + + let incoming = stream! { + for await frame in req_body { + match frame { + Ok(frame) => { + if let Ok(data) = frame.into_data() { + decoder.push(data); + loop { + match decoder.consume_next() { + Ok(Some((_header, message))) => yield Ok(message), + Ok(None) => { + break + }, + Err(err) => yield Err(FrameError::EncodingError(err)), + } + } + } + }, + Err(err) => yield Err(FrameError::Hyper(err)), + }; + } + }; + + let handler: Handler = match handler_name.parse() { + Ok(handler) => handler, + Err(_err) => { + return Ok(Response::builder() + .status(404) + .body(Either::Left(Empty::new())) + .unwrap()); + } + }; + + let outgoing = handler.handle(incoming).map(move |message| match message { + Ok(message) => Ok(Frame::data(encoder.encode(message))), + Err(err) => { + error!("Error handling stream: {err:?}"); + Ok(Frame::data(encoder.encode(error(err)))) + } + }); + + Ok(Response::builder() + .status(200) + .header("content-type", "application/vnd.restate.invocation.v1") + .body(Either::Right(StreamBody::new(outgoing))) + .unwrap()) +} + +pub enum Handler { + Get, + Add, +} + +#[derive(Debug, thiserror::Error)] +#[error("Invalid handler")] +pub struct InvalidHandler; + +impl FromStr for Handler { + type Err = InvalidHandler; + + fn from_str(s: &str) -> Result { + match s { + "get" => Ok(Self::Get), + "add" => Ok(Self::Add), + _ => Err(InvalidHandler), + } + } +} + +impl Display for Handler { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Get => write!(f, "get"), + Self::Add => write!(f, "add"), + } + } +} + +impl Handler { + fn handle( + self, + incoming: impl Stream>, + ) -> impl Stream> { + try_stream! { + pin_mut!(incoming); + match (incoming.next().await, incoming.next().await) { + (Some(Ok(ProtocolMessage::Start(start_message))), Some(Ok(ProtocolMessage::UnparsedEntry(input)))) if input.ty() == EntryType::Input => { + let input = input.deserialize_entry_ref::()?; + let_assert!( + Entry::Input(input) = input + ); + + let replay_count = start_message.known_entries as usize - 1; + let mut replayed = Vec::with_capacity(replay_count); + for _ in 0..replay_count { + let message = incoming.next().await.ok_or(FrameError::UnexpectedEOF)??; + replayed.push(message); + } + + debug!("Handling request to {self} with {} known entries", start_message.known_entries); + + match self { + Handler::Get => { + for await message in Self::handle_get(start_message, input, replayed, incoming) { + yield message? + } + }, + Handler::Add => { + for await message in Self::handle_add(start_message, input, replayed, incoming) { + yield message? + } + }, + }; + }, + _ => {Err(FrameError::InvalidJournal)?; return}, + }; + } + } + + fn handle_get( + start_message: StartMessage, + _input: InputEntry, + replayed: Vec, + _incoming: impl Stream>, + ) -> impl Stream> { + try_stream! { + let counter = read_counter(&start_message.state_map); + match replayed.len() { + 0 => { + yield get_state(counter.clone()); + yield output(counter.unwrap_or("0".into())); + yield end(); + }, + 1 => { + yield output(counter.unwrap_or("0".into())); + yield end(); + } + 2=> { + yield end(); + } + _ => {Err(FrameError::InvalidJournal)?; return}, + } + } + } + + fn handle_add( + start_message: StartMessage, + input: InputEntry, + replayed: Vec, + _incoming: impl Stream>, + ) -> impl Stream> { + try_stream! { + let counter = read_counter(&start_message.state_map); + match replayed.len() { + 0 => { + yield get_state(counter.clone()); + + let next_value = match counter { + Some(ref counter) => { + let to_add: i32 = serde_json::from_slice(input.value.as_ref())?; + let current: i32 = serde_json::from_slice(counter.as_ref())?; + + serde_json::to_vec(&(to_add + current))?.into() + } + None => input.value, + }; + + yield set_state(next_value.clone()); + yield output(next_value); + yield end(); + }, + 1 => { + let next_value = match counter { + Some(ref counter) => { + let to_add: i32 = serde_json::from_slice(input.value.as_ref())?; + let current: i32 = serde_json::from_slice(counter.as_ref())?; + + serde_json::to_vec(&(to_add + current))?.into() + } + None => input.value, + }; + + yield set_state(next_value.clone()); + yield output(next_value); + yield end(); + } + 2 => { + let set_value = match &replayed[1] { + ProtocolMessage::UnparsedEntry(set) if set.ty() == EntryType::SetState => { + let set = set.deserialize_entry_ref::()?; + let_assert!( + Entry::SetState(set) = set + ); + set.value.clone() + }, + _ => {Err(FrameError::InvalidJournal)?; return}, + }; + yield output(set_value); + yield end(); + } + 3 => { + yield end(); + } + _ => {Err(FrameError::InvalidJournal)?; return}, + } + } + } +} + +fn read_counter(state_map: &[StateEntry]) -> Option { + let entry = state_map + .iter() + .find(|entry| entry.key.as_ref() == b"counter")?; + Some(entry.value.clone()) +} + +fn get_state(counter: Option) -> ProtocolMessage { + debug!( + "Yielding GetStateEntryMessage with value {}", + LossyDisplay(counter.as_deref()) + ); + + ProtocolMessage::UnparsedEntry(PlainRawEntry::new( + EntryHeader::GetState { is_completed: true }, + service_protocol::GetStateEntryMessage { + name: String::new(), + key: "counter".into(), + result: Some(match counter { + Some(ref counter) => get_state_entry_message::Result::Value(counter.clone()), + None => get_state_entry_message::Result::Empty(service_protocol::Empty {}), + }), + } + .encode_to_vec() + .into(), + )) +} + +fn set_state(value: Bytes) -> ProtocolMessage { + debug!( + "Yielding SetStateEntryMessage with value {}", + LossyDisplay(Some(&value)) + ); + + ProtocolMessage::UnparsedEntry(PlainRawEntry::new( + EntryHeader::SetState, + service_protocol::SetStateEntryMessage { + name: String::new(), + key: "counter".into(), + value: value.clone(), + } + .encode_to_vec() + .into(), + )) +} + +fn output(value: Bytes) -> ProtocolMessage { + debug!( + "Yielding OutputEntryMessage with result {}", + LossyDisplay(Some(&value)) + ); + + ProtocolMessage::UnparsedEntry(PlainRawEntry::new( + EntryHeader::Output, + service_protocol::OutputEntryMessage { + name: String::new(), + result: Some(output_entry_message::Result::Value(value)), + } + .encode_to_vec() + .into(), + )) +} + +fn end() -> ProtocolMessage { + debug!("Yielding EndMessage"); + + ProtocolMessage::End(service_protocol::EndMessage {}) +} + +fn error(err: FrameError) -> ProtocolMessage { + let code = match err { + FrameError::EncodingError(_) => codes::PROTOCOL_VIOLATION, + FrameError::Hyper(_) => codes::INTERNAL, + FrameError::UnexpectedEOF => codes::PROTOCOL_VIOLATION, + FrameError::InvalidJournal => codes::JOURNAL_MISMATCH, + FrameError::RawEntryCodecError(_) => codes::PROTOCOL_VIOLATION, + FrameError::Serde(_) => codes::INTERNAL, + }; + ProtocolMessage::Error(service_protocol::ErrorMessage { + code: code.into(), + description: err.to_string(), + message: String::new(), + related_entry_index: None, + related_entry_name: None, + related_entry_type: None, + next_retry_delay: None, + }) +} + +struct LossyDisplay<'a>(Option<&'a [u8]>); +impl<'a> Display for LossyDisplay<'a> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self.0 { + Some(bytes) => write!(f, "{}", String::from_utf8_lossy(bytes)), + None => write!(f, ""), + } + } +} diff --git a/tools/mock-service-endpoint/src/lib.rs b/tools/mock-service-endpoint/src/lib.rs new file mode 100644 index 0000000000..6e95bd1cc5 --- /dev/null +++ b/tools/mock-service-endpoint/src/lib.rs @@ -0,0 +1,12 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod handler; +pub mod listener; diff --git a/tools/mock-service-endpoint/src/listener.rs b/tools/mock-service-endpoint/src/listener.rs new file mode 100644 index 0000000000..9a5e662f59 --- /dev/null +++ b/tools/mock-service-endpoint/src/listener.rs @@ -0,0 +1,57 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::convert::Infallible; +use std::error::Error; +use std::net::SocketAddr; + +use bytes::Bytes; +use http_body_util::{Either, Full}; +use hyper::server::conn::http2; +use hyper::service::service_fn; +use hyper::Response; +use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; +use tokio::net::TcpListener; +use tracing::error; + +use crate::handler::serve; + +pub async fn run_listener(address: SocketAddr) -> Result<(), Box> { + let listener = TcpListener::bind(address).await?; + + loop { + tokio::select! { + incoming = listener.accept() => { + let (tcp, _) = incoming?; + let io = TokioIo::new(tcp); + tokio::task::spawn(async move { + if let Err(err) = http2::Builder::new(TokioExecutor::new()) + .timer(TokioTimer::new()) + .serve_connection(io, service_fn(|req| async { + if req.uri().path() == "/discover" { + return Ok(Response::builder() + .header("content-type", "application/vnd.restate.endpointmanifest.v1+json") + .body(Either::Left(Full::new(Bytes::from( + r#"{"protocolMode":"BIDI_STREAM","minProtocolVersion":1,"maxProtocolVersion":1,"services":[{"name":"Counter","ty":"VIRTUAL_OBJECT","handlers":[{"name":"add","input":{"required":false,"contentType":"application/json"},"output":{"setContentTypeIfEmpty":false,"contentType":"application/json"},"ty":"EXCLUSIVE"},{"name":"get","input":{"required":false,"contentType":"application/json"},"output":{"setContentTypeIfEmpty":false,"contentType":"application/json"},"ty":"EXCLUSIVE"}]}]}"# + )))).unwrap()); + } + + let (head, body) = serve(req).await?.into_parts(); + Result::<_, Infallible>::Ok(Response::from_parts(head, Either::Right(body))) + })) + .await + { + error!("Error serving connection: {:?}", err); + } + }); + } + } + } +} diff --git a/tools/mock-service-endpoint/src/main.rs b/tools/mock-service-endpoint/src/main.rs index 8891881c0d..88ba6332bd 100644 --- a/tools/mock-service-endpoint/src/main.rs +++ b/tools/mock-service-endpoint/src/main.rs @@ -8,395 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::convert::Infallible; -use std::fmt::{Display, Formatter}; use std::net::SocketAddr; -use std::str::FromStr; -use assert2::let_assert; -use async_stream::{stream, try_stream}; -use bytes::Bytes; -use futures::{pin_mut, Stream, StreamExt}; -use http_body_util::{BodyStream, Either, Empty, Full, StreamBody}; -use hyper::body::{Frame, Incoming}; -use hyper::server::conn::http2; -use hyper::service::service_fn; -use hyper::{Request, Response}; -use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; -use prost::Message; -use tokio::net::TcpListener; -use tracing::{debug, error, info}; +use tracing::info; use tracing_subscriber::filter::LevelFilter; -use restate_service_protocol::codec::ProtobufRawEntryCodec; -use restate_service_protocol::message::{Decoder, Encoder, EncodingError, ProtocolMessage}; -use restate_types::errors::codes; -use restate_types::journal::raw::{EntryHeader, PlainRawEntry, RawEntryCodecError}; -use restate_types::journal::{Entry, EntryType, InputEntry}; -use restate_types::service_protocol::start_message::StateEntry; -use restate_types::service_protocol::{ - self, get_state_entry_message, output_entry_message, ServiceProtocolVersion, StartMessage, -}; - -#[derive(Debug, thiserror::Error)] -enum FrameError { - #[error(transparent)] - EncodingError(EncodingError), - #[error(transparent)] - Hyper(hyper::Error), - #[error("Stream ended before finished replay")] - UnexpectedEOF, - #[error("Journal does not contain expected messages")] - InvalidJournal, - #[error(transparent)] - RawEntryCodecError(#[from] RawEntryCodecError), - #[error(transparent)] - Serde(#[from] serde_json::Error), -} - -async fn serve( - req: Request, -) -> Result< - Response< - Either, StreamBody, Infallible>>>>, - >, - Infallible, -> { - let (req_head, req_body) = req.into_parts(); - let mut split = req_head.uri.path().rsplit('/'); - let handler_name = if let Some(handler_name) = split.next() { - handler_name - } else { - return Ok(Response::builder() - .status(404) - .body(Either::Left(Empty::new())) - .unwrap()); - }; - if let Some("Counter") = split.next() { - } else { - return Ok(Response::builder() - .status(404) - .body(Either::Left(Empty::new())) - .unwrap()); - }; - if let Some("invoke") = split.next() { - } else { - return Ok(Response::builder() - .status(404) - .body(Either::Left(Empty::new())) - .unwrap()); - }; - - let req_body = BodyStream::new(req_body); - let mut decoder = Decoder::new(ServiceProtocolVersion::V1, usize::MAX, None); - let encoder = Encoder::new(ServiceProtocolVersion::V1); - - let incoming = stream! { - for await frame in req_body { - match frame { - Ok(frame) => { - if let Ok(data) = frame.into_data() { - decoder.push(data); - loop { - match decoder.consume_next() { - Ok(Some((_header, message))) => yield Ok(message), - Ok(None) => { - break - }, - Err(err) => yield Err(FrameError::EncodingError(err)), - } - } - } - }, - Err(err) => yield Err(FrameError::Hyper(err)), - }; - } - }; - - let handler: Handler = match handler_name.parse() { - Ok(handler) => handler, - Err(_err) => { - return Ok(Response::builder() - .status(404) - .body(Either::Left(Empty::new())) - .unwrap()); - } - }; - - let outgoing = handler.handle(incoming).map(move |message| match message { - Ok(message) => Ok(Frame::data(encoder.encode(message))), - Err(err) => { - error!("Error handling stream: {err:?}"); - Ok(Frame::data(encoder.encode(error(err)))) - } - }); - - Ok(Response::builder() - .status(200) - .header("content-type", "application/vnd.restate.invocation.v1") - .body(Either::Right(StreamBody::new(outgoing))) - .unwrap()) -} - -enum Handler { - Get, - Add, -} - -#[derive(Debug, thiserror::Error)] -#[error("Invalid handler")] -struct InvalidHandler; - -impl FromStr for Handler { - type Err = InvalidHandler; - - fn from_str(s: &str) -> Result { - match s { - "get" => Ok(Self::Get), - "add" => Ok(Self::Add), - _ => Err(InvalidHandler), - } - } -} - -impl Display for Handler { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Self::Get => write!(f, "get"), - Self::Add => write!(f, "add"), - } - } -} - -impl Handler { - fn handle( - self, - incoming: impl Stream>, - ) -> impl Stream> { - try_stream! { - pin_mut!(incoming); - match (incoming.next().await, incoming.next().await) { - (Some(Ok(ProtocolMessage::Start(start_message))), Some(Ok(ProtocolMessage::UnparsedEntry(input)))) if input.ty() == EntryType::Input => { - let input = input.deserialize_entry_ref::()?; - let_assert!( - Entry::Input(input) = input - ); - - let replay_count = start_message.known_entries as usize - 1; - let mut replayed = Vec::with_capacity(replay_count); - for _ in 0..replay_count { - let message = incoming.next().await.ok_or(FrameError::UnexpectedEOF)??; - replayed.push(message); - } - - debug!("Handling request to {self} with {} known entries", start_message.known_entries); - - match self { - Handler::Get => { - for await message in Self::handle_get(start_message, input, replayed, incoming) { - yield message? - } - }, - Handler::Add => { - for await message in Self::handle_add(start_message, input, replayed, incoming) { - yield message? - } - }, - }; - }, - _ => {Err(FrameError::InvalidJournal)?; return}, - }; - } - } - - fn handle_get( - start_message: StartMessage, - _input: InputEntry, - replayed: Vec, - _incoming: impl Stream>, - ) -> impl Stream> { - try_stream! { - let counter = read_counter(&start_message.state_map); - match replayed.len() { - 0 => { - yield get_state(counter.clone()); - yield output(counter.unwrap_or("0".into())); - yield end(); - }, - 1 => { - yield output(counter.unwrap_or("0".into())); - yield end(); - } - 2=> { - yield end(); - } - _ => {Err(FrameError::InvalidJournal)?; return}, - } - } - } - - fn handle_add( - start_message: StartMessage, - input: InputEntry, - replayed: Vec, - _incoming: impl Stream>, - ) -> impl Stream> { - try_stream! { - let counter = read_counter(&start_message.state_map); - match replayed.len() { - 0 => { - yield get_state(counter.clone()); - - let next_value = match counter { - Some(ref counter) => { - let to_add: i32 = serde_json::from_slice(input.value.as_ref())?; - let current: i32 = serde_json::from_slice(counter.as_ref())?; - - serde_json::to_vec(&(to_add + current))?.into() - } - None => input.value, - }; - - yield set_state(next_value.clone()); - yield output(next_value); - yield end(); - }, - 1 => { - let next_value = match counter { - Some(ref counter) => { - let to_add: i32 = serde_json::from_slice(input.value.as_ref())?; - let current: i32 = serde_json::from_slice(counter.as_ref())?; - - serde_json::to_vec(&(to_add + current))?.into() - } - None => input.value, - }; - - yield set_state(next_value.clone()); - yield output(next_value); - yield end(); - } - 2 => { - let set_value = match &replayed[1] { - ProtocolMessage::UnparsedEntry(set) if set.ty() == EntryType::SetState => { - let set = set.deserialize_entry_ref::()?; - let_assert!( - Entry::SetState(set) = set - ); - set.value.clone() - }, - _ => {Err(FrameError::InvalidJournal)?; return}, - }; - yield output(set_value); - yield end(); - } - 3 => { - yield end(); - } - _ => {Err(FrameError::InvalidJournal)?; return}, - } - } - } -} - -fn read_counter(state_map: &[StateEntry]) -> Option { - let entry = state_map - .iter() - .find(|entry| entry.key.as_ref() == b"counter")?; - Some(entry.value.clone()) -} - -fn get_state(counter: Option) -> ProtocolMessage { - debug!( - "Yielding GetStateEntryMessage with value {}", - LossyDisplay(counter.as_deref()) - ); - - ProtocolMessage::UnparsedEntry(PlainRawEntry::new( - EntryHeader::GetState { is_completed: true }, - service_protocol::GetStateEntryMessage { - name: String::new(), - key: "counter".into(), - result: Some(match counter { - Some(ref counter) => get_state_entry_message::Result::Value(counter.clone()), - None => get_state_entry_message::Result::Empty(service_protocol::Empty {}), - }), - } - .encode_to_vec() - .into(), - )) -} - -fn set_state(value: Bytes) -> ProtocolMessage { - debug!( - "Yielding SetStateEntryMessage with value {}", - LossyDisplay(Some(&value)) - ); - - ProtocolMessage::UnparsedEntry(PlainRawEntry::new( - EntryHeader::SetState, - service_protocol::SetStateEntryMessage { - name: String::new(), - key: "counter".into(), - value: value.clone(), - } - .encode_to_vec() - .into(), - )) -} - -fn output(value: Bytes) -> ProtocolMessage { - debug!( - "Yielding OutputEntryMessage with result {}", - LossyDisplay(Some(&value)) - ); - - ProtocolMessage::UnparsedEntry(PlainRawEntry::new( - EntryHeader::Output, - service_protocol::OutputEntryMessage { - name: String::new(), - result: Some(output_entry_message::Result::Value(value)), - } - .encode_to_vec() - .into(), - )) -} - -fn end() -> ProtocolMessage { - debug!("Yielding EndMessage"); - - ProtocolMessage::End(service_protocol::EndMessage {}) -} - -fn error(err: FrameError) -> ProtocolMessage { - let code = match err { - FrameError::EncodingError(_) => codes::PROTOCOL_VIOLATION, - FrameError::Hyper(_) => codes::INTERNAL, - FrameError::UnexpectedEOF => codes::PROTOCOL_VIOLATION, - FrameError::InvalidJournal => codes::JOURNAL_MISMATCH, - FrameError::RawEntryCodecError(_) => codes::PROTOCOL_VIOLATION, - FrameError::Serde(_) => codes::INTERNAL, - }; - ProtocolMessage::Error(service_protocol::ErrorMessage { - code: code.into(), - description: err.to_string(), - message: String::new(), - related_entry_index: None, - related_entry_name: None, - related_entry_type: None, - next_retry_delay: None, - }) -} - -struct LossyDisplay<'a>(Option<&'a [u8]>); -impl<'a> Display for LossyDisplay<'a> { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self.0 { - Some(bytes) => write!(f, "{}", String::from_utf8_lossy(bytes)), - None => write!(f, ""), - } - } -} +use mock_service_endpoint::listener::run_listener; #[tokio::main] pub async fn main() -> Result<(), Box> { @@ -413,31 +30,6 @@ pub async fn main() -> Result<(), Box> { let addr: SocketAddr = ([127, 0, 0, 1], 9080).into(); - let listener = TcpListener::bind(addr).await?; info!("Listening on http://{}", addr); - loop { - let (tcp, _) = listener.accept().await?; - let io = TokioIo::new(tcp); - - tokio::task::spawn(async move { - if let Err(err) = http2::Builder::new(TokioExecutor::new()) - .timer(TokioTimer::new()) - .serve_connection(io, service_fn(|req| async { - if req.uri().path() == "/discover" { - return Ok(Response::builder() - .header("content-type", "application/vnd.restate.endpointmanifest.v1+json") - .body(Either::Left(Full::new(Bytes::from( - r#"{"protocolMode":"BIDI_STREAM","minProtocolVersion":1,"maxProtocolVersion":1,"services":[{"name":"Counter","ty":"VIRTUAL_OBJECT","handlers":[{"name":"add","input":{"required":false,"contentType":"application/json"},"output":{"setContentTypeIfEmpty":false,"contentType":"application/json"},"ty":"EXCLUSIVE"},{"name":"get","input":{"required":false,"contentType":"application/json"},"output":{"setContentTypeIfEmpty":false,"contentType":"application/json"},"ty":"EXCLUSIVE"}]}]}"# - )))).unwrap()); - } - - let (head, body) = serve(req).await?.into_parts(); - Result::<_, Infallible>::Ok(Response::from_parts(head, Either::Right(body))) - })) - .await - { - println!("Error serving connection: {:?}", err); - } - }); - } + run_listener(addr).await } From 5fb7e9090fde65a9f8b4050cda80dd135cffc2c4 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Tue, 31 Dec 2024 17:44:26 +0200 Subject: [PATCH 03/12] Add trim gap handling end-to-end test --- Cargo.lock | 3 + server/Cargo.toml | 3 + server/tests/trim_gap_handling.rs | 288 ++++++++++++++++++++++++++++++ 3 files changed, 294 insertions(+) create mode 100644 server/tests/trim_gap_handling.rs diff --git a/Cargo.lock b/Cargo.lock index f2e0ed8b2a..3dafbc87e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6735,8 +6735,10 @@ dependencies = [ "googletest", "humantime", "hyper-util", + "mock-service-endpoint", "pin-project", "regex", + "reqwest", "restate-admin", "restate-bifrost", "restate-core", @@ -6753,6 +6755,7 @@ dependencies = [ "rust-rocksdb", "schemars", "serde", + "serde_json", "serde_with", "tempfile", "test-log", diff --git a/server/Cargo.toml b/server/Cargo.toml index e8485a7c19..8244fedbcc 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -72,6 +72,7 @@ restate-core = { workspace = true, features = ["test-util"] } restate-local-cluster-runner = { workspace = true } restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } +mock-service-endpoint = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } @@ -83,6 +84,8 @@ test-log = { workspace = true } tonic = { workspace = true, features = ["transport", "prost"] } tower = { workspace = true } tracing-subscriber = { workspace = true } +reqwest = { workspace = true } +serde_json = { workspace = true } url = { workspace = true } [target.'cfg(not(target_env = "msvc"))'.dependencies] diff --git a/server/tests/trim_gap_handling.rs b/server/tests/trim_gap_handling.rs new file mode 100644 index 0000000000..4685cfb126 --- /dev/null +++ b/server/tests/trim_gap_handling.rs @@ -0,0 +1,288 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::net::SocketAddr; +use std::time::Duration; + +use enumset::enum_set; +use futures_util::StreamExt; +use googletest::fail; +use hyper_util::rt::TokioIo; +use tempfile::TempDir; +use test_log::test; +use tokio::io; +use tokio::net::UnixStream; +use tonic::codec::CompressionEncoding; +use tonic::transport::{Channel, Endpoint, Uri}; +use tower::service_fn; +use tracing::{error, info}; +use url::Url; + +use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; +use restate_admin::cluster_controller::protobuf::{ + ClusterStateRequest, CreatePartitionSnapshotRequest, TrimLogRequest, +}; +use restate_local_cluster_runner::{ + cluster::Cluster, + node::{BinarySource, Node}, +}; +use restate_types::config::{LogFormat, MetadataStoreClient}; +use restate_types::logs::metadata::ProviderKind::Replicated; +use restate_types::net::AdvertisedAddress; +use restate_types::protobuf::cluster::node_state::State; +use restate_types::retries::RetryPolicy; +use restate_types::{config::Configuration, nodes_config::Role}; + +mod common; + +#[test(restate_core::test)] +async fn fast_forward_over_trim_gap() -> googletest::Result<()> { + let mut base_config = Configuration::default(); + base_config.common.bootstrap_num_partitions = 1.try_into()?; + base_config.bifrost.default_provider = Replicated; + base_config.common.log_filter = "restate=debug,warn".to_owned(); + base_config.common.log_format = LogFormat::Compact; + + let snapshots_dir = TempDir::new()?; + base_config.worker.snapshots.destination = Some( + Url::from_file_path(snapshots_dir.path()) + .unwrap() + .to_string(), + ); + + let nodes = Node::new_test_nodes_with_metadata( + base_config.clone(), + BinarySource::CargoTest, + enum_set!(Role::Worker | Role::LogServer), + 2, + ); + let admin_node = &nodes[0]; + + let worker_1 = &nodes[1]; + let worker_2 = &nodes[2]; + + let mut worker_1_ready = worker_1.lines("PartitionProcessor starting event loop".parse()?); + let mut worker_2_ready = worker_2.lines("PartitionProcessor starting event loop".parse()?); + + let mut cluster = Cluster::builder() + .temp_base_dir() + .nodes(nodes.clone()) + .build() + .start() + .await?; + + cluster.wait_healthy(Duration::from_secs(10)).await?; + tokio::time::timeout(Duration::from_secs(10), worker_1_ready.next()).await?; + tokio::time::timeout(Duration::from_secs(10), worker_2_ready.next()).await?; + + let mut client = + ClusterCtrlSvcClient::new(grpc_connect(cluster.nodes[0].node_address().clone()).await?) + .accept_compressed(CompressionEncoding::Gzip); + + any_partition_active(&mut client, Duration::from_secs(5)).await?; + + let addr: SocketAddr = "127.0.0.1:9080".parse()?; + tokio::spawn(async move { + info!("Starting mock service on http://{}", addr); + if let Err(e) = mock_service_endpoint::listener::run_listener(addr).await { + error!("Error running listener: {:?}", e); + } + }); + + let http_client = reqwest::Client::new(); + let registration_response = http_client + .post(format!( + "http://{}/deployments", + admin_node.config().admin.bind_address + )) + .header("content-type", "application/json") + .json(&serde_json::json!({ "uri": "http://localhost:9080" })) + .send() + .await?; + info!("Registration response: {:?}", registration_response); + assert!(registration_response.status().is_success()); + + let ingress_url = format!( + "http://{}/Counter/0/get", + worker_1.config().ingress.bind_address + ); + + // It takes a little bit for the service to become available for invocations + let mut retry = RetryPolicy::fixed_delay(Duration::from_millis(500), Some(10)).into_iter(); + loop { + let invoke_response = http_client.post(ingress_url.clone()).send().await?; + info!("Invoke response: {:?}", invoke_response); + if invoke_response.status().is_success() { + break; + } + if let Some(delay) = retry.next() { + tokio::time::sleep(delay).await; + } else { + fail!("Failed to invoke worker")?; + } + } + + let snapshot_response = client + .create_partition_snapshot(CreatePartitionSnapshotRequest { partition_id: 0 }) + .await? + .into_inner(); + + // todo(pavel): we don't have a confirmed trimmed LSN, and it takes a bit of time for the log tail info to propagate + client + .trim_log(TrimLogRequest { + log_id: 0, + trim_point: 3, + }) + .await?; + + tokio::time::sleep(Duration::from_secs(1)).await; + client + .trim_log(TrimLogRequest { + log_id: 0, + trim_point: 3, + }) + .await?; + + tokio::time::sleep(Duration::from_secs(1)).await; + client + .trim_log(TrimLogRequest { + log_id: 0, + trim_point: 3, + }) + .await?; + + // todo(pavel): if create snapshot returned an LSN, we could trim the log to that specific LSN instead of guessing + let mut no_snapshot_config = base_config.clone(); + no_snapshot_config.worker.snapshots.destination = None; + let node_3_name = "node-3"; + let mut node_3 = Node::new_test_node( + node_3_name, + no_snapshot_config, + BinarySource::CargoTest, + enum_set!(Role::HttpIngress | Role::Worker), + ); + *node_3.metadata_store_client_mut() = MetadataStoreClient::Embedded { + address: cluster.nodes[0].node_address().clone(), + }; + + let mut trim_gap_encountered = + node_3.lines("Partition processor stopped due to a log trim gap, and no snapshot repository is configured".parse()?); + cluster.push_node(node_3).await?; + assert!( + tokio::time::timeout(Duration::from_secs(20), trim_gap_encountered.next()) + .await + .is_ok() + ); + + let worker_3 = &mut cluster.nodes[3]; + assert_eq!(worker_3.config().node_name(), node_3_name); + worker_3.graceful_shutdown(Duration::from_secs(2)).await?; + + // Restart node 3 with snapshot repository configured + let mut worker_3 = Node::new_test_node( + node_3_name, + base_config.clone(), + BinarySource::CargoTest, + enum_set!(Role::HttpIngress | Role::Worker), + ); + *worker_3.metadata_store_client_mut() = MetadataStoreClient::Embedded { + address: cluster.nodes[0].node_address().clone(), + }; + + let worker_3_ingress_url = format!( + "http://{}/Counter/0/get", + worker_3.config().ingress.bind_address + ); + + let mut worker_3_ready = worker_3.lines( + format!( + "Importing partition store snapshot.*{}", + snapshot_response.snapshot_id + ) + .parse()?, + ); + + worker_3 + .start_clustered(cluster.base_dir(), cluster.cluster_name()) + .await?; + + assert!( + tokio::time::timeout(Duration::from_secs(20), worker_3_ready.next()) + .await + .is_ok() + ); + + let invoke_response = http_client + .post(worker_3_ingress_url.clone()) + .send() + .await?; + assert!(invoke_response.status().is_success()); + + Ok(()) +} + +async fn any_partition_active( + client: &mut ClusterCtrlSvcClient, + timeout: Duration, +) -> googletest::Result<()> { + let deadline = tokio::time::Instant::now() + timeout; + loop { + let cluster_state = client + .get_cluster_state(ClusterStateRequest {}) + .await? + .into_inner() + .cluster_state + .unwrap(); + + if cluster_state.nodes.values().any(|n| { + n.state.as_ref().is_some_and(|s| match s { + State::Alive(s) => s + .partitions + .values() + .any(|p| p.effective_mode.cmp(&1).is_eq()), + _ => false, + }) + }) { + break; // partition is ready; we can request snapshot + } + if tokio::time::Instant::now() > deadline { + fail!( + "Partition processor did not become ready within {:?}", + timeout + )?; + } + tokio::time::sleep(Duration::from_millis(250)).await; + } + Ok(()) +} + +async fn grpc_connect(address: AdvertisedAddress) -> Result { + match address { + AdvertisedAddress::Uds(uds_path) => { + // dummy endpoint required to specify an uds connector, it is not used anywhere + Endpoint::try_from("http://127.0.0.1") + .expect("/ should be a valid Uri") + .connect_with_connector(service_fn(move |_: Uri| { + let uds_path = uds_path.clone(); + async move { + Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?)) + } + })).await + } + AdvertisedAddress::Http(uri) => { + Channel::builder(uri) + .connect_timeout(Duration::from_secs(2)) + .timeout(Duration::from_secs(2)) + .http2_adaptive_window(true) + .connect() + .await + } + } +} From 862151a1b185805d573ca0b72800ae6ff5e4f9ff Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Thu, 2 Jan 2025 12:46:08 +0200 Subject: [PATCH 04/12] Add a Drop implementation for Cluster to prevent leaking nodes --- crates/local-cluster-runner/src/node/mod.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index a147666af4..b86faee90a 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -749,6 +749,25 @@ impl StartedNode { } } +impl Drop for StartedNode { + fn drop(&mut self) { + if let StartedNodeStatus::Running { pid, .. } = self.status { + match nix::sys::signal::kill( + nix::unistd::Pid::from_raw(pid.try_into().unwrap()), + nix::sys::signal::SIGKILL, + ) { + Ok(()) | Err(nix::errno::Errno::ESRCH) => {} + err => error!( + "Failed to send SIGKILL to running node {} (pid {}): {:?}", + self.config.node_name(), + pid, + err, + ), + } + } + } +} + #[derive(Debug, Clone, Copy)] pub enum HealthCheck { Admin, From 47535cc0281c5142ed0367bf019ee4e8eb883a5b Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Thu, 2 Jan 2025 15:40:09 +0200 Subject: [PATCH 05/12] Assert log convergence after new follower joins --- server/tests/trim_gap_handling.rs | 152 ++++++++++++++++++++++-------- 1 file changed, 111 insertions(+), 41 deletions(-) diff --git a/server/tests/trim_gap_handling.rs b/server/tests/trim_gap_handling.rs index 4685cfb126..4fea2a14d0 100644 --- a/server/tests/trim_gap_handling.rs +++ b/server/tests/trim_gap_handling.rs @@ -16,12 +16,12 @@ use futures_util::StreamExt; use googletest::fail; use hyper_util::rt::TokioIo; use tempfile::TempDir; -use test_log::test; use tokio::io; use tokio::net::UnixStream; use tonic::codec::CompressionEncoding; use tonic::transport::{Channel, Endpoint, Uri}; use tower::service_fn; +use tracing::level_filters::LevelFilter; use tracing::{error, info}; use url::Url; @@ -42,8 +42,17 @@ use restate_types::{config::Configuration, nodes_config::Role}; mod common; -#[test(restate_core::test)] +#[tokio::test] async fn fast_forward_over_trim_gap() -> googletest::Result<()> { + tracing_subscriber::fmt() + .event_format(tracing_subscriber::fmt::format().compact()) + .with_env_filter( + tracing_subscriber::EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(), + ) + .init(); + let mut base_config = Configuration::default(); base_config.common.bootstrap_num_partitions = 1.try_into()?; base_config.bifrost.default_provider = Replicated; @@ -106,7 +115,6 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { .json(&serde_json::json!({ "uri": "http://localhost:9080" })) .send() .await?; - info!("Registration response: {:?}", registration_response); assert!(registration_response.status().is_success()); let ingress_url = format!( @@ -134,31 +142,9 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { .await? .into_inner(); - // todo(pavel): we don't have a confirmed trimmed LSN, and it takes a bit of time for the log tail info to propagate - client - .trim_log(TrimLogRequest { - log_id: 0, - trim_point: 3, - }) - .await?; - - tokio::time::sleep(Duration::from_secs(1)).await; - client - .trim_log(TrimLogRequest { - log_id: 0, - trim_point: 3, - }) - .await?; - - tokio::time::sleep(Duration::from_secs(1)).await; - client - .trim_log(TrimLogRequest { - log_id: 0, - trim_point: 3, - }) - .await?; - // todo(pavel): if create snapshot returned an LSN, we could trim the log to that specific LSN instead of guessing + trim_log(&mut client, 3).await?; + let mut no_snapshot_config = base_config.clone(); no_snapshot_config.worker.snapshots.destination = None; let node_3_name = "node-3"; @@ -196,12 +182,7 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { address: cluster.nodes[0].node_address().clone(), }; - let worker_3_ingress_url = format!( - "http://{}/Counter/0/get", - worker_3.config().ingress.bind_address - ); - - let mut worker_3_ready = worker_3.lines( + let mut worker_3_imported_snapshot = worker_3.lines( format!( "Importing partition store snapshot.*{}", snapshot_response.snapshot_id @@ -209,21 +190,30 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { .parse()?, ); - worker_3 + let ingress_url = format!( + "http://{}/Counter/0/get", + worker_3.config().ingress.bind_address + ); + + let _started_node = worker_3 .start_clustered(cluster.base_dir(), cluster.cluster_name()) .await?; - assert!( - tokio::time::timeout(Duration::from_secs(20), worker_3_ready.next()) + tokio::time::timeout(Duration::from_secs(20), worker_3_imported_snapshot.next()) .await .is_ok() ); - let invoke_response = http_client - .post(worker_3_ingress_url.clone()) + // todo(pavel): promote node 3 to be the leader for partition 0 and invoke the service again + // for now, we just ensure that the new node is applying newly appended log records + assert!(http_client + .post(ingress_url) .send() - .await?; - assert!(invoke_response.status().is_success()); + .await? + .status() + .is_success()); + + applied_lsn_converged(&mut client, Duration::from_secs(5), 3, 0).await?; Ok(()) } @@ -250,7 +240,7 @@ async fn any_partition_active( _ => false, }) }) { - break; // partition is ready; we can request snapshot + break; } if tokio::time::Instant::now() > deadline { fail!( @@ -263,6 +253,86 @@ async fn any_partition_active( Ok(()) } +async fn applied_lsn_converged( + client: &mut ClusterCtrlSvcClient, + timeout: Duration, + expected_processors: usize, + partition_id: u32, +) -> googletest::Result<()> { + assert!(expected_processors > 0); + let deadline = tokio::time::Instant::now() + timeout; + loop { + let cluster_state = client + .get_cluster_state(ClusterStateRequest {}) + .await? + .into_inner() + .cluster_state + .unwrap(); + + let applied_lsn: Vec<_> = cluster_state + .nodes + .values() + .filter_map(|n| { + n.state + .as_ref() + .map(|s| match s { + State::Alive(s) => s + .partitions + .get(&partition_id) + .map(|p| p.last_applied_log_lsn) + .unwrap_or_default() + .map(|lsn| (partition_id, lsn.value)), + _ => None, + }) + .unwrap_or_default() + }) + .collect(); + + if applied_lsn.len() == expected_processors + && applied_lsn.iter().all(|(_, lsn)| *lsn == applied_lsn[0].1) + { + info!("All partition processors converged: {:?}", applied_lsn); + break; + } + + if tokio::time::Instant::now() > deadline { + fail!( + "Partition processors did not converge on the same applied LSN within {:?}", + timeout + )?; + } + tokio::time::sleep(Duration::from_millis(250)).await; + } + Ok(()) +} + +async fn trim_log( + client: &mut ClusterCtrlSvcClient, + trim_point: u64, +) -> googletest::Result<()> { + // todo(pavel): remove this hack which ensures we actually trim the log + + // Since we don't have a confirmed trimmed LSN in the response, and it takes a bit of time for + // the log tail info to propagate to the admin node, we must wait long enough to cover the + // heartbeat interval before we retry. The first attempt is usually a no-op as the admin does + // not know the effective global tail. + client + .trim_log(TrimLogRequest { + log_id: 0, + trim_point, + }) + .await?; + + tokio::time::sleep(Duration::from_secs(2)).await; + client + .trim_log(TrimLogRequest { + log_id: 0, + trim_point, + }) + .await?; + Ok(()) +} + async fn grpc_connect(address: AdvertisedAddress) -> Result { match address { AdvertisedAddress::Uds(uds_path) => { From 0a0e6285ad7e26685e8d96c61d981ed2030470c4 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Thu, 2 Jan 2025 17:54:45 +0200 Subject: [PATCH 06/12] Don't pass ownership of the cluster to the inner future to avoid it getting dropped --- crates/local-cluster-runner/src/node/mod.rs | 7 ++++++- server/tests/common/replicated_loglet.rs | 5 ++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index b86faee90a..d67c27f5b8 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -220,7 +220,7 @@ impl Node { nodes } - /// Start this Node, providing the base_dir and the cluster_name of the cluster its + /// Start this Node, providing the base_dir and the cluster_name of the cluster it's /// expected to attach to. All relative file paths addresses specified in the node config /// (eg, nodename/node.sock) will be absolutized against the base path, and the base dir /// and cluster name present in config will be overwritten. @@ -752,6 +752,11 @@ impl StartedNode { impl Drop for StartedNode { fn drop(&mut self) { if let StartedNodeStatus::Running { pid, .. } = self.status { + warn!( + "Node {} (pid {}) dropped without explicit shutdown", + self.config.node_name(), + pid, + ); match nix::sys::signal::kill( nix::unistd::Pid::from_raw(pid.try_into().unwrap()), nix::sys::signal::SIGKILL, diff --git a/server/tests/common/replicated_loglet.rs b/server/tests/common/replicated_loglet.rs index 6287074541..301ee3e82e 100644 --- a/server/tests/common/replicated_loglet.rs +++ b/server/tests/common/replicated_loglet.rs @@ -89,7 +89,6 @@ pub struct TestEnv { pub loglet: Arc, pub metadata_writer: MetadataWriter, pub metadata_store_client: MetadataStoreClient, - pub cluster: StartedCluster, } impl TestEnv { @@ -128,7 +127,7 @@ where RocksDbManager::init(Configuration::mapped_updateable(|c| &c.common)); - let cluster = Cluster::builder() + let mut cluster = Cluster::builder() .base_dir(base_dir.as_path().to_owned()) .nodes(nodes) .build() @@ -175,12 +174,12 @@ where future(TestEnv { bifrost, loglet, - cluster, metadata_writer, metadata_store_client, }) .await?; + cluster.graceful_shutdown(Duration::from_secs(1)).await?; TaskCenter::shutdown_node("test completed", 0).await; RocksDbManager::get().shutdown().await; Ok(()) From ed308f207aaa600d633e0af14ea38e963b15fea6 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Thu, 2 Jan 2025 17:55:01 +0200 Subject: [PATCH 07/12] Robustness improvements --- server/tests/trim_gap_handling.rs | 112 +++++++++++++++--------------- 1 file changed, 57 insertions(+), 55 deletions(-) diff --git a/server/tests/trim_gap_handling.rs b/server/tests/trim_gap_handling.rs index 4fea2a14d0..22601c04b8 100644 --- a/server/tests/trim_gap_handling.rs +++ b/server/tests/trim_gap_handling.rs @@ -59,6 +59,8 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { base_config.common.log_filter = "restate=debug,warn".to_owned(); base_config.common.log_format = LogFormat::Compact; + let no_snapshot_repository_config = base_config.clone(); + let snapshots_dir = TempDir::new()?; base_config.worker.snapshots.destination = Some( Url::from_file_path(snapshots_dir.path()) @@ -126,7 +128,6 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { let mut retry = RetryPolicy::fixed_delay(Duration::from_millis(500), Some(10)).into_iter(); loop { let invoke_response = http_client.post(ingress_url.clone()).send().await?; - info!("Invoke response: {:?}", invoke_response); if invoke_response.status().is_success() { break; } @@ -145,35 +146,33 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { // todo(pavel): if create snapshot returned an LSN, we could trim the log to that specific LSN instead of guessing trim_log(&mut client, 3).await?; - let mut no_snapshot_config = base_config.clone(); - no_snapshot_config.worker.snapshots.destination = None; - let node_3_name = "node-3"; - let mut node_3 = Node::new_test_node( - node_3_name, - no_snapshot_config, + let mut worker_3 = Node::new_test_node( + "node-3", + no_snapshot_repository_config, BinarySource::CargoTest, enum_set!(Role::HttpIngress | Role::Worker), ); - *node_3.metadata_store_client_mut() = MetadataStoreClient::Embedded { + *worker_3.metadata_store_client_mut() = MetadataStoreClient::Embedded { address: cluster.nodes[0].node_address().clone(), }; let mut trim_gap_encountered = - node_3.lines("Partition processor stopped due to a log trim gap, and no snapshot repository is configured".parse()?); - cluster.push_node(node_3).await?; + worker_3.lines("Partition processor stopped due to a log trim gap, and no snapshot repository is configured".parse()?); + + info!("Waiting for partition processor to encounter log trim gap"); + let mut worker_3 = worker_3 + .start_clustered(cluster.base_dir(), cluster.cluster_name()) + .await?; assert!( tokio::time::timeout(Duration::from_secs(20), trim_gap_encountered.next()) .await .is_ok() ); + worker_3.graceful_shutdown(Duration::from_secs(1)).await?; - let worker_3 = &mut cluster.nodes[3]; - assert_eq!(worker_3.config().node_name(), node_3_name); - worker_3.graceful_shutdown(Duration::from_secs(2)).await?; - - // Restart node 3 with snapshot repository configured + info!("Re-starting additional node with snapshot repository configured"); let mut worker_3 = Node::new_test_node( - node_3_name, + "node-3", base_config.clone(), BinarySource::CargoTest, enum_set!(Role::HttpIngress | Role::Worker), @@ -182,6 +181,10 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { address: cluster.nodes[0].node_address().clone(), }; + let ingress_url = format!( + "http://{}/Counter/0/get", + worker_3.config().ingress.bind_address + ); let mut worker_3_imported_snapshot = worker_3.lines( format!( "Importing partition store snapshot.*{}", @@ -189,13 +192,7 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { ) .parse()?, ); - - let ingress_url = format!( - "http://{}/Counter/0/get", - worker_3.config().ingress.bind_address - ); - - let _started_node = worker_3 + let mut worker_3 = worker_3 .start_clustered(cluster.base_dir(), cluster.cluster_name()) .await?; assert!( @@ -205,16 +202,17 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { ); // todo(pavel): promote node 3 to be the leader for partition 0 and invoke the service again - // for now, we just ensure that the new node is applying newly appended log records + // right now, all we are asserting is that the new node is applying newly appended log records assert!(http_client .post(ingress_url) .send() .await? .status() .is_success()); - applied_lsn_converged(&mut client, Duration::from_secs(5), 3, 0).await?; + worker_3.graceful_shutdown(Duration::from_secs(1)).await?; + cluster.graceful_shutdown(Duration::from_secs(1)).await?; Ok(()) } @@ -253,6 +251,33 @@ async fn any_partition_active( Ok(()) } +async fn trim_log( + client: &mut ClusterCtrlSvcClient, + trim_point: u64, +) -> googletest::Result<()> { + // todo(pavel): this is flimsy, ensure we actually trim the log to a particular LSN + + // Since we don't have a confirmed trimmed LSN in the response, and it takes a bit of time for + // the log tail info to propagate to the admin node, we must wait long enough to cover the + // heartbeat interval before we retry. The first attempt is usually a no-op as the admin does + // not know the effective global tail. + let mut i = 0; + loop { + client + .trim_log(TrimLogRequest { + log_id: 0, + trim_point, + }) + .await?; + if i >= 2 { + break; + } + tokio::time::sleep(Duration::from_secs(1)).await; + i += 1; + } + Ok(()) +} + async fn applied_lsn_converged( client: &mut ClusterCtrlSvcClient, timeout: Duration, @@ -260,6 +285,10 @@ async fn applied_lsn_converged( partition_id: u32, ) -> googletest::Result<()> { assert!(expected_processors > 0); + info!( + "Waiting for {} partition processors to converge on the same applied LSN", + expected_processors + ); let deadline = tokio::time::Instant::now() + timeout; loop { let cluster_state = client @@ -291,14 +320,14 @@ async fn applied_lsn_converged( if applied_lsn.len() == expected_processors && applied_lsn.iter().all(|(_, lsn)| *lsn == applied_lsn[0].1) { - info!("All partition processors converged: {:?}", applied_lsn); break; } if tokio::time::Instant::now() > deadline { fail!( - "Partition processors did not converge on the same applied LSN within {:?}", - timeout + "Partition processors did not converge on the same applied LSN within {:?}: {:?}", + timeout, + applied_lsn )?; } tokio::time::sleep(Duration::from_millis(250)).await; @@ -306,33 +335,6 @@ async fn applied_lsn_converged( Ok(()) } -async fn trim_log( - client: &mut ClusterCtrlSvcClient, - trim_point: u64, -) -> googletest::Result<()> { - // todo(pavel): remove this hack which ensures we actually trim the log - - // Since we don't have a confirmed trimmed LSN in the response, and it takes a bit of time for - // the log tail info to propagate to the admin node, we must wait long enough to cover the - // heartbeat interval before we retry. The first attempt is usually a no-op as the admin does - // not know the effective global tail. - client - .trim_log(TrimLogRequest { - log_id: 0, - trim_point, - }) - .await?; - - tokio::time::sleep(Duration::from_secs(2)).await; - client - .trim_log(TrimLogRequest { - log_id: 0, - trim_point, - }) - .await?; - Ok(()) -} - async fn grpc_connect(address: AdvertisedAddress) -> Result { match address { AdvertisedAddress::Uds(uds_path) => { From ed625310ea4ae58abfb450c13ed23c6365f106e8 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Thu, 2 Jan 2025 17:55:14 +0200 Subject: [PATCH 08/12] Remove original snapshots test which is a subset of the trim gap test --- server/tests/snapshots.rs | 169 -------------------------------------- 1 file changed, 169 deletions(-) delete mode 100644 server/tests/snapshots.rs diff --git a/server/tests/snapshots.rs b/server/tests/snapshots.rs deleted file mode 100644 index 5f0fdc5846..0000000000 --- a/server/tests/snapshots.rs +++ /dev/null @@ -1,169 +0,0 @@ -// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. -// All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use std::time::Duration; - -use enumset::enum_set; -use futures_util::StreamExt; -use googletest::fail; -use hyper_util::rt::TokioIo; -use regex::Regex; -use tempfile::TempDir; -use test_log::test; -use tokio::io; -use tokio::net::UnixStream; -use tonic::codec::CompressionEncoding; -use tonic::transport::{Channel, Endpoint, Uri}; -use tower::service_fn; -use url::Url; - -use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; -use restate_admin::cluster_controller::protobuf::{ - ClusterStateRequest, CreatePartitionSnapshotRequest, -}; -use restate_local_cluster_runner::{ - cluster::Cluster, - node::{BinarySource, Node}, -}; -use restate_types::config::{LogFormat, MetadataStoreClient}; -use restate_types::net::AdvertisedAddress; -use restate_types::protobuf::cluster::node_state::State; -use restate_types::{config::Configuration, nodes_config::Role}; - -mod common; - -#[test(restate_core::test)] -async fn create_and_restore_snapshot() -> googletest::Result<()> { - let mut base_config = Configuration::default(); - base_config.common.bootstrap_num_partitions = 1.try_into()?; - base_config.common.log_filter = "restate=debug,warn".to_owned(); - base_config.common.log_format = LogFormat::Compact; - - let snapshots_dir = TempDir::new()?; - base_config.worker.snapshots.destination = Some( - Url::from_file_path(snapshots_dir.path()) - .unwrap() - .to_string(), - ); - - let nodes = Node::new_test_nodes_with_metadata( - base_config.clone(), - BinarySource::CargoTest, - enum_set!(Role::Worker), - 1, - ); - - let mut partition_ready = nodes[1].lines(Regex::new("Won the leadership campaign")?); - - let cluster = Cluster::builder() - .temp_base_dir() - .nodes(nodes.clone()) - .build() - .start() - .await?; - - cluster.wait_healthy(Duration::from_secs(30)).await?; - assert!(partition_ready.next().await.is_some()); - - let mut client = - ClusterCtrlSvcClient::new(grpc_connect(cluster.nodes[0].node_address().clone()).await?) - .accept_compressed(CompressionEncoding::Gzip); - - any_partition_active(&mut client, Duration::from_secs(5)).await?; - - let snapshot_response = client - .create_partition_snapshot(CreatePartitionSnapshotRequest { partition_id: 0 }) - .await? - .into_inner(); - - let mut node_2 = Node::new_test_node( - "node-2", - base_config, - BinarySource::CargoTest, - enum_set!(Role::Worker), - ); - *node_2.metadata_store_client_mut() = MetadataStoreClient::Embedded { - address: cluster.nodes[0].node_address().clone(), - }; - - let mut snapshot_restored = node_2.lines( - format!( - "Importing partition store snapshot.*{}", - snapshot_response.snapshot_id - ) - .parse()?, - ); - - node_2 - .start_clustered(cluster.base_dir(), cluster.cluster_name()) - .await?; - - assert!(snapshot_restored.next().await.is_some()); - Ok(()) -} - -async fn any_partition_active( - client: &mut ClusterCtrlSvcClient, - timeout: Duration, -) -> googletest::Result<()> { - let deadline = tokio::time::Instant::now() + timeout; - loop { - let cluster_state = client - .get_cluster_state(ClusterStateRequest {}) - .await? - .into_inner() - .cluster_state - .unwrap(); - - if cluster_state.nodes.values().any(|n| { - n.state.as_ref().is_some_and(|s| match s { - State::Alive(s) => s - .partitions - .values() - .any(|p| p.effective_mode.cmp(&1).is_eq()), - _ => false, - }) - }) { - break; // partition is ready; we can request snapshot - } - if tokio::time::Instant::now() > deadline { - fail!( - "Partition processor did not become ready within {:?}", - timeout - )?; - } - tokio::time::sleep(Duration::from_millis(250)).await; - } - Ok(()) -} - -async fn grpc_connect(address: AdvertisedAddress) -> Result { - match address { - AdvertisedAddress::Uds(uds_path) => { - // dummy endpoint required to specify an uds connector, it is not used anywhere - Endpoint::try_from("http://127.0.0.1") - .expect("/ should be a valid Uri") - .connect_with_connector(service_fn(move |_: Uri| { - let uds_path = uds_path.clone(); - async move { - Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?)) - } - })).await - } - AdvertisedAddress::Http(uri) => { - Channel::builder(uri) - .connect_timeout(Duration::from_secs(2)) - .timeout(Duration::from_secs(2)) - .http2_adaptive_window(true) - .connect() - .await - } - } -} From 9b1479ef41a5fd044d4e55945bab4dfb61456549 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 6 Jan 2025 15:09:30 +0200 Subject: [PATCH 09/12] Deterministically trim the log --- server/tests/trim_gap_handling.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/server/tests/trim_gap_handling.rs b/server/tests/trim_gap_handling.rs index 22601c04b8..b460e38495 100644 --- a/server/tests/trim_gap_handling.rs +++ b/server/tests/trim_gap_handling.rs @@ -144,7 +144,7 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { .into_inner(); // todo(pavel): if create snapshot returned an LSN, we could trim the log to that specific LSN instead of guessing - trim_log(&mut client, 3).await?; + trim_log(&mut client, 3, Duration::from_secs(3)).await?; let mut worker_3 = Node::new_test_node( "node-3", @@ -254,26 +254,26 @@ async fn any_partition_active( async fn trim_log( client: &mut ClusterCtrlSvcClient, trim_point: u64, + timeout: Duration, ) -> googletest::Result<()> { - // todo(pavel): this is flimsy, ensure we actually trim the log to a particular LSN - - // Since we don't have a confirmed trimmed LSN in the response, and it takes a bit of time for - // the log tail info to propagate to the admin node, we must wait long enough to cover the - // heartbeat interval before we retry. The first attempt is usually a no-op as the admin does - // not know the effective global tail. - let mut i = 0; + let deadline = tokio::time::Instant::now() + timeout; loop { - client + let response = client .trim_log(TrimLogRequest { log_id: 0, trim_point, }) - .await?; - if i >= 2 { + .await? + .into_inner(); + + if response.trim_point.is_some_and(|tp| tp == trim_point) { break; } - tokio::time::sleep(Duration::from_secs(1)).await; - i += 1; + + if tokio::time::Instant::now() > deadline { + fail!("Failed to trim log to LSN {} within {:?}", trim_point, timeout)?; + } + tokio::time::sleep(Duration::from_millis(250)).await; } Ok(()) } From 446bd563ea8ee6886ac181b4afda7bfca2bfbe1d Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 6 Jan 2025 15:30:19 +0200 Subject: [PATCH 10/12] PR feedback --- server/tests/trim_gap_handling.rs | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/server/tests/trim_gap_handling.rs b/server/tests/trim_gap_handling.rs index b460e38495..f669311019 100644 --- a/server/tests/trim_gap_handling.rs +++ b/server/tests/trim_gap_handling.rs @@ -21,7 +21,6 @@ use tokio::net::UnixStream; use tonic::codec::CompressionEncoding; use tonic::transport::{Channel, Endpoint, Uri}; use tower::service_fn; -use tracing::level_filters::LevelFilter; use tracing::{error, info}; use url::Url; @@ -37,22 +36,14 @@ use restate_types::config::{LogFormat, MetadataStoreClient}; use restate_types::logs::metadata::ProviderKind::Replicated; use restate_types::net::AdvertisedAddress; use restate_types::protobuf::cluster::node_state::State; +use restate_types::protobuf::cluster::RunMode; use restate_types::retries::RetryPolicy; use restate_types::{config::Configuration, nodes_config::Role}; mod common; -#[tokio::test] +#[test_log::test(tokio::test)] async fn fast_forward_over_trim_gap() -> googletest::Result<()> { - tracing_subscriber::fmt() - .event_format(tracing_subscriber::fmt::format().compact()) - .with_env_filter( - tracing_subscriber::EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env_lossy(), - ) - .init(); - let mut base_config = Configuration::default(); base_config.common.bootstrap_num_partitions = 1.try_into()?; base_config.bifrost.default_provider = Replicated; @@ -231,10 +222,9 @@ async fn any_partition_active( if cluster_state.nodes.values().any(|n| { n.state.as_ref().is_some_and(|s| match s { - State::Alive(s) => s - .partitions - .values() - .any(|p| p.effective_mode.cmp(&1).is_eq()), + State::Alive(s) => s.partitions.values().any(|p| { + RunMode::try_from(p.effective_mode).is_ok_and(|m| m == RunMode::Leader) + }), _ => false, }) }) { @@ -271,7 +261,11 @@ async fn trim_log( } if tokio::time::Instant::now() > deadline { - fail!("Failed to trim log to LSN {} within {:?}", trim_point, timeout)?; + fail!( + "Failed to trim log to LSN {} within {:?}", + trim_point, + timeout + )?; } tokio::time::sleep(Duration::from_millis(250)).await; } From 60dbc4ed9a870be6d740f2923d864f7f4f85a454 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 6 Jan 2025 16:21:03 +0200 Subject: [PATCH 11/12] Use shared create_tonic_channel_from_advertised_address utility --- Cargo.lock | 2 +- crates/cli-util/Cargo.toml | 3 +- crates/cli-util/src/context.rs | 2 +- crates/cli-util/src/opts.rs | 24 ++++++- server/tests/trim_gap_handling.rs | 66 +++++++++++-------- .../src/commands/cluster/config/get.rs | 10 +-- .../src/commands/cluster/config/set.rs | 9 +-- .../src/commands/log/describe_log.rs | 9 +-- .../restatectl/src/commands/log/find_tail.rs | 10 +-- .../restatectl/src/commands/log/list_logs.rs | 10 +-- .../src/commands/log/reconfigure.rs | 9 +-- tools/restatectl/src/commands/log/trim_log.rs | 9 +-- .../src/commands/node/list_nodes.rs | 14 +--- .../restatectl/src/commands/partition/list.rs | 10 +-- .../src/commands/replicated_loglet/digest.rs | 15 +---- .../src/commands/replicated_loglet/info.rs | 10 +-- .../src/commands/snapshot/create_snapshot.rs | 10 +-- tools/restatectl/src/util.rs | 32 ++------- 18 files changed, 84 insertions(+), 170 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3dafbc87e0..0e8567bfe7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6195,7 +6195,7 @@ dependencies = [ "crossterm 0.27.0", "dialoguer", "dotenvy", - "parking_lot", + "restate-core", "tokio", "tracing", "tracing-log", diff --git a/crates/cli-util/Cargo.toml b/crates/cli-util/Cargo.toml index f41db687a3..5af2e28d6e 100644 --- a/crates/cli-util/Cargo.toml +++ b/crates/cli-util/Cargo.toml @@ -23,9 +23,10 @@ comfy-table = { workspace = true } crossterm = { workspace = true } dialoguer = { workspace = true } dotenvy = { version = "0.15" } -parking_lot = { workspace = true } tokio = { workspace = true, features = ["time"] } tracing = { workspace = true } tracing-log = { version = "0.2.0" } tracing-subscriber = { workspace = true } unicode-width = { version = "0.1.11" } + +restate-core = { workspace = true } \ No newline at end of file diff --git a/crates/cli-util/src/context.rs b/crates/cli-util/src/context.rs index a5b7255e82..385b31907d 100644 --- a/crates/cli-util/src/context.rs +++ b/crates/cli-util/src/context.rs @@ -26,7 +26,7 @@ static GLOBAL_CLI_CONTEXT: OnceLock> = OnceLock::new(); pub struct CliContext { confirm_mode: ConfirmMode, ui: UiOpts, - network: NetworkOpts, + pub network: NetworkOpts, colors_enabled: bool, loaded_dotenv: Option, } diff --git a/crates/cli-util/src/opts.rs b/crates/cli-util/src/opts.rs index 5bd4999331..ce3265b564 100644 --- a/crates/cli-util/src/opts.rs +++ b/crates/cli-util/src/opts.rs @@ -8,10 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::time::Duration; + use clap::{Args, ValueEnum}; use clap_verbosity_flag::LogLevel; use cling::Collect; +use restate_core::network::net_util::CommonClientConnectionOptions; + const DEFAULT_CONNECT_TIMEOUT: u64 = 5_000; const DEFAULT_REQUEST_TIMEOUT: u64 = 13_000; @@ -63,7 +67,7 @@ pub(crate) struct ConfirmMode { } #[derive(Args, Clone, Default)] -pub(crate) struct NetworkOpts { +pub struct NetworkOpts { /// Connection timeout for network calls, in milliseconds. #[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT, global = true)] pub connect_timeout: u64, @@ -72,6 +76,24 @@ pub(crate) struct NetworkOpts { pub request_timeout: u64, } +impl CommonClientConnectionOptions for NetworkOpts { + fn connect_timeout(&self) -> Duration { + Duration::from_millis(self.connect_timeout) + } + + fn keep_alive_interval(&self) -> Duration { + Duration::from_secs(60) + } + + fn keep_alive_timeout(&self) -> Duration { + Duration::from_millis(self.request_timeout) + } + + fn http2_adaptive_window(&self) -> bool { + true + } +} + #[derive(Args, Collect, Clone, Default)] pub struct CommonOpts { #[clap(flatten)] diff --git a/server/tests/trim_gap_handling.rs b/server/tests/trim_gap_handling.rs index f669311019..c9711574d8 100644 --- a/server/tests/trim_gap_handling.rs +++ b/server/tests/trim_gap_handling.rs @@ -14,13 +14,9 @@ use std::time::Duration; use enumset::enum_set; use futures_util::StreamExt; use googletest::fail; -use hyper_util::rt::TokioIo; use tempfile::TempDir; -use tokio::io; -use tokio::net::UnixStream; use tonic::codec::CompressionEncoding; -use tonic::transport::{Channel, Endpoint, Uri}; -use tower::service_fn; +use tonic::transport::Channel; use tracing::{error, info}; use url::Url; @@ -28,13 +24,15 @@ use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::Cluste use restate_admin::cluster_controller::protobuf::{ ClusterStateRequest, CreatePartitionSnapshotRequest, TrimLogRequest, }; +use restate_core::network::net_util::{ + create_tonic_channel_from_advertised_address, CommonClientConnectionOptions, +}; use restate_local_cluster_runner::{ cluster::Cluster, node::{BinarySource, Node}, }; use restate_types::config::{LogFormat, MetadataStoreClient}; use restate_types::logs::metadata::ProviderKind::Replicated; -use restate_types::net::AdvertisedAddress; use restate_types::protobuf::cluster::node_state::State; use restate_types::protobuf::cluster::RunMode; use restate_types::retries::RetryPolicy; @@ -84,9 +82,11 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { tokio::time::timeout(Duration::from_secs(10), worker_1_ready.next()).await?; tokio::time::timeout(Duration::from_secs(10), worker_2_ready.next()).await?; - let mut client = - ClusterCtrlSvcClient::new(grpc_connect(cluster.nodes[0].node_address().clone()).await?) - .accept_compressed(CompressionEncoding::Gzip); + let mut client = ClusterCtrlSvcClient::new(create_tonic_channel_from_advertised_address( + cluster.nodes[0].node_address().clone(), + &TestNetworkOptions::default(), + )) + .accept_compressed(CompressionEncoding::Gzip); any_partition_active(&mut client, Duration::from_secs(5)).await?; @@ -329,26 +329,34 @@ async fn applied_lsn_converged( Ok(()) } -async fn grpc_connect(address: AdvertisedAddress) -> Result { - match address { - AdvertisedAddress::Uds(uds_path) => { - // dummy endpoint required to specify an uds connector, it is not used anywhere - Endpoint::try_from("http://127.0.0.1") - .expect("/ should be a valid Uri") - .connect_with_connector(service_fn(move |_: Uri| { - let uds_path = uds_path.clone(); - async move { - Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?)) - } - })).await - } - AdvertisedAddress::Http(uri) => { - Channel::builder(uri) - .connect_timeout(Duration::from_secs(2)) - .timeout(Duration::from_secs(2)) - .http2_adaptive_window(true) - .connect() - .await +struct TestNetworkOptions { + connect_timeout: u64, + request_timeout: u64, +} + +impl Default for TestNetworkOptions { + fn default() -> Self { + Self { + connect_timeout: 1000, + request_timeout: 5000, } } } + +impl CommonClientConnectionOptions for TestNetworkOptions { + fn connect_timeout(&self) -> Duration { + Duration::from_millis(self.connect_timeout) + } + + fn keep_alive_interval(&self) -> Duration { + Duration::from_secs(60) + } + + fn keep_alive_timeout(&self) -> Duration { + Duration::from_millis(self.request_timeout) + } + + fn http2_adaptive_window(&self) -> bool { + true + } +} diff --git a/tools/restatectl/src/commands/cluster/config/get.rs b/tools/restatectl/src/commands/cluster/config/get.rs index 807f3968ad..fcd6ebada1 100644 --- a/tools/restatectl/src/commands/cluster/config/get.rs +++ b/tools/restatectl/src/commands/cluster/config/get.rs @@ -8,7 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use anyhow::Context; use clap::Parser; use cling::{Collect, Run}; use tonic::{codec::CompressionEncoding, Code}; @@ -27,14 +26,7 @@ use crate::{ pub struct ConfigGetOpts {} async fn config_get(connection: &ConnectionInfo, _get_opts: &ConfigGetOpts) -> anyhow::Result<()> { - let channel = grpc_connect(connection.cluster_controller.clone()) - .await - .with_context(|| { - format!( - "cannot connect to cluster controller at {}", - connection.cluster_controller - ) - })?; + let channel = grpc_connect(connection.cluster_controller.clone()); let mut client = ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); diff --git a/tools/restatectl/src/commands/cluster/config/set.rs b/tools/restatectl/src/commands/cluster/config/set.rs index 5c23c01f28..51e5ccd1f5 100644 --- a/tools/restatectl/src/commands/cluster/config/set.rs +++ b/tools/restatectl/src/commands/cluster/config/set.rs @@ -58,14 +58,7 @@ pub struct ConfigSetOpts { } async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> anyhow::Result<()> { - let channel = grpc_connect(connection.cluster_controller.clone()) - .await - .with_context(|| { - format!( - "cannot connect to cluster controller at {}", - connection.cluster_controller - ) - })?; + let channel = grpc_connect(connection.cluster_controller.clone()); let mut client = ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); diff --git a/tools/restatectl/src/commands/log/describe_log.rs b/tools/restatectl/src/commands/log/describe_log.rs index 7a90944dfb..ac02e99f02 100644 --- a/tools/restatectl/src/commands/log/describe_log.rs +++ b/tools/restatectl/src/commands/log/describe_log.rs @@ -71,14 +71,7 @@ async fn describe_logs( connection: &ConnectionInfo, opts: &DescribeLogIdOpts, ) -> anyhow::Result<()> { - let channel = grpc_connect(connection.cluster_controller.clone()) - .await - .with_context(|| { - format!( - "cannot connect to cluster controller at {}", - connection.cluster_controller - ) - })?; + let channel = grpc_connect(connection.cluster_controller.clone()); let mut client = ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); diff --git a/tools/restatectl/src/commands/log/find_tail.rs b/tools/restatectl/src/commands/log/find_tail.rs index 1393a9677c..e08d02ce61 100644 --- a/tools/restatectl/src/commands/log/find_tail.rs +++ b/tools/restatectl/src/commands/log/find_tail.rs @@ -8,7 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use anyhow::Context; use cling::prelude::*; use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::ui::console::StyledTable; @@ -32,14 +31,7 @@ pub struct FindTailOpts { } async fn find_tail(connection: &ConnectionInfo, opts: &FindTailOpts) -> anyhow::Result<()> { - let channel = grpc_connect(connection.cluster_controller.clone()) - .await - .with_context(|| { - format!( - "cannot connect to cluster controller at {}", - connection.cluster_controller - ) - })?; + let channel = grpc_connect(connection.cluster_controller.clone()); let mut client = ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); diff --git a/tools/restatectl/src/commands/log/list_logs.rs b/tools/restatectl/src/commands/log/list_logs.rs index a92668959a..48ca6c81e4 100644 --- a/tools/restatectl/src/commands/log/list_logs.rs +++ b/tools/restatectl/src/commands/log/list_logs.rs @@ -10,7 +10,6 @@ use std::collections::BTreeMap; -use anyhow::Context; use cling::prelude::*; use tonic::codec::CompressionEncoding; @@ -34,14 +33,7 @@ use crate::util::grpc_connect; pub struct ListLogsOpts {} pub async fn list_logs(connection: &ConnectionInfo, _opts: &ListLogsOpts) -> anyhow::Result<()> { - let channel = grpc_connect(connection.cluster_controller.clone()) - .await - .with_context(|| { - format!( - "cannot connect to cluster controller at {}", - connection.cluster_controller - ) - })?; + let channel = grpc_connect(connection.cluster_controller.clone()); let mut client = ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); diff --git a/tools/restatectl/src/commands/log/reconfigure.rs b/tools/restatectl/src/commands/log/reconfigure.rs index b67545a132..d8356bb2c5 100644 --- a/tools/restatectl/src/commands/log/reconfigure.rs +++ b/tools/restatectl/src/commands/log/reconfigure.rs @@ -66,14 +66,7 @@ pub struct ReconfigureOpts { } async fn reconfigure(connection: &ConnectionInfo, opts: &ReconfigureOpts) -> anyhow::Result<()> { - let channel = grpc_connect(connection.cluster_controller.clone()) - .await - .with_context(|| { - format!( - "cannot connect to cluster controller at {}", - connection.cluster_controller - ) - })?; + let channel = grpc_connect(connection.cluster_controller.clone()); let mut client = ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); diff --git a/tools/restatectl/src/commands/log/trim_log.rs b/tools/restatectl/src/commands/log/trim_log.rs index f23b81280e..09995cca01 100644 --- a/tools/restatectl/src/commands/log/trim_log.rs +++ b/tools/restatectl/src/commands/log/trim_log.rs @@ -33,14 +33,7 @@ pub struct TrimLogOpts { } async fn trim_log(connection: &ConnectionInfo, opts: &TrimLogOpts) -> anyhow::Result<()> { - let channel = grpc_connect(connection.cluster_controller.clone()) - .await - .with_context(|| { - format!( - "cannot connect to cluster controller at {}", - connection.cluster_controller - ) - })?; + let channel = grpc_connect(connection.cluster_controller.clone()); let mut client = ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); diff --git a/tools/restatectl/src/commands/node/list_nodes.rs b/tools/restatectl/src/commands/node/list_nodes.rs index fbcc1b5a4c..f3478d4d0b 100644 --- a/tools/restatectl/src/commands/node/list_nodes.rs +++ b/tools/restatectl/src/commands/node/list_nodes.rs @@ -11,7 +11,6 @@ use std::collections::{BTreeMap, HashMap}; use std::time::Duration; -use anyhow::Context; use chrono::TimeDelta; use cling::prelude::*; use itertools::Itertools; @@ -47,14 +46,7 @@ pub struct ListNodesOpts { } pub async fn list_nodes(connection: &ConnectionInfo, opts: &ListNodesOpts) -> anyhow::Result<()> { - let channel = grpc_connect(connection.cluster_controller.clone()) - .await - .with_context(|| { - format!( - "cannot connect to cluster controller at {}", - connection.cluster_controller - ) - })?; + let channel = grpc_connect(connection.cluster_controller.clone()); let mut client = ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); @@ -154,9 +146,9 @@ async fn fetch_extra_info( for (node_id, node_config) in nodes_configuration.iter() { let address = node_config.address.clone(); let get_ident = async move { - let node_channel = grpc_connect(address).await?; + let channel = grpc_connect(address); let mut node_ctl_svc_client = - NodeCtlSvcClient::new(node_channel).accept_compressed(CompressionEncoding::Gzip); + NodeCtlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); Ok(node_ctl_svc_client .get_ident(()) diff --git a/tools/restatectl/src/commands/partition/list.rs b/tools/restatectl/src/commands/partition/list.rs index 5fc3adb5eb..ca2268d084 100644 --- a/tools/restatectl/src/commands/partition/list.rs +++ b/tools/restatectl/src/commands/partition/list.rs @@ -11,7 +11,6 @@ use std::cmp::PartialOrd; use std::collections::{BTreeMap, HashMap}; -use anyhow::Context; use cling::prelude::*; use itertools::Itertools; use tonic::codec::CompressionEncoding; @@ -72,14 +71,7 @@ pub async fn list_partitions( connection: &ConnectionInfo, opts: &ListPartitionsOpts, ) -> anyhow::Result<()> { - let channel = grpc_connect(connection.cluster_controller.clone()) - .await - .with_context(|| { - format!( - "cannot connect to cluster controller at {}", - connection.cluster_controller - ) - })?; + let channel = grpc_connect(connection.cluster_controller.clone()); let mut client = ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); diff --git a/tools/restatectl/src/commands/replicated_loglet/digest.rs b/tools/restatectl/src/commands/replicated_loglet/digest.rs index 805f2151e7..5ef80f8b42 100644 --- a/tools/restatectl/src/commands/replicated_loglet/digest.rs +++ b/tools/restatectl/src/commands/replicated_loglet/digest.rs @@ -53,14 +53,7 @@ pub struct DigestOpts { } async fn get_digest(connection: &ConnectionInfo, opts: &DigestOpts) -> anyhow::Result<()> { - let channel = grpc_connect(connection.cluster_controller.clone()) - .await - .with_context(|| { - format!( - "cannot connect to node at {}", - connection.cluster_controller - ) - })?; + let channel = grpc_connect(connection.cluster_controller.clone()); let mut client = NodeCtlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); let req = GetMetadataRequest { kind: MetadataKind::Logs.into(), @@ -105,11 +98,7 @@ async fn get_digest(connection: &ConnectionInfo, opts: &DigestOpts) -> anyhow::R continue; } - let Ok(channel) = grpc_connect(node.address.clone()).await else { - warn!("Failed to connect to node {} at {}", node_id, node.address); - continue; - }; - + let channel = grpc_connect(node.address.clone()); let mut client = LogServerSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); diff --git a/tools/restatectl/src/commands/replicated_loglet/info.rs b/tools/restatectl/src/commands/replicated_loglet/info.rs index f5b9d56e70..e802b9aaa0 100644 --- a/tools/restatectl/src/commands/replicated_loglet/info.rs +++ b/tools/restatectl/src/commands/replicated_loglet/info.rs @@ -8,7 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use anyhow::Context; use cling::prelude::*; use tonic::codec::CompressionEncoding; @@ -35,14 +34,7 @@ pub struct InfoOpts { } async fn get_info(connection: &ConnectionInfo, opts: &InfoOpts) -> anyhow::Result<()> { - let channel = grpc_connect(connection.cluster_controller.clone()) - .await - .with_context(|| { - format!( - "cannot connect to node at {}", - connection.cluster_controller - ) - })?; + let channel = grpc_connect(connection.cluster_controller.clone()); let mut client = NodeCtlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); let req = GetMetadataRequest { diff --git a/tools/restatectl/src/commands/snapshot/create_snapshot.rs b/tools/restatectl/src/commands/snapshot/create_snapshot.rs index 767b682f8c..ba011997d8 100644 --- a/tools/restatectl/src/commands/snapshot/create_snapshot.rs +++ b/tools/restatectl/src/commands/snapshot/create_snapshot.rs @@ -8,7 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use anyhow::Context; use cling::prelude::*; use tonic::codec::CompressionEncoding; @@ -32,14 +31,7 @@ async fn create_snapshot( connection: &ConnectionInfo, opts: &CreateSnapshotOpts, ) -> anyhow::Result<()> { - let channel = grpc_connect(connection.cluster_controller.clone()) - .await - .with_context(|| { - format!( - "cannot connect to cluster controller at {}", - connection.cluster_controller - ) - })?; + let channel = grpc_connect(connection.cluster_controller.clone()); let mut client = ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); diff --git a/tools/restatectl/src/util.rs b/tools/restatectl/src/util.rs index b981a54a80..07f3d07d9b 100644 --- a/tools/restatectl/src/util.rs +++ b/tools/restatectl/src/util.rs @@ -8,35 +8,13 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use hyper_util::rt::TokioIo; +use tonic::transport::Channel; + use restate_cli_util::CliContext; +use restate_core::network::net_util::create_tonic_channel_from_advertised_address; use restate_types::net::AdvertisedAddress; -use tokio::io; -use tokio::net::UnixStream; -use tonic::transport::{Channel, Endpoint, Uri}; -use tower::service_fn; -pub async fn grpc_connect(address: AdvertisedAddress) -> Result { +pub fn grpc_connect(address: AdvertisedAddress) -> Channel { let ctx = CliContext::get(); - match address { - AdvertisedAddress::Uds(uds_path) => { - // dummy endpoint required to specify an uds connector, it is not used anywhere - Endpoint::try_from("http://127.0.0.1") - .expect("/ should be a valid Uri") - .connect_with_connector(service_fn(move |_: Uri| { - let uds_path = uds_path.clone(); - async move { - Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?)) - } - })).await - } - AdvertisedAddress::Http(uri) => { - Channel::builder(uri) - .connect_timeout(ctx.connect_timeout()) - .timeout(ctx.request_timeout()) - .http2_adaptive_window(true) - .connect() - .await - } - } + create_tonic_channel_from_advertised_address(address, &ctx.network) } From 8fb6ab9735bb2261e15076d97b5a814a217a558d Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Tue, 7 Jan 2025 12:42:27 +0200 Subject: [PATCH 12/12] Force a partition leadership change to new node-3 after snapshot restore --- Cargo.lock | 1 + server/Cargo.toml | 1 + server/tests/trim_gap_handling.rs | 218 +++++++++++++++++++++++------- 3 files changed, 173 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0e8567bfe7..7a6fa0a4c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6745,6 +6745,7 @@ dependencies = [ "restate-errors", "restate-fs-util", "restate-local-cluster-runner", + "restate-metadata-store", "restate-node", "restate-rocksdb", "restate-test-util", diff --git a/server/Cargo.toml b/server/Cargo.toml index 8244fedbcc..7869d72b1e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -72,6 +72,7 @@ restate-core = { workspace = true, features = ["test-util"] } restate-local-cluster-runner = { workspace = true } restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } +restate-metadata-store = { workspace = true } mock-service-endpoint = { workspace = true } anyhow = { workspace = true } diff --git a/server/tests/trim_gap_handling.rs b/server/tests/trim_gap_handling.rs index c9711574d8..03ac32c90d 100644 --- a/server/tests/trim_gap_handling.rs +++ b/server/tests/trim_gap_handling.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::BTreeMap; use std::net::SocketAddr; use std::time::Duration; @@ -22,21 +23,30 @@ use url::Url; use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; use restate_admin::cluster_controller::protobuf::{ - ClusterStateRequest, CreatePartitionSnapshotRequest, TrimLogRequest, + ChainExtension, ClusterStateRequest, CreatePartitionSnapshotRequest, ListLogsRequest, + ListNodesRequest, SealAndExtendChainRequest, TrimLogRequest, }; use restate_core::network::net_util::{ create_tonic_channel_from_advertised_address, CommonClientConnectionOptions, }; +use restate_local_cluster_runner::cluster::StartedCluster; use restate_local_cluster_runner::{ cluster::Cluster, node::{BinarySource, Node}, }; +use restate_types::cluster_controller::SchedulingPlan; use restate_types::config::{LogFormat, MetadataStoreClient}; -use restate_types::logs::metadata::ProviderKind::Replicated; +use restate_types::identifiers::PartitionId; +use restate_types::logs::metadata::{Logs, ProviderKind}; +use restate_types::logs::{LogId, LogletId}; +use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY; +use restate_types::nodes_config::NodesConfiguration; use restate_types::protobuf::cluster::node_state::State; use restate_types::protobuf::cluster::RunMode; +use restate_types::replicated_loglet::ReplicatedLogletParams; use restate_types::retries::RetryPolicy; -use restate_types::{config::Configuration, nodes_config::Role}; +use restate_types::storage::StorageCodec; +use restate_types::{config::Configuration, nodes_config::Role, PlainNodeId}; mod common; @@ -44,7 +54,7 @@ mod common; async fn fast_forward_over_trim_gap() -> googletest::Result<()> { let mut base_config = Configuration::default(); base_config.common.bootstrap_num_partitions = 1.try_into()?; - base_config.bifrost.default_provider = Replicated; + base_config.bifrost.default_provider = ProviderKind::Replicated; base_config.common.log_filter = "restate=debug,warn".to_owned(); base_config.common.log_format = LogFormat::Compact; @@ -90,6 +100,10 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { any_partition_active(&mut client, Duration::from_secs(5)).await?; + let metadata_config = MetadataStoreClient::Embedded { + address: cluster.nodes[0].node_address().clone(), + }; + let addr: SocketAddr = "127.0.0.1:9080".parse()?; tokio::spawn(async move { info!("Starting mock service on http://{}", addr); @@ -143,9 +157,7 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { BinarySource::CargoTest, enum_set!(Role::HttpIngress | Role::Worker), ); - *worker_3.metadata_store_client_mut() = MetadataStoreClient::Embedded { - address: cluster.nodes[0].node_address().clone(), - }; + *worker_3.metadata_store_client_mut() = metadata_config.clone(); let mut trim_gap_encountered = worker_3.lines("Partition processor stopped due to a log trim gap, and no snapshot repository is configured".parse()?); @@ -168,14 +180,8 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { BinarySource::CargoTest, enum_set!(Role::HttpIngress | Role::Worker), ); - *worker_3.metadata_store_client_mut() = MetadataStoreClient::Embedded { - address: cluster.nodes[0].node_address().clone(), - }; + *worker_3.metadata_store_client_mut() = metadata_config.clone(); - let ingress_url = format!( - "http://{}/Counter/0/get", - worker_3.config().ingress.bind_address - ); let mut worker_3_imported_snapshot = worker_3.lines( format!( "Importing partition store snapshot.*{}", @@ -192,15 +198,44 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { .is_ok() ); - // todo(pavel): promote node 3 to be the leader for partition 0 and invoke the service again - // right now, all we are asserting is that the new node is applying newly appended log records + // Make the new node the sequencer for the partition log - this makes it the + // preferred candidate to become the next partition processor leader. + let extension = + Some(replicated_loglet_extension(&mut client, LogId::new(0), PlainNodeId::new(3)).await?); + let reconfigure_response = client + .seal_and_extend_chain(SealAndExtendChainRequest { + log_id: 0, + min_version: None, + extension, + }) + .await? + .into_inner(); + assert!(reconfigure_response.sealed_segment.is_some()); + + force_promote_partition_leader( + &cluster, + &mut client, + PartitionId::from(0), + PlainNodeId::new(3), + Duration::from_secs(10), + ) + .await?; + + // Verify that node 3 can process the incoming request while being the partition leader: + let ingress_url = format!( + "http://{}/Counter/0/get", + worker_3.config().ingress.bind_address + ); assert!(http_client .post(ingress_url) .send() .await? .status() .is_success()); - applied_lsn_converged(&mut client, Duration::from_secs(5), 3, 0).await?; + assert_eq!( + effective_partition_leader(&mut client, PartitionId::from(0)).await?, + Some(PlainNodeId::from(3)) + ); worker_3.graceful_shutdown(Duration::from_secs(1)).await?; cluster.graceful_shutdown(Duration::from_secs(1)).await?; @@ -272,19 +307,46 @@ async fn trim_log( Ok(()) } -async fn applied_lsn_converged( +async fn force_promote_partition_leader( + cluster: &StartedCluster, client: &mut ClusterCtrlSvcClient, + partition_id: PartitionId, + leader: PlainNodeId, timeout: Duration, - expected_processors: usize, - partition_id: u32, ) -> googletest::Result<()> { - assert!(expected_processors > 0); - info!( - "Waiting for {} partition processors to converge on the same applied LSN", - expected_processors - ); + let metadata_client = cluster.nodes[0] + .metadata_client() + .await + .expect("can create metadata client"); + let deadline = tokio::time::Instant::now() + timeout; loop { + let plan = metadata_client + .get::(SCHEDULING_PLAN_KEY.clone()) + .await?; + if plan.is_none_or(|p| { + p.get(&partition_id) + .is_none_or(|p| p.leader != Some(leader)) + }) { + metadata_client + .read_modify_write( + SCHEDULING_PLAN_KEY.clone(), + |scheduling_plan: Option| { + let mut plan_builder = scheduling_plan.unwrap().into_builder(); + plan_builder.modify_partition(&partition_id, |partition| { + if partition.leader == Some(leader) { + return false; + } + partition.leader = Some(leader); + true + }); + anyhow::Ok::(plan_builder.build()) + }, + ) + .await?; + } + tokio::time::sleep(Duration::from_millis(500)).await; + let cluster_state = client .get_cluster_state(ClusterStateRequest {}) .await? @@ -292,36 +354,29 @@ async fn applied_lsn_converged( .cluster_state .unwrap(); - let applied_lsn: Vec<_> = cluster_state + if cluster_state .nodes - .values() - .filter_map(|n| { - n.state - .as_ref() - .map(|s| match s { - State::Alive(s) => s - .partitions - .get(&partition_id) - .map(|p| p.last_applied_log_lsn) - .unwrap_or_default() - .map(|lsn| (partition_id, lsn.value)), - _ => None, - }) - .unwrap_or_default() + .get(&leader.into()) + .is_some_and(|ns| match ns.state.as_ref() { + Some(State::Alive(n)) => n.partitions.get(&partition_id.into()).is_some_and(|p| { + RunMode::try_from(p.effective_mode).is_ok_and(|m| m == RunMode::Leader) + }), + _ => false, }) - .collect(); - - if applied_lsn.len() == expected_processors - && applied_lsn.iter().all(|(_, lsn)| *lsn == applied_lsn[0].1) { + info!( + "Node {:#} became leader for partition {:#}", + leader, partition_id + ); break; } if tokio::time::Instant::now() > deadline { fail!( - "Partition processors did not converge on the same applied LSN within {:?}: {:?}", - timeout, - applied_lsn + "Node {:#} did not become leader for partition {:#} within {:?}", + leader, + partition_id, + timeout )?; } tokio::time::sleep(Duration::from_millis(250)).await; @@ -329,6 +384,75 @@ async fn applied_lsn_converged( Ok(()) } +async fn effective_partition_leader( + client: &mut ClusterCtrlSvcClient, + partition_id: PartitionId, +) -> googletest::Result> { + let cluster_state = client + .get_cluster_state(ClusterStateRequest {}) + .await? + .into_inner() + .cluster_state + .unwrap(); + + Ok(cluster_state + .nodes + .iter() + .find(|(_, ns)| match &ns.state { + Some(State::Alive(n)) => n + .partitions + .get(&partition_id.into()) + .map(|p| RunMode::try_from(p.effective_mode).is_ok_and(|m| m == RunMode::Leader)) + .unwrap_or(false), + _ => false, + }) + .map(|(id, _)| PlainNodeId::from(*id))) +} + +async fn replicated_loglet_extension( + client: &mut ClusterCtrlSvcClient, + log_id: LogId, + new_sequencer: PlainNodeId, +) -> googletest::Result { + let mut logs_response = client.list_logs(ListLogsRequest {}).await?.into_inner(); + + let logs = StorageCodec::decode::(&mut logs_response.logs)?; + let chain = logs.chain(&log_id).expect("log exists"); + + let tail_index = chain.tail_index(); + let loglet_id = LogletId::new(log_id, tail_index.next()); + let tail_segment = chain.tail(); + + let last_params = + ReplicatedLogletParams::deserialize_from(tail_segment.config.params.as_bytes())?; + + // --- get nodes --- + let mut nodes_response = client.list_nodes(ListNodesRequest {}).await?.into_inner(); + let nodes_configuration = + StorageCodec::decode::(&mut nodes_response.nodes_configuration)?; + let nodes = nodes_configuration.iter().collect::>(); + + // --- construct new replicated log params --- + let mut nodeset = last_params.nodeset.clone(); + nodeset.insert(new_sequencer); // just in case + + let params = ReplicatedLogletParams { + loglet_id, + nodeset, + replication: last_params.replication.clone(), + sequencer: nodes + .get(&new_sequencer) + .expect("proposed sequencer node exists") + .current_generation, + }; + + Ok(ChainExtension { + provider: ProviderKind::Replicated.to_string(), + segment_index: None, + params: params.serialize()?, + }) +} + struct TestNetworkOptions { connect_timeout: u64, request_timeout: u64,