Skip to content

Commit

Permalink
scribe_client: Increase thrift timeout for invocation recorder upload
Browse files Browse the repository at this point in the history
Summary:
Most instances of 'command_report_finalizing_errors' being logged to buck_results are thrift timeouts (`apache::thrift::transport::TTransportException: Timed Out`) when uploading the invocation record.

It's possible congestion from other events (actions etc.) are contributing and it may be possible to prioritize these messages somehow, but for a start let's just increase the thrift timeouts used when uploading the invocation record, from 1s to 2s.

Reviewed By: iguridi

Differential Revision: D69882960

fbshipit-source-id: ded65b486470f7ad9059ccd61667c2c95e1324b4
  • Loading branch information
christolliday authored and facebook-github-bot committed Feb 20, 2025
1 parent b8c5f11 commit 5ec33a5
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 5 deletions.
1 change: 1 addition & 0 deletions app/buck2_client_ctx/src/subscribers/build_graph_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl BuildGraphStats {
retry_backoff: Duration::from_millis(100),
retry_attempts: 2,
message_batch_size: None,
thrift_timeout: Duration::from_secs(1),
},
) {
tracing::info!("Sending events to Scribe: {:?}", &events);
Expand Down
1 change: 1 addition & 0 deletions app/buck2_client_ctx/src/subscribers/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1758,6 +1758,7 @@ impl EventSubscriber for InvocationRecorder {
retry_backoff: Duration::from_millis(500),
retry_attempts: 5,
message_batch_size: None,
thrift_timeout: Duration::from_secs(2),
},
)? {
tracing::info!("Recording invocation to Scribe: {:?}", &event);
Expand Down
1 change: 1 addition & 0 deletions app/buck2_events/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ mod fbcode {
pub retry_backoff: Duration,
pub retry_attempts: usize,
pub message_batch_size: Option<usize>,
pub thrift_timeout: Duration,
}
}

Expand Down
1 change: 1 addition & 0 deletions app/buck2_server/src/daemon/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl DaemonState {
retry_backoff,
retry_attempts,
message_batch_size,
thrift_timeout: Duration::from_secs(1),
},
)
.buck_error_context("failed to init scribe sink")?;
Expand Down
15 changes: 10 additions & 5 deletions shed/scribe_client/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,15 @@ pub(crate) struct ScribeProducer {
retry_attempts: usize,
message_batch_size: Option<usize>,
last_cutoff: Mutex<Option<usize>>,
thrift_timeout: Duration,
}

pub struct ScribeConfig {
pub buffer_size: usize,
pub retry_backoff: Duration,
pub retry_attempts: usize,
pub message_batch_size: Option<usize>,
pub thrift_timeout: Duration,
}

impl Default for ScribeConfig {
Expand All @@ -275,13 +277,14 @@ impl Default for ScribeConfig {
retry_backoff: Duration::from_millis(500),
retry_attempts: 5,
message_batch_size: None,
thrift_timeout: Duration::from_secs(1),
}
}
}

impl ScribeProducer {
pub(crate) fn new(fb: FacebookInit, config: ScribeConfig) -> anyhow::Result<ScribeProducer> {
let client = connect(fb)?;
let client = connect(fb, config.thrift_timeout)?;
let queue = ArrayQueue::new(config.buffer_size);
Ok(ScribeProducer {
fb,
Expand All @@ -292,6 +295,7 @@ impl ScribeProducer {
retry_attempts: config.retry_attempts,
message_batch_size: config.message_batch_size,
last_cutoff: Mutex::new(None),
thrift_timeout: config.thrift_timeout,
})
}

Expand Down Expand Up @@ -358,7 +362,7 @@ impl ScribeProducer {
&self,
client_: &mut tokio::sync::MutexGuard<'_, ProducerServiceClient>,
) -> anyhow::Result<()> {
let new_client = connect(self.fb)?;
let new_client = connect(self.fb, self.thrift_timeout)?;
**client_ = new_client;
Ok(())
}
Expand Down Expand Up @@ -564,15 +568,15 @@ impl ScribeProducer {
}

/// Connect to Scribe producer service (local Scribe daemon) via Thrift interface.
fn connect(fb: FacebookInit) -> anyhow::Result<ProducerServiceClient> {
fn connect(fb: FacebookInit, timeout: Duration) -> anyhow::Result<ProducerServiceClient> {
let addr = SocketAddr::new(
IpAddr::V6(Ipv6Addr::LOCALHOST),
DEFAULT_PRODUCER_SERVICE_PORT as u16,
);
build_ProducerService_client(
ThriftChannelBuilder::from_sock_addr(fb, addr)?
.with_conn_timeout(1000)
.with_recv_timeout(1000)
.with_conn_timeout(timeout.as_millis().try_into()?)
.with_recv_timeout(timeout.as_millis().try_into()?)
.with_channel_pool(false)
.with_transport_type(TransportType::Header)
// By default, ThriftChannelBuilder will initiate a TLS handshake with the target server. This works fine
Expand Down Expand Up @@ -777,6 +781,7 @@ mod tests {
retry_attempts: 5,
message_batch_size: None,
last_cutoff: Mutex::new(None),
thrift_timeout: Duration::from_millis(0),
}
}

Expand Down

0 comments on commit 5ec33a5

Please sign in to comment.