From f0ee30be733e676c62631249aeb865f0c967ba5a Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Mon, 10 Feb 2025 18:30:34 +0100 Subject: [PATCH] storage: schedule singleton sources only on one replica --- src/storage-controller/src/instance.rs | 270 +++++++++++++++++++++++-- 1 file changed, 250 insertions(+), 20 deletions(-) diff --git a/src/storage-controller/src/instance.rs b/src/storage-controller/src/instance.rs index e9a2cc1984205..8f2957c7fed63 100644 --- a/src/storage-controller/src/instance.rs +++ b/src/storage-controller/src/instance.rs @@ -15,6 +15,7 @@ use std::time::Duration; use anyhow::bail; use differential_dataflow::lattice::Lattice; +use itertools::Itertools; use mz_build_info::BuildInfo; use mz_cluster_client::client::{ClusterReplicaLocation, ClusterStartupEpoch, TimelyConfig}; use mz_cluster_client::ReplicaId; @@ -25,10 +26,12 @@ use mz_repr::GlobalId; use mz_service::client::{GenericClient, Partitioned}; use mz_service::params::GrpcClientParameters; use mz_storage_client::client::{ - Status, StatusUpdate, StorageClient, StorageCommand, StorageGrpcClient, StorageResponse, + RunIngestionCommand, Status, StatusUpdate, StorageClient, StorageCommand, StorageGrpcClient, + StorageResponse, }; use mz_storage_client::metrics::{InstanceMetrics, ReplicaMetrics}; -use timely::progress::Timestamp; +use mz_storage_types::sources::SourceConnection; +use timely::progress::{Antichain, Timestamp}; use tokio::select; use tokio::sync::mpsc; use tracing::{debug, info, warn}; @@ -53,7 +56,9 @@ pub(crate) struct Instance { /// While this is derivable from `history` on demand, keeping a denormalized /// list of running ingestions is quite a bit more convenient in the /// implementation of `StorageController::active_ingestions`. - active_ingestions: BTreeSet, + active_ingestions: BTreeMap, + /// A map from ingestion export ID to the ingestion that is producing it. + ingestion_exports: BTreeMap, /// The command history, used to replay past commands when introducing new replicas or /// reconnecting to existing replicas. history: CommandHistory, @@ -70,6 +75,15 @@ pub(crate) struct Instance { response_tx: mpsc::UnboundedSender>, } +#[derive(Debug)] +struct ActiveIngestion { + /// Whether the ingestion prefers running on a single replica. + prefers_single_replica: bool, + + /// The set of replicas that this ingestion is currently running on. + active_replicas: BTreeSet, +} + impl Instance where T: Timestamp + Lattice, @@ -87,7 +101,8 @@ where let mut instance = Self { replicas: Default::default(), - active_ingestions: BTreeSet::new(), + active_ingestions: Default::default(), + ingestion_exports: Default::default(), history, epoch, metrics, @@ -125,16 +140,59 @@ where // Replay the commands at the new replica. for command in self.history.iter() { - replica.send(command.clone()); + // replica.send(command.clone()); + match command.clone() { + StorageCommand::RunIngestions(_) => { + // Ingestions are handled by schedule_ingestions below. + } + StorageCommand::AllowCompaction(_) => { + // Compactions are handled by send_compactions below. + } + command => { + replica.send(command.clone()); + } + } } self.replicas.insert(id, replica); + + self.schedule_ingestions(); + + let compactions = self + .history + .iter() + .flat_map(|cmd| { + if let StorageCommand::AllowCompaction(cmds) = cmd { + cmds.clone() + } else { + Vec::new() + } + }) + .collect(); + self.send_compactions(&compactions); } /// Removes the identified replica from this storage instance. pub fn drop_replica(&mut self, id: ReplicaId) { let replica = self.replicas.remove(&id); + let mut needs_rescheduling = false; + for (ingestion_id, ingestion) in self.active_ingestions.iter_mut() { + let was_running = ingestion.active_replicas.remove(&id); + if was_running { + tracing::debug!( + %ingestion_id, + replica_id = %id, + "ingestion was running on dropped replica, updating scheduling decisions" + ); + needs_rescheduling = true; + } + } + + if needs_rescheduling { + self.schedule_ingestions(); + } + if replica.is_some() && self.replicas.is_empty() { self.update_paused_statuses(); } @@ -197,32 +255,204 @@ where /// Sends a command to this storage instance. pub fn send(&mut self, command: StorageCommand) { - match &command { + // Record the command so that new replicas can be brought up to speed. + self.history.push(command.clone()); + + match command.clone() { StorageCommand::RunIngestions(ingestions) => { - for ingestion in ingestions { - self.active_ingestions.insert(ingestion.id); + self.absorb_ingestions(ingestions); + } + StorageCommand::AllowCompaction(cmds) => { + self.absorb_compactions(cmds); + } + command => { + for replica in self.replicas.values_mut() { + replica.send(command.clone()); } } - StorageCommand::AllowCompaction(policies) => { - for (id, frontier) in policies { - if frontier.is_empty() { - self.active_ingestions.remove(id); + } + + if command.installs_objects() && self.replicas.is_empty() { + self.update_paused_statuses(); + } + } + + /// Updates internal state based on incoming ingestion commands. + /// + /// If a command updates an existing ingestion, it is sent out to the same + /// replicas that are already running that ingestion. Otherwise, we only + /// record the ingestion and leave scheduling up to + /// [`schedule_ingestions`](Self::schedule_ingestions). + fn absorb_ingestions(&mut self, ingestions: Vec) { + for ingestion in ingestions { + let prefers_single_replica = ingestion + .description + .desc + .connection + .prefers_single_replica(); + + let existing_ingestion_state = self.active_ingestions.get_mut(&ingestion.id); + + if let Some(ingestion_state) = existing_ingestion_state { + assert!( + ingestion_state.prefers_single_replica == prefers_single_replica, + "single-replica preference changed" + ); + tracing::debug!( + ingestion_id = %ingestion.id, + active_replicas = %ingestion_state.active_replicas.iter().map(|id| id.to_string()).join(", "), + "updating ingestion" + ); + for id in ingestion.description.source_exports.keys() { + self.ingestion_exports.insert(id.clone(), ingestion.id); + } + + // It's an update for an existing ingestion. So we need to send + // it out to the same replicas that are already running that + // ingestion. We don't need to change anything about our + // scheduling decisions, no need to update active_ingestions. + for active_replica_id in ingestion_state.active_replicas.iter() { + let active_replica = self + .replicas + .get_mut(active_replica_id) + .expect("missing replica"); + active_replica.send(StorageCommand::RunIngestions(vec![ingestion.clone()])); + } + } else { + // We create a new ingestion state for this ingestion and then defer to schedule_ingestions. + let ingestion_state = ActiveIngestion { + prefers_single_replica, + active_replicas: BTreeSet::new(), + }; + self.active_ingestions.insert(ingestion.id, ingestion_state); + + for id in ingestion.description.source_exports.keys() { + self.ingestion_exports.insert(id.clone(), ingestion.id); + } + + // Maybe update scheduling decisions. + self.schedule_ingestions(); + } + } + } + + /// Schedule ingestions on replicas, if needed. + /// + /// Single-replica ingestions are scheduled on the last replica, if it's not + /// already running, the rationale being that the latest replica is the one + /// that was created to take over load from other replicas. + /// + /// For multi-replica ingestions, we ensure that each active ingestion is + /// scheduled on all replicas. + fn schedule_ingestions(&mut self) { + // Ensure the command history is reduced before scheduling. + self.history.reduce(); + + for (ingestion_id, ingestion_state) in self.active_ingestions.iter_mut() { + // Look up the corresponding RunIngestionCommand in the command history. + let mut run_ingestion_cmd = None; + for command in self.history.iter() { + if let StorageCommand::RunIngestions(cmds) = command { + for cmd in cmds { + if cmd.id == *ingestion_id { + run_ingestion_cmd = Some(cmd.clone()); + } + } + } + } + + let run_ingestion_cmd = match run_ingestion_cmd { + Some(cmd) => cmd, + None => { + panic!( + "missing ingestion command in history for ingestion id {:?}", + ingestion_id + ); + } + }; + + if ingestion_state.prefers_single_replica { + // For single-replica ingestion, schedule only if it's not already running. + if ingestion_state.active_replicas.is_empty() { + if let Some(last_replica_id) = self.replicas.keys().cloned().last() { + ingestion_state.active_replicas.insert(last_replica_id); + if let Some(replica) = self.replicas.get_mut(&last_replica_id) { + tracing::debug!( + ingestion_id = %ingestion_id, + replica_id = %last_replica_id, + "scheduling single-replica ingestion"); + replica.send(StorageCommand::RunIngestions(vec![ + run_ingestion_cmd.clone() + ])); + } + } + } else { + tracing::debug!( + %ingestion_id, + active_replicas = %ingestion_state.active_replicas.iter().map(|id| id.to_string()).join(", "), + "single-replica ingestion already running, not scheduling again", + ); + } + } else { + // For multi-replica ingestion, ensure all replicas have received the command. + let current_replica_ids: BTreeSet<_> = self.replicas.keys().copied().collect(); + let unscheduled_replicas: Vec<_> = current_replica_ids + .difference(&ingestion_state.active_replicas) + .copied() + .collect(); + for replica_id in unscheduled_replicas { + if let Some(replica) = self.replicas.get_mut(&replica_id) { + tracing::debug!( + %ingestion_id, + %replica_id, + "scheduling multi-replica ingestion" + ); + replica.send(StorageCommand::RunIngestions(vec![ + run_ingestion_cmd.clone() + ])); + ingestion_state.active_replicas.insert(replica_id); } } } - _ => (), } + } - // Record the command so that new replicas can be brought up to speed. - self.history.push(command.clone()); + /// Updates internal state based on incoming compaction commands. Also sends + /// out compaction commands to replicas. + fn absorb_compactions(&mut self, cmds: Vec<(GlobalId, Antichain)>) { + tracing::debug!(?self.active_ingestions, ?cmds, "allow_compaction"); + + self.send_compactions(&cmds); - // Clone the command for each active replica. - for replica in self.replicas.values_mut() { - replica.send(command.clone()); + for (id, frontier) in cmds.iter() { + if frontier.is_empty() { + self.active_ingestions.remove(id); + self.ingestion_exports.remove(id); + } } + } - if command.installs_objects() && self.replicas.is_empty() { - self.update_paused_statuses(); + fn send_compactions(&mut self, cmds: &Vec<(GlobalId, Antichain)>) { + for (id, frontier) in cmds.iter() { + let ingestion_id = self + .ingestion_exports + .get(id) + .expect("missing ingestion state for export"); + let active_ingestion = self + .active_ingestions + .get(ingestion_id) + .expect("missing ingestion state"); + + for active_replica_id in active_ingestion.active_replicas.iter() { + let active_replica = self + .replicas + .get_mut(active_replica_id) + .expect("missing replica"); + active_replica.send(StorageCommand::AllowCompaction(vec![( + id.clone(), + frontier.clone(), + )])); + } } } }