Skip to content

Commit bbcbdf7

Browse files
Adding keep alive setting. (#5769)
By default, the behavior is unchanged. Co-authored-by: fulmicoton <[email protected]>
1 parent 61d878c commit bbcbdf7

File tree

13 files changed

+268
-71
lines changed

13 files changed

+268
-71
lines changed

quickwit/quickwit-cli/src/tool.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use colored::{ColoredString, Colorize};
2626
use humantime::format_duration;
2727
use quickwit_actors::{ActorExitStatus, ActorHandle, Mailbox, Universe};
2828
use quickwit_cluster::{
29-
ChannelTransport, Cluster, ClusterMember, FailureDetectorConfig, make_client_tls_config,
29+
ChannelTransport, Cluster, ClusterMember, FailureDetectorConfig, make_client_grpc_config,
3030
};
3131
use quickwit_common::pubsub::EventBroker;
3232
use quickwit_common::runtimes::RuntimesConfig;
@@ -939,6 +939,7 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
939939
indexing_cpu_capacity: CpuCapacity::zero(),
940940
indexing_tasks: Vec::new(),
941941
};
942+
let client_grpc_config = make_client_grpc_config(&config.grpc_config)?;
942943
let cluster = Cluster::join(
943944
config.cluster_id.clone(),
944945
self_node,
@@ -947,12 +948,7 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
947948
config.gossip_interval,
948949
FailureDetectorConfig::default(),
949950
&ChannelTransport::default(),
950-
config
951-
.grpc_config
952-
.tls
953-
.as_ref()
954-
.map(make_client_tls_config)
955-
.transpose()?,
951+
client_grpc_config,
956952
)
957953
.await?;
958954

quickwit/quickwit-cluster/src/change.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ use chitchat::{ChitchatId, NodeState};
2121
use futures::Stream;
2222
use pin_project::pin_project;
2323
use quickwit_common::sorted_iter::{KeyDiff, SortedByKeyIterator};
24-
use quickwit_common::tower::{make_channel, warmup_channel};
24+
use quickwit_common::tower::{ClientGrpcConfig, make_channel, warmup_channel};
2525
use quickwit_proto::types::NodeId;
2626
use tokio::sync::mpsc;
2727
use tokio_stream::wrappers::UnboundedReceiverStream;
28-
use tonic::transport::{Channel, ClientTlsConfig};
28+
use tonic::transport::Channel;
2929
use tracing::{info, warn};
3030

