Skip to content

Commit

Permalink
Add remove node command and DisableNodeChecker
Browse files Browse the repository at this point in the history
This commit introduces the remove node command which uses the DisableNodeChecker
to verify that it is safe to disable/remove this node from the cluster.
  • Loading branch information
tillrohrmann committed Feb 27, 2025
1 parent d149d33 commit c68a4d5
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 22 deletions.
7 changes: 7 additions & 0 deletions crates/core/protobuf/node_ctl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ service NodeCtlSvc {

// Returns the cluster health from the point of view of this node.
rpc ClusterHealth(google.protobuf.Empty) returns (ClusterHealthResponse);

// Try to remove the given node from the cluster.
rpc RemoveNode(RemoveNodeRequest) returns (google.protobuf.Empty);
}

message ProvisionClusterRequest {
Expand Down Expand Up @@ -92,3 +95,7 @@ message ClusterHealthResponse {
message EmbeddedMetadataClusterHealth {
repeated restate.common.NodeId members = 1;
}

message RemoveNodeRequest {
uint32 node_id = 1;
}
6 changes: 4 additions & 2 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,13 +342,15 @@ impl Node {
TaskCenter::spawn(TaskKind::RpcServer, "node-rpc-server", {
let common_options = config.common.clone();
let connection_manager = self.networking.connection_manager().clone();
let metadata_store_client = self.metadata_store_client.clone();
let bifrost = self.bifrost.handle();
let metadata_writer = metadata_writer.clone();
async move {
NetworkServer::run(
connection_manager,
self.server_builder,
common_options,
metadata_store_client,
metadata_writer,
bifrost,
)
.await?;
Ok(())
Expand Down
210 changes: 210 additions & 0 deletions crates/node/src/network_server/disable_node_checker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use futures::future;
use restate_bifrost::Bifrost;
use restate_core::{Metadata, SyncError, TargetVersion};
use restate_types::PlainNodeId;
use restate_types::logs::metadata::{Logs, ProviderKind};
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::net::metadata::MetadataKind;
use restate_types::nodes_config::{NodesConfiguration, Role, StorageState};
use restate_types::replicated_loglet::ReplicatedLogletParams;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::info;

Check failure on line 22 in crates/node/src/network_server/disable_node_checker.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused import: `tracing::info`

#[derive(Debug, thiserror::Error)]
pub enum DisableNodeError {
#[error("unknown node {0}")]
UnknownNode(PlainNodeId),
#[error("log server is part of the open tail segment of log {0}")]
TailSegment(LogId),
#[error("log server is in state {0} but StorageState::ReadOnly is required for disabling it")]
NotReadOnly(StorageState),
#[error(
"log server is not safe to disable because it is still part of log {log_id} up to lsn {required_trim_point}, current trim point {actual_trim_point}"
)]
NotSafeToDisable {
log_id: LogId,
required_trim_point: Lsn,
actual_trim_point: Lsn,
},
#[error("failed finding the trim point for log {log_id}: {err}")]
FindTrimPoint {
log_id: LogId,
err: restate_bifrost::Error,
},
#[error("metadata server is member of cluster")]
MetadataMember,
}

pub struct DisableNodeChecker {
bifrost: Bifrost,
metadata: Metadata,
nodes_configuration: Arc<NodesConfiguration>,
logs: Arc<Logs>,
}

impl DisableNodeChecker {
pub async fn create(bifrost: Bifrost) -> Result<Self, SyncError> {
let metadata = Metadata::current();

let nodes_configuration = metadata.nodes_config_snapshot();

// It's important that we never operate on an outdated Logs wrt the used NodesConfiguration.
// Otherwise, we risk that we don't see all segments this node is part of.
metadata
.sync(MetadataKind::Logs, TargetVersion::Latest)
.await?;
let logs = metadata.logs_snapshot();

Ok(DisableNodeChecker {
bifrost,
nodes_configuration,
logs,
metadata,
})
}

pub async fn refresh_metadata(
&mut self,
sync_nodes_configuration: bool,
) -> Result<(), SyncError> {
if sync_nodes_configuration {
self.metadata
.sync(MetadataKind::NodesConfiguration, TargetVersion::Latest)
.await?;
}

self.nodes_configuration = self.metadata.nodes_config_snapshot();

// It's important that we never operate on an outdated Logs wrt the used NodesConfiguration.
// Otherwise, we risk that we don't see all segments this node is part of.
self.metadata
.sync(MetadataKind::Logs, TargetVersion::Latest)
.await?;
self.logs = self.metadata.logs_snapshot();

Ok(())
}

pub fn nodes_configuration(&self) -> &NodesConfiguration {
&self.nodes_configuration
}

pub async fn safe_to_disable_node(&self, node_id: PlainNodeId) -> Result<(), DisableNodeError> {
// for a node to be safe to be disabled it needs to be known so that we don't accidentally
// disable a newly joining node
let node_config = self
.nodes_configuration
.find_node_by_id(node_id)
.map_err(|_err| DisableNodeError::UnknownNode(node_id))?;

if node_config.roles.contains(Role::LogServer) {
self.safe_to_disable_log_server(node_id, node_config.log_server_config.storage_state)
.await?
}

if node_config.roles.contains(Role::MetadataServer) {
// only safe to disable node if it does not run a metadata server
return Err(DisableNodeError::MetadataMember);
}

Ok(())
}

/// Checks whether it is safe to disable the given log server identified by the node_id. It is
/// safe to disable the log server if it can no longer be added to new nodesets (== not being
/// a candidate) and all segments this log server is part of (member of the nodeset) are before
/// the trim point.
async fn safe_to_disable_log_server(
&self,
node_id: PlainNodeId,
storage_state: StorageState,
) -> Result<(), DisableNodeError> {
match storage_state {
storage_state @ (StorageState::Provisioning
| StorageState::ReadWrite
| StorageState::DataLoss) => {
return Err(DisableNodeError::NotReadOnly(storage_state));
}
// it's safe to disable a disabled node
StorageState::Disabled => return Ok(()),
// we need to check whether this node is no longer part of any segments that haven't
// been fully trimmed.
StorageState::ReadOnly => {}
}

let mut required_trim_points: HashMap<_, _> = HashMap::default();

// find for every log the latest segment in which the node is part of the nodeset
for (log_id, chain) in self.logs.iter() {
let mut next_segment_lsn: Option<Lsn> = None;
for segment in chain.iter().rev() {
let is_member = match segment.config.kind {
// we assume that the given node runs the local and memory loglet
ProviderKind::Local => true,
// #[cfg(any(test, feature = "memory-loglet"))]
ProviderKind::InMemory => true,
ProviderKind::Replicated => {
let replicated_loglet = ReplicatedLogletParams::deserialize_from(
segment.config.params.as_bytes(),
)
.expect("to be deserializable");
replicated_loglet.nodeset.contains(node_id)
}
};

if is_member {
let Some(tail_lsn) = next_segment_lsn else {
// If we are still part of the tail segment, then trimming won't help.
// The node can only be disabled if it is in closed segment that can be
// fully trimmed.
return Err(DisableNodeError::TailSegment(*log_id));
};

// tail_lsn is the first lsn that will be written after this segment
required_trim_points.insert(*log_id, tail_lsn.prev());
break;
}

next_segment_lsn = Some(segment.base_lsn);
}
}

// find for every log in which node is a member of a nodeset the actual trim point and
// compare whether the segment ends before or after the trim point.
future::try_join_all(required_trim_points.into_iter().map(
|(log_id, required_trim_point)| async move {
let actual_trim_point = self
.bifrost
.clone()
.get_trim_point(log_id)
.await
.map_err(|err| DisableNodeError::FindTrimPoint { log_id, err })?;
// The actual trim point must be larger or equal than the required trim point for
// the node to be no longer needed for reads.
if actual_trim_point < required_trim_point {
Err(DisableNodeError::NotSafeToDisable {
log_id,
required_trim_point,
actual_trim_point,
})
} else {
Ok(())
}
},
))
.await?;

Ok(())
}
}
Loading

0 comments on commit c68a4d5

Please sign in to comment.