Skip to content

Commit 45345e4

Browse files
fix(catalog/rest): Using async lock in token to avoid blocking runtime (#1223)
## Which issue does this PR close? We used to think it was acceptable to use a blocking lock in the token since we were not crossing an await boundary. However, our users reported that this can cause the runtime to hang completely if multiple catalog instances try to acquire the token concurrently. ## What changes are included in this PR? This PR fixed it by using an async lock instead. ## Are these changes tested? Unit tests. Signed-off-by: Xuanwo <[email protected]> Co-authored-by: Renjie Liu <[email protected]>
1 parent ca22eaa commit 45345e4

File tree

1 file changed

+5
-8
lines changed

1 file changed

+5
-8
lines changed

crates/catalog/rest/src/client.rs

+5-8
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717

1818
use std::collections::HashMap;
1919
use std::fmt::{Debug, Formatter};
20-
use std::sync::Mutex;
2120

2221
use http::StatusCode;
2322
use iceberg::{Error, ErrorKind, Result};
2423
use reqwest::header::HeaderMap;
2524
use reqwest::{Client, IntoUrl, Method, Request, RequestBuilder, Response};
2625
use serde::de::DeserializeOwned;
26+
use tokio::sync::Mutex;
2727

2828
use crate::types::{ErrorResponse, TokenResponse};
2929
use crate::RestCatalogConfig;
@@ -79,10 +79,7 @@ impl HttpClient {
7979
.unwrap_or(self.extra_headers);
8080
Ok(HttpClient {
8181
client: cfg.client().unwrap_or(self.client),
82-
token: Mutex::new(
83-
cfg.token()
84-
.or_else(|| self.token.into_inner().ok().flatten()),
85-
),
82+
token: Mutex::new(cfg.token().or_else(|| self.token.into_inner())),
8683
token_endpoint: (!cfg.get_token_endpoint().is_empty())
8784
.then(|| cfg.get_token_endpoint())
8885
.unwrap_or(self.token_endpoint),
@@ -102,7 +99,7 @@ impl HttpClient {
10299
.build()
103100
.unwrap();
104101
self.authenticate(&mut req).await.ok();
105-
self.token.lock().unwrap().clone()
102+
self.token.lock().await.clone()
106103
}
107104

108105
/// Authenticate the request by filling token.
@@ -116,7 +113,7 @@ impl HttpClient {
116113
/// Support refreshing token while needed.
117114
async fn authenticate(&self, req: &mut Request) -> Result<()> {
118115
// Clone the token from lock without holding the lock for entire function.
119-
let token = { self.token.lock().expect("lock poison").clone() };
116+
let token = self.token.lock().await.clone();
120117

121118
if self.credential.is_none() && token.is_none() {
122119
return Ok(());
@@ -197,7 +194,7 @@ impl HttpClient {
197194
}?;
198195
let token = auth_res.access_token;
199196
// Update token.
200-
*self.token.lock().expect("lock poison") = Some(token.clone());
197+
*self.token.lock().await = Some(token.clone());
201198
// Insert token in request.
202199
req.headers_mut().insert(
203200
http::header::AUTHORIZATION,

0 commit comments

Comments
 (0)