From 5ec33a5b8ec9466def207dad12336d58b25208c1 Mon Sep 17 00:00:00 2001 From: Chris Tolliday Date: Wed, 19 Feb 2025 20:41:20 -0800 Subject: [PATCH] scribe_client: Increase thrift timeout for invocation recorder upload Summary: Most instances of 'command_report_finalizing_errors' being logged to buck_results are thrift timeouts (`apache::thrift::transport::TTransportException: Timed Out`) when uploading the invocation record. It's possible congestion from other events (actions etc.) are contributing and it may be possible to prioritize these messages somehow, but for a start let's just increase the thrift timeouts used when uploading the invocation record, from 1s to 2s. Reviewed By: iguridi Differential Revision: D69882960 fbshipit-source-id: ded65b486470f7ad9059ccd61667c2c95e1324b4 --- .../src/subscribers/build_graph_stats.rs | 1 + app/buck2_client_ctx/src/subscribers/recorder.rs | 1 + app/buck2_events/src/sink/remote.rs | 1 + app/buck2_server/src/daemon/state.rs | 1 + shed/scribe_client/src/producer.rs | 15 ++++++++++----- 5 files changed, 14 insertions(+), 5 deletions(-) diff --git a/app/buck2_client_ctx/src/subscribers/build_graph_stats.rs b/app/buck2_client_ctx/src/subscribers/build_graph_stats.rs index 6f19b80ae45e..a6a65e241f3c 100644 --- a/app/buck2_client_ctx/src/subscribers/build_graph_stats.rs +++ b/app/buck2_client_ctx/src/subscribers/build_graph_stats.rs @@ -84,6 +84,7 @@ impl BuildGraphStats { retry_backoff: Duration::from_millis(100), retry_attempts: 2, message_batch_size: None, + thrift_timeout: Duration::from_secs(1), }, ) { tracing::info!("Sending events to Scribe: {:?}", &events); diff --git a/app/buck2_client_ctx/src/subscribers/recorder.rs b/app/buck2_client_ctx/src/subscribers/recorder.rs index 4bb85f560b67..4804c34034de 100644 --- a/app/buck2_client_ctx/src/subscribers/recorder.rs +++ b/app/buck2_client_ctx/src/subscribers/recorder.rs @@ -1758,6 +1758,7 @@ impl EventSubscriber for InvocationRecorder { retry_backoff: Duration::from_millis(500), retry_attempts: 5, message_batch_size: None, + thrift_timeout: Duration::from_secs(2), }, )? { tracing::info!("Recording invocation to Scribe: {:?}", &event); diff --git a/app/buck2_events/src/sink/remote.rs b/app/buck2_events/src/sink/remote.rs index afbb86ad3c69..119247cb9cb9 100644 --- a/app/buck2_events/src/sink/remote.rs +++ b/app/buck2_events/src/sink/remote.rs @@ -372,6 +372,7 @@ mod fbcode { pub retry_backoff: Duration, pub retry_attempts: usize, pub message_batch_size: Option, + pub thrift_timeout: Duration, } } diff --git a/app/buck2_server/src/daemon/state.rs b/app/buck2_server/src/daemon/state.rs index 12b1f9f9ffeb..9b32e8f304ae 100644 --- a/app/buck2_server/src/daemon/state.rs +++ b/app/buck2_server/src/daemon/state.rs @@ -318,6 +318,7 @@ impl DaemonState { retry_backoff, retry_attempts, message_batch_size, + thrift_timeout: Duration::from_secs(1), }, ) .buck_error_context("failed to init scribe sink")?; diff --git a/shed/scribe_client/src/producer.rs b/shed/scribe_client/src/producer.rs index 1f96c7d69611..c1a2c7e28c50 100644 --- a/shed/scribe_client/src/producer.rs +++ b/shed/scribe_client/src/producer.rs @@ -259,6 +259,7 @@ pub(crate) struct ScribeProducer { retry_attempts: usize, message_batch_size: Option, last_cutoff: Mutex>, + thrift_timeout: Duration, } pub struct ScribeConfig { @@ -266,6 +267,7 @@ pub struct ScribeConfig { pub retry_backoff: Duration, pub retry_attempts: usize, pub message_batch_size: Option, + pub thrift_timeout: Duration, } impl Default for ScribeConfig { @@ -275,13 +277,14 @@ impl Default for ScribeConfig { retry_backoff: Duration::from_millis(500), retry_attempts: 5, message_batch_size: None, + thrift_timeout: Duration::from_secs(1), } } } impl ScribeProducer { pub(crate) fn new(fb: FacebookInit, config: ScribeConfig) -> anyhow::Result { - let client = connect(fb)?; + let client = connect(fb, config.thrift_timeout)?; let queue = ArrayQueue::new(config.buffer_size); Ok(ScribeProducer { fb, @@ -292,6 +295,7 @@ impl ScribeProducer { retry_attempts: config.retry_attempts, message_batch_size: config.message_batch_size, last_cutoff: Mutex::new(None), + thrift_timeout: config.thrift_timeout, }) } @@ -358,7 +362,7 @@ impl ScribeProducer { &self, client_: &mut tokio::sync::MutexGuard<'_, ProducerServiceClient>, ) -> anyhow::Result<()> { - let new_client = connect(self.fb)?; + let new_client = connect(self.fb, self.thrift_timeout)?; **client_ = new_client; Ok(()) } @@ -564,15 +568,15 @@ impl ScribeProducer { } /// Connect to Scribe producer service (local Scribe daemon) via Thrift interface. -fn connect(fb: FacebookInit) -> anyhow::Result { +fn connect(fb: FacebookInit, timeout: Duration) -> anyhow::Result { let addr = SocketAddr::new( IpAddr::V6(Ipv6Addr::LOCALHOST), DEFAULT_PRODUCER_SERVICE_PORT as u16, ); build_ProducerService_client( ThriftChannelBuilder::from_sock_addr(fb, addr)? - .with_conn_timeout(1000) - .with_recv_timeout(1000) + .with_conn_timeout(timeout.as_millis().try_into()?) + .with_recv_timeout(timeout.as_millis().try_into()?) .with_channel_pool(false) .with_transport_type(TransportType::Header) // By default, ThriftChannelBuilder will initiate a TLS handshake with the target server. This works fine @@ -777,6 +781,7 @@ mod tests { retry_attempts: 5, message_batch_size: None, last_cutoff: Mutex::new(None), + thrift_timeout: Duration::from_millis(0), } }