Skip to content

Commit 1ed7ece

Browse files
authored
Add timeout to metastore GRPC client (#5716)
* Add timeout to metastore GRPC client * Explicit the test content
1 parent f10654a commit 1ed7ece

File tree

5 files changed

+88
-16
lines changed

5 files changed

+88
-16
lines changed

quickwit/quickwit-codegen/example/src/error.rs

+7
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::fmt;
1616

1717
use quickwit_actors::AskError;
18+
use quickwit_common::tower::TimeoutExceeded;
1819
use quickwit_proto::error::GrpcServiceError;
1920
pub use quickwit_proto::error::{grpc_error_to_grpc_status, grpc_status_to_service_error};
2021
use quickwit_proto::{ServiceError, ServiceErrorCode};
@@ -72,3 +73,9 @@ where E: fmt::Debug
7273
HelloError::Internal(format!("{error:?}"))
7374
}
7475
}
76+
77+
impl From<TimeoutExceeded> for HelloError {
78+
fn from(_: TimeoutExceeded) -> Self {
79+
HelloError::Timeout("client".to_string())
80+
}
81+
}

quickwit/quickwit-codegen/example/src/lib.rs

+58-1
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ mod tests {
162162

163163
use bytesize::ByteSize;
164164
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Universe};
165-
use quickwit_common::tower::{BalanceChannel, Change};
165+
use quickwit_common::tower::{BalanceChannel, Change, TimeoutLayer};
166166
use tokio::sync::mpsc::error::TrySendError;
167167
use tokio_stream::StreamExt;
168168
use tonic::transport::{Endpoint, Server};
@@ -767,4 +767,61 @@ mod tests {
767767
.unwrap_err();
768768
assert!(matches!(error, HelloError::Timeout(_)));
769769
}
770+
771+
#[tokio::test]
772+
async fn test_balanced_channel_timeout_with_server_crash() {
773+
let addr_str = "127.0.0.1:11112";
774+
let addr: SocketAddr = addr_str.parse().unwrap();
775+
// We want to abruptly stop a server without even sending the connection
776+
// RST packet. Simply dropping the tonic Server is not enough, so we
777+
// spawn a thread and freeze it with thread::park().
778+
std::thread::spawn(move || {
779+
let server_fut = async {
780+
let hello = HelloImpl {
781+
// delay the response so that the server freezes in the middle of the request
782+
delay: Duration::from_millis(1000),
783+
};
784+
let grpc_server_adapter = HelloGrpcServerAdapter::new(hello);
785+
let grpc_server = HelloGrpcServer::new(grpc_server_adapter);
786+
tokio::select! {
787+
// wait just enough to let the client perform its request
788+
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
789+
_ = Server::builder().add_service(grpc_server).serve(addr) => {}
790+
};
791+
std::thread::park();
792+
println!("Thread unparked, unexpected");
793+
};
794+
tokio::runtime::Builder::new_current_thread()
795+
.enable_all()
796+
.build()
797+
.unwrap()
798+
.block_on(server_fut);
799+
});
800+
801+
// create a client that will try to connect to the server
802+
let (balance_channel, balance_channel_tx) = BalanceChannel::new();
803+
let channel = Endpoint::from_str(&format!("http://{addr_str}"))
804+
.unwrap()
805+
.connect_lazy();
806+
balance_channel_tx
807+
.send(Change::Insert(addr, channel))
808+
.unwrap();
809+
810+
let grpc_client = HelloClient::tower()
811+
// this test hangs forever if we comment out the TimeoutLayer, which
812+
// shows that a request without explicit timeout might hang forever
813+
.stack_layer(TimeoutLayer::new(Duration::from_secs(3)))
814+
.build_from_balance_channel(balance_channel, ByteSize::mib(1));
815+
816+
let response_fut = async move {
817+
grpc_client
818+
.hello(HelloRequest {
819+
name: "World".to_string(),
820+
})
821+
.await
822+
};
823+
response_fut
824+
.await
825+
.expect_err("should have timed out at the client level");
826+
}
770827
}

quickwit/quickwit-indexing/src/actors/indexing_service.rs

+4
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,10 @@ impl IndexingService {
749749

750750
/// Shuts down the pipelines with supplied ids and performs necessary cleanup.
751751
async fn shutdown_pipelines(&mut self, pipelines_to_shutdown: &[PipelineUid]) {
752+
info!(
753+
pipeline_uids=?pipelines_to_shutdown,
754+
"shutdown indexing pipelines"
755+
);
752756
let should_gc_ingest_api_queues = pipelines_to_shutdown
753757
.iter()
754758
.flat_map(|pipeline_uid| self.indexing_pipelines.get(pipeline_uid))

quickwit/quickwit-proto/src/metastore/mod.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::fmt;
1616

1717
use quickwit_common::rate_limited_error;
1818
use quickwit_common::retry::Retryable;
19-
use quickwit_common::tower::MakeLoadShedError;
19+
use quickwit_common::tower::{MakeLoadShedError, TimeoutExceeded};
2020
use serde::{Deserialize, Serialize};
2121

2222
use crate::types::{IndexId, IndexUid, QueueId, SourceId, SplitId};
@@ -187,6 +187,12 @@ impl From<sqlx::Error> for MetastoreError {
187187
}
188188
}
189189

190+
impl From<TimeoutExceeded> for MetastoreError {
191+
fn from(_: TimeoutExceeded) -> Self {
192+
MetastoreError::Timeout("client".to_string())
193+
}
194+
}
195+
190196
impl ServiceError for MetastoreError {
191197
fn error_code(&self) -> ServiceErrorCode {
192198
match self {

quickwit/quickwit-serve/src/lib.rs

+12-14
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,9 @@ static METASTORE_GRPC_CLIENT_METRICS_LAYER: Lazy<GrpcMetricsLayer> =
169169
static METASTORE_GRPC_SERVER_METRICS_LAYER: Lazy<GrpcMetricsLayer> =
170170
Lazy::new(|| GrpcMetricsLayer::new("metastore", "server"));
171171

172-
static GRPC_TIMEOUT_LAYER: Lazy<TimeoutLayer> =
173-
Lazy::new(|| TimeoutLayer::new(Duration::from_secs(30)));
172+
static GRPC_INGESTER_SERVICE_TIMEOUT: Duration = Duration::from_secs(30);
173+
static GRPC_INDEXING_SERVICE_TIMEOUT: Duration = Duration::from_secs(30);
174+
static GRPC_METASTORE_SERVICE_TIMEOUT: Duration = Duration::from_secs(10);
174175

175176
struct QuickwitServices {
176177
pub node_config: Arc<NodeConfig>,
@@ -459,16 +460,13 @@ pub async fn serve_quickwit(
459460
{
460461
bail!("could not find any metastore node in the cluster");
461462
}
462-
// These layers applies to all the RPCs of the metastore.
463-
let shared_layers = ServiceBuilder::new()
464-
.layer(RetryLayer::new(RetryPolicy::from(RetryParams::standard())))
465-
.layer(METASTORE_GRPC_CLIENT_METRICS_LAYER.clone())
466-
.layer(tower::limit::GlobalConcurrencyLimitLayer::new(
463+
MetastoreServiceClient::tower()
464+
.stack_layer(TimeoutLayer::new(GRPC_METASTORE_SERVICE_TIMEOUT))
465+
.stack_layer(RetryLayer::new(RetryPolicy::from(RetryParams::standard())))
466+
.stack_layer(METASTORE_GRPC_CLIENT_METRICS_LAYER.clone())
467+
.stack_layer(tower::limit::GlobalConcurrencyLimitLayer::new(
467468
get_metastore_client_max_concurrency(),
468469
))
469-
.into_inner();
470-
MetastoreServiceClient::tower()
471-
.stack_layer(shared_layers)
472470
.build_from_balance_channel(balance_channel, grpc_config.max_message_size)
473471
};
474472
// Instantiate a control plane server if the `control-plane` role is enabled on the node.
@@ -514,11 +512,11 @@ pub async fn serve_quickwit(
514512
None
515513
};
516514

517-
// Setup indexer pool.
515+
// Setup the indexer pool to track cluster changes.
518516
setup_indexer_pool(
519517
&node_config,
520518
cluster.change_stream(),
521-
indexer_pool.clone(),
519+
indexer_pool,
522520
indexing_service_opt.clone(),
523521
);
524522

@@ -943,7 +941,7 @@ async fn setup_ingest_v2(
943941
} else {
944942
let ingester_service = IngesterServiceClient::tower()
945943
.stack_layer(INGEST_GRPC_CLIENT_METRICS_LAYER.clone())
946-
.stack_layer(GRPC_TIMEOUT_LAYER.clone())
944+
.stack_layer(TimeoutLayer::new(GRPC_INGESTER_SERVICE_TIMEOUT))
947945
.build_from_channel(
948946
node.grpc_advertise_addr(),
949947
node.channel(),
@@ -1145,7 +1143,7 @@ fn setup_indexer_pool(
11451143
} else {
11461144
let client = IndexingServiceClient::tower()
11471145
.stack_layer(INDEXING_GRPC_CLIENT_METRICS_LAYER.clone())
1148-
.stack_layer(GRPC_TIMEOUT_LAYER.clone())
1146+
.stack_layer(TimeoutLayer::new(GRPC_INDEXING_SERVICE_TIMEOUT))
11491147
.build_from_channel(
11501148
node.grpc_advertise_addr(),
11511149
node.channel(),

0 commit comments

Comments
 (0)