Skip to content

Commit 50c75de

Browse files
committed
post-rebase cleanup
1 parent ccb111c commit 50c75de

File tree

4 files changed

+147
-120
lines changed

4 files changed

+147
-120
lines changed

src/client/executor.rs

+143-120
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use crate::{
2929
wire::{next_request_id, Message},
3030
PinnedConnectionHandle,
3131
},
32-
Command,
3332
ConnectionPool,
3433
RawCommandResponse,
3534
StreamDescription,
@@ -310,7 +309,6 @@ impl Client {
310309
let mut retry: Option<ExecutionRetry> = None;
311310
let mut implicit_session: Option<ClientSession> = None;
312311
loop {
313-
314312
if retry.is_some() {
315313
op.update_for_retry();
316314
}
@@ -395,7 +393,14 @@ impl Client {
395393
};
396394

397395
let details = match self
398-
.execute_command_on_connection(cmd, op, &mut conn, &mut session, retryability)
396+
.execute_operation_on_connection(
397+
op,
398+
&mut conn,
399+
&mut session,
400+
txn_number,
401+
retryability,
402+
effective_criteria,
403+
)
399404
.await
400405
{
401406
Ok(output) => ExecutionDetails {
@@ -468,127 +473,21 @@ impl Client {
468473
session: &mut Option<&mut ClientSession>,
469474
txn_number: Option<i64>,
470475
retryability: Retryability,
476+
effective_criteria: SelectionCriteria,
471477
) -> Result<T::O> {
472478
loop {
473-
let stream_description = connection.stream_description()?;
474-
let is_sharded = stream_description.initial_server_type == ServerType::Mongos;
475-
let mut cmd = op.build(stream_description)?;
476-
self.inner.topology.update_command_with_read_pref(
477-
connection.address(),
478-
&mut cmd,
479-
op.selection_criteria(),
480-
);
481-
482-
match session {
483-
Some(ref mut session) if op.supports_sessions() && op.is_acknowledged() => {
484-
cmd.set_session(session);
485-
if let Some(txn_number) = txn_number {
486-
cmd.set_txn_number(txn_number);
487-
}
488-
if session
489-
.options()
490-
.and_then(|opts| opts.snapshot)
491-
.unwrap_or(false)
492-
{
493-
if connection
494-
.stream_description()?
495-
.max_wire_version
496-
.unwrap_or(0)
497-
< 13
498-
{
499-
let labels: Option<Vec<_>> = None;
500-
return Err(Error::new(
501-
ErrorKind::IncompatibleServer {
502-
message: "Snapshot reads require MongoDB 5.0 or later".into(),
503-
},
504-
labels,
505-
));
506-
}
507-
cmd.set_snapshot_read_concern(session);
508-
}
509-
// If this is a causally consistent session, set `readConcern.afterClusterTime`.
510-
// Causal consistency defaults to true, unless snapshot is true.
511-
else if session.causal_consistency()
512-
&& matches!(
513-
session.transaction.state,
514-
TransactionState::None | TransactionState::Starting
515-
)
516-
&& op.supports_read_concern(stream_description)
517-
{
518-
cmd.set_after_cluster_time(session);
519-
}
520-
521-
match session.transaction.state {
522-
TransactionState::Starting => {
523-
cmd.set_start_transaction();
524-
cmd.set_autocommit();
525-
if session.causal_consistency() {
526-
cmd.set_after_cluster_time(session);
527-
}
528-
529-
if let Some(ref options) = session.transaction.options {
530-
if let Some(ref read_concern) = options.read_concern {
531-
cmd.set_read_concern_level(read_concern.level.clone());
532-
}
533-
}
534-
if self.is_load_balanced() {
535-
session.pin_connection(connection.pin()?);
536-
} else if is_sharded {
537-
session.pin_mongos(connection.address().clone());
538-
}
539-
session.transaction.state = TransactionState::InProgress;
540-
}
541-
TransactionState::InProgress => cmd.set_autocommit(),
542-
TransactionState::Committed { .. } | TransactionState::Aborted => {
543-
cmd.set_autocommit();
544-
545-
// Append the recovery token to the command if we are committing or
546-
// aborting on a sharded transaction.
547-
if is_sharded {
548-
if let Some(ref recovery_token) = session.transaction.recovery_token
549-
{
550-
cmd.set_recovery_token(recovery_token);
551-
}
552-
}
553-
}
554-
_ => {}
555-
}
556-
session.update_last_use();
557-
}
558-
Some(ref session) if !op.supports_sessions() && !session.is_implicit() => {
559-
return Err(ErrorKind::InvalidArgument {
560-
message: format!("{} does not support sessions", cmd.name),
561-
}
562-
.into());
563-
}
564-
Some(ref session) if !op.is_acknowledged() && !session.is_implicit() => {
565-
return Err(ErrorKind::InvalidArgument {
566-
message: "Cannot use ClientSessions with unacknowledged write concern"
567-
.to_string(),
568-
}
569-
.into());
570-
}
571-
_ => {}
572-
}
573-
574-
let session_cluster_time = session.as_ref().and_then(|session| session.cluster_time());
575-
let client_cluster_time = self.inner.topology.cluster_time();
576-
let max_cluster_time =
577-
std::cmp::max(session_cluster_time, client_cluster_time.as_ref());
578-
if let Some(cluster_time) = max_cluster_time {
579-
cmd.set_cluster_time(cluster_time);
580-
}
479+
let cmd = self.build_command(
480+
op,
481+
connection,
482+
session,
483+
txn_number,
484+
effective_criteria.clone(),
485+
)?;
581486

582487
let connection_info = connection.info();
583488
let service_id = connection.service_id();
584489
let request_id = next_request_id();
585-
586-
if let Some(ref server_api) = self.inner.options.server_api {
587-
cmd.set_server_api(server_api);
588-
}
589-
590490
let should_redact = cmd.should_redact();
591-
592491
let cmd_name = cmd.name.clone();
593492
let target_db = cmd.target_db.clone();
594493

@@ -627,8 +526,9 @@ impl Client {
627526
let start_time = Instant::now();
628527
let command_result = match connection.send_message(message).await {
629528
Ok(response) => {
630-
self.handle_response(op, session, is_sharded, response)
631-
.await
529+
let is_sharded =
530+
connection.stream_description()?.initial_server_type == ServerType::Mongos;
531+
self.parse_response(op, session, is_sharded, response).await
632532
}
633533
Err(err) => Err(err),
634534
};
@@ -703,6 +603,7 @@ impl Client {
703603
let context = ExecutionContext {
704604
connection,
705605
session: session.as_deref_mut(),
606+
effective_criteria: effective_criteria.clone(),
706607
};
707608

708609
match op.handle_response(response, context).await {
@@ -734,6 +635,128 @@ impl Client {
734635
}
735636
}
736637

638+
fn build_command<T: Operation>(
639+
&self,
640+
op: &mut T,
641+
connection: &mut PooledConnection,
642+
session: &mut Option<&mut ClientSession>,
643+
txn_number: Option<i64>,
644+
effective_criteria: SelectionCriteria,
645+
) -> Result<crate::cmap::Command> {
646+
let stream_description = connection.stream_description()?;
647+
let is_sharded = stream_description.initial_server_type == ServerType::Mongos;
648+
let mut cmd = op.build(stream_description)?;
649+
self.inner.topology.update_command_with_read_pref(
650+
connection.address(),
651+
&mut cmd,
652+
&effective_criteria,
653+
);
654+
655+
match session {
656+
Some(ref mut session) if op.supports_sessions() && op.is_acknowledged() => {
657+
cmd.set_session(session);
658+
if let Some(txn_number) = txn_number {
659+
cmd.set_txn_number(txn_number);
660+
}
661+
if session
662+
.options()
663+
.and_then(|opts| opts.snapshot)
664+
.unwrap_or(false)
665+
{
666+
if connection
667+
.stream_description()?
668+
.max_wire_version
669+
.unwrap_or(0)
670+
< 13
671+
{
672+
let labels: Option<Vec<_>> = None;
673+
return Err(Error::new(
674+
ErrorKind::IncompatibleServer {
675+
message: "Snapshot reads require MongoDB 5.0 or later".into(),
676+
},
677+
labels,
678+
));
679+
}
680+
cmd.set_snapshot_read_concern(session);
681+
}
682+
// If this is a causally consistent session, set `readConcern.afterClusterTime`.
683+
// Causal consistency defaults to true, unless snapshot is true.
684+
else if session.causal_consistency()
685+
&& matches!(
686+
session.transaction.state,
687+
TransactionState::None | TransactionState::Starting
688+
)
689+
&& op.supports_read_concern(stream_description)
690+
{
691+
cmd.set_after_cluster_time(session);
692+
}
693+
694+
match session.transaction.state {
695+
TransactionState::Starting => {
696+
cmd.set_start_transaction();
697+
cmd.set_autocommit();
698+
if session.causal_consistency() {
699+
cmd.set_after_cluster_time(session);
700+
}
701+
702+
if let Some(ref options) = session.transaction.options {
703+
if let Some(ref read_concern) = options.read_concern {
704+
cmd.set_read_concern_level(read_concern.level.clone());
705+
}
706+
}
707+
if self.is_load_balanced() {
708+
session.pin_connection(connection.pin()?);
709+
} else if is_sharded {
710+
session.pin_mongos(connection.address().clone());
711+
}
712+
session.transaction.state = TransactionState::InProgress;
713+
}
714+
TransactionState::InProgress => cmd.set_autocommit(),
715+
TransactionState::Committed { .. } | TransactionState::Aborted => {
716+
cmd.set_autocommit();
717+
718+
// Append the recovery token to the command if we are committing or aborting
719+
// on a sharded transaction.
720+
if is_sharded {
721+
if let Some(ref recovery_token) = session.transaction.recovery_token {
722+
cmd.set_recovery_token(recovery_token);
723+
}
724+
}
725+
}
726+
_ => {}
727+
}
728+
session.update_last_use();
729+
}
730+
Some(ref session) if !op.supports_sessions() && !session.is_implicit() => {
731+
return Err(ErrorKind::InvalidArgument {
732+
message: format!("{} does not support sessions", cmd.name),
733+
}
734+
.into());
735+
}
736+
Some(ref session) if !op.is_acknowledged() && !session.is_implicit() => {
737+
return Err(ErrorKind::InvalidArgument {
738+
message: "Cannot use ClientSessions with unacknowledged write concern"
739+
.to_string(),
740+
}
741+
.into());
742+
}
743+
_ => {}
744+
}
745+
746+
let session_cluster_time = session.as_ref().and_then(|session| session.cluster_time());
747+
let client_cluster_time = self.inner.topology.cluster_time();
748+
let max_cluster_time = std::cmp::max(session_cluster_time, client_cluster_time.as_ref());
749+
if let Some(cluster_time) = max_cluster_time {
750+
cmd.set_cluster_time(cluster_time);
751+
}
752+
753+
if let Some(ref server_api) = self.inner.options.server_api {
754+
cmd.set_server_api(server_api);
755+
}
756+
757+
Ok(cmd)
758+
}
759+
737760
#[cfg(feature = "in-use-encryption")]
738761
fn auto_encrypt<'a>(
739762
&'a self,
@@ -786,7 +809,7 @@ impl Client {
786809
.await
787810
}
788811

789-
async fn handle_response<T: Operation>(
812+
async fn parse_response<T: Operation>(
790813
&self,
791814
op: &T,
792815
session: &mut Option<&mut ClientSession>,

src/operation.rs

+1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ const OP_MSG_OVERHEAD_BYTES: usize = 1_000;
8989
pub(crate) struct ExecutionContext<'a> {
9090
pub(crate) connection: &'a mut PooledConnection,
9191
pub(crate) session: Option<&'a mut ClientSession>,
92+
pub(crate) effective_criteria: SelectionCriteria,
9293
}
9394

9495
#[derive(Debug, PartialEq, Clone, Copy)]

src/operation/aggregate/change_stream.rs

+1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ impl OperationWithDefaults for ChangeStreamAggregate {
9494
let inner_context = ExecutionContext {
9595
connection: context.connection,
9696
session: context.session.as_deref_mut(),
97+
effective_criteria: context.effective_criteria,
9798
};
9899
let spec = self.inner.handle_response(response, inner_context)?;
99100

src/operation/bulk_write.rs

+2
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ where
114114
&mut context.session,
115115
txn_number,
116116
Retryability::None,
117+
context.effective_criteria.clone(),
117118
)
118119
.await;
119120

@@ -135,6 +136,7 @@ where
135136
&mut context.session,
136137
txn_number,
137138
Retryability::None,
139+
context.effective_criteria.clone(),
138140
)
139141
.await;
140142
}

0 commit comments

Comments
 (0)