@@ -4,33 +4,33 @@ use std::{
4
4
path:: { Path , PathBuf } ,
5
5
} ;
6
6
7
- use bson:: { Document , RawDocument , RawDocumentBuf } ;
7
+ use bson:: { rawdoc , Document , RawDocument , RawDocumentBuf } ;
8
8
use futures_util:: { stream, TryStreamExt } ;
9
- use mongocrypt:: ctx:: { Ctx , State } ;
9
+ use mongocrypt:: ctx:: { Ctx , KmsProvider , State } ;
10
10
use rayon:: ThreadPool ;
11
11
use tokio:: {
12
12
io:: { AsyncReadExt , AsyncWriteExt } ,
13
13
sync:: { oneshot, Mutex } ,
14
14
} ;
15
15
16
16
use crate :: {
17
- client:: { options:: ServerAddress , WeakClient } ,
17
+ client:: { auth :: Credential , options:: ServerAddress , WeakClient } ,
18
18
coll:: options:: FindOptions ,
19
19
error:: { Error , Result } ,
20
20
operation:: { RawOutput , RunCommand } ,
21
21
options:: ReadConcern ,
22
- runtime:: { AsyncStream , Process , TlsConfig } ,
22
+ runtime:: { AsyncStream , HttpClient , Process , TlsConfig } ,
23
23
Client ,
24
24
Namespace ,
25
25
} ;
26
26
27
- use super :: options:: KmsProvidersTlsOptions ;
27
+ use super :: options:: KmsProviders ;
28
28
29
29
#[ derive( Debug ) ]
30
30
pub ( crate ) struct CryptExecutor {
31
31
key_vault_client : WeakClient ,
32
32
key_vault_namespace : Namespace ,
33
- tls_options : Option < KmsProvidersTlsOptions > ,
33
+ kms_providers : KmsProviders ,
34
34
crypto_threads : ThreadPool ,
35
35
mongocryptd : Option < Mongocryptd > ,
36
36
mongocryptd_client : Option < Client > ,
@@ -41,7 +41,7 @@ impl CryptExecutor {
41
41
pub ( crate ) fn new_explicit (
42
42
key_vault_client : WeakClient ,
43
43
key_vault_namespace : Namespace ,
44
- tls_options : Option < KmsProvidersTlsOptions > ,
44
+ kms_providers : KmsProviders ,
45
45
) -> Result < Self > {
46
46
// TODO RUST-1492: Replace num_cpus with std::thread::available_parallelism.
47
47
let crypto_threads = rayon:: ThreadPoolBuilder :: new ( )
@@ -51,7 +51,7 @@ impl CryptExecutor {
51
51
Ok ( Self {
52
52
key_vault_client,
53
53
key_vault_namespace,
54
- tls_options ,
54
+ kms_providers ,
55
55
crypto_threads,
56
56
mongocryptd : None ,
57
57
mongocryptd_client : None ,
@@ -62,7 +62,7 @@ impl CryptExecutor {
62
62
pub ( crate ) async fn new_implicit (
63
63
key_vault_client : WeakClient ,
64
64
key_vault_namespace : Namespace ,
65
- tls_options : Option < KmsProvidersTlsOptions > ,
65
+ kms_providers : KmsProviders ,
66
66
mongocryptd_opts : Option < MongocryptdOptions > ,
67
67
mongocryptd_client : Option < Client > ,
68
68
metadata_client : Option < WeakClient > ,
@@ -71,7 +71,7 @@ impl CryptExecutor {
71
71
Some ( opts) => Some ( Mongocryptd :: new ( opts) . await ?) ,
72
72
None => None ,
73
73
} ;
74
- let mut exec = Self :: new_explicit ( key_vault_client, key_vault_namespace, tls_options ) ?;
74
+ let mut exec = Self :: new_explicit ( key_vault_client, key_vault_namespace, kms_providers ) ?;
75
75
exec. mongocryptd = mongocryptd;
76
76
exec. mongocryptd_client = mongocryptd_client;
77
77
exec. metadata_client = metadata_client;
@@ -185,8 +185,8 @@ impl CryptExecutor {
185
185
let addr = ServerAddress :: parse ( endpoint) ?;
186
186
let provider = kms_ctx. kms_provider ( ) ?;
187
187
let tls_options = self
188
- . tls_options
189
- . as_ref ( )
188
+ . kms_providers
189
+ . tls_options ( )
190
190
. and_then ( |tls| tls. get ( & provider) )
191
191
. cloned ( )
192
192
. unwrap_or_default ( ) ;
@@ -208,8 +208,38 @@ impl CryptExecutor {
208
208
. await ?;
209
209
}
210
210
State :: NeedKmsCredentials => {
211
- // TODO(RUST-1314, RUST-1417): support fetching KMS credentials.
212
- return Err ( Error :: internal ( "KMS credentials are not yet supported" ) ) ;
211
+ let ctx = result_mut ( & mut ctx) ?;
212
+ let mut out = rawdoc ! { } ;
213
+ if self
214
+ . kms_providers
215
+ . credentials ( )
216
+ . get ( & KmsProvider :: Aws )
217
+ . map_or ( false , |d| d. is_empty ( ) )
218
+ {
219
+ #[ cfg( feature = "aws-auth" ) ]
220
+ {
221
+ let aws_creds = crate :: client:: auth:: aws:: AwsCredential :: get (
222
+ & Credential :: default ( ) ,
223
+ & HttpClient :: default ( ) ,
224
+ )
225
+ . await ?;
226
+ let mut creds = rawdoc ! {
227
+ "accessKeyId" : aws_creds. access_key( ) ,
228
+ "secretAccessKey" : aws_creds. secret_key( ) ,
229
+ } ;
230
+ if let Some ( token) = aws_creds. session_token ( ) {
231
+ creds. append ( "sessionToken" , token) ;
232
+ }
233
+ out. append ( "aws" , creds) ;
234
+ }
235
+ #[ cfg( not( feature = "aws-auth" ) ) ]
236
+ {
237
+ return Err ( Error :: invalid_argument (
238
+ "On-demand AWS KMS credentials require the `aws-auth` feature." ,
239
+ ) ) ;
240
+ }
241
+ }
242
+ ctx. provide_kms_providers ( & out) ?;
213
243
}
214
244
State :: Ready => {
215
245
let ( tx, rx) = oneshot:: channel ( ) ;
0 commit comments