Skip to content

Commit aceef6e

Browse files
RUBY-3379 CSOT for bulk writes (#2871)
1 parent bba73e9 commit aceef6e

File tree

25 files changed

+1233
-197
lines changed

25 files changed

+1233
-197
lines changed

lib/mongo.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
require 'mongo/semaphore'
4040
require 'mongo/distinguishing_semaphore'
4141
require 'mongo/condition_variable'
42+
require 'mongo/csot_timeout_holder'
4243
require 'mongo/options'
4344
require 'mongo/loggable'
4445
require 'mongo/cluster_time'

lib/mongo/auth/aws/credentials_retriever.rb

Lines changed: 30 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,7 @@ def initialize(user = nil, credentials_cache: CredentialsCache.instance)
6969
# Retrieves a valid set of credentials, if possible, or raises
7070
# Auth::InvalidConfiguration.
7171
#
72-
# @param [ Operation::Context | nil ] context Context of the operation
73-
# credentials are retrieved for.
72+
# @param [ CsotTimeoutHolder | nil ] timeout_holder CSOT timeout, if any.
7473
#
7574
# @return [ Auth::Aws::Credentials ] A valid set of credentials.
7675
#
@@ -80,14 +79,14 @@ def initialize(user = nil, credentials_cache: CredentialsCache.instance)
8079
# retrieved from any source.
8180
# @raise Error::TimeoutError if credentials cannot be retrieved within
8281
# the timeout defined on the operation context.
83-
def credentials(context = nil)
82+
def credentials(timeout_holder = nil)
8483
credentials = credentials_from_user(user)
8584
return credentials unless credentials.nil?
8685

8786
credentials = credentials_from_environment
8887
return credentials unless credentials.nil?
8988

90-
credentials = @credentials_cache.fetch { obtain_credentials_from_endpoints(context) }
89+
credentials = @credentials_cache.fetch { obtain_credentials_from_endpoints(timeout_holder) }
9190
return credentials unless credentials.nil?
9291

9392
raise Auth::Aws::CredentialsNotFound
@@ -132,8 +131,7 @@ def credentials_from_environment
132131

133132
# Returns credentials from the AWS metadata endpoints.
134133
#
135-
# @param [ Operation::Context | nil ] context Context of the operation
136-
# credentials are retrieved for.
134+
# @param [ CsotTimeoutHolder ] timeout_holder CSOT timeout.
137135
#
138136
# @return [ Auth::Aws::Credentials | nil ] A set of credentials, or nil
139137
# if retrieval failed or the obtained credentials are invalid.
@@ -142,48 +140,47 @@ def credentials_from_environment
142140
# of credentials.
143141
# @ raise Error::TimeoutError if credentials cannot be retrieved within
144142
# the timeout defined on the operation context.
145-
def obtain_credentials_from_endpoints(context = nil)
146-
if (credentials = web_identity_credentials(context)) && credentials_valid?(credentials, 'Web identity token')
143+
def obtain_credentials_from_endpoints(timeout_holder = nil)
144+
if (credentials = web_identity_credentials(timeout_holder)) && credentials_valid?(credentials, 'Web identity token')
147145
credentials
148-
elsif (credentials = ecs_metadata_credentials(context)) && credentials_valid?(credentials, 'ECS task metadata')
146+
elsif (credentials = ecs_metadata_credentials(timeout_holder)) && credentials_valid?(credentials, 'ECS task metadata')
149147
credentials
150-
elsif (credentials = ec2_metadata_credentials(context)) && credentials_valid?(credentials, 'EC2 instance metadata')
148+
elsif (credentials = ec2_metadata_credentials(timeout_holder)) && credentials_valid?(credentials, 'EC2 instance metadata')
151149
credentials
152150
end
153151
end
154152

155153
# Returns credentials from the EC2 metadata endpoint. The credentials
156154
# could be empty, partial or invalid.
157155
#
158-
# @param [ Operation::Context | nil ] context Context of the operation
159-
# credentials are retrieved for.
156+
# @param [ CsotTimeoutHolder ] timeout_holder CSOT timeout.
160157
#
161158
# @return [ Auth::Aws::Credentials | nil ] A set of credentials, or nil
162159
# if retrieval failed.
163160
# @ raise Error::TimeoutError if credentials cannot be retrieved within
164-
# the timeout defined on the operation context.
165-
def ec2_metadata_credentials(context = nil)
166-
context&.check_timeout!
161+
# the timeout.
162+
def ec2_metadata_credentials(timeout_holder = nil)
163+
timeout_holder&.check_timeout!
167164
http = Net::HTTP.new('169.254.169.254')
168165
req = Net::HTTP::Put.new('/latest/api/token',
169166
# The TTL is required in order to obtain the metadata token.
170167
{'x-aws-ec2-metadata-token-ttl-seconds' => '30'})
171-
resp = with_timeout(context) do
168+
resp = with_timeout(timeout_holder) do
172169
http.request(req)
173170
end
174171
if resp.code != '200'
175172
return nil
176173
end
177174
metadata_token = resp.body
178-
resp = with_timeout(context) do
175+
resp = with_timeout(timeout_holder) do
179176
http_get(http, '/latest/meta-data/iam/security-credentials', metadata_token)
180177
end
181178
if resp.code != '200'
182179
return nil
183180
end
184181
role_name = resp.body
185182
escaped_role_name = CGI.escape(role_name).gsub('+', '%20')
186-
resp = with_timeout(context) do
183+
resp = with_timeout(timeout_holder) do
187184
http_get(http, "/latest/meta-data/iam/security-credentials/#{escaped_role_name}", metadata_token)
188185
end
189186
if resp.code != '200'
@@ -208,15 +205,14 @@ def ec2_metadata_credentials(context = nil)
208205
# Returns credentials from the ECS metadata endpoint. The credentials
209206
# could be empty, partial or invalid.
210207
#
211-
# @param [ Operation::Context | nil ] context Context of the operation
212-
# credentials are retrieved for.
208+
# @param [ CsotTimeoutHolder | nil ] timeout_holder CSOT timeout.
213209
#
214210
# @return [ Auth::Aws::Credentials | nil ] A set of credentials, or nil
215211
# if retrieval failed.
216212
# @ raise Error::TimeoutError if credentials cannot be retrieved within
217213
# the timeout defined on the operation context.
218-
def ecs_metadata_credentials(context = nil)
219-
context&.check_timeout!
214+
def ecs_metadata_credentials(timeout_holder = nil)
215+
timeout_holder&.check_timeout!
220216
relative_uri = ENV['AWS_CONTAINER_CREDENTIALS_RELATIVE_URI']
221217
if relative_uri.nil? || relative_uri.empty?
222218
return nil
@@ -230,7 +226,7 @@ def ecs_metadata_credentials(context = nil)
230226
# a leading slash must be added by the driver, but this is not
231227
# in fact needed.
232228
req = Net::HTTP::Get.new(relative_uri)
233-
resp = with_timeout(context) do
229+
resp = with_timeout(timeout_holder) do
234230
http.request(req)
235231
end
236232
if resp.code != '200'
@@ -252,16 +248,15 @@ def ecs_metadata_credentials(context = nil)
252248
# inside EKS. See https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html
253249
# for further details.
254250
#
255-
# @param [ Operation::Context | nil ] context Context of the operation
256-
# credentials are retrieved for.
251+
# @param [ CsotTimeoutHolder | nil ] timeout_holder CSOT timeout.
257252
#
258253
# @return [ Auth::Aws::Credentials | nil ] A set of credentials, or nil
259254
# if retrieval failed.
260-
def web_identity_credentials(context = nil)
255+
def web_identity_credentials(timeout_holder = nil)
261256
web_identity_token, role_arn, role_session_name = prepare_web_identity_inputs
262257
return nil if web_identity_token.nil?
263258
response = request_web_identity_credentials(
264-
web_identity_token, role_arn, role_session_name, context
259+
web_identity_token, role_arn, role_session_name, timeout_holder
265260
)
266261
return if response.nil?
267262
credentials_from_web_identity_response(response)
@@ -296,16 +291,15 @@ def prepare_web_identity_inputs
296291
# that the caller is assuming.
297292
# @param [ String ] role_session_name An identifier for the assumed
298293
# role session.
299-
# @param [ Operation::Context | nil ] context Context of the operation
300-
# credentials are retrieved for.
294+
# @param [ CsotTimeoutHolder | nil ] timeout_holder CSOT timeout.
301295
#
302296
# @return [ Net::HTTPResponse | nil ] AWS API response if successful,
303297
# otherwise nil.
304298
#
305299
# @ raise Error::TimeoutError if credentials cannot be retrieved within
306300
# the timeout defined on the operation context.
307-
def request_web_identity_credentials(token, role_arn, role_session_name, context)
308-
context&.check_timeout!
301+
def request_web_identity_credentials(token, role_arn, role_session_name, timeout_holder)
302+
timeout_holder&.check_timeout!
309303
uri = URI('https://sts.amazonaws.com/')
310304
params = {
311305
'Action' => 'AssumeRoleWithWebIdentity',
@@ -317,7 +311,7 @@ def request_web_identity_credentials(token, role_arn, role_session_name, context
317311
uri.query = ::URI.encode_www_form(params)
318312
req = Net::HTTP::Post.new(uri)
319313
req['Accept'] = 'application/json'
320-
resp = with_timeout(context) do
314+
resp = with_timeout(timeout_holder) do
321315
Net::HTTP.start(uri.hostname, uri.port, use_ssl: true) do |https|
322316
https.request(req)
323317
end
@@ -396,13 +390,12 @@ def credentials_valid?(credentials, source)
396390
# We use +Timeout.timeout+ here because there is no other acceptable easy
397391
# way to time limit http requests.
398392
#
399-
# @param [ Operation::Context | nil ] context Context of the operation
393+
# @param [ CsotTimeoutHolder | nil ] timeout_holder CSOT timeout.
400394
#
401395
# @ raise Error::TimeoutError if deadline exceeded.
402-
def with_timeout(context)
403-
context&.check_timeout!
404-
timeout = context&.remaining_timeout_sec || METADATA_TIMEOUT
405-
exception_class = if context&.csot?
396+
def with_timeout(timeout_holder)
397+
timeout = timeout_holder&.remaining_timeout_sec! || METADATA_TIMEOUT
398+
exception_class = if timeout_holder&.csot?
406399
Error::TimeoutError
407400
else
408401
nil

lib/mongo/collection.rb

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -441,20 +441,26 @@ def create(opts = {})
441441
# @option opts [ Hash ] :write_concern The write concern options.
442442
# @option opts [ Hash | nil ] :encrypted_fields Encrypted fields hash that
443443
# was provided to `create` collection helper.
444+
# @option opts [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
445+
# Must a positive integer. The default value is unset which means infinite.
444446
#
445447
# @return [ Result ] The result of the command.
446448
#
447449
# @since 2.0.0
448450
def drop(opts = {})
449-
client.send(:with_session, opts) do |session|
451+
client.with_session(opts) do |session|
450452
maybe_drop_emm_collections(opts[:encrypted_fields], client, session) do
451453
temp_write_concern = write_concern
452454
write_concern = if opts[:write_concern]
453455
WriteConcern.get(opts[:write_concern])
454456
else
455457
temp_write_concern
456458
end
457-
context = Operation::Context.new(client: client, session: session)
459+
context = Operation::Context.new(
460+
client: client,
461+
session: session,
462+
operation_timeouts: operation_timeouts(opts)
463+
)
458464
operation = Operation::Drop.new({
459465
selector: { :drop => name },
460466
db_name: database.name,

lib/mongo/crypt/auto_encrypter.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,26 +187,26 @@ def encrypt?
187187
# @param [ Hash ] command The command to be encrypted.
188188
#
189189
# @return [ BSON::Document ] The encrypted command.
190-
def encrypt(database_name, command, context)
190+
def encrypt(database_name, command, timeout_holder)
191191
AutoEncryptionContext.new(
192192
@crypt_handle,
193193
@encryption_io,
194194
database_name,
195195
command
196-
).run_state_machine(context)
196+
).run_state_machine(timeout_holder)
197197
end
198198

199199
# Decrypt a database command.
200200
#
201201
# @param [ Hash ] command The command with encrypted fields.
202202
#
203203
# @return [ BSON::Document ] The decrypted command.
204-
def decrypt(command, context)
204+
def decrypt(command, timeout_holder)
205205
AutoDecryptionContext.new(
206206
@crypt_handle,
207207
@encryption_io,
208208
command
209-
).run_state_machine(context)
209+
).run_state_machine(timeout_holder)
210210
end
211211

212212
# Close the resources created by the AutoEncrypter.

lib/mongo/crypt/context.rb

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ def state
6666
# Runs the mongocrypt_ctx_t state machine and handles
6767
# all I/O on behalf of
6868
#
69-
# @param [ Operation::Context ] context Context of the operation the state
70-
# machine is run for.
69+
# @param [ CsotTimeoutHolder ] timeout_holder CSOT timeouts for the
70+
# operation the state.
7171
#
7272
# @return [ BSON::Document ] A BSON document representing the outcome
7373
# of the state machine. Contents can differ depending on how the
@@ -78,10 +78,9 @@ def state
7878
#
7979
# This method is not currently unit tested. It is integration tested
8080
# in spec/integration/explicit_encryption_spec.rb
81-
def run_state_machine(context)
81+
def run_state_machine(timeout_holder)
8282
while true
83-
context.check_timeout!
84-
timeout_ms = context.remaining_timeout_ms
83+
timeout_ms = timeout_holder.remaining_timeout_ms!
8584
case state
8685
when :error
8786
Binding.check_ctx_status(self)
@@ -123,7 +122,7 @@ def run_state_machine(context)
123122
when :need_kms_credentials
124123
Binding.ctx_provide_kms_providers(
125124
self,
126-
retrieve_kms_credentials(context).to_document
125+
retrieve_kms_credentials(timeout_holder).to_document
127126
)
128127
else
129128
raise Error::CryptError.new(
@@ -152,16 +151,15 @@ def mongocrypt_feed(doc)
152151
# Retrieves KMS credentials for providers that are configured
153152
# for automatic credentials retrieval.
154153
#
155-
# @param [ Operation::Context ] context Context of the operation credentials
156-
# are retrieved for.
154+
# @param [ CsotTimeoutHolder ] timeout_holder CSOT timeout.
157155
#
158156
# @return [ Crypt::KMS::Credentials ] Credentials for the configured
159157
# KMS providers.
160-
def retrieve_kms_credentials(context)
158+
def retrieve_kms_credentials(timeout_holder)
161159
providers = {}
162160
if kms_providers.aws&.empty?
163161
begin
164-
aws_credentials = Mongo::Auth::Aws::CredentialsRetriever.new.credentials(context)
162+
aws_credentials = Mongo::Auth::Aws::CredentialsRetriever.new.credentials(timeout_holder)
165163
rescue Auth::Aws::CredentialsNotFound
166164
raise Error::CryptError.new(
167165
"Could not locate AWS credentials (checked environment variables, ECS and EC2 metadata)"
@@ -170,10 +168,10 @@ def retrieve_kms_credentials(context)
170168
providers[:aws] = aws_credentials.to_h
171169
end
172170
if kms_providers.gcp&.empty?
173-
providers[:gcp] = { access_token: gcp_access_token }
171+
providers[:gcp] = { access_token: gcp_access_token(timeout_holder) }
174172
end
175173
if kms_providers.azure&.empty?
176-
providers[:azure] = { access_token: azure_access_token }
174+
providers[:azure] = { access_token: azure_access_token(timeout_holder) }
177175
end
178176
KMS::Credentials.new(providers)
179177
end
@@ -183,8 +181,8 @@ def retrieve_kms_credentials(context)
183181
# @return [ String ] A GCP access token.
184182
#
185183
# @raise [ Error::CryptError ] If the GCP access token could not be
186-
def gcp_access_token
187-
KMS::GCP::CredentialsRetriever.fetch_access_token
184+
def gcp_access_token(timeout_holder)
185+
KMS::GCP::CredentialsRetriever.fetch_access_token(timeout_holder)
188186
rescue KMS::CredentialsNotFound => e
189187
raise Error::CryptError.new(
190188
"Could not locate GCP credentials: #{e.class}: #{e.message}"
@@ -197,9 +195,9 @@ def gcp_access_token
197195
#
198196
# @raise [ Error::CryptError ] If the Azure access token could not be
199197
# retrieved.
200-
def azure_access_token
198+
def azure_access_token(timeout_holder)
201199
if @cached_azure_token.nil? || @cached_azure_token.expired?
202-
@cached_azure_token = KMS::Azure::CredentialsRetriever.fetch_access_token
200+
@cached_azure_token = KMS::Azure::CredentialsRetriever.fetch_access_token(timeout_holder: timeout_holder)
203201
end
204202
@cached_azure_token.access_token
205203
rescue KMS::CredentialsNotFound => e

0 commit comments

Comments
 (0)