Skip to content

Commit ff638dc

Browse files
committed
plumbing
1 parent e92d2aa commit ff638dc

File tree

8 files changed

+146
-126
lines changed

8 files changed

+146
-126
lines changed

src/client.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ impl Client {
446446
&self,
447447
criteria: Option<&SelectionCriteria>,
448448
) -> Result<ServerAddress> {
449-
let server = self
449+
let (server, _) = self
450450
.select_server(criteria, "Test select server", None)
451451
.await?;
452452
Ok(server.address.clone())
@@ -460,7 +460,7 @@ impl Client {
460460
#[allow(unused_variables)] // we only use the operation_name for tracing.
461461
operation_name: &str,
462462
deprioritized: Option<&ServerAddress>,
463-
) -> Result<SelectedServer> {
463+
) -> Result<(SelectedServer, SelectionCriteria)> {
464464
let criteria =
465465
criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary));
466466

@@ -488,9 +488,12 @@ impl Client {
488488
let mut watcher = self.inner.topology.watch();
489489
loop {
490490
let state = watcher.observe_latest();
491-
491+
for server in state.description.servers.values() {
492+
eprintln!("at selection: {:?}", server.hello_response());
493+
}
494+
let effective_criteria = criteria; // TODO
492495
let result = server_selection::attempt_to_select_server(
493-
criteria,
496+
effective_criteria,
494497
&state.description,
495498
&state.servers(),
496499
deprioritized,
@@ -507,7 +510,7 @@ impl Client {
507510
#[cfg(feature = "tracing-unstable")]
508511
event_emitter.emit_succeeded_event(&state.description, &server);
509512

510-
return Ok(server);
513+
return Ok((server, effective_criteria.clone()));
511514
} else {
512515
#[cfg(feature = "tracing-unstable")]
513516
if !emitted_waiting_message {

src/client/executor.rs

Lines changed: 104 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::{
2929
wire::{next_request_id, Message},
3030
PinnedConnectionHandle,
3131
},
32+
Command,
3233
ConnectionPool,
3334
RawCommandResponse,
3435
},
@@ -58,7 +59,7 @@ use crate::{
5859
Retryability,
5960
},
6061
options::{ChangeStreamOptions, SelectionCriteria},
61-
sdam::{HandshakePhase, SelectedServer, ServerType, TopologyType, TransactionSupportStatus},
62+
sdam::{HandshakePhase, ServerType, TopologyType, TransactionSupportStatus},
6263
selection_criteria::ReadPreference,
6364
tracking_arc::TrackingArc,
6465
ClusterTime,
@@ -309,7 +310,7 @@ impl Client {
309310
let mut retry: Option<ExecutionRetry> = None;
310311
let mut implicit_session: Option<ClientSession> = None;
311312
loop {
312-
op.update_for_topology(&self.inner.topology.description());
313+
//op.update_for_topology(&self.inner.topology.description());
313314

314315
if retry.is_some() {
315316
op.update_for_retry();
@@ -320,15 +321,15 @@ impl Client {
320321
.and_then(|s| s.transaction.pinned_mongos())
321322
.or_else(|| op.selection_criteria());
322323

323-
let server = match self
324+
let (server, effective_criteria) = match self
324325
.select_server(
325326
selection_criteria,
326327
op.name(),
327328
retry.as_ref().map(|r| &r.first_server),
328329
)
329330
.await
330331
{
331-
Ok(server) => server,
332+
Ok(out) => out,
332333
Err(mut err) => {
333334
retry.first_error()?;
334335

@@ -389,14 +390,11 @@ impl Client {
389390
.and_then(|r| r.prior_txn_number)
390391
.or_else(|| get_txn_number(&mut session, retryability));
391392

393+
let cmd =
394+
self.build_command(op, &mut conn, &mut session, txn_number, effective_criteria)?;
395+
392396
let details = match self
393-
.execute_operation_on_connection(
394-
op,
395-
&mut conn,
396-
&mut session,
397-
txn_number,
398-
retryability,
399-
)
397+
.execute_command_on_connection(cmd, op, &mut conn, &mut session, retryability)
400398
.await
401399
{
402400
Ok(output) => ExecutionDetails {
@@ -485,19 +483,14 @@ impl Client {
485483
}
486484
}
487485

488-
/// Executes an operation on a given connection, optionally using a provided session.
489-
async fn execute_operation_on_connection<T: Operation>(
486+
fn build_command<T: Operation>(
490487
&self,
491488
op: &mut T,
492489
connection: &mut PooledConnection,
493490
session: &mut Option<&mut ClientSession>,
494491
txn_number: Option<i64>,
495-
retryability: Retryability,
496-
) -> Result<T::O> {
497-
if let Some(wc) = op.write_concern() {
498-
wc.validate()?;
499-
}
500-
492+
_effective_critera: SelectionCriteria,
493+
) -> Result<Command> {
501494
let stream_description = connection.stream_description()?;
502495
let is_sharded = stream_description.initial_server_type == ServerType::Mongos;
503496
let mut cmd = op.build(stream_description)?;
@@ -605,16 +598,30 @@ impl Client {
605598
cmd.set_cluster_time(cluster_time);
606599
}
607600

608-
let connection_info = connection.info();
609-
let service_id = connection.service_id();
610-
let request_id = next_request_id();
611-
612601
if let Some(ref server_api) = self.inner.options.server_api {
613602
cmd.set_server_api(server_api);
614603
}
615604

616-
let should_redact = cmd.should_redact();
605+
Ok(cmd)
606+
}
617607

608+
/// Executes a command on a given connection, optionally using a provided session.
609+
async fn execute_command_on_connection<T: Operation>(
610+
&self,
611+
cmd: Command,
612+
op: &mut T,
613+
connection: &mut PooledConnection,
614+
session: &mut Option<&mut ClientSession>,
615+
retryability: Retryability,
616+
) -> Result<T::O> {
617+
if let Some(wc) = op.write_concern() {
618+
wc.validate()?;
619+
}
620+
621+
let connection_info = connection.info();
622+
let service_id = connection.service_id();
623+
let request_id = next_request_id();
624+
let should_redact = cmd.should_redact();
618625
let cmd_name = cmd.name.clone();
619626
let target_db = cmd.target_db.clone();
620627

@@ -653,78 +660,9 @@ impl Client {
653660
let start_time = Instant::now();
654661
let command_result = match connection.send_message(message).await {
655662
Ok(response) => {
656-
async fn handle_response<T: Operation>(
657-
client: &Client,
658-
op: &T,
659-
session: &mut Option<&mut ClientSession>,
660-
is_sharded: bool,
661-
response: RawCommandResponse,
662-
) -> Result<RawCommandResponse> {
663-
let raw_doc = RawDocument::from_bytes(response.as_bytes())?;
664-
665-
let ok = match raw_doc.get("ok")? {
666-
Some(b) => crate::bson_util::get_int_raw(b).ok_or_else(|| {
667-
ErrorKind::InvalidResponse {
668-
message: format!(
669-
"expected ok value to be a number, instead got {:?}",
670-
b
671-
),
672-
}
673-
})?,
674-
None => {
675-
return Err(ErrorKind::InvalidResponse {
676-
message: "missing 'ok' value in response".to_string(),
677-
}
678-
.into())
679-
}
680-
};
681-
682-
let cluster_time: Option<ClusterTime> = raw_doc
683-
.get("$clusterTime")?
684-
.and_then(RawBsonRef::as_document)
685-
.map(|d| bson::from_slice(d.as_bytes()))
686-
.transpose()?;
687-
688-
let at_cluster_time = op.extract_at_cluster_time(raw_doc)?;
689-
690-
client
691-
.update_cluster_time(cluster_time, at_cluster_time, session)
692-
.await;
693-
694-
if let (Some(session), Some(ts)) = (
695-
session.as_mut(),
696-
raw_doc
697-
.get("operationTime")?
698-
.and_then(RawBsonRef::as_timestamp),
699-
) {
700-
session.advance_operation_time(ts);
701-
}
702-
703-
if ok == 1 {
704-
if let Some(ref mut session) = session {
705-
if is_sharded && session.in_transaction() {
706-
let recovery_token = raw_doc
707-
.get("recoveryToken")?
708-
.and_then(RawBsonRef::as_document)
709-
.map(|d| bson::from_slice(d.as_bytes()))
710-
.transpose()?;
711-
session.transaction.recovery_token = recovery_token;
712-
}
713-
}
714-
715-
Ok(response)
716-
} else {
717-
Err(response
718-
.body::<CommandErrorBody>()
719-
.map(|error_response| error_response.into())
720-
.unwrap_or_else(|e| {
721-
Error::from(ErrorKind::InvalidResponse {
722-
message: format!("error deserializing command error: {}", e),
723-
})
724-
}))
725-
}
726-
}
727-
handle_response(self, op, session, is_sharded, response).await
663+
let is_sharded =
664+
connection.stream_description()?.initial_server_type == ServerType::Mongos;
665+
Self::parse_response(self, op, session, is_sharded, response).await
728666
}
729667
Err(err) => Err(err),
730668
};
@@ -811,6 +749,75 @@ impl Client {
811749
}
812750
}
813751

752+
async fn parse_response<T: Operation>(
753+
client: &Client,
754+
op: &T,
755+
session: &mut Option<&mut ClientSession>,
756+
is_sharded: bool,
757+
response: RawCommandResponse,
758+
) -> Result<RawCommandResponse> {
759+
let raw_doc = RawDocument::from_bytes(response.as_bytes())?;
760+
761+
let ok = match raw_doc.get("ok")? {
762+
Some(b) => {
763+
crate::bson_util::get_int_raw(b).ok_or_else(|| ErrorKind::InvalidResponse {
764+
message: format!("expected ok value to be a number, instead got {:?}", b),
765+
})?
766+
}
767+
None => {
768+
return Err(ErrorKind::InvalidResponse {
769+
message: "missing 'ok' value in response".to_string(),
770+
}
771+
.into())
772+
}
773+
};
774+
775+
let cluster_time: Option<ClusterTime> = raw_doc
776+
.get("$clusterTime")?
777+
.and_then(RawBsonRef::as_document)
778+
.map(|d| bson::from_slice(d.as_bytes()))
779+
.transpose()?;
780+
781+
let at_cluster_time = op.extract_at_cluster_time(raw_doc)?;
782+
783+
client
784+
.update_cluster_time(cluster_time, at_cluster_time, session)
785+
.await;
786+
787+
if let (Some(session), Some(ts)) = (
788+
session.as_mut(),
789+
raw_doc
790+
.get("operationTime")?
791+
.and_then(RawBsonRef::as_timestamp),
792+
) {
793+
session.advance_operation_time(ts);
794+
}
795+
796+
if ok == 1 {
797+
if let Some(ref mut session) = session {
798+
if is_sharded && session.in_transaction() {
799+
let recovery_token = raw_doc
800+
.get("recoveryToken")?
801+
.and_then(RawBsonRef::as_document)
802+
.map(|d| bson::from_slice(d.as_bytes()))
803+
.transpose()?;
804+
session.transaction.recovery_token = recovery_token;
805+
}
806+
}
807+
808+
Ok(response)
809+
} else {
810+
Err(response
811+
.body::<CommandErrorBody>()
812+
.map(|error_response| error_response.into())
813+
.unwrap_or_else(|e| {
814+
Error::from(ErrorKind::InvalidResponse {
815+
message: format!("error deserializing command error: {}", e),
816+
})
817+
}))
818+
}
819+
}
820+
814821
#[cfg(feature = "in-use-encryption")]
815822
fn auto_encrypt<'a>(
816823
&'a self,
@@ -846,7 +853,7 @@ impl Client {
846853
(matches!(topology_type, TopologyType::Single) && server_type.is_available())
847854
|| server_type.is_data_bearing()
848855
}));
849-
let _: SelectedServer = self
856+
let _ = self
850857
.select_server(Some(&criteria), operation_name, None)
851858
.await?;
852859
Ok(())

src/operation.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ use crate::{
5252
WriteFailure,
5353
},
5454
options::WriteConcern,
55-
sdam::TopologyDescription,
5655
selection_criteria::SelectionCriteria,
5756
BoxFuture,
5857
ClientSession,
@@ -77,6 +76,7 @@ pub(crate) use update::{Update, UpdateOrReplace};
7776

7877
const SERVER_4_2_0_WIRE_VERSION: i32 = 8;
7978
const SERVER_4_4_0_WIRE_VERSION: i32 = 9;
79+
const _SERVER_5_0_0_WIRE_VERSION: i32 = 13;
8080
const SERVER_8_0_0_WIRE_VERSION: i32 = 25;
8181
// The maximum number of bytes that may be included in a write payload when auto-encryption is
8282
// enabled.
@@ -149,9 +149,6 @@ pub(crate) trait Operation {
149149
/// Updates this operation as needed for a retry.
150150
fn update_for_retry(&mut self);
151151

152-
/// Updates this operation based on server topology.
153-
fn update_for_topology(&mut self, _topology: &TopologyDescription);
154-
155152
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle>;
156153

157154
fn name(&self) -> &str;
@@ -239,9 +236,6 @@ pub(crate) trait OperationWithDefaults: Send + Sync {
239236
/// Updates this operation as needed for a retry.
240237
fn update_for_retry(&mut self) {}
241238

242-
/// Updates this operation based on server topology.
243-
fn update_for_topology(&mut self, _topology: &TopologyDescription) {}
244-
245239
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
246240
None
247241
}
@@ -294,9 +288,6 @@ where
294288
fn update_for_retry(&mut self) {
295289
self.update_for_retry()
296290
}
297-
fn update_for_topology(&mut self, topology: &TopologyDescription) {
298-
self.update_for_topology(topology)
299-
}
300291
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
301292
self.pinned_connection()
302293
}

0 commit comments

Comments
 (0)