Skip to content

Commit a961483

Browse files
authored
feat(iceberg-catalog-rest): expose invalidate_token, regenerate_token APIs (#1465)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Partially implements #437 ## What changes are included in this PR? <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> Per discussion here: #437 (comment) This branch exposes two public methods on the rest catalog for managing tokens: `invalidate_token` and `regenerate_token`. - `invalidate_token`: takes the token lock and sets the token to `None`, forcing re-auth on the next request - `regenerate_token`: calls the oauth endpoint to generate a new token. if successful, takes the token lock and writes the new token. ## Are these changes tested? <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes -- I've added new unit tests to cover both methods.
1 parent ed12ac4 commit a961483

File tree

2 files changed

+214
-44
lines changed

2 files changed

+214
-44
lines changed

crates/catalog/rest/src/catalog.rs

Lines changed: 158 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,22 @@ impl RestCatalog {
318318

319319
Ok(file_io)
320320
}
321+
322+
/// Invalidate the current token without generating a new one. On the next request, the client
323+
/// will attempt to generate a new token.
324+
pub async fn invalidate_token(&self) -> Result<()> {
325+
self.context().await?.client.invalidate_token().await
326+
}
327+
328+
/// Invalidate the current token and set a new one. Generates a new token before invalidating
329+
/// the current token, meaning the old token will be used until this function acquires the lock
330+
/// and overwrites the token.
331+
///
332+
/// If credential is invalid, or the request fails, this method will return an error and leave
333+
/// the current token unchanged.
334+
pub async fn regenerate_token(&self) -> Result<()> {
335+
self.context().await?.client.regenerate_token().await
336+
}
321337
}
322338

323339
/// All requests and expected responses are derived from the REST catalog API spec:
@@ -860,21 +876,27 @@ mod tests {
860876
}
861877

862878
async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
863-
create_oauth_mock_with_path(server, "/v1/oauth/tokens").await
879+
create_oauth_mock_with_path(server, "/v1/oauth/tokens", "ey000000000000", 200).await
864880
}
865881

866-
async fn create_oauth_mock_with_path(server: &mut ServerGuard, path: &str) -> Mock {
867-
server
868-
.mock("POST", path)
869-
.with_status(200)
870-
.with_body(
871-
r#"{
872-
"access_token": "ey000000000000",
882+
async fn create_oauth_mock_with_path(
883+
server: &mut ServerGuard,
884+
path: &str,
885+
token: &str,
886+
status: usize,
887+
) -> Mock {
888+
let body = format!(
889+
r#"{{
890+
"access_token": "{token}",
873891
"token_type": "Bearer",
874892
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
875893
"expires_in": 86400
876-
}"#,
877-
)
894+
}}"#
895+
);
896+
server
897+
.mock("POST", path)
898+
.with_status(status)
899+
.with_body(body)
878900
.expect(1)
879901
.create_async()
880902
.await
@@ -949,6 +971,129 @@ mod tests {
949971
assert_eq!(token, Some("ey000000000000".to_string()));
950972
}
951973

