diff --git a/crates/core/protobuf/node_ctl_svc.proto b/crates/core/protobuf/node_ctl_svc.proto index 462bcb580..16618d9be 100644 --- a/crates/core/protobuf/node_ctl_svc.proto +++ b/crates/core/protobuf/node_ctl_svc.proto @@ -27,6 +27,9 @@ service NodeCtlSvc { // Returns the cluster health from the point of view of this node. rpc ClusterHealth(google.protobuf.Empty) returns (ClusterHealthResponse); + + // Try to remove the given node from the cluster. + rpc RemoveNode(RemoveNodeRequest) returns (google.protobuf.Empty); } message ProvisionClusterRequest { @@ -92,3 +95,7 @@ message ClusterHealthResponse { message EmbeddedMetadataClusterHealth { repeated restate.common.NodeId members = 1; } + +message RemoveNodeRequest { + uint32 node_id = 1; +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index abf30a33e..652280de3 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -342,13 +342,15 @@ impl Node { TaskCenter::spawn(TaskKind::RpcServer, "node-rpc-server", { let common_options = config.common.clone(); let connection_manager = self.networking.connection_manager().clone(); - let metadata_store_client = self.metadata_store_client.clone(); + let bifrost = self.bifrost.handle(); + let metadata_writer = metadata_writer.clone(); async move { NetworkServer::run( connection_manager, self.server_builder, common_options, - metadata_store_client, + metadata_writer, + bifrost, ) .await?; Ok(()) diff --git a/crates/node/src/network_server/disable_node_checker.rs b/crates/node/src/network_server/disable_node_checker.rs new file mode 100644 index 000000000..509f1d7cf --- /dev/null +++ b/crates/node/src/network_server/disable_node_checker.rs @@ -0,0 +1,210 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use futures::future; +use restate_bifrost::Bifrost; +use restate_core::{Metadata, SyncError, TargetVersion}; +use restate_types::PlainNodeId; +use restate_types::logs::metadata::{Logs, ProviderKind}; +use restate_types::logs::{LogId, Lsn, SequenceNumber}; +use restate_types::net::metadata::MetadataKind; +use restate_types::nodes_config::{NodesConfiguration, Role, StorageState}; +use restate_types::replicated_loglet::ReplicatedLogletParams; +use std::collections::HashMap; +use std::sync::Arc; +use tracing::info; + +#[derive(Debug, thiserror::Error)] +pub enum DisableNodeError { + #[error("unknown node {0}")] + UnknownNode(PlainNodeId), + #[error("log server is part of the open tail segment of log {0}")] + TailSegment(LogId), + #[error("log server is in state {0} but StorageState::ReadOnly is required for disabling it")] + NotReadOnly(StorageState), + #[error( + "log server is not safe to disable because it is still part of log {log_id} up to lsn {required_trim_point}, current trim point {actual_trim_point}" + )] + NotSafeToDisable { + log_id: LogId, + required_trim_point: Lsn, + actual_trim_point: Lsn, + }, + #[error("failed finding the trim point for log {log_id}: {err}")] + FindTrimPoint { + log_id: LogId, + err: restate_bifrost::Error, + }, + #[error("metadata server is member of cluster")] + MetadataMember, +} + +pub struct DisableNodeChecker { + bifrost: Bifrost, + metadata: Metadata, + nodes_configuration: Arc, + logs: Arc, +} + +impl DisableNodeChecker { + pub async fn create(bifrost: Bifrost) -> Result { + let metadata = Metadata::current(); + + let nodes_configuration = metadata.nodes_config_snapshot(); + + // It's important that we never operate on an outdated Logs wrt the used NodesConfiguration. + // Otherwise, we risk that we don't see all segments this node is part of. + metadata + .sync(MetadataKind::Logs, TargetVersion::Latest) + .await?; + let logs = metadata.logs_snapshot(); + + Ok(DisableNodeChecker { + bifrost, + nodes_configuration, + logs, + metadata, + }) + } + + pub async fn refresh_metadata( + &mut self, + sync_nodes_configuration: bool, + ) -> Result<(), SyncError> { + if sync_nodes_configuration { + self.metadata + .sync(MetadataKind::NodesConfiguration, TargetVersion::Latest) + .await?; + } + + self.nodes_configuration = self.metadata.nodes_config_snapshot(); + + // It's important that we never operate on an outdated Logs wrt the used NodesConfiguration. + // Otherwise, we risk that we don't see all segments this node is part of. + self.metadata + .sync(MetadataKind::Logs, TargetVersion::Latest) + .await?; + self.logs = self.metadata.logs_snapshot(); + + Ok(()) + } + + pub fn nodes_configuration(&self) -> &NodesConfiguration { + &self.nodes_configuration + } + + pub async fn safe_to_disable_node(&self, node_id: PlainNodeId) -> Result<(), DisableNodeError> { + // for a node to be safe to be disabled it needs to be known so that we don't accidentally + // disable a newly joining node + let node_config = self + .nodes_configuration + .find_node_by_id(node_id) + .map_err(|_err| DisableNodeError::UnknownNode(node_id))?; + + if node_config.roles.contains(Role::LogServer) { + self.safe_to_disable_log_server(node_id, node_config.log_server_config.storage_state) + .await? + } + + if node_config.roles.contains(Role::MetadataServer) { + // only safe to disable node if it does not run a metadata server + return Err(DisableNodeError::MetadataMember); + } + + Ok(()) + } + + /// Checks whether it is safe to disable the given log server identified by the node_id. It is + /// safe to disable the log server if it can no longer be added to new nodesets (== not being + /// a candidate) and all segments this log server is part of (member of the nodeset) are before + /// the trim point. + async fn safe_to_disable_log_server( + &self, + node_id: PlainNodeId, + storage_state: StorageState, + ) -> Result<(), DisableNodeError> { + match storage_state { + storage_state @ (StorageState::Provisioning + | StorageState::ReadWrite + | StorageState::DataLoss) => { + return Err(DisableNodeError::NotReadOnly(storage_state)); + } + // it's safe to disable a disabled node + StorageState::Disabled => return Ok(()), + // we need to check whether this node is no longer part of any segments that haven't + // been fully trimmed. + StorageState::ReadOnly => {} + } + + let mut required_trim_points: HashMap<_, _> = HashMap::default(); + + // find for every log the latest segment in which the node is part of the nodeset + for (log_id, chain) in self.logs.iter() { + let mut next_segment_lsn: Option = None; + for segment in chain.iter().rev() { + let is_member = match segment.config.kind { + // we assume that the given node runs the local and memory loglet + ProviderKind::Local => true, + // #[cfg(any(test, feature = "memory-loglet"))] + ProviderKind::InMemory => true, + ProviderKind::Replicated => { + let replicated_loglet = ReplicatedLogletParams::deserialize_from( + segment.config.params.as_bytes(), + ) + .expect("to be deserializable"); + replicated_loglet.nodeset.contains(node_id) + } + }; + + if is_member { + let Some(tail_lsn) = next_segment_lsn else { + // If we are still part of the tail segment, then trimming won't help. + // The node can only be disabled if it is in closed segment that can be + // fully trimmed. + return Err(DisableNodeError::TailSegment(*log_id)); + }; + + // tail_lsn is the first lsn that will be written after this segment + required_trim_points.insert(*log_id, tail_lsn.prev()); + break; + } + + next_segment_lsn = Some(segment.base_lsn); + } + } + + // find for every log in which node is a member of a nodeset the actual trim point and + // compare whether the segment ends before or after the trim point. + future::try_join_all(required_trim_points.into_iter().map( + |(log_id, required_trim_point)| async move { + let actual_trim_point = self + .bifrost + .clone() + .get_trim_point(log_id) + .await + .map_err(|err| DisableNodeError::FindTrimPoint { log_id, err })?; + // The actual trim point must be larger or equal than the required trim point for + // the node to be no longer needed for reads. + if actual_trim_point < required_trim_point { + Err(DisableNodeError::NotSafeToDisable { + log_id, + required_trim_point, + actual_trim_point, + }) + } else { + Ok(()) + } + }, + )) + .await?; + + Ok(()) + } +} diff --git a/crates/node/src/network_server/grpc_svc_handler.rs b/crates/node/src/network_server/grpc_svc_handler.rs index 5dd016a0f..c84cd1dae 100644 --- a/crates/node/src/network_server/grpc_svc_handler.rs +++ b/crates/node/src/network_server/grpc_svc_handler.rs @@ -8,34 +8,40 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::cmp::max_by_key; -use std::num::NonZeroU16; - +use crate::network_server::disable_node_checker::DisableNodeChecker; use crate::{ClusterConfiguration, provision_cluster_metadata}; use anyhow::Context; use bytes::BytesMut; use enumset::EnumSet; use futures::stream::BoxStream; -use restate_core::metadata_store::MetadataStoreClient; +use restate_bifrost::Bifrost; +use restate_core::metadata_store::{Precondition, WriteError}; use restate_core::network::net_util::create_tonic_channel; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvc; use restate_core::network::{ConnectionManager, ProtocolError, TransportConnect}; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc; use restate_core::protobuf::node_ctl_svc::{ ClusterHealthResponse, EmbeddedMetadataClusterHealth, GetMetadataRequest, GetMetadataResponse, - IdentResponse, ProvisionClusterRequest, ProvisionClusterResponse, + IdentResponse, ProvisionClusterRequest, ProvisionClusterResponse, RemoveNodeRequest, }; use restate_core::task_center::TaskCenterMonitoring; -use restate_core::{Metadata, MetadataKind, TargetVersion, TaskCenter, task_center}; +use restate_core::{ + Metadata, MetadataKind, MetadataWriter, TargetVersion, TaskCenter, task_center, +}; use restate_metadata_server::grpc::metadata_server_svc_client::MetadataServerSvcClient; -use restate_types::Version; use restate_types::config::Configuration; +use restate_types::errors::MaybeRetryableError; use restate_types::logs::metadata::{NodeSetSize, ProviderConfiguration}; +use restate_types::metadata_store::keys::NODES_CONFIG_KEY; use restate_types::nodes_config::Role; use restate_types::protobuf::cluster::ClusterConfiguration as ProtoClusterConfiguration; use restate_types::protobuf::node::Message; use restate_types::replication::ReplicationProperty; use restate_types::storage::StorageCodec; +use restate_types::{PlainNodeId, Version}; +use std::cmp::max_by_key; +use std::num::NonZeroU16; +use std::sync::Arc; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; use tracing::debug; @@ -44,7 +50,8 @@ pub struct NodeCtlSvcHandler { task_center: task_center::Handle, cluster_name: String, roles: EnumSet, - metadata_store_client: MetadataStoreClient, + metadata_writer: MetadataWriter, + bifrost: Bifrost, } impl NodeCtlSvcHandler { @@ -52,13 +59,15 @@ impl NodeCtlSvcHandler { task_center: task_center::Handle, cluster_name: String, roles: EnumSet, - metadata_store_client: MetadataStoreClient, + metadata_writer: MetadataWriter, + bifrost: Bifrost, ) -> Self { Self { task_center, cluster_name, roles, - metadata_store_client, + metadata_writer, + bifrost, } } @@ -204,7 +213,7 @@ impl NodeCtlSvc for NodeCtlSvcHandler { } let newly_provisioned = provision_cluster_metadata( - &self.metadata_store_client, + self.metadata_writer.metadata_store_client(), &config.common, &cluster_configuration, ) @@ -282,6 +291,73 @@ impl NodeCtlSvc for NodeCtlSvcHandler { Ok(Response::new(cluster_state_response)) } + + async fn remove_node( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let node_id = PlainNodeId::from(request.node_id); + + let mut disable_node_checker = DisableNodeChecker::create(self.bifrost.clone()) + .await + .map_err(|err| Status::internal(err.to_string()))?; + + let mut retry_iter = Configuration::pinned() + .common + .network_error_retry_policy + .clone() + .into_iter(); + + loop { + disable_node_checker + .safe_to_disable_node(node_id) + .await + .map_err(|err| Status::failed_precondition(err.to_string()))?; + let nodes_configuration = disable_node_checker.nodes_configuration(); + let mut updated_nodes_configuration = nodes_configuration.clone(); + updated_nodes_configuration.remove_node_unchecked(node_id); + updated_nodes_configuration.increment_version(); + + match self + .metadata_writer + .metadata_store_client() + .put( + NODES_CONFIG_KEY.clone(), + &updated_nodes_configuration, + Precondition::MatchesVersion(nodes_configuration.version()), + ) + .await + { + Ok(_) => { + self.metadata_writer + .update(Arc::new(updated_nodes_configuration)) + .await + .map_err(|err| Status::internal(err.to_string()))?; + break; + } + Err(err) => { + if retry_iter.remaining_attempts() == 0 { + return Err(Status::deadline_exceeded(err.to_string())); + } + + if matches!(err, WriteError::FailedPrecondition(_)) { + // concurrent update; try again after refreshing the metadata + disable_node_checker + .refresh_metadata(true) + .await + .map_err(|err| Status::internal(err.to_string()))?; + } else if !err.retryable() { + return Err(Status::internal(err.to_string())); + } + } + } + + tokio::time::sleep(retry_iter.next().expect("must have a remaining attempt")).await; + } + + Ok(Response::new(())) + } } pub struct CoreNodeSvcHandler { diff --git a/crates/node/src/network_server/mod.rs b/crates/node/src/network_server/mod.rs index cdb0a3177..cf118cd65 100644 --- a/crates/node/src/network_server/mod.rs +++ b/crates/node/src/network_server/mod.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +pub mod disable_node_checker; mod grpc_svc_handler; mod metrics; mod pprof; diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index 53d656979..5b720d888 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -11,18 +11,17 @@ use std::pin::pin; use axum::routing::{MethodFilter, get, on}; -use tokio::time::MissedTickBehavior; -use tonic::codec::CompressionEncoding; -use tracing::{debug, trace}; - -use restate_core::metadata_store::MetadataStoreClient; +use restate_bifrost::Bifrost; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{ConnectionManager, NetworkServerBuilder, TransportConnect}; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer; -use restate_core::{TaskCenter, TaskKind, cancellation_watcher}; +use restate_core::{MetadataWriter, TaskCenter, TaskKind, cancellation_watcher}; use restate_types::config::CommonOptions; use restate_types::protobuf::common::NodeStatus; +use tokio::time::MissedTickBehavior; +use tonic::codec::CompressionEncoding; +use tracing::{debug, trace}; use super::grpc_svc_handler::{CoreNodeSvcHandler, NodeCtlSvcHandler}; use super::pprof; @@ -36,7 +35,8 @@ impl NetworkServer { connection_manager: ConnectionManager, mut server_builder: NetworkServerBuilder, options: CommonOptions, - metadata_store_client: MetadataStoreClient, + metadata_writer: MetadataWriter, + bifrost: Bifrost, ) -> Result<(), anyhow::Error> { // Configure Metric Exporter let mut state_builder = NodeCtrlHandlerStateBuilder::default(); @@ -103,7 +103,8 @@ impl NetworkServer { TaskCenter::current(), options.cluster_name().to_owned(), options.roles, - metadata_store_client, + metadata_writer, + bifrost, )) .accept_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Gzip), diff --git a/tools/restatectl/src/commands/node/mod.rs b/tools/restatectl/src/commands/node/mod.rs index 55b950296..5c25e2790 100644 --- a/tools/restatectl/src/commands/node/mod.rs +++ b/tools/restatectl/src/commands/node/mod.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. pub mod list_nodes; +mod remove_node; use cling::prelude::*; @@ -16,4 +17,6 @@ use cling::prelude::*; pub enum Nodes { /// Print a summary of the active nodes registered in a cluster List(list_nodes::ListNodesOpts), + /// Removes the given node from the cluster + Remove(remove_node::RemoveNodeOpts), } diff --git a/tools/restatectl/src/commands/node/remove_node.rs b/tools/restatectl/src/commands/node/remove_node.rs new file mode 100644 index 000000000..532b70e3f --- /dev/null +++ b/tools/restatectl/src/commands/node/remove_node.rs @@ -0,0 +1,44 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::connection::ConnectionInfo; +use clap::Parser; +use cling::{Collect, Run}; +use restate_cli_util::c_println; +use restate_core::protobuf::node_ctl_svc::RemoveNodeRequest; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_types::PlainNodeId; + +#[derive(Run, Parser, Collect, Clone, Debug)] +#[clap(alias = "rm")] +#[cling(run = "remove_node")] +pub struct RemoveNodeOpts { + /// The node to remove from the cluster + #[arg(long, short)] + node_id: PlainNodeId, +} + +pub async fn remove_node(connection: &ConnectionInfo, opts: &RemoveNodeOpts) -> anyhow::Result<()> { + connection + .try_each(None, |channel| async { + NodeCtlSvcClient::new(channel) + .remove_node(RemoveNodeRequest { + node_id: u32::from(opts.node_id), + }) + .await + }) + .await?; + + c_println!( + "Successfully removed node {} from the cluster", + opts.node_id + ); + Ok(()) +} diff --git a/tools/restatectl/src/connection.rs b/tools/restatectl/src/connection.rs index 74b01ebed..4b94ba15b 100644 --- a/tools/restatectl/src/connection.rs +++ b/tools/restatectl/src/connection.rs @@ -10,7 +10,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::RwLock; -use std::{cmp::Ordering, fmt::Display, future::Future, sync::Arc}; +use std::{cmp::Ordering, fmt::Display, sync::Arc}; use cling::{Collect, prelude::Parser}; use itertools::{Either, Itertools, Position};