Skip to content

Commit

Permalink
ore: pend in wait_in_assert_finished instead
Browse files Browse the repository at this point in the history
  • Loading branch information
guswynn committed Nov 30, 2023
1 parent 218f638 commit bc63dd1
Show file tree
Hide file tree
Showing 29 changed files with 162 additions and 168 deletions.
2 changes: 1 addition & 1 deletion src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5281,7 +5281,7 @@ mod tests {
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_smoketest_all_builtins() {
fn inner(catalog: Catalog) -> Vec<tokio::task::JoinHandle<()>> {
fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
let catalog = Arc::new(catalog);
let conn_catalog = catalog.for_system_session();

Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use mz_ore::collections::CollectionExt;
use mz_ore::id_gen::IdAllocator;
use mz_ore::now::{to_datetime, EpochMillis, NowFn};
use mz_ore::result::ResultExt;
use mz_ore::task::{AbortOnDropHandle, JoinHandleExt};
use mz_ore::task::AbortOnDropHandle;
use mz_ore::thread::JoinOnDropHandle;
use mz_ore::tracing::OpenTelemetryContext;
use mz_repr::{GlobalId, Row, ScalarType};
Expand Down
2 changes: 1 addition & 1 deletion src/compute-client/src/controller/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use differential_dataflow::lattice::Lattice;
use mz_build_info::BuildInfo;
use mz_cluster_client::client::{ClusterReplicaLocation, ClusterStartupEpoch, TimelyConfig};
use mz_ore::retry::Retry;
use mz_ore::task::{AbortOnDropHandle, JoinHandleExt};
use mz_ore::task::AbortOnDropHandle;
use mz_repr::GlobalId;
use mz_service::client::{GenericClient, Partitioned};
use mz_service::params::GrpcClientParameters;
Expand Down
6 changes: 3 additions & 3 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use mz_compute_types::plan::Plan;
use mz_expr::SafeMfpPlan;
use mz_ore::cast::CastFrom;
use mz_ore::metrics::UIntGauge;
use mz_ore::task::{AbortHandleExt, AbortOnDropAbortHandle};
use mz_ore::task::AbortOnDropHandle;
use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::read::ReadHandle;
Expand Down Expand Up @@ -734,7 +734,7 @@ impl PendingPeek {
});
PendingPeek::Persist(PersistPeek {
peek,
_abort_handle: task_handle.abort_handle().abort_on_drop(),
_abort_handle: task_handle.abort_on_drop(),
result: result_rx,
span: tracing::Span::current(),
})
Expand Down Expand Up @@ -763,7 +763,7 @@ pub struct PersistPeek {
pub(crate) peek: Peek,
/// A background task that's responsible for producing the peek results.
/// If we're no longer interested in the results, we abort the task.
_abort_handle: AbortOnDropAbortHandle,
_abort_handle: AbortOnDropHandle<()>,
/// The result of the background task, eventually.
result: oneshot::Receiver<(PeekResponse, Duration)>,
/// The `tracing::Span` tracking this peek's operation
Expand Down
2 changes: 1 addition & 1 deletion src/controller/src/clusters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use mz_orchestrator::{
ServiceEvent, ServicePort,
};
use mz_ore::halt;
use mz_ore::task::{AbortOnDropHandle, JoinHandleExt};
use mz_ore::task::AbortOnDropHandle;
use mz_repr::adt::numeric::Numeric;
use mz_repr::GlobalId;
use once_cell::sync::Lazy;
Expand Down
16 changes: 8 additions & 8 deletions src/environmentd/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ impl<'s, T, H> ConnectBuilder<'s, T, H> {
}
}

/// Configures this [`ConnectBuilder`] to return the [`tokio::task::JoinHandle`] that is
/// Configures this [`ConnectBuilder`] to return the [`mz_ore::task::JoinHandle`] that is
/// polling the underlying postgres connection, associated with the returned client.
pub fn with_handle(self) -> ConnectBuilder<'s, T, WithHandle> {
ConnectBuilder {
Expand All @@ -750,37 +750,37 @@ impl<'s, T, H> ConnectBuilder<'s, T, H> {
}
}

/// This trait enables us to either include or omit the [`tokio::task::JoinHandle`] in the result
/// This trait enables us to either include or omit the [`mz_ore::task::JoinHandle`] in the result
/// of a client connection.
pub trait IncludeHandle: Send {
type Output;
fn transform_result(
client: tokio_postgres::Client,
handle: tokio::task::JoinHandle<()>,
handle: mz_ore::task::JoinHandle<()>,
) -> Self::Output;
}

/// Type parameter that denotes we __will not__ return the [`tokio::task::JoinHandle`] in the
/// Type parameter that denotes we __will not__ return the [`mz_ore::task::JoinHandle`] in the
/// result of a [`ConnectBuilder`].
pub struct NoHandle;
impl IncludeHandle for NoHandle {
type Output = tokio_postgres::Client;
fn transform_result(
client: tokio_postgres::Client,
_handle: tokio::task::JoinHandle<()>,
_handle: mz_ore::task::JoinHandle<()>,
) -> Self::Output {
client
}
}

/// Type parameter that denotes we __will__ return the [`tokio::task::JoinHandle`] in the result of
/// Type parameter that denotes we __will__ return the [`mz_ore::task::JoinHandle`] in the result of
/// a [`ConnectBuilder`].
pub struct WithHandle;
impl IncludeHandle for WithHandle {
type Output = (tokio_postgres::Client, tokio::task::JoinHandle<()>);
type Output = (tokio_postgres::Client, mz_ore::task::JoinHandle<()>);
fn transform_result(
client: tokio_postgres::Client,
handle: tokio::task::JoinHandle<()>,
handle: mz_ore::task::JoinHandle<()>,
) -> Self::Output {
(client, handle)
}
Expand Down
13 changes: 5 additions & 8 deletions src/environmentd/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,11 +1124,9 @@ async fn test_subscribe_shutdown() {
.unwrap();

// Un-gracefully abort the connection.
conn_task.abort();

// Need to await `conn_task` to actually deliver the `abort`. We don't
// We Need to await `conn_task` to actually deliver the `abort`. We don't
// care about the result though (it's probably `JoinError` with `is_cancelled` being true).
let _ = conn_task.await;
conn_task.abort_and_wait().await;

// Dropping the server will initiate a graceful shutdown. We previously had
// a bug where the server would fail to notice that the client running
Expand Down Expand Up @@ -1561,10 +1559,9 @@ async fn test_github_12546() {
// Aborting the connection should cause its pending queries to be cancelled,
// allowing the compute instances to stop crashing while trying to execute
// them.
conn_task.abort();

// Need to await `conn_task` to actually deliver the `abort`.
let _ = conn_task.await;
//
// We need to await `conn_task` to actually deliver the `abort`.
conn_task.abort_and_wait().await;

// Make a new connection to verify the compute instance can now start.
let client = server.connect().await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/frontegg-mock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ use jsonwebtoken::EncodingKey;
use mz_frontegg_auth::{ApiTokenArgs, ApiTokenResponse, Claims, RefreshToken, REFRESH_SUFFIX};
use mz_ore::now::NowFn;
use mz_ore::retry::Retry;
use mz_ore::task::JoinHandle;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;
use uuid::Uuid;

pub struct FronteggMockServer {
Expand Down
2 changes: 1 addition & 1 deletion src/orchestrator-process/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ use mz_ore::cast::{CastFrom, ReinterpretCast, TryCastFrom};
use mz_ore::error::ErrorExt;
use mz_ore::netio::UnixSocketAddr;
use mz_ore::result::ResultExt;
use mz_ore::task::{self, AbortOnDropHandle, JoinHandleExt};
use mz_ore::task::{self, AbortOnDropHandle};
use mz_pid_file::PidFile;
use scopeguard::defer;
use serde::Serialize;
Expand Down
Loading

0 comments on commit bc63dd1

Please sign in to comment.