974+
#[tokio::test]
975+
async fn test_invalidate_token() {
976+
let mut server = Server::new_async().await;
977+
let oauth_mock = create_oauth_mock(&mut server).await;
978+
let config_mock = create_config_mock(&mut server).await;
979+
980+
let mut props = HashMap::new();
981+
props.insert("credential".to_string(), "client1:secret1".to_string());
982+
983+
let catalog = RestCatalog::new(
984+
RestCatalogConfig::builder()
985+
.uri(server.url())
986+
.props(props)
987+
.build(),
988+
);
989+
990+
let token = catalog.context().await.unwrap().client.token().await;
991+
oauth_mock.assert_async().await;
992+
config_mock.assert_async().await;
993+
assert_eq!(token, Some("ey000000000000".to_string()));
994+
995+
let oauth_mock =
996+
create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
997+
.await;
998+
catalog.invalidate_token().await.unwrap();
999+
let token = catalog.context().await.unwrap().client.token().await;
1000+
oauth_mock.assert_async().await;
1001+
assert_eq!(token, Some("ey000000000001".to_string()));
1002+
}
1003+
1004+
#[tokio::test]
1005+
async fn test_invalidate_token_failing_request() {
1006+
let mut server = Server::new_async().await;
1007+
let oauth_mock = create_oauth_mock(&mut server).await;
1008+
let config_mock = create_config_mock(&mut server).await;
1009+
1010+
let mut props = HashMap::new();
1011+
props.insert("credential".to_string(), "client1:secret1".to_string());
1012+
1013+
let catalog = RestCatalog::new(
1014+
RestCatalogConfig::builder()
1015+
.uri(server.url())
1016+
.props(props)
1017+
.build(),
1018+
);
1019+
1020+
let token = catalog.context().await.unwrap().client.token().await;
1021+
oauth_mock.assert_async().await;
1022+
config_mock.assert_async().await;
1023+
assert_eq!(token, Some("ey000000000000".to_string()));
1024+
1025+
let oauth_mock =
1026+
create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1027+
.await;
1028+
catalog.invalidate_token().await.unwrap();
1029+
let token = catalog.context().await.unwrap().client.token().await;
1030+
oauth_mock.assert_async().await;
1031+
assert_eq!(token, None);
1032+
}
1033+
1034+
#[tokio::test]
1035+
async fn test_regenerate_token() {
1036+
let mut server = Server::new_async().await;
1037+
let oauth_mock = create_oauth_mock(&mut server).await;
1038+
let config_mock = create_config_mock(&mut server).await;
1039+
1040+
let mut props = HashMap::new();
1041+
props.insert("credential".to_string(), "client1:secret1".to_string());
1042+
1043+
let catalog = RestCatalog::new(
1044+
RestCatalogConfig::builder()
1045+
.uri(server.url())
1046+
.props(props)
1047+
.build(),
1048+
);
1049+
1050+
let token = catalog.context().await.unwrap().client.token().await;
1051+
oauth_mock.assert_async().await;
1052+
config_mock.assert_async().await;
1053+
assert_eq!(token, Some("ey000000000000".to_string()));
1054+
1055+
let oauth_mock =
1056+
create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1057+
.await;
1058+
catalog.regenerate_token().await.unwrap();
1059+
oauth_mock.assert_async().await;
1060+
let token = catalog.context().await.unwrap().client.token().await;
1061+
assert_eq!(token, Some("ey000000000001".to_string()));
1062+
}
1063+
1064+
#[tokio::test]
1065+
async fn test_regenerate_token_failing_request() {
1066+
let mut server = Server::new_async().await;
1067+
let oauth_mock = create_oauth_mock(&mut server).await;
1068+
let config_mock = create_config_mock(&mut server).await;
1069+
1070+
let mut props = HashMap::new();
1071+
props.insert("credential".to_string(), "client1:secret1".to_string());
1072+
1073+
let catalog = RestCatalog::new(
1074+
RestCatalogConfig::builder()
1075+
.uri(server.url())
1076+
.props(props)
1077+
.build(),
1078+
);
1079+
1080+
let token = catalog.context().await.unwrap().client.token().await;
1081+
oauth_mock.assert_async().await;
1082+
config_mock.assert_async().await;
1083+
assert_eq!(token, Some("ey000000000000".to_string()));
1084+
1085+
let oauth_mock =
1086+
create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1087+
.await;
1088+
let invalidate_result = catalog.regenerate_token().await;
1089+
assert!(invalidate_result.is_err());
1090+
oauth_mock.assert_async().await;
1091+
let token = catalog.context().await.unwrap().client.token().await;
1092+
1093+
// original token is left intact
1094+
assert_eq!(token, Some("ey000000000000".to_string()));
1095+
}
1096+
9521097
#[tokio::test]
9531098
async fn test_http_headers() {
9541099
let server = Server::new_async().await;
@@ -1026,7 +1171,9 @@ mod tests {
10261171

10271172
let mut auth_server = Server::new_async().await;
10281173
let auth_server_path = "/some/path";
1029-
let oauth_mock = create_oauth_mock_with_path(&mut auth_server, auth_server_path).await;
1174+
let oauth_mock =
1175+
create_oauth_mock_with_path(&mut auth_server, auth_server_path, "ey000000000000", 200)
1176+
.await;
10301177

10311178
let mut props = HashMap::new();
10321179
props.insert("credential".to_string(), "client1:secret1".to_string());

crates/catalog/rest/src/client.rs

Lines changed: 56 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -106,38 +106,7 @@ impl HttpClient {
106106
self.token.lock().await.clone()
107107
}
108108

109-
/// Authenticate the request by filling token.
110-
///
111-
/// - If neither token nor credential is provided, this method will do nothing.
112-
/// - If only credential is provided, this method will try to fetch token from the server.
113-
/// - If token is provided, this method will use the token directly.
114-
///
115-
/// # TODO
116-
///
117-
/// Support refreshing token while needed.
118-
async fn authenticate(&self, req: &mut Request) -> Result<()> {
119-
// Clone the token from lock without holding the lock for entire function.
120-
let token = self.token.lock().await.clone();
121-
122-
if self.credential.is_none() && token.is_none() {
123-
return Ok(());
124-
}
125-
126-
// Use token if provided.
127-
if let Some(token) = &token {
128-
req.headers_mut().insert(
129-
http::header::AUTHORIZATION,
130-
format!("Bearer {token}").parse().map_err(|e| {
131-
Error::new(
132-
ErrorKind::DataInvalid,
133-
"Invalid token received from catalog server!",
134-
)
135-
.with_source(e)
136-
})?,
137-
);
138-
return Ok(());
139-
}
140-
109+
async fn exchange_credential_for_token(&self) -> Result<String> {
141110
// Credential must exist here.
142111
let (client_id, client_secret) = self.credential.as_ref().ok_or_else(|| {
143112
Error::new(
@@ -202,7 +171,61 @@ impl HttpClient {
202171
})?;
203172
Err(Error::from(e))
204173
}?;
205-
let token = auth_res.access_token;
174+
Ok(auth_res.access_token)
175+
}
176+
177+
/// Invalidate the current token without generating a new one. On the next request, the client
178+
/// will attempt to generate a new token.
179+
pub(crate) async fn invalidate_token(&self) -> Result<()> {
180+
*self.token.lock().await = None;
181+
Ok(())
182+
}
183+
184+
/// Invalidate the current token and set a new one. Generates a new token before invalidating
185+
/// the current token, meaning the old token will be used until this function acquires the lock
186+
/// and overwrites the token.
187+
///
188+
/// If credential is invalid, or the request fails, this method will return an error and leave
189+
/// the current token unchanged.
190+
pub(crate) async fn regenerate_token(&self) -> Result<()> {
191+
let new_token = self.exchange_credential_for_token().await?;
192+
*self.token.lock().await = Some(new_token.clone());
193+
Ok(())
194+
}
195+
196+
/// Authenticate the request by filling token.
197+
///
198+
/// - If neither token nor credential is provided, this method will do nothing.
199+
/// - If only credential is provided, this method will try to fetch token from the server.
200+
/// - If token is provided, this method will use the token directly.
201+
///
202+
/// # TODO
203+
///
204+
/// Support refreshing token while needed.
205+
async fn authenticate(&self, req: &mut Request) -> Result<()> {
206+
// Clone the token from lock without holding the lock for entire function.
207+
let token = self.token.lock().await.clone();
208+
209+
if self.credential.is_none() && token.is_none() {
210+
return Ok(());
211+
}
212+
213+
// Use token if provided.
214+
if let Some(token) = &token {
215+
req.headers_mut().insert(
216+
http::header::AUTHORIZATION,
217+
format!("Bearer {token}").parse().map_err(|e| {
218+
Error::new(
219+
ErrorKind::DataInvalid,
220+
"Invalid token received from catalog server!",
221+
)
222+
.with_source(e)
223+
})?,
224+
);
225+
return Ok(());
226+
}
227+
228+
let token = self.exchange_credential_for_token().await?;
206229
// Update token.
207230
*self.token.lock().await = Some(token.clone());
208231
// Insert token in request.

0 commit comments

Comments
 (0)