Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remove node command and DisableNodeChecker #2806

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/protobuf/node_ctl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,4 @@ message ClusterHealthResponse {

message EmbeddedMetadataClusterHealth {
repeated restate.common.NodeId members = 1;
}
}
5 changes: 2 additions & 3 deletions crates/node/src/network_server/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::cmp::max_by_key;
use std::num::NonZeroU16;

use crate::{ClusterConfiguration, provision_cluster_metadata};
use anyhow::Context;
use bytes::BytesMut;
Expand All @@ -36,6 +33,8 @@ use restate_types::protobuf::cluster::ClusterConfiguration as ProtoClusterConfig
use restate_types::protobuf::node::Message;
use restate_types::replication::ReplicationProperty;
use restate_types::storage::StorageCodec;
use std::cmp::max_by_key;
use std::num::NonZeroU16;
use tokio_stream::StreamExt;
use tonic::{Request, Response, Status, Streaming};
use tracing::debug;
Expand Down
16 changes: 7 additions & 9 deletions crates/node/src/network_server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

use std::pin::pin;

use super::grpc_svc_handler::{CoreNodeSvcHandler, NodeCtlSvcHandler};
use super::pprof;
use crate::network_server::metrics::{install_global_prometheus_recorder, render_metrics};
use crate::network_server::state::NodeCtrlHandlerStateBuilder;
use axum::routing::{MethodFilter, get, on};
use tokio::time::MissedTickBehavior;
use tonic::codec::CompressionEncoding;
use tracing::{debug, trace};

use restate_core::metadata_store::MetadataStoreClient;
use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer;
use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady};
Expand All @@ -23,11 +23,9 @@ use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer;
use restate_core::{TaskCenter, TaskKind, cancellation_watcher};
use restate_types::config::CommonOptions;
use restate_types::protobuf::common::NodeStatus;

use super::grpc_svc_handler::{CoreNodeSvcHandler, NodeCtlSvcHandler};
use super::pprof;
use crate::network_server::metrics::{install_global_prometheus_recorder, render_metrics};
use crate::network_server::state::NodeCtrlHandlerStateBuilder;
use tokio::time::MissedTickBehavior;
use tonic::codec::CompressionEncoding;
use tracing::{debug, trace};

pub struct NetworkServer {}

Expand Down
121 changes: 121 additions & 0 deletions tools/restatectl/src/commands/node/disable_node_checker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_types::PlainNodeId;
use restate_types::logs::LogId;
use restate_types::logs::metadata::{Logs, ProviderKind};
use restate_types::nodes_config::{MetadataServerState, NodesConfiguration, Role, StorageState};
use restate_types::replicated_loglet::ReplicatedLogletParams;

#[derive(Debug, thiserror::Error)]
pub enum DisableNodeError {
#[error("unknown node {0}")]
UnknownNode(PlainNodeId),
#[error("log server is part of a node set of log {0}")]
NodeSetMember(LogId),
#[error("log server cannot be disabled because it is in read-write state")]
ReadWrite,
#[error("The current default loglet provider does not support disabling nodes")]
DefaultLogletProvider,
#[error("metadata server is member of cluster")]
MetadataMember,
}

pub struct DisableNodeChecker {
nodes_configuration: NodesConfiguration,
logs: Logs,
}

impl DisableNodeChecker {
pub fn new(nodes_configuration: NodesConfiguration, logs: Logs) -> Self {
DisableNodeChecker {
nodes_configuration,
logs,
}
}

pub fn nodes_configuration(&self) -> &NodesConfiguration {
&self.nodes_configuration
}

pub fn safe_to_disable_node(&self, node_id: PlainNodeId) -> Result<(), DisableNodeError> {
// for a node to be safe to be disabled it needs to be known so that we don't accidentally
// disable a newly joining node
let node_config = self
.nodes_configuration
.find_node_by_id(node_id)
.map_err(|_err| DisableNodeError::UnknownNode(node_id))?;

self.safe_to_disable_log_server(node_id, node_config.log_server_config.storage_state)?;

// only safe to disable node if it does not run a metadata server or is not a member
// todo atm we must consider the role because the default metadata server state is MetadataServerState::Member.
// We need to introduce a provisioning state or make the metadata server state optional in the NodeConfig.
if node_config.roles.contains(Role::MetadataServer)
&& node_config.metadata_server_config.metadata_server_state
== MetadataServerState::Member
{
return Err(DisableNodeError::MetadataMember);
}

Ok(())
}

/// Checks whether it is safe to disable the given log server identified by the node_id. It is
/// safe to disable the log server if it can no longer be added to new node sets (== not being
/// a candidate) and it is no longer part of any known node sets.
fn safe_to_disable_log_server(
&self,
node_id: PlainNodeId,
storage_state: StorageState,
) -> Result<(), DisableNodeError> {
match storage_state {
StorageState::ReadWrite => {
return Err(DisableNodeError::ReadWrite);
}
// it's safe to disable a disabled node
StorageState::Disabled => return Ok(()),
// we need to check whether this node is no longer part of any known node sets
StorageState::ReadOnly | StorageState::DataLoss | StorageState::Provisioning => {}
}

// if the default provider kind is local or in-memory than it is not safe to disable the
// given node because it is included in the implicit node set
if matches!(
self.logs.configuration().default_provider.kind(),
ProviderKind::Local | ProviderKind::InMemory
) {
return Err(DisableNodeError::DefaultLogletProvider);
}

// check for every log that the given node is not contained in any node set
for (log_id, chain) in self.logs.iter() {
for segment in chain.iter().rev() {
if match segment.config.kind {
// we assume that the given node runs the local and memory loglet and, therefore,
// is part of the implicit node set
ProviderKind::Local => true,
ProviderKind::InMemory => true,
ProviderKind::Replicated => {
let replicated_loglet = ReplicatedLogletParams::deserialize_from(
segment.config.params.as_bytes(),
)
.expect("to be deserializable");
replicated_loglet.nodeset.contains(node_id)
}
} {
return Err(DisableNodeError::NodeSetMember(*log_id));
}
}
}

Ok(())
}
}
6 changes: 6 additions & 0 deletions tools/restatectl/src/commands/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub mod disable_node_checker;
pub mod list_nodes;
mod remove_nodes;

