Skip to content

Commit 74b5f60

Browse files
authored
feat: Allow reuse http client in rest catalog (#1221)
## Which issue does this PR close? Creating a reqwest client is not without cost; it involves DNS caching, HTTP connection pooling, and TLS certificate loading. The previous design also made it difficult for users to configure the HTTP client, such as setting custom timeout values or using a globally shared DNS resolver. This PR addressed this by allowing users to create the client and pass to rest catalog as config. ## What changes are included in this PR? Allow users to pass a `reqwest::Client` as config. ## Are these changes tested? Unit tests Signed-off-by: Xuanwo <[email protected]>
1 parent bc92f27 commit 74b5f60

File tree

2 files changed

+22
-11
lines changed

2 files changed

+22
-11
lines changed

crates/catalog/rest/src/catalog.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use itertools::Itertools;
3131
use reqwest::header::{
3232
HeaderMap, HeaderName, HeaderValue, {self},
3333
};
34-
use reqwest::{Method, StatusCode, Url};
34+
use reqwest::{Client, Method, StatusCode, Url};
3535
use tokio::sync::OnceCell;
3636
use typed_builder::TypedBuilder;
3737

@@ -58,6 +58,9 @@ pub struct RestCatalogConfig {
5858

5959
#[builder(default)]
6060
props: HashMap<String, String>,
61+
62+
#[builder(default)]
63+
client: Option<Client>,
6164
}
6265

6366
impl RestCatalogConfig {
@@ -106,6 +109,11 @@ impl RestCatalogConfig {
106109
])
107110
}
108111

112+
/// Get the client from the config.
113+
pub(crate) fn client(&self) -> Option<Client> {
114+
self.client.clone()
115+
}
116+
109117
/// Get the token from the config.
110118
///
111119
/// The client can use this token to send requests.

crates/catalog/rest/src/client.rs

+13-10
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,7 @@ impl HttpClient {
5959
pub fn new(cfg: &RestCatalogConfig) -> Result<Self> {
6060
let extra_headers = cfg.extra_headers()?;
6161
Ok(HttpClient {
62-
client: Client::builder()
63-
.default_headers(extra_headers.clone())
64-
.build()?,
62+
client: cfg.client().unwrap_or_default(),
6563
token: Mutex::new(cfg.token()),
6664
token_endpoint: cfg.get_token_endpoint(),
6765
credential: cfg.credential(),
@@ -80,9 +78,7 @@ impl HttpClient {
8078
.transpose()?
8179
.unwrap_or(self.extra_headers);
8280
Ok(HttpClient {
83-
client: Client::builder()
84-
.default_headers(extra_headers.clone())
85-
.build()?,
81+
client: cfg.client().unwrap_or(self.client),
8682
token: Mutex::new(
8783
cfg.token()
8884
.or_else(|| self.token.into_inner().ok().flatten()),
@@ -162,12 +158,11 @@ impl HttpClient {
162158
);
163159

164160
let auth_req = self
165-
.client
166161
.request(Method::POST, &self.token_endpoint)
167162
.form(&params)
168163
.build()?;
169164
let auth_url = auth_req.url().clone();
170-
let auth_resp = self.client.execute(auth_req).await?;
165+
let auth_resp = self.execute(auth_req).await?;
171166

172167
let auth_res: TokenResponse = if auth_resp.status() == StatusCode::OK {
173168
let text = auth_resp
@@ -220,14 +215,22 @@ impl HttpClient {
220215

221216
#[inline]
222217
pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
223-
self.client.request(method, url)
218+
self.client
219+
.request(method, url)
220+
.headers(self.extra_headers.clone())
221+
}
222+
223+
/// Executes the given `Request` and returns a `Response`.
224+
pub async fn execute(&self, mut request: Request) -> Result<Response> {
225+
request.headers_mut().extend(self.extra_headers.clone());
226+
Ok(self.client.execute(request).await?)
224227
}
225228

226229
// Queries the Iceberg REST catalog after authentication with the given `Request` and
227230
// returns a `Response`.
228231
pub async fn query_catalog(&self, mut request: Request) -> Result<Response> {
229232
self.authenticate(&mut request).await?;
230-
Ok(self.client.execute(request).await?)
233+
self.execute(request).await
231234
}
232235
}
233236

0 commit comments

Comments
 (0)