Skip to content

Commit

Permalink
storage: schedule singleton sources only on one replica
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Feb 12, 2025
1 parent d5258c1 commit f0ee30b
Showing 1 changed file with 250 additions and 20 deletions.
270 changes: 250 additions & 20 deletions src/storage-controller/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::time::Duration;

use anyhow::bail;
use differential_dataflow::lattice::Lattice;
use itertools::Itertools;
use mz_build_info::BuildInfo;
use mz_cluster_client::client::{ClusterReplicaLocation, ClusterStartupEpoch, TimelyConfig};
use mz_cluster_client::ReplicaId;
Expand All @@ -25,10 +26,12 @@ use mz_repr::GlobalId;
use mz_service::client::{GenericClient, Partitioned};
use mz_service::params::GrpcClientParameters;
use mz_storage_client::client::{
Status, StatusUpdate, StorageClient, StorageCommand, StorageGrpcClient, StorageResponse,
RunIngestionCommand, Status, StatusUpdate, StorageClient, StorageCommand, StorageGrpcClient,
StorageResponse,
};
use mz_storage_client::metrics::{InstanceMetrics, ReplicaMetrics};
use timely::progress::Timestamp;
use mz_storage_types::sources::SourceConnection;
use timely::progress::{Antichain, Timestamp};
use tokio::select;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
Expand All @@ -53,7 +56,9 @@ pub(crate) struct Instance<T> {
/// While this is derivable from `history` on demand, keeping a denormalized
/// list of running ingestions is quite a bit more convenient in the
/// implementation of `StorageController::active_ingestions`.
active_ingestions: BTreeSet<GlobalId>,
active_ingestions: BTreeMap<GlobalId, ActiveIngestion>,
/// A map from ingestion export ID to the ingestion that is producing it.
ingestion_exports: BTreeMap<GlobalId, GlobalId>,
/// The command history, used to replay past commands when introducing new replicas or
/// reconnecting to existing replicas.
history: CommandHistory<T>,
Expand All @@ -70,6 +75,15 @@ pub(crate) struct Instance<T> {
response_tx: mpsc::UnboundedSender<StorageResponse<T>>,
}

#[derive(Debug)]
struct ActiveIngestion {
/// Whether the ingestion prefers running on a single replica.
prefers_single_replica: bool,

/// The set of replicas that this ingestion is currently running on.
active_replicas: BTreeSet<ReplicaId>,
}

impl<T> Instance<T>
where
T: Timestamp + Lattice,
Expand All @@ -87,7 +101,8 @@ where

let mut instance = Self {
replicas: Default::default(),
active_ingestions: BTreeSet::new(),
active_ingestions: Default::default(),
ingestion_exports: Default::default(),
history,
epoch,
metrics,
Expand Down Expand Up @@ -125,16 +140,59 @@ where

// Replay the commands at the new replica.
for command in self.history.iter() {
replica.send(command.clone());
// replica.send(command.clone());
match command.clone() {
StorageCommand::RunIngestions(_) => {
// Ingestions are handled by schedule_ingestions below.
}
StorageCommand::AllowCompaction(_) => {
// Compactions are handled by send_compactions below.
}
command => {
replica.send(command.clone());
}
}
}

self.replicas.insert(id, replica);

self.schedule_ingestions();

let compactions = self
.history
.iter()
.flat_map(|cmd| {
if let StorageCommand::AllowCompaction(cmds) = cmd {
cmds.clone()
} else {
Vec::new()
}
})
.collect();
self.send_compactions(&compactions);
}

/// Removes the identified replica from this storage instance.
pub fn drop_replica(&mut self, id: ReplicaId) {
let replica = self.replicas.remove(&id);

let mut needs_rescheduling = false;
for (ingestion_id, ingestion) in self.active_ingestions.iter_mut() {
let was_running = ingestion.active_replicas.remove(&id);
if was_running {
tracing::debug!(
%ingestion_id,
replica_id = %id,
"ingestion was running on dropped replica, updating scheduling decisions"
);
needs_rescheduling = true;
}
}

if needs_rescheduling {
self.schedule_ingestions();
}

if replica.is_some() && self.replicas.is_empty() {
self.update_paused_statuses();
}
Expand Down Expand Up @@ -197,32 +255,204 @@ where

/// Sends a command to this storage instance.
pub fn send(&mut self, command: StorageCommand<T>) {
match &command {
// Record the command so that new replicas can be brought up to speed.
self.history.push(command.clone());

match command.clone() {
StorageCommand::RunIngestions(ingestions) => {
for ingestion in ingestions {
self.active_ingestions.insert(ingestion.id);
self.absorb_ingestions(ingestions);
}
StorageCommand::AllowCompaction(cmds) => {
self.absorb_compactions(cmds);
}
command => {
for replica in self.replicas.values_mut() {
replica.send(command.clone());
}
}
StorageCommand::AllowCompaction(policies) => {
for (id, frontier) in policies {
if frontier.is_empty() {
self.active_ingestions.remove(id);
}

if command.installs_objects() && self.replicas.is_empty() {
self.update_paused_statuses();
}
}

/// Updates internal state based on incoming ingestion commands.
///
/// If a command updates an existing ingestion, it is sent out to the same
/// replicas that are already running that ingestion. Otherwise, we only
/// record the ingestion and leave scheduling up to
/// [`schedule_ingestions`](Self::schedule_ingestions).
fn absorb_ingestions(&mut self, ingestions: Vec<RunIngestionCommand>) {
for ingestion in ingestions {
let prefers_single_replica = ingestion
.description
.desc
.connection
.prefers_single_replica();

let existing_ingestion_state = self.active_ingestions.get_mut(&ingestion.id);

if let Some(ingestion_state) = existing_ingestion_state {
assert!(
ingestion_state.prefers_single_replica == prefers_single_replica,
"single-replica preference changed"
);
tracing::debug!(
ingestion_id = %ingestion.id,
active_replicas = %ingestion_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
"updating ingestion"
);
for id in ingestion.description.source_exports.keys() {
self.ingestion_exports.insert(id.clone(), ingestion.id);
}

// It's an update for an existing ingestion. So we need to send
// it out to the same replicas that are already running that
// ingestion. We don't need to change anything about our
// scheduling decisions, no need to update active_ingestions.
for active_replica_id in ingestion_state.active_replicas.iter() {
let active_replica = self
.replicas
.get_mut(active_replica_id)
.expect("missing replica");
active_replica.send(StorageCommand::RunIngestions(vec![ingestion.clone()]));
}
} else {
// We create a new ingestion state for this ingestion and then defer to schedule_ingestions.
let ingestion_state = ActiveIngestion {
prefers_single_replica,
active_replicas: BTreeSet::new(),
};
self.active_ingestions.insert(ingestion.id, ingestion_state);

for id in ingestion.description.source_exports.keys() {
self.ingestion_exports.insert(id.clone(), ingestion.id);
}

// Maybe update scheduling decisions.
self.schedule_ingestions();
}
}
}

/// Schedule ingestions on replicas, if needed.
///
/// Single-replica ingestions are scheduled on the last replica, if it's not
/// already running, the rationale being that the latest replica is the one
/// that was created to take over load from other replicas.
///
/// For multi-replica ingestions, we ensure that each active ingestion is
/// scheduled on all replicas.
fn schedule_ingestions(&mut self) {
// Ensure the command history is reduced before scheduling.
self.history.reduce();

for (ingestion_id, ingestion_state) in self.active_ingestions.iter_mut() {
// Look up the corresponding RunIngestionCommand in the command history.
let mut run_ingestion_cmd = None;
for command in self.history.iter() {
if let StorageCommand::RunIngestions(cmds) = command {
for cmd in cmds {
if cmd.id == *ingestion_id {
run_ingestion_cmd = Some(cmd.clone());
}
}
}
}

let run_ingestion_cmd = match run_ingestion_cmd {
Some(cmd) => cmd,
None => {
panic!(
"missing ingestion command in history for ingestion id {:?}",
ingestion_id
);
}
};

if ingestion_state.prefers_single_replica {
// For single-replica ingestion, schedule only if it's not already running.
if ingestion_state.active_replicas.is_empty() {
if let Some(last_replica_id) = self.replicas.keys().cloned().last() {
ingestion_state.active_replicas.insert(last_replica_id);
if let Some(replica) = self.replicas.get_mut(&last_replica_id) {
tracing::debug!(
ingestion_id = %ingestion_id,
replica_id = %last_replica_id,
"scheduling single-replica ingestion");
replica.send(StorageCommand::RunIngestions(vec![
run_ingestion_cmd.clone()
]));
}
}
} else {
tracing::debug!(
%ingestion_id,
active_replicas = %ingestion_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
"single-replica ingestion already running, not scheduling again",
);
}
} else {
// For multi-replica ingestion, ensure all replicas have received the command.
let current_replica_ids: BTreeSet<_> = self.replicas.keys().copied().collect();
let unscheduled_replicas: Vec<_> = current_replica_ids
.difference(&ingestion_state.active_replicas)
.copied()
.collect();
for replica_id in unscheduled_replicas {
if let Some(replica) = self.replicas.get_mut(&replica_id) {
tracing::debug!(
%ingestion_id,
%replica_id,
"scheduling multi-replica ingestion"
);
replica.send(StorageCommand::RunIngestions(vec![
run_ingestion_cmd.clone()
]));
ingestion_state.active_replicas.insert(replica_id);
}
}
}
_ => (),
}
}

// Record the command so that new replicas can be brought up to speed.
self.history.push(command.clone());
/// Updates internal state based on incoming compaction commands. Also sends
/// out compaction commands to replicas.
fn absorb_compactions(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
tracing::debug!(?self.active_ingestions, ?cmds, "allow_compaction");

self.send_compactions(&cmds);

// Clone the command for each active replica.
for replica in self.replicas.values_mut() {
replica.send(command.clone());
for (id, frontier) in cmds.iter() {
if frontier.is_empty() {
self.active_ingestions.remove(id);
self.ingestion_exports.remove(id);
}
}
}

if command.installs_objects() && self.replicas.is_empty() {
self.update_paused_statuses();
fn send_compactions(&mut self, cmds: &Vec<(GlobalId, Antichain<T>)>) {
for (id, frontier) in cmds.iter() {
let ingestion_id = self
.ingestion_exports
.get(id)
.expect("missing ingestion state for export");
let active_ingestion = self
.active_ingestions
.get(ingestion_id)
.expect("missing ingestion state");

for active_replica_id in active_ingestion.active_replicas.iter() {
let active_replica = self
.replicas
.get_mut(active_replica_id)
.expect("missing replica");
active_replica.send(StorageCommand::AllowCompaction(vec![(
id.clone(),
frontier.clone(),
)]));
}
}
}
}
Expand Down

0 comments on commit f0ee30b

Please sign in to comment.