use cling::prelude::*;

#[derive(Run, Subcommand, Clone)]
pub enum Nodes {
/// Print a summary of the active nodes registered in a cluster
List(list_nodes::ListNodesOpts),
/// Removes the given node/s from the cluster. You should only use this command if you are
/// certain that the specified nodes are no longer part of any node sets, not members of the
/// metadata cluster nor required to run partition processors.
Remove(remove_nodes::RemoveNodesOpts),
}
101 changes: 101 additions & 0 deletions tools/restatectl/src/commands/node/remove_nodes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::commands::node::disable_node_checker::DisableNodeChecker;
use crate::connection::ConnectionInfo;
use anyhow::Context;
use clap::Parser;
use cling::{Collect, Run};
use itertools::Itertools;
use restate_cli_util::c_println;
use restate_core::metadata_store::{MetadataStoreClient, Precondition};
use restate_metadata_server::create_client;
use restate_types::PlainNodeId;
use restate_types::config::{MetadataClientKind, MetadataClientOptions};
use restate_types::metadata_store::keys::NODES_CONFIG_KEY;
use restate_types::nodes_config::{NodesConfiguration, Role};

#[derive(Run, Parser, Collect, Clone, Debug)]
#[clap(alias = "rm")]
#[cling(run = "remove_nodes")]
pub struct RemoveNodesOpts {
/// The node/s to remove from the cluster. Specify multiple nodes as a comma-separated list or
/// specify the option multiple times.
#[arg(long, required = true, visible_alias = "node", value_delimiter = ',')]
nodes: Vec<PlainNodeId>,
}

pub async fn remove_nodes(
connection: &ConnectionInfo,
opts: &RemoveNodesOpts,
) -> anyhow::Result<()> {
let nodes_configuration = connection.get_nodes_configuration().await?;
let logs = connection.get_logs().await?;

let disable_node_checker = DisableNodeChecker::new(nodes_configuration, logs);

for node_id in &opts.nodes {
disable_node_checker
.safe_to_disable_node(*node_id)
.context("It is not safe to disable node {node_id}")?
}

let nodes_configuration = disable_node_checker.nodes_configuration();
let mut updated_nodes_configuration = nodes_configuration.clone();

for node_id in &opts.nodes {
updated_nodes_configuration.remove_node_unchecked(*node_id);
}

updated_nodes_configuration.increment_version();

let metadata_client = create_metadata_client(nodes_configuration).await?;

metadata_client
.put(
NODES_CONFIG_KEY.clone(),
&updated_nodes_configuration,
Precondition::MatchesVersion(nodes_configuration.version()),
)
.await?;

c_println!(
"Successfully removed nodes [{}]",
opts.nodes.iter().join(",")
);

Ok(())
}

async fn create_metadata_client(
nodes_configuration: &NodesConfiguration,
) -> anyhow::Result<MetadataStoreClient> {
// find metadata nodes
let addresses: Vec<_> = nodes_configuration
.iter_role(Role::MetadataServer)
.map(|(_, config)| config.address.clone())
.collect();
if addresses.is_empty() {
return Err(anyhow::anyhow!(
"No nodes are configured to run metadata-server role, this command only \
supports replicated metadata deployment"
));
}

// todo make this work with other metadata client kinds as well; maybe proxy through Restate server
let metadata_store_client_options = MetadataClientOptions {
kind: MetadataClientKind::Replicated { addresses },
..Default::default()
};

create_client(metadata_store_client_options)
.await
.map_err(|e| anyhow::anyhow!("Failed to create metadata store client: {}", e))
}
2 changes: 1 addition & 1 deletion tools/restatectl/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use std::collections::{HashMap, HashSet};
use std::sync::RwLock;
use std::{cmp::Ordering, fmt::Display, future::Future, sync::Arc};
use std::{cmp::Ordering, fmt::Display, sync::Arc};

use cling::{Collect, prelude::Parser};
use itertools::{Either, Itertools, Position};
Expand Down
Loading