3131
use crate::ClusterNode;
@@ -74,7 +74,7 @@ pub(crate) async fn compute_cluster_change_events(
7474
previous_nodes: &mut BTreeMap<NodeId, ClusterNode>,
7575
previous_node_states: &BTreeMap<ChitchatId, NodeState>,
7676
new_node_states: &BTreeMap<ChitchatId, NodeState>,
77-
tls_config: Option<&ClientTlsConfig>,
77+
client_grpc_config: &ClientGrpcConfig,
7878
) -> Vec<ClusterChange> {
7979
let mut cluster_events = Vec::new();
8080

@@ -91,7 +91,7 @@ pub(crate) async fn compute_cluster_change_events(
9191
chitchat_id,
9292
node_state,
9393
previous_nodes,
94-
tls_config,
94+
client_grpc_config.clone(),
9595
)
9696
.await;
9797

@@ -136,7 +136,7 @@ async fn compute_cluster_change_events_on_added(
136136
new_chitchat_id: &ChitchatId,
137137
new_node_state: &NodeState,
138138
previous_nodes: &mut BTreeMap<NodeId, ClusterNode>,
139-
tls_config: Option<&ClientTlsConfig>,
139+
client_grpc_config: ClientGrpcConfig,
140140
) -> Vec<ClusterChange> {
141141
let is_self_node = self_chitchat_id == new_chitchat_id;
142142
let new_node_id: NodeId = new_chitchat_id.node_id.clone().into();
@@ -169,7 +169,7 @@ async fn compute_cluster_change_events_on_added(
169169
new_chitchat_id,
170170
new_node_state,
171171
is_self_node,
172-
tls_config,
172+
&client_grpc_config,
173173
)
174174
.await
175175
else {
@@ -304,11 +304,11 @@ async fn try_new_node(
304304
chitchat_id: &ChitchatId,
305305
node_state: &NodeState,
306306
is_self_node: bool,
307-
tls_config: Option<&ClientTlsConfig>,
307+
grpc_config: &ClientGrpcConfig,
308308
) -> Option<ClusterNode> {
309309
match node_state.grpc_advertise_addr() {
310310
Ok(socket_addr) => {
311-
let channel = make_channel(socket_addr, tls_config.cloned()).await;
311+
let channel = make_channel(socket_addr, grpc_config.clone()).await;
312312
try_new_node_with_channel(cluster_id, chitchat_id, node_state, channel, is_self_node)
313313
}
314314
Err(error) => {
@@ -448,7 +448,7 @@ pub(crate) mod tests {
448448
&new_chitchat_id,
449449
&new_node_state,
450450
&mut previous_nodes,
451-
None,
451+
Default::default(),
452452
)
453453
.await;
454454
assert!(events.is_empty());
@@ -471,7 +471,7 @@ pub(crate) mod tests {
471471
&new_chitchat_id,
472472
&new_node_state,
473473
&mut previous_nodes,
474-
None,
474+
Default::default(),
475475
)
476476
.await;
477477
assert!(events.is_empty());
@@ -500,7 +500,7 @@ pub(crate) mod tests {
500500
&new_chitchat_id,
501501
&new_node_state,
502502
&mut previous_nodes,
503-
None,
503+
Default::default(),
504504
)
505505
.await;
506506

@@ -523,7 +523,7 @@ pub(crate) mod tests {
523523
&rejoined_chitchat_id,
524524
&new_node_state,
525525
&mut previous_nodes,
526-
None,
526+
Default::default(),
527527
)
528528
.await;
529529
assert_eq!(events.len(), 2);
@@ -552,7 +552,7 @@ pub(crate) mod tests {
552552
&new_chitchat_id,
553553
&new_node_state,
554554
&mut previous_nodes,
555-
None,
555+
Default::default(),
556556
)
557557
.await;
558558
assert!(events.is_empty());
@@ -577,7 +577,7 @@ pub(crate) mod tests {
577577
&new_chitchat_id,
578578
&new_node_state,
579579
&mut previous_nodes,
580-
None,
580+
Default::default(),
581581
)
582582
.await;
583583
assert_eq!(events.len(), 1);
@@ -908,7 +908,7 @@ pub(crate) mod tests {
908908
&mut previous_nodes,
909909
&previous_node_states,
910910
&new_node_states,
911-
None,
911+
&Default::default(),
912912
)
913913
.await;
914914
assert!(events.is_empty());
@@ -938,7 +938,7 @@ pub(crate) mod tests {
938938
&mut previous_nodes,
939939
&previous_node_states,
940940
&new_node_states,
941-
None,
941+
&Default::default(),
942942
)
943943
.await;
944944
assert!(events.is_empty());
@@ -956,7 +956,7 @@ pub(crate) mod tests {
956956
&mut previous_nodes,
957957
&previous_node_states,
958958
&new_node_states,
959-
None,
959+
&Default::default(),
960960
)
961961
.await;
962962
assert_eq!(events.len(), 1);
@@ -971,7 +971,7 @@ pub(crate) mod tests {
971971
&mut previous_nodes,
972972
&new_node_states,
973973
&new_node_states,
974-
None,
974+
&Default::default(),
975975
)
976976
.await;
977977
assert_eq!(events.len(), 0);
@@ -1004,7 +1004,7 @@ pub(crate) mod tests {
10041004
&mut previous_nodes,
10051005
&previous_node_states,
10061006
&new_node_states,
1007-
None,
1007+
&Default::default(),
10081008
)
10091009
.await;
10101010
assert_eq!(events.len(), 1);
@@ -1024,7 +1024,7 @@ pub(crate) mod tests {
10241024
&mut previous_nodes,
10251025
&previous_node_states,
10261026
&new_node_states,
1027-
None,
1027+
&Default::default(),
10281028
)
10291029
.await;
10301030
assert_eq!(events.len(), 1);

quickwit/quickwit-cluster/src/cluster.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ use chitchat::{
2626
FailureDetectorConfig, KeyChangeEvent, ListenerHandle, NodeState, spawn_chitchat,
2727
};
2828
use itertools::Itertools;
29+
use quickwit_common::tower::ClientGrpcConfig;
2930
use quickwit_proto::indexing::{IndexingPipelineId, IndexingTask, PipelineMetrics};
3031
use quickwit_proto::types::{NodeId, NodeIdRef, PipelineUid, ShardId};
3132
use serde::{Deserialize, Serialize};
3233
use tokio::sync::{Mutex, RwLock, mpsc, watch};
3334
use tokio::time::timeout;
3435
use tokio_stream::StreamExt;
3536
use tokio_stream::wrappers::WatchStream;
36-
use tonic::transport::ClientTlsConfig;
3737
use tracing::{info, warn};
3838

3939
use crate::change::{ClusterChange, ClusterChangeStreamFactory, compute_cluster_change_events};
@@ -62,9 +62,10 @@ pub struct Cluster {
6262
self_chitchat_id: ChitchatId,
6363
/// Socket address (UDP) the node listens on for receiving gossip messages.
6464
pub gossip_listen_addr: SocketAddr,
65-
// TODO this should become an ArcSwap<ClienTlsConfig> or something so that
66-
// some task can watch for new certificates and update this (hot reloading)
67-
tls_config: Option<ClientTlsConfig>,
65+
// TODO this object contains a tls config. We might want to change it to a
66+
// ArcSwap<ClientGrpcConfig> or something so that some task can watch for new certificates
67+
// and update this (hot reloading)
68+
client_grpc_config: ClientGrpcConfig,
6869
gossip_interval: Duration,
6970
inner: Arc<RwLock<InnerCluster>>,
7071
}
@@ -115,7 +116,7 @@ impl Cluster {
115116
gossip_interval: Duration,
116117
failure_detector_config: FailureDetectorConfig,
117118
transport: &dyn Transport,
118-
tls_config: Option<ClientTlsConfig>,
119+
client_grpc_config: ClientGrpcConfig,
119120
) -> anyhow::Result<Self> {
120121
info!(
121122
cluster_id=%cluster_id,
@@ -186,7 +187,7 @@ impl Cluster {
186187
weak_chitchat,
187188
live_nodes_rx,
188189
catchup_callback_rx.clone(),
189-
tls_config.clone(),
190+
client_grpc_config.clone(),
190191
)
191192
.await;
192193

@@ -204,7 +205,7 @@ impl Cluster {
204205
gossip_listen_addr,
205206
gossip_interval,
206207
inner: Arc::new(RwLock::new(inner)),
207-
tls_config,
208+
client_grpc_config,
208209
};
209210
spawn_change_stream_task(cluster.clone()).await;
210211
Ok(cluster)
@@ -560,7 +561,7 @@ fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option<IndexingTask>
560561
async fn spawn_change_stream_task(cluster: Cluster) {
561562
let cluster_guard = cluster.inner.read().await;
562563
let cluster_id = cluster_guard.cluster_id.clone();
563-
let tls_config = cluster.tls_config.clone();
564+
let client_grpc_config = cluster.client_grpc_config.clone();
564565
let self_chitchat_id = cluster_guard.self_chitchat_id.clone();
565566
let chitchat = cluster_guard.chitchat_handle.chitchat();
566567
let weak_cluster = Arc::downgrade(&cluster.inner);
@@ -584,7 +585,7 @@ async fn spawn_change_stream_task(cluster: Cluster) {
584585
previous_live_nodes,
585586
&previous_live_node_states,
586587
&new_live_node_states,
587-
tls_config.as_ref(),
588+
&client_grpc_config,
588589
)
589590
.await;
590591
if !events.is_empty() {
@@ -701,7 +702,7 @@ pub async fn create_cluster_for_test_with_id(
701702
Duration::from_millis(25),
702703
failure_detector_config,
703704
transport,
704-
None,
705+
Default::default(),
705706
)
706707
.await?;
707708
cluster.set_self_node_readiness(self_node_readiness).await;

quickwit/quickwit-cluster/src/grpc_gossip.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ use std::time::{Duration, Instant};
2121
use chitchat::{Chitchat, ChitchatId, NodeState, VersionedValue};
2222
use futures::Future;
2323
use quickwit_common::pretty::PrettyDisplay;
24+
use quickwit_common::tower::ClientGrpcConfig;
2425
use quickwit_proto::cluster::{ClusterService, ClusterServiceClient, FetchClusterStateRequest};
2526
use rand::seq::IteratorRandom;
2627
use tokio::sync::{Mutex, watch};
2728
use tokio_stream::StreamExt;
2829
use tokio_stream::wrappers::WatchStream;
29-
use tonic::transport::ClientTlsConfig;
3030
use tracing::{info, warn};
3131

3232
use crate::grpc_service::cluster_grpc_client;
@@ -42,7 +42,7 @@ pub(crate) async fn spawn_catchup_callback_task(
4242
weak_chitchat: Weak<Mutex<Chitchat>>,
4343
live_nodes_rx: watch::Receiver<BTreeMap<ChitchatId, NodeState>>,
4444
mut catchup_callback_rx: watch::Receiver<()>,
45-
tls_config: Option<ClientTlsConfig>,
45+
client_grpc_config: ClientGrpcConfig,
4646
) {
4747
let catchup_callback_future = async move {
4848
let mut interval = tokio::time::interval(Duration::from_secs(60));
@@ -57,8 +57,7 @@ pub(crate) async fn spawn_catchup_callback_task(
5757
&self_chitchat_id,
5858
chitchat,
5959
live_nodes_rx.clone(),
60-
cluster_grpc_client,
61-
tls_config.clone(),
60+
|socket_addr| cluster_grpc_client(socket_addr, client_grpc_config.clone()),
6261
)
6362
.await;
6463

@@ -72,15 +71,14 @@ pub(crate) async fn spawn_catchup_callback_task(
7271
tokio::spawn(catchup_callback_future);
7372
}
7473

75-
async fn perform_grpc_gossip_rounds<Factory, Fut>(
74+
async fn perform_grpc_gossip_rounds<ClusterServiceClientFactory, Fut>(
7675
cluster_id: String,
7776
self_chitchat_id: &ChitchatId,
7877
chitchat: Arc<Mutex<Chitchat>>,
7978
live_nodes_rx: watch::Receiver<BTreeMap<ChitchatId, NodeState>>,
80-
grpc_client_factory: Factory,
81-
tls_client_config: Option<ClientTlsConfig>,
79+
grpc_client_factory: ClusterServiceClientFactory,
8280
) where
83-
Factory: Fn(SocketAddr, Option<ClientTlsConfig>) -> Fut,
81+
ClusterServiceClientFactory: Fn(SocketAddr) -> Fut,
8482
Fut: Future<Output = ClusterServiceClient>,
8583
{
8684
wait_for_gossip_candidates(
@@ -101,8 +99,7 @@ async fn perform_grpc_gossip_rounds<Factory, Fut>(
10199
info!("pulling cluster state from node(s): {node_ids:?}");
102100

103101
for (node_id, grpc_advertise_addr) in zip(node_ids, grpc_advertise_addrs) {
104-
let cluster_client =
105-
grpc_client_factory(grpc_advertise_addr, tls_client_config.clone()).await;
102+
let cluster_client = grpc_client_factory(grpc_advertise_addr).await;
106103

107104
let request = FetchClusterStateRequest {
108105
cluster_id: cluster_id.clone(),
@@ -272,7 +269,7 @@ mod tests {
272269
let self_chitchat_id = cluster.self_chitchat_id();
273270
let chitchat = cluster.chitchat().await;
274271

275-
let grpc_client_factory = |_: SocketAddr, _: Option<_>| {
272+
let grpc_client_factory = |_: SocketAddr| {
276273
Box::pin(async {
277274
let mut mock_cluster_service = MockClusterService::new();
278275
mock_cluster_service
@@ -336,7 +333,6 @@ mod tests {
336333
chitchat.clone(),
337334
live_nodes_rx,
338335
grpc_client_factory,
339-
None,
340336
)
341337
.await;
342338

quickwit/quickwit-cluster/src/grpc_service.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@ use std::net::SocketAddr;
1717
use bytesize::ByteSize;
1818
use itertools::Itertools;
1919
use once_cell::sync::Lazy;
20-
use quickwit_common::tower::{GrpcMetricsLayer, make_channel};
20+
use quickwit_common::tower::{ClientGrpcConfig, GrpcMetricsLayer, make_channel};
2121
use quickwit_proto::cluster::cluster_service_grpc_server::ClusterServiceGrpcServer;
2222
use quickwit_proto::cluster::{
2323
ChitchatId as ProtoChitchatId, ClusterError, ClusterResult, ClusterService,
2424
ClusterServiceClient, ClusterServiceGrpcServerAdapter, FetchClusterStateRequest,
2525
FetchClusterStateResponse, NodeState as ProtoNodeState, VersionedKeyValue,
2626
};
2727
use tonic::async_trait;
28-
use tonic::transport::ClientTlsConfig;
2928

3029
use crate::Cluster;
3130

@@ -38,9 +37,9 @@ static CLUSTER_GRPC_SERVER_METRICS_LAYER: Lazy<GrpcMetricsLayer> =
3837

3938
pub(crate) async fn cluster_grpc_client(
4039
socket_addr: SocketAddr,
41-
tls_config: Option<ClientTlsConfig>,
40+
client_grpc_config: ClientGrpcConfig,
4241
) -> ClusterServiceClient {
43-
let channel = make_channel(socket_addr, tls_config).await;
42+
let channel = make_channel(socket_addr, client_grpc_config).await;
4443

4544
ClusterServiceClient::tower()
4645
.stack_layer(CLUSTER_GRPC_CLIENT_METRICS_LAYER.clone())

0 commit comments

Comments
 (0)