diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs index 79d81d423..7815b9d25 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs @@ -37,7 +37,7 @@ impl PeriodicTailChecker { "Started a background periodic tail checker for this loglet", ); // Optimization. Don't run the check if the tail/seal has been updated recently. - // Unfortunately this requires a litte bit more setup in the TailOffsetWatch so we don't do + // Unfortunately this requires a little bit more setup in the TailOffsetWatch so we don't do // it. loop { let Some(loglet) = loglet.upgrade() else { diff --git a/crates/worker/src/metric_definitions.rs b/crates/worker/src/metric_definitions.rs index be6a44832..766efcbbd 100644 --- a/crates/worker/src/metric_definitions.rs +++ b/crates/worker/src/metric_definitions.rs @@ -12,6 +12,8 @@ /// the metrics' sink. use metrics::{describe_counter, describe_gauge, describe_histogram, Unit}; +pub const PARTITION_LABEL: &str = "partition"; + pub const PARTITION_APPLY_COMMAND: &str = "restate.partition.apply_command.seconds"; pub const PARTITION_ACTUATOR_HANDLED: &str = "restate.partition.actuator_handled.total"; pub const PARTITION_STORAGE_TX_CREATED: &str = "restate.partition.storage_tx_created.total"; @@ -23,18 +25,27 @@ pub const PARTITION_TIME_SINCE_LAST_STATUS_UPDATE: &str = "restate.partition.time_since_last_status_update"; pub const PARTITION_TIME_SINCE_LAST_RECORD: &str = "restate.partition.time_since_last_record"; pub const PARTITION_LAST_APPLIED_LOG_LSN: &str = "restate.partition.last_applied_lsn"; +pub const PARTITION_LAST_APPLIED_LSN_LAG: &str = "restate.partition.applied_lsn_lag"; pub const PARTITION_LAST_PERSISTED_LOG_LSN: &str = "restate.partition.last_persisted_lsn"; pub const PARTITION_IS_EFFECTIVE_LEADER: &str = "restate.partition.is_effective_leader"; pub const PARTITION_IS_ACTIVE: &str = "restate.partition.is_active"; -pub const PP_APPLY_COMMAND_DURATION: &str = "restate.partition.apply_command_duration.seconds"; -pub const PP_APPLY_COMMAND_BATCH_SIZE: &str = "restate.partition.apply_command_batch_size"; +pub const PARTITION_APPLY_COMMAND_DURATION: &str = + "restate.partition.apply_command_duration.seconds"; +pub const PARTITION_APPLY_COMMAND_BATCH_SIZE: &str = "restate.partition.apply_command_batch_size"; pub const PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION: &str = "restate.partition.handle_action_batch_duration.seconds"; pub const PARTITION_HANDLE_INVOKER_EFFECT_COMMAND: &str = "restate.partition.handle_invoker_effect.seconds"; +pub const PARTITION_RPC_QUEUE_UTILIZATION_PERCENT: &str = + "restate.partition.rpc_queue.utilization.percent"; +pub const PARTITION_RPC_QUEUE_OUTSTANDING_REQUESTS: &str = + "restate.partition.rpc_queue.outstanding_requests"; +pub const PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS: &str = + "restate.partition.record_committed_to_read_latency.seconds"; -pub const PARTITION_LABEL: &str = "partition"; +// to calculate read rates +pub const PARTITION_RECORD_READ_COUNT: &str = "restate.partition.record_read_count"; pub(crate) fn describe_metrics() { describe_histogram!( @@ -58,12 +69,12 @@ pub(crate) fn describe_metrics() { "Storage transactions committed by applying partition state machine commands" ); describe_histogram!( - PP_APPLY_COMMAND_DURATION, + PARTITION_APPLY_COMMAND_DURATION, Unit::Seconds, "Time spent processing a single bifrost message" ); describe_histogram!( - PP_APPLY_COMMAND_BATCH_SIZE, + PARTITION_APPLY_COMMAND_BATCH_SIZE, Unit::Count, "Size of the applied command batch" ); @@ -83,6 +94,12 @@ pub(crate) fn describe_metrics() { "Time spent handling an invoker effect command" ); + describe_histogram!( + PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS, + Unit::Seconds, + "Duration between the record commit time to read time" + ); + describe_gauge!( NUM_ACTIVE_PARTITIONS, Unit::Count, @@ -113,6 +130,12 @@ pub(crate) fn describe_metrics() { "Raw value of the last applied log LSN" ); + describe_gauge!( + PARTITION_LAST_APPLIED_LSN_LAG, + Unit::Count, + "Number of records between last applied lsn and the log tail" + ); + describe_gauge!( PARTITION_LAST_PERSISTED_LOG_LSN, Unit::Count, @@ -124,4 +147,22 @@ pub(crate) fn describe_metrics() { Unit::Seconds, "Number of seconds since the last record was applied" ); + + describe_gauge!( + PARTITION_RPC_QUEUE_UTILIZATION_PERCENT, + Unit::Percent, + "Partition processor requests queue utilization, 0 to 100%" + ); + + describe_gauge!( + PARTITION_RPC_QUEUE_OUTSTANDING_REQUESTS, + Unit::Count, + "Number of outstanding requests in the partition processor request queue" + ); + + describe_counter!( + PARTITION_RECORD_READ_COUNT, + Unit::Count, + "Number of read records from bifrost", + ); } diff --git a/crates/worker/src/partition/leadership/leader_state.rs b/crates/worker/src/partition/leadership/leader_state.rs index b2e8f216b..61215a58d 100644 --- a/crates/worker/src/partition/leadership/leader_state.rs +++ b/crates/worker/src/partition/leadership/leader_state.rs @@ -46,6 +46,8 @@ use tracing::{debug, trace}; const BATCH_READY_UP_TO: usize = 10; +type RpcReciprocal = Reciprocal>; + pub struct LeaderState { partition_id: PartitionId, pub leader_epoch: LeaderEpoch, @@ -61,10 +63,7 @@ pub struct LeaderState { pub timer_service: Pin>, self_proposer: SelfProposer, - pub awaiting_rpc_actions: HashMap< - PartitionProcessorRpcRequestId, - Reciprocal>, - >, + awaiting_rpc_actions: HashMap, awaiting_rpc_self_propose: FuturesUnordered, invoker_stream: InvokerStream, @@ -279,17 +278,16 @@ impl LeaderState { cmd: Command, ) { match self.awaiting_rpc_actions.entry(request_id) { - Entry::Occupied(o) => { + Entry::Occupied(mut o) => { // In this case, someone already proposed this command, // let's just replace the reciprocal and fail the old one to avoid keeping it dangling - let old_reciprocal = o.remove(); + let old_reciprocal = o.insert(reciprocal); trace!(%request_id, "Replacing rpc with newer request"); respond_to_rpc( old_reciprocal.prepare(Err(PartitionProcessorRpcError::Internal( "retried".to_string(), ))), ); - self.awaiting_rpc_actions.insert(request_id, reciprocal); } Entry::Vacant(v) => { // In this case, no one proposed this command yet, let's try to propose it @@ -507,7 +505,7 @@ impl Future for SelfAppendFuture { let append_result = ready!(self.commit_token.poll_unpin(cx)); if append_result.is_err() { - self.get_mut().fail_with_internal(); + self.fail_with_internal(); return Poll::Ready(()); } self.succeed_with_appended(); diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 5ee55f0eb..6cbe886d7 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -17,7 +17,7 @@ use anyhow::Context; use assert2::let_assert; use enumset::EnumSet; use futures::{FutureExt, Stream, StreamExt, TryStreamExt as _}; -use metrics::histogram; +use metrics::{counter, gauge, histogram}; use tokio::sync::{mpsc, watch}; use tokio::time::MissedTickBehavior; use tracing::{debug, error, info, instrument, trace, warn, Span}; @@ -65,8 +65,10 @@ use restate_wal_protocol::control::AnnounceLeader; use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; use crate::metric_definitions::{ - PARTITION_LABEL, PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION, PP_APPLY_COMMAND_BATCH_SIZE, - PP_APPLY_COMMAND_DURATION, + PARTITION_APPLY_COMMAND_BATCH_SIZE, PARTITION_APPLY_COMMAND_DURATION, PARTITION_LABEL, + PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION, + PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS, PARTITION_RECORD_READ_COUNT, + PARTITION_RPC_QUEUE_OUTSTANDING_REQUESTS, PARTITION_RPC_QUEUE_UTILIZATION_PERCENT, }; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::leadership::{LeadershipState, PartitionProcessorMetadata}; @@ -367,8 +369,20 @@ where ); } + // Telemetry setup + let partition_id_str = self.partition_id.to_string(); + let apply_command_latency = histogram!(PARTITION_APPLY_COMMAND_DURATION, PARTITION_LABEL => partition_id_str.clone()); + let record_actions_latency = histogram!(PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION); + let record_write_to_read_latencty = histogram!(PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS, PARTITION_LABEL => partition_id_str.clone()); + let command_batch_size = histogram!(PARTITION_APPLY_COMMAND_BATCH_SIZE, PARTITION_LABEL => partition_id_str.clone()); + let command_read_count = + counter!(PARTITION_RECORD_READ_COUNT, PARTITION_LABEL => partition_id_str.clone()); + let rpc_queue_utilization = gauge!(PARTITION_RPC_QUEUE_UTILIZATION_PERCENT, PARTITION_LABEL => partition_id_str.clone()); + let rpc_queue_len = + gauge!(PARTITION_RPC_QUEUE_OUTSTANDING_REQUESTS, PARTITION_LABEL => partition_id_str); // Start reading after the last applied lsn let key_query = KeyFilter::Within(self.partition_key_range.clone()); + let mut record_stream = self .bifrost .create_reader( @@ -382,6 +396,9 @@ where trace!(?entry, "Read entry"); let lsn = entry.sequence_number(); if entry.is_data_record() { + entry.as_record().inspect(|record| { + record_write_to_read_latencty.record(record.created_at().elapsed()); + }); entry .try_decode_arc::() .map(|envelope| Ok((lsn, envelope?))) @@ -411,20 +428,18 @@ where tokio::time::interval(Duration::from_millis(500 + rand::random::() % 524)); status_update_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); - let partition_id_str: &'static str = Box::leak(Box::new(self.partition_id.to_string())); - // Telemetry setup - let apply_command_latency = - histogram!(PP_APPLY_COMMAND_DURATION, PARTITION_LABEL => partition_id_str); - let record_actions_latency = histogram!(PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION); - let command_batch_size = - histogram!(PP_APPLY_COMMAND_BATCH_SIZE, PARTITION_LABEL => partition_id_str); - let mut action_collector = ActionCollector::default(); let mut command_buffer = Vec::with_capacity(self.max_command_batch_size); info!("Partition {} started", self.partition_id); loop { + let utilization = + (self.rpc_rx.len() as f64 * 100.0) / self.rpc_rx.max_capacity() as f64; + + rpc_queue_utilization.set(utilization); + rpc_queue_len.set(self.rpc_rx.len() as f64); + tokio::select! { Some(command) = self.control_rx.recv() => { if let Err(err) = self.on_command(command).await { @@ -444,6 +459,7 @@ where // check that reading has succeeded operation?; + command_read_count.increment(command_buffer.len() as u64); command_batch_size.record(command_buffer.len() as f64); let mut transaction = partition_store.transaction(); diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 334499acb..e553decc2 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -13,6 +13,7 @@ mod persisted_lsn_watchdog; mod processor_state; mod spawn_processor_task; +use futures::FutureExt; use restate_types::identifiers::SnapshotId; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; @@ -51,7 +52,7 @@ use restate_types::epoch::EpochMetadata; use restate_types::health::HealthStatus; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; use restate_types::live::Live; -use restate_types::logs::{Lsn, SequenceNumber}; +use restate_types::logs::{LogId, Lsn, SequenceNumber}; use restate_types::metadata_store::keys::partition_processor_epoch_key; use restate_types::net::metadata::MetadataKind; use restate_types::net::partition_processor::{ @@ -64,7 +65,6 @@ use restate_types::partition_table::PartitionTable; use restate_types::protobuf::common::WorkerStatus; use restate_types::{GenerationalNodeId, Version}; -use crate::metric_definitions::NUM_ACTIVE_PARTITIONS; use crate::metric_definitions::PARTITION_IS_ACTIVE; use crate::metric_definitions::PARTITION_IS_EFFECTIVE_LEADER; use crate::metric_definitions::PARTITION_LABEL; @@ -72,6 +72,7 @@ use crate::metric_definitions::PARTITION_LAST_APPLIED_LOG_LSN; use crate::metric_definitions::PARTITION_LAST_PERSISTED_LOG_LSN; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE; +use crate::metric_definitions::{NUM_ACTIVE_PARTITIONS, PARTITION_LAST_APPLIED_LSN_LAG}; use crate::partition::snapshots::{SnapshotPartitionTask, SnapshotRepository}; use crate::partition::ProcessorError; use crate::partition_processor_manager::message_handler::PartitionProcessorManagerMessageHandler; @@ -97,6 +98,7 @@ pub struct PartitionProcessorManager { persisted_lsns_rx: Option>>, archived_lsns: HashMap, + target_tail_lsns: HashMap, invokers_status_reader: MultiplexedInvokerStatusReader, pending_control_processors: Option, @@ -197,6 +199,7 @@ impl PartitionProcessorManager { tx, persisted_lsns_rx: None, archived_lsns: HashMap::default(), + target_tail_lsns: HashMap::default(), invokers_status_reader: MultiplexedInvokerStatusReader::default(), pending_control_processors: None, asynchronous_operations: JoinSet::default(), @@ -241,6 +244,9 @@ impl PartitionProcessorManager { let mut latest_snapshot_check_interval = tokio::time::interval(Duration::from_secs(5)); latest_snapshot_check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut update_target_tail_lsns = tokio::time::interval(Duration::from_secs(1)); + update_target_tail_lsns.set_missed_tick_behavior(MissedTickBehavior::Delay); + self.health_status.update(WorkerStatus::Ready); loop { tokio::select! { @@ -250,6 +256,9 @@ impl PartitionProcessorManager { _ = latest_snapshot_check_interval.tick() => { self.trigger_periodic_partition_snapshots(); } + _ = update_target_tail_lsns.tick() => { + self.update_target_tail_lsns(); + } Some(control_processors) = self.incoming_update_processors.next() => { self.pending_control_processors = Some(PendingControlProcessors::new(control_processors.peer(), control_processors.into_body())); self.on_control_processors(); @@ -513,6 +522,22 @@ impl PartitionProcessorManager { debug!("Partition processor is no longer running. Ignoring new leader epoch result."); } } + EventKind::NewTargetTail { tail } => { + let Some(tail_lsn) = tail else { + return; + }; + + match self.target_tail_lsns.entry(partition_id) { + Entry::Occupied(mut o) => { + if *o.get() < tail_lsn { + o.insert(tail_lsn); + } + } + Entry::Vacant(v) => { + v.insert(tail_lsn); + } + } + } } } @@ -533,6 +558,41 @@ impl PartitionProcessorManager { ); } + /// A lightweight tail watcher that leverages the loglet watch tail implementation + /// to retrieve the most recently observed tail for the writable segment. + /// This ensures that the tail remains close to the actual value, + /// regardless of which segment is currently being processed by the partition processor. + fn update_target_tail_lsns(&mut self) { + for partition_id in self.processor_states.keys().cloned() { + let bifrost = self.bifrost.clone(); + + self.asynchronous_operations.spawn( + async move { + let tail = if let Ok(loglet) = bifrost + .admin() + .writeable_loglet(LogId::from(partition_id)) + .await + { + loglet + .watch_tail() + .next() + .now_or_never() + .flatten() + .map(|tail| tail.offset()) + } else { + None + }; + + AsynchronousEvent { + partition_id, + inner: EventKind::NewTargetTail { tail }, + } + } + .in_current_tc(), + ); + } + } + fn obtain_new_leader_epoch( partition_id: PartitionId, leader_epoch_token: LeaderEpochToken, @@ -557,59 +617,67 @@ impl PartitionProcessorManager { self.processor_states .iter() .filter_map(|(partition_id, processor_state)| { - let status = processor_state.partition_processor_status(); + let mut status = processor_state.partition_processor_status()?; - if let Some(mut status) = status { - gauge!(PARTITION_TIME_SINCE_LAST_STATUS_UPDATE, + gauge!(PARTITION_TIME_SINCE_LAST_STATUS_UPDATE, PARTITION_LABEL => partition_id.to_string()) - .set(status.updated_at.elapsed()); + .set(status.updated_at.elapsed()); - gauge!(PARTITION_IS_EFFECTIVE_LEADER, + gauge!(PARTITION_IS_EFFECTIVE_LEADER, PARTITION_LABEL => partition_id.to_string()) - .set(if status.is_effective_leader() { - 1.0 - } else { - 0.0 - }); + .set(if status.is_effective_leader() { + 1.0 + } else { + 0.0 + }); - gauge!(PARTITION_IS_ACTIVE, + gauge!(PARTITION_IS_ACTIVE, PARTITION_LABEL => partition_id.to_string()) - .set(if status.replay_status == ReplayStatus::Active { - 1.0 - } else { - 0.0 - }); + .set(if status.replay_status == ReplayStatus::Active { + 1.0 + } else { + 0.0 + }); - if let Some(last_applied_log_lsn) = status.last_applied_log_lsn { - gauge!(PARTITION_LAST_APPLIED_LOG_LSN, + if let Some(last_applied_log_lsn) = status.last_applied_log_lsn { + gauge!(PARTITION_LAST_APPLIED_LOG_LSN, PARTITION_LABEL => partition_id.to_string()) - .set(last_applied_log_lsn.as_u64() as f64); - } + .set(last_applied_log_lsn.as_u64() as f64); + } - if let Some(last_persisted_log_lsn) = status.last_persisted_log_lsn { - gauge!(PARTITION_LAST_PERSISTED_LOG_LSN, + if let Some(last_persisted_log_lsn) = status.last_persisted_log_lsn { + gauge!(PARTITION_LAST_PERSISTED_LOG_LSN, PARTITION_LABEL => partition_id.to_string()) - .set(last_persisted_log_lsn.as_u64() as f64); - } + .set(last_persisted_log_lsn.as_u64() as f64); + } - if let Some(last_record_applied_at) = status.last_record_applied_at { - gauge!(PARTITION_TIME_SINCE_LAST_RECORD, + if let Some(last_record_applied_at) = status.last_record_applied_at { + gauge!(PARTITION_TIME_SINCE_LAST_RECORD, PARTITION_LABEL => partition_id.to_string()) - .set(last_record_applied_at.elapsed()); - } + .set(last_record_applied_at.elapsed()); + } - // it is a bit unfortunate that we share PartitionProcessorStatus between the - // PP and the PPManager :-(. Maybe at some point we want to split the struct for it. - status.last_persisted_log_lsn = persisted_lsns - .as_ref() - .and_then(|lsns| lsns.get(partition_id).cloned()); + // it is a bit unfortunate that we share PartitionProcessorStatus between the + // PP and the PPManager :-(. Maybe at some point we want to split the struct for it. + status.last_persisted_log_lsn = persisted_lsns + .as_ref() + .and_then(|lsns| lsns.get(partition_id).cloned()); - status.last_archived_log_lsn = self.archived_lsns.get(partition_id).cloned(); + status.last_archived_log_lsn = self.archived_lsns.get(partition_id).cloned(); - Some((*partition_id, status)) - } else { - None + let target_tail_lsn = self.target_tail_lsns.get(partition_id).cloned(); + if target_tail_lsn > status.target_tail_lsn { + status.target_tail_lsn = target_tail_lsn; + } + + if let Some((target_lsn, applied_lsn)) = + status.target_tail_lsn.zip(status.last_applied_log_lsn) + { + gauge!(PARTITION_LAST_APPLIED_LSN_LAG, PARTITION_LABEL => partition_id.to_string()) + .set(target_lsn.prev().as_u64().saturating_sub(applied_lsn.as_u64()) as f64); } + + Some((*partition_id, status)) }) .collect() } @@ -973,6 +1041,9 @@ enum EventKind { leader_epoch_token: LeaderEpochToken, result: anyhow::Result, }, + NewTargetTail { + tail: Option, + }, } struct PendingControlProcessors { diff --git a/tools/restatectl/src/commands/partition/list.rs b/tools/restatectl/src/commands/partition/list.rs index 010acd3a5..83948fc47 100644 --- a/tools/restatectl/src/commands/partition/list.rs +++ b/tools/restatectl/src/commands/partition/list.rs @@ -151,6 +151,7 @@ pub async fn list_partitions( "PERSISTED-LSN", "SKIPPED-RECORDS", "ARCHIVED-LSN", + "LSN-LAG", "LAST-UPDATE", ]); @@ -286,6 +287,17 @@ pub async fn list_partitions( .map(|x| x.to_string()) .unwrap_or("-".to_owned()), ), + Cell::new( + processor + .status + .target_tail_lsn + .zip(processor.status.last_applied_log_lsn) + .map(|(tail, applied)| { + // (tail - 1) - applied_lsn = tail - (applied_lsn + 1) + tail.value.saturating_sub(applied.value + 1).to_string() + }) + .unwrap_or("-".to_owned()), + ), render_as_duration(processor.status.updated_at, Tense::Past), ]); });