Skip to content

Commit 9881b2b

Browse files
committed
switch to callback
1 parent 4b06979 commit 9881b2b

File tree

7 files changed

+55
-64
lines changed

7 files changed

+55
-64
lines changed

src/client.rs

Lines changed: 11 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::{
3535
error::{Error, ErrorKind, Result},
3636
event::command::CommandEvent,
3737
id_set::IdSet,
38+
operation::OverrideCriteriaFn,
3839
options::{ClientOptions, DatabaseOptions, ReadPreference, SelectionCriteria, ServerAddress},
3940
sdam::{
4041
server_selection::{self, attempt_to_select_server},
@@ -447,7 +448,7 @@ impl Client {
447448
criteria: Option<&SelectionCriteria>,
448449
) -> Result<ServerAddress> {
449450
let (server, _) = self
450-
.select_server(criteria, "Test select server", None, false)
451+
.select_server(criteria, "Test select server", None, |_, _| None)
451452
.await?;
452453
Ok(server.address.clone())
453454
}
@@ -460,7 +461,7 @@ impl Client {
460461
#[allow(unused_variables)] // we only use the operation_name for tracing.
461462
operation_name: &str,
462463
deprioritized: Option<&ServerAddress>,
463-
is_out_or_merge: bool,
464+
override_criteria: OverrideCriteriaFn,
464465
) -> Result<(SelectedServer, SelectionCriteria)> {
465466
let criteria =
466467
criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary));
@@ -489,15 +490,14 @@ impl Client {
489490
let mut watcher = self.inner.topology.watch();
490491
loop {
491492
let state = watcher.observe_latest();
492-
let override_criteria;
493-
let effective_criteria = if let Some(oc) =
494-
Self::override_criteria(criteria, &state.description, is_out_or_merge)
495-
{
496-
override_criteria = oc;
497-
&override_criteria
498-
} else {
499-
criteria
500-
};
493+
let override_slot;
494+
let effective_criteria =
495+
if let Some(oc) = override_criteria(criteria, &state.description) {
496+
override_slot = oc;
497+
&override_slot
498+
} else {
499+
criteria
500+
};
501501
let result = server_selection::attempt_to_select_server(
502502
effective_criteria,
503503
&state.description,
@@ -549,38 +549,6 @@ impl Client {
549549
}
550550
}
551551

552-
/// Check to see if selection criteria need to be overridden. Currently only required for
553-
/// aggregate operations with $merge/$out stages.
554-
fn override_criteria(
555-
criteria: &SelectionCriteria,
556-
desc: &crate::sdam::TopologyDescription,
557-
is_out_or_merge: bool,
558-
) -> Option<SelectionCriteria> {
559-
if is_out_or_merge {
560-
eprintln!("aggregate: checking override");
561-
}
562-
if !is_out_or_merge
563-
|| criteria == &SelectionCriteria::ReadPreference(ReadPreference::Primary)
564-
|| desc.topology_type() == crate::TopologyType::LoadBalanced
565-
{
566-
if is_out_or_merge {
567-
eprintln!("aggregate: skipping override");
568-
}
569-
return None;
570-
}
571-
for server in desc.servers.values() {
572-
let _ = dbg!(server.hello_response());
573-
if let Ok(Some(v)) = server.max_wire_version() {
574-
static SERVER_5_0_0_WIRE_VERSION: i32 = 13;
575-
if v < SERVER_5_0_0_WIRE_VERSION {
576-
eprintln!("aggregate: overriding criteria");
577-
return Some(SelectionCriteria::ReadPreference(ReadPreference::Primary));
578-
}
579-
}
580-
}
581-
return None;
582-
}
583-
584552
#[cfg(all(test, feature = "dns-resolver"))]
585553
pub(crate) fn get_hosts(&self) -> Vec<String> {
586554
let watcher = self.inner.topology.watch();

src/client/executor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ impl Client {
326326
selection_criteria,
327327
op.name(),
328328
retry.as_ref().map(|r| &r.first_server),
329-
op.is_out_or_merge(),
329+
op.override_criteria(),
330330
)
331331
.await
332332
{
@@ -863,7 +863,7 @@ impl Client {
863863
|| server_type.is_data_bearing()
864864
}));
865865
let _ = self
866-
.select_server(Some(&criteria), operation_name, None, false)
866+
.select_server(Some(&criteria), operation_name, None, |_, _| None)
867867
.await?;
868868
Ok(())
869869
}

src/operation.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ pub(crate) use update::{Update, UpdateOrReplace};
7676

7777
const SERVER_4_2_0_WIRE_VERSION: i32 = 8;
7878
const SERVER_4_4_0_WIRE_VERSION: i32 = 9;
79+
const SERVER_5_0_0_WIRE_VERSION: i32 = 13;
7980
const SERVER_8_0_0_WIRE_VERSION: i32 = 25;
8081
// The maximum number of bytes that may be included in a write payload when auto-encryption is
8182
// enabled.
@@ -148,14 +149,18 @@ pub(crate) trait Operation {
148149
/// Updates this operation as needed for a retry.
149150
fn update_for_retry(&mut self);
150151

151-
/// Returns whether this is a $out or $merge aggregation operation.
152-
fn is_out_or_merge(&self) -> bool;
152+
/// Returns a function handle to potentially override selection criteria based on server
153+
/// topology.
154+
fn override_criteria(&self) -> OverrideCriteriaFn;
153155

154156
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle>;
155157

156158
fn name(&self) -> &str;
157159
}
158160

161+
pub(crate) type OverrideCriteriaFn =
162+
fn(&SelectionCriteria, &crate::sdam::TopologyDescription) -> Option<SelectionCriteria>;
163+
159164
// A mirror of the `Operation` trait, with default behavior where appropriate. Should only be
160165
// implemented by operation types that do not delegate to other operations.
161166
pub(crate) trait OperationWithDefaults: Send + Sync {
@@ -238,9 +243,10 @@ pub(crate) trait OperationWithDefaults: Send + Sync {
238243
/// Updates this operation as needed for a retry.
239244
fn update_for_retry(&mut self) {}
240245

241-
/// Returns whether this is a $out or $merge aggregation operation.
242-
fn is_out_or_merge(&self) -> bool {
243-
false
246+
/// Returns a function handle to potentially override selection criteria based on server
247+
/// topology.
248+
fn override_criteria(&self) -> OverrideCriteriaFn {
249+
|_, _| None
244250
}
245251

246252
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
@@ -295,8 +301,8 @@ where
295301
fn update_for_retry(&mut self) {
296302
self.update_for_retry()
297303
}
298-
fn is_out_or_merge(&self) -> bool {
299-
self.is_out_or_merge()
304+
fn override_criteria(&self) -> OverrideCriteriaFn {
305+
self.override_criteria()
300306
}
301307
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
302308
self.pinned_connection()

src/operation/aggregate.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
cursor::CursorSpecification,
88
error::Result,
99
operation::{append_options, remove_empty_write_concern, Retryability},
10-
options::{AggregateOptions, SelectionCriteria, WriteConcern},
10+
options::{AggregateOptions, ReadPreference, SelectionCriteria, WriteConcern},
1111
Namespace,
1212
};
1313

@@ -135,6 +135,30 @@ impl OperationWithDefaults for Aggregate {
135135
}
136136
}
137137

138+
fn override_criteria(&self) -> super::OverrideCriteriaFn {
139+
if !self.is_out_or_merge() {
140+
return |_, _| None;
141+
}
142+
|criteria, topology| {
143+
if criteria == &SelectionCriteria::ReadPreference(ReadPreference::Primary)
144+
|| topology.topology_type() == crate::TopologyType::LoadBalanced
145+
{
146+
return None;
147+
}
148+
for server in topology.servers.values() {
149+
if let Ok(Some(v)) = server.max_wire_version() {
150+
if v < super::SERVER_5_0_0_WIRE_VERSION {
151+
return Some(SelectionCriteria::ReadPreference(ReadPreference::Primary));
152+
}
153+
}
154+
}
155+
None
156+
}
157+
}
158+
}
159+
160+
impl Aggregate {
161+
/// Returns whether this is a $out or $merge aggregation operation.
138162
fn is_out_or_merge(&self) -> bool {
139163
self.pipeline
140164
.last()

src/operation/raw_output.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ impl<Op: Operation> Operation for RawOutput<Op> {
6868
self.0.update_for_retry()
6969
}
7070

71-
fn is_out_or_merge(&self) -> bool {
72-
self.0.is_out_or_merge()
71+
fn override_criteria(&self) -> super::OverrideCriteriaFn {
72+
self.0.override_criteria()
7373
}
7474

7575
fn pinned_connection(&self) -> Option<&crate::cmap::conn::PinnedConnectionHandle> {

src/operation/run_cursor_command.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ impl Operation for RunCursorCommand<'_> {
7979
self.run_command.update_for_retry()
8080
}
8181

82-
fn is_out_or_merge(&self) -> bool {
83-
self.run_command.is_out_or_merge()
82+
fn override_criteria(&self) -> super::OverrideCriteriaFn {
83+
self.run_command.override_criteria()
8484
}
8585

8686
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {

src/sdam/description/server.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -386,13 +386,6 @@ impl ServerDescription {
386386
Ok(me)
387387
}
388388

389-
pub(crate) fn hello_response(&self) -> Result<Option<&HelloCommandResponse>> {
390-
self.reply
391-
.as_ref()
392-
.map_err(Clone::clone)
393-
.map(|o| o.as_ref().map(|r| &r.command_response))
394-
}
395-
396389
pub(crate) fn last_write_date(&self) -> Result<Option<DateTime>> {
397390
match self.reply {
398391
Ok(None) => Ok(None),

0 commit comments

Comments
 (0)