Skip to content

Commit 6fb56ba

Browse files
authored
RUST-663 Support $merge and $out executing on secondaries (#1360)
1 parent 3c241be commit 6fb56ba

File tree

11 files changed

+215
-145
lines changed

11 files changed

+215
-145
lines changed

src/client.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::{
3535
error::{Error, ErrorKind, Result},
3636
event::command::CommandEvent,
3737
id_set::IdSet,
38+
operation::OverrideCriteriaFn,
3839
options::{ClientOptions, DatabaseOptions, ReadPreference, SelectionCriteria, ServerAddress},
3940
sdam::{
4041
server_selection::{self, attempt_to_select_server},
@@ -446,8 +447,8 @@ impl Client {
446447
&self,
447448
criteria: Option<&SelectionCriteria>,
448449
) -> Result<ServerAddress> {
449-
let server = self
450-
.select_server(criteria, "Test select server", None)
450+
let (server, _) = self
451+
.select_server(criteria, "Test select server", None, |_, _| None)
451452
.await?;
452453
Ok(server.address.clone())
453454
}
@@ -460,7 +461,8 @@ impl Client {
460461
#[allow(unused_variables)] // we only use the operation_name for tracing.
461462
operation_name: &str,
462463
deprioritized: Option<&ServerAddress>,
463-
) -> Result<SelectedServer> {
464+
override_criteria: OverrideCriteriaFn,
465+
) -> Result<(SelectedServer, SelectionCriteria)> {
464466
let criteria =
465467
criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary));
466468

@@ -488,9 +490,16 @@ impl Client {
488490
let mut watcher = self.inner.topology.watch();
489491
loop {
490492
let state = watcher.observe_latest();
491-
493+
let override_slot;
494+
let effective_criteria =
495+
if let Some(oc) = override_criteria(criteria, &state.description) {
496+
override_slot = oc;
497+
&override_slot
498+
} else {
499+
criteria
500+
};
492501
let result = server_selection::attempt_to_select_server(
493-
criteria,
502+
effective_criteria,
494503
&state.description,
495504
&state.servers(),
496505
deprioritized,
@@ -507,7 +516,7 @@ impl Client {
507516
#[cfg(feature = "tracing-unstable")]
508517
event_emitter.emit_succeeded_event(&state.description, &server);
509518

510-
return Ok(server);
519+
return Ok((server, effective_criteria.clone()));
511520
} else {
512521
#[cfg(feature = "tracing-unstable")]
513522
if !emitted_waiting_message {

src/client/executor.rs

Lines changed: 142 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ use crate::{
5959
Retryability,
6060
},
6161
options::{ChangeStreamOptions, SelectionCriteria},
62-
sdam::{HandshakePhase, SelectedServer, ServerType, TopologyType, TransactionSupportStatus},
62+
sdam::{HandshakePhase, ServerType, TopologyType, TransactionSupportStatus},
6363
selection_criteria::ReadPreference,
6464
tracking_arc::TrackingArc,
6565
ClusterTime,
@@ -318,15 +318,16 @@ impl Client {
318318
.and_then(|s| s.transaction.pinned_mongos())
319319
.or_else(|| op.selection_criteria());
320320

321-
let server = match self
321+
let (server, effective_criteria) = match self
322322
.select_server(
323323
selection_criteria,
324324
op.name(),
325325
retry.as_ref().map(|r| &r.first_server),
326+
op.override_criteria(),
326327
)
327328
.await
328329
{
329-
Ok(server) => server,
330+
Ok(out) => out,
330331
Err(mut err) => {
331332
retry.first_error()?;
332333

@@ -398,6 +399,7 @@ impl Client {
398399
&mut session,
399400
txn_number,
400401
retryability,
402+
effective_criteria,
401403
)
402404
.await
403405
{
@@ -471,127 +473,21 @@ impl Client {
471473
session: &mut Option<&mut ClientSession>,
472474
txn_number: Option<i64>,
473475
retryability: Retryability,
476+
effective_criteria: SelectionCriteria,
474477
) -> Result<T::O> {
475478
loop {
476-
let stream_description = connection.stream_description()?;
477-
let is_sharded = stream_description.initial_server_type == ServerType::Mongos;
478-
let mut cmd = op.build(stream_description)?;
479-
self.inner.topology.update_command_with_read_pref(
480-
connection.address(),
481-
&mut cmd,
482-
op.selection_criteria(),
483-
);
484-
485-
match session {
486-
Some(ref mut session) if op.supports_sessions() && op.is_acknowledged() => {
487-
cmd.set_session(session);
488-
if let Some(txn_number) = txn_number {
489-
cmd.set_txn_number(txn_number);
490-
}
491-
if session
492-
.options()
493-
.and_then(|opts| opts.snapshot)
494-
.unwrap_or(false)
495-
{
496-
if connection
497-
.stream_description()?
498-
.max_wire_version
499-
.unwrap_or(0)
500-
< 13
501-
{
502-
let labels: Option<Vec<_>> = None;
503-
return Err(Error::new(
504-
ErrorKind::IncompatibleServer {
505-
message: "Snapshot reads require MongoDB 5.0 or later".into(),
506-
},
507-
labels,
508-
));
509-
}
510-
cmd.set_snapshot_read_concern(session);
511-
}
512-
// If this is a causally consistent session, set `readConcern.afterClusterTime`.
513-
// Causal consistency defaults to true, unless snapshot is true.
514-
else if session.causal_consistency()
515-
&& matches!(
516-
session.transaction.state,
517-
TransactionState::None | TransactionState::Starting
518-
)
519-
&& op.supports_read_concern(stream_description)
520-
{
521-
cmd.set_after_cluster_time(session);
522-
}
523-
524-
match session.transaction.state {
525-
TransactionState::Starting => {
526-
cmd.set_start_transaction();
527-
cmd.set_autocommit();
528-
if session.causal_consistency() {
529-
cmd.set_after_cluster_time(session);
530-
}
531-
532-
if let Some(ref options) = session.transaction.options {
533-
if let Some(ref read_concern) = options.read_concern {
534-
cmd.set_read_concern_level(read_concern.level.clone());
535-
}
536-
}
537-
if self.is_load_balanced() {
538-
session.pin_connection(connection.pin()?);
539-
} else if is_sharded {
540-
session.pin_mongos(connection.address().clone());
541-
}
542-
session.transaction.state = TransactionState::InProgress;
543-
}
544-
TransactionState::InProgress => cmd.set_autocommit(),
545-
TransactionState::Committed { .. } | TransactionState::Aborted => {
546-
cmd.set_autocommit();
547-
548-
// Append the recovery token to the command if we are committing or
549-
// aborting on a sharded transaction.
550-
if is_sharded {
551-
if let Some(ref recovery_token) = session.transaction.recovery_token
552-
{
553-
cmd.set_recovery_token(recovery_token);
554-
}
555-
}
556-
}
557-
_ => {}
558-
}
559-
session.update_last_use();
560-
}
561-
Some(ref session) if !op.supports_sessions() && !session.is_implicit() => {
562-
return Err(ErrorKind::InvalidArgument {
563-
message: format!("{} does not support sessions", cmd.name),
564-
}
565-
.into());
566-
}
567-
Some(ref session) if !op.is_acknowledged() && !session.is_implicit() => {
568-
return Err(ErrorKind::InvalidArgument {
569-
message: "Cannot use ClientSessions with unacknowledged write concern"
570-
.to_string(),
571-
}
572-
.into());
573-
}
574-
_ => {}
575-
}
576-
577-
let session_cluster_time = session.as_ref().and_then(|session| session.cluster_time());
578-
let client_cluster_time = self.inner.topology.cluster_time();
579-
let max_cluster_time =
580-
std::cmp::max(session_cluster_time, client_cluster_time.as_ref());
581-
if let Some(cluster_time) = max_cluster_time {
582-
cmd.set_cluster_time(cluster_time);
583-
}
479+
let cmd = self.build_command(
480+
op,
481+
connection,
482+
session,
483+
txn_number,
484+
effective_criteria.clone(),
485+
)?;
584486

585487
let connection_info = connection.info();
586488
let service_id = connection.service_id();
587489
let request_id = next_request_id();
588-
589-
if let Some(ref server_api) = self.inner.options.server_api {
590-
cmd.set_server_api(server_api);
591-
}
592-
593490
let should_redact = cmd.should_redact();
594-
595491
let cmd_name = cmd.name.clone();
596492
let target_db = cmd.target_db.clone();
597493

@@ -630,8 +526,9 @@ impl Client {
630526
let start_time = Instant::now();
631527
let command_result = match connection.send_message(message).await {
632528
Ok(response) => {
633-
self.handle_response(op, session, is_sharded, response)
634-
.await
529+
let is_sharded =
530+
connection.stream_description()?.initial_server_type == ServerType::Mongos;
531+
self.parse_response(op, session, is_sharded, response).await
635532
}
636533
Err(err) => Err(err),
637534
};
@@ -706,6 +603,7 @@ impl Client {
706603
let context = ExecutionContext {
707604
connection,
708605
session: session.as_deref_mut(),
606+
effective_criteria: effective_criteria.clone(),
709607
};
710608

711609
match op.handle_response(response, context).await {
@@ -737,6 +635,128 @@ impl Client {
737635
}
738636
}
739637

638+
fn build_command<T: Operation>(
639+
&self,
640+
op: &mut T,
641+
connection: &mut PooledConnection,
642+
session: &mut Option<&mut ClientSession>,
643+
txn_number: Option<i64>,
644+
effective_criteria: SelectionCriteria,
645+
) -> Result<crate::cmap::Command> {
646+
let stream_description = connection.stream_description()?;
647+
let is_sharded = stream_description.initial_server_type == ServerType::Mongos;
648+
let mut cmd = op.build(stream_description)?;
649+
self.inner.topology.update_command_with_read_pref(
650+
connection.address(),
651+
&mut cmd,
652+
&effective_criteria,
653+
);
654+
655+
match session {
656+
Some(ref mut session) if op.supports_sessions() && op.is_acknowledged() => {
657+
cmd.set_session(session);
658+
if let Some(txn_number) = txn_number {
659+
cmd.set_txn_number(txn_number);
660+
}
661+
if session
662+
.options()
663+
.and_then(|opts| opts.snapshot)
664+
.unwrap_or(false)
665+
{
666+
if connection
667+
.stream_description()?
668+
.max_wire_version
669+
.unwrap_or(0)
670+
< 13
671+
{
672+
let labels: Option<Vec<_>> = None;
673+
return Err(Error::new(
674+
ErrorKind::IncompatibleServer {
675+
message: "Snapshot reads require MongoDB 5.0 or later".into(),
676+
},
677+
labels,
678+
));
679+
}
680+
cmd.set_snapshot_read_concern(session);
681+
}
682+
// If this is a causally consistent session, set `readConcern.afterClusterTime`.
683+
// Causal consistency defaults to true, unless snapshot is true.
684+
else if session.causal_consistency()
685+
&& matches!(
686+
session.transaction.state,
687+
TransactionState::None | TransactionState::Starting
688+
)
689+
&& op.supports_read_concern(stream_description)
690+
{
691+
cmd.set_after_cluster_time(session);
692+
}
693+
694+
match session.transaction.state {
695+
TransactionState::Starting => {
696+
cmd.set_start_transaction();
697+
cmd.set_autocommit();
698+
if session.causal_consistency() {
699+
cmd.set_after_cluster_time(session);
700+
}
701+
702+
if let Some(ref options) = session.transaction.options {
703+
if let Some(ref read_concern) = options.read_concern {
704+
cmd.set_read_concern_level(read_concern.level.clone());
705+
}
706+
}
707+
if self.is_load_balanced() {
708+
session.pin_connection(connection.pin()?);
709+
} else if is_sharded {
710+
session.pin_mongos(connection.address().clone());
711+
}
712+
session.transaction.state = TransactionState::InProgress;
713+
}
714+
TransactionState::InProgress => cmd.set_autocommit(),
715+
TransactionState::Committed { .. } | TransactionState::Aborted => {
716+
cmd.set_autocommit();
717+
718+
// Append the recovery token to the command if we are committing or aborting
719+
// on a sharded transaction.
720+
if is_sharded {
721+
if let Some(ref recovery_token) = session.transaction.recovery_token {
722+
cmd.set_recovery_token(recovery_token);
723+
}
724+
}
725+
}
726+
_ => {}
727+
}
728+
session.update_last_use();
729+
}
730+
Some(ref session) if !op.supports_sessions() && !session.is_implicit() => {
731+
return Err(ErrorKind::InvalidArgument {
732+
message: format!("{} does not support sessions", cmd.name),
733+
}
734+
.into());
735+
}
736+
Some(ref session) if !op.is_acknowledged() && !session.is_implicit() => {
737+
return Err(ErrorKind::InvalidArgument {
738+
message: "Cannot use ClientSessions with unacknowledged write concern"
739+
.to_string(),
740+
}
741+
.into());
742+
}
743+
_ => {}
744+
}
745+
746+
let session_cluster_time = session.as_ref().and_then(|session| session.cluster_time());
747+
let client_cluster_time = self.inner.topology.cluster_time();
748+
let max_cluster_time = std::cmp::max(session_cluster_time, client_cluster_time.as_ref());
749+
if let Some(cluster_time) = max_cluster_time {
750+
cmd.set_cluster_time(cluster_time);
751+
}
752+
753+
if let Some(ref server_api) = self.inner.options.server_api {
754+
cmd.set_server_api(server_api);
755+
}
756+
757+
Ok(cmd)
758+
}
759+
740760
#[cfg(feature = "in-use-encryption")]
741761
fn auto_encrypt<'a>(
742762
&'a self,
@@ -789,7 +809,7 @@ impl Client {
789809
.await
790810
}
791811

792-
async fn handle_response<T: Operation>(
812+
async fn parse_response<T: Operation>(
793813
&self,
794814
op: &T,
795815
session: &mut Option<&mut ClientSession>,
@@ -864,8 +884,8 @@ impl Client {
864884
(matches!(topology_type, TopologyType::Single) && server_type.is_available())
865885
|| server_type.is_data_bearing()
866886
}));
867-
let _: SelectedServer = self
868-
.select_server(Some(&criteria), operation_name, None)
887+
let _ = self
888+
.select_server(Some(&criteria), operation_name, None, |_, _| None)
869889
.await?;
870890
Ok(())
871891
}

0 commit comments

Comments
 (0)