Skip to content

Commit bb372ed

Browse files
authored
adapter: don't block the Coordinator on cluster status events (#31070)
While investigating some spikes in p99 message duration in the Coordinator, I noticed that we currently block on a builtin table write when receiving a status update for a cluster. This PR refactors the code so we wait for the builtin write to complete in a `tokio::task`, and then notify sessions. ### Motivation Improve P99 latencies for Coordinator messages ### Checklist - [x] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [x] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [x] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [x] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](https://github.com/MaterializeInc/cloud/pull/5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [x] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post.
1 parent 0d14e5d commit bb372ed

File tree

2 files changed

+37
-9
lines changed

2 files changed

+37
-9
lines changed

src/adapter/src/coord.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3477,12 +3477,33 @@ impl Coordinator {
34773477
}
34783478

34793479
/// Publishes a notice message to all sessions.
3480+
///
3481+
/// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
3482+
/// so we keep it around.
3483+
#[allow(dead_code)]
34803484
pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
34813485
for meta in self.active_conns.values() {
34823486
let _ = meta.notice_tx.send(notice.clone());
34833487
}
34843488
}
34853489

3490+
/// Returns a closure that will publish a notice to all sessions that were active at the time
3491+
/// this method was called.
3492+
pub(crate) fn broadcast_notice_tx(
3493+
&self,
3494+
) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3495+
let senders: Vec<_> = self
3496+
.active_conns
3497+
.values()
3498+
.map(|meta| meta.notice_tx.clone())
3499+
.collect();
3500+
Box::new(move |notice| {
3501+
for tx in senders {
3502+
let _ = tx.send(notice.clone());
3503+
}
3504+
})
3505+
}
3506+
34863507
pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
34873508
&self.active_conns
34883509
}

src/adapter/src/coord/message_handler.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -756,13 +756,8 @@ impl Coordinator {
756756
);
757757

758758
let builtin_table_updates = vec![builtin_table_retraction, builtin_table_addition];
759-
760-
self.builtin_table_update()
761-
.execute(builtin_table_updates)
762-
.await
763-
.0
764-
.instrument(info_span!("coord::message_cluster_event::table_updates"))
765-
.await;
759+
// Returns a Future that completes when the Builtin Table write is completed.
760+
let builtin_table_completion = self.builtin_table_update().defer(builtin_table_updates);
766761

767762
let cluster = self.catalog().get_cluster(event.cluster_id);
768763
let replica = cluster.replica(event.replica_id).expect("Replica exists");
@@ -771,12 +766,24 @@ impl Coordinator {
771766
.get_cluster_replica_status(event.cluster_id, event.replica_id);
772767

773768
if old_replica_status != new_replica_status {
774-
self.broadcast_notice(AdapterNotice::ClusterReplicaStatusChanged {
769+
let notifier = self.broadcast_notice_tx();
770+
let notice = AdapterNotice::ClusterReplicaStatusChanged {
775771
cluster: cluster.name.clone(),
776772
replica: replica.name.clone(),
777773
status: new_replica_status,
778774
time: event.time,
779-
});
775+
};
776+
// In a separate task, so we don't block the Coordinator, wait for the builtin
777+
// table update to complete, and then notify active sessions.
778+
mz_ore::task::spawn(
779+
|| format!("cluster_event-{}-{}", event.cluster_id, event.replica_id),
780+
async move {
781+
// Wait for the builtin table updates to complete.
782+
builtin_table_completion.await;
783+
// Notify all sessions that were active at the time the cluster status changed.
784+
(notifier)(notice)
785+
},
786+
);
780787
}
781788
}
782789
}

0 commit comments

Comments
 (0)