Skip to content

Commit ae31f78

Browse files
committed
working impl
1 parent ff638dc commit ae31f78

File tree

8 files changed

+72
-49
lines changed

8 files changed

+72
-49
lines changed

.evergreen/run-tests.sh

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,13 @@ if [ "Windows_NT" == "$OS" ]; then
3333
export SSL_CERT_DIR=$(cygpath /etc/ssl/certs --windows)
3434
fi
3535

36-
cargo_test ""
36+
CARGO_OPTIONS+=("--nocapture")
37+
export TEST_FILE=aggregate-write-readPreference.json
38+
cargo_test "test::spec::crud::run_unified"
39+
#cargo_test ""
3740

3841
# cargo-nextest doesn't support doc tests
39-
RUST_BACKTRACE=1 cargo test --doc $(cargo_test_options)
40-
((CARGO_RESULT = ${CARGO_RESULT} || $?))
42+
#RUST_BACKTRACE=1 cargo test --doc $(cargo_test_options)
43+
#((CARGO_RESULT = ${CARGO_RESULT} || $?))
4144

4245
exit $CARGO_RESULT

src/client.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ impl Client {
447447
criteria: Option<&SelectionCriteria>,
448448
) -> Result<ServerAddress> {
449449
let (server, _) = self
450-
.select_server(criteria, "Test select server", None)
450+
.select_server(criteria, "Test select server", None, false)
451451
.await?;
452452
Ok(server.address.clone())
453453
}
@@ -460,6 +460,7 @@ impl Client {
460460
#[allow(unused_variables)] // we only use the operation_name for tracing.
461461
operation_name: &str,
462462
deprioritized: Option<&ServerAddress>,
463+
is_out_or_merge: bool,
463464
) -> Result<(SelectedServer, SelectionCriteria)> {
464465
let criteria =
465466
criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary));
@@ -488,10 +489,15 @@ impl Client {
488489
let mut watcher = self.inner.topology.watch();
489490
loop {
490491
let state = watcher.observe_latest();
491-
for server in state.description.servers.values() {
492-
eprintln!("at selection: {:?}", server.hello_response());
493-
}
494-
let effective_criteria = criteria; // TODO
492+
let override_criteria;
493+
let effective_criteria = if let Some(oc) =
494+
Self::override_criteria(criteria, &state.description, is_out_or_merge)
495+
{
496+
override_criteria = oc;
497+
&override_criteria
498+
} else {
499+
criteria
500+
};
495501
let result = server_selection::attempt_to_select_server(
496502
effective_criteria,
497503
&state.description,
@@ -543,6 +549,38 @@ impl Client {
543549
}
544550
}
545551

552+
/// Check to see if selection criteria need to be overridden. Currently only required for
553+
/// aggregate operations with $merge/$out stages.
554+
fn override_criteria(
555+
criteria: &SelectionCriteria,
556+
desc: &crate::sdam::TopologyDescription,
557+
is_out_or_merge: bool,
558+
) -> Option<SelectionCriteria> {
559+
if is_out_or_merge {
560+
eprintln!("aggregate: checking override");
561+
}
562+
if !is_out_or_merge
563+
|| criteria == &SelectionCriteria::ReadPreference(ReadPreference::Primary)
564+
|| desc.topology_type() == crate::TopologyType::LoadBalanced
565+
{
566+
if is_out_or_merge {
567+
eprintln!("aggregate: skipping override");
568+
}
569+
return None;
570+
}
571+
for server in desc.servers.values() {
572+
let _ = dbg!(server.hello_response());
573+
if let Ok(Some(v)) = server.max_wire_version() {
574+
static SERVER_5_0_0_WIRE_VERSION: i32 = 13;
575+
if v < SERVER_5_0_0_WIRE_VERSION {
576+
eprintln!("aggregate: overriding criteria");
577+
return Some(SelectionCriteria::ReadPreference(ReadPreference::Primary));
578+
}
579+
}
580+
}
581+
return None;
582+
}
583+
546584
#[cfg(all(test, feature = "dns-resolver"))]
547585
pub(crate) fn get_hosts(&self) -> Vec<String> {
548586
let watcher = self.inner.topology.watch();

src/client/executor.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ impl Client {
326326
selection_criteria,
327327
op.name(),
328328
retry.as_ref().map(|r| &r.first_server),
329+
op.is_out_or_merge(),
329330
)
330331
.await
331332
{
@@ -489,15 +490,15 @@ impl Client {
489490
connection: &mut PooledConnection,
490491
session: &mut Option<&mut ClientSession>,
491492
txn_number: Option<i64>,
492-
_effective_critera: SelectionCriteria,
493+
effective_criteria: SelectionCriteria,
493494
) -> Result<Command> {
494495
let stream_description = connection.stream_description()?;
495496
let is_sharded = stream_description.initial_server_type == ServerType::Mongos;
496497
let mut cmd = op.build(stream_description)?;
497498
self.inner.topology.update_command_with_read_pref(
498499
connection.address(),
499500
&mut cmd,
500-
op.selection_criteria(),
501+
Some(&effective_criteria),
501502
);
502503

503504
match session {
@@ -854,7 +855,7 @@ impl Client {
854855
|| server_type.is_data_bearing()
855856
}));
856857
let _ = self
857-
.select_server(Some(&criteria), operation_name, None)
858+
.select_server(Some(&criteria), operation_name, None, false)
858859
.await?;
859860
Ok(())
860861
}

src/operation.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ pub(crate) use update::{Update, UpdateOrReplace};
7676

7777
const SERVER_4_2_0_WIRE_VERSION: i32 = 8;
7878
const SERVER_4_4_0_WIRE_VERSION: i32 = 9;
79-
const _SERVER_5_0_0_WIRE_VERSION: i32 = 13;
8079
const SERVER_8_0_0_WIRE_VERSION: i32 = 25;
8180
// The maximum number of bytes that may be included in a write payload when auto-encryption is
8281
// enabled.
@@ -149,6 +148,9 @@ pub(crate) trait Operation {
149148
/// Updates this operation as needed for a retry.
150149
fn update_for_retry(&mut self);
151150

151+
/// Returns whether this is a $out or $merge aggregation operation.
152+
fn is_out_or_merge(&self) -> bool;
153+
152154
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle>;
153155

154156
fn name(&self) -> &str;
@@ -236,6 +238,11 @@ pub(crate) trait OperationWithDefaults: Send + Sync {
236238
/// Updates this operation as needed for a retry.
237239
fn update_for_retry(&mut self) {}
238240

241+
/// Returns whether this is a $out or $merge aggregation operation.
242+
fn is_out_or_merge(&self) -> bool {
243+
false
244+
}
245+
239246
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
240247
None
241248
}
@@ -288,6 +295,9 @@ where
288295
fn update_for_retry(&mut self) {
289296
self.update_for_retry()
290297
}
298+
fn is_out_or_merge(&self) -> bool {
299+
self.is_out_or_merge()
300+
}
291301
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
292302
self.pinned_connection()
293303
}

src/operation/aggregate.rs

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -109,33 +109,6 @@ impl OperationWithDefaults for Aggregate {
109109
))
110110
}
111111

112-
/*
113-
fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) {
114-
eprintln!("aggregate: update_for_topology");
115-
if !self.is_out_or_merge()
116-
|| matches!(
117-
self.selection_criteria(),
118-
None | Some(SelectionCriteria::ReadPreference(ReadPreference::Primary))
119-
)
120-
|| topology.topology_type() == TopologyType::LoadBalanced
121-
{
122-
eprintln!("aggregate: skipping topology update");
123-
return;
124-
}
125-
for server in topology.servers.values() {
126-
let _ = dbg!(server.hello_response());
127-
if let Ok(Some(v)) = server.max_wire_version() {
128-
if v < SERVER_5_0_0_WIRE_VERSION {
129-
eprintln!("aggregate: updating topology");
130-
self.options.get_or_insert_default().selection_criteria =
131-
Some(SelectionCriteria::ReadPreference(ReadPreference::Primary));
132-
break;
133-
}
134-
}
135-
}
136-
}
137-
*/
138-
139112
fn selection_criteria(&self) -> Option<&SelectionCriteria> {
140113
self.options
141114
.as_ref()
@@ -161,10 +134,7 @@ impl OperationWithDefaults for Aggregate {
161134
Retryability::Read
162135
}
163136
}
164-
}
165137

166-
impl Aggregate {
167-
/// Returns whether this is a $out or $merge aggregation operation.
168138
fn is_out_or_merge(&self) -> bool {
169139
self.pipeline
170140
.last()

src/operation/raw_output.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ impl<Op: Operation> Operation for RawOutput<Op> {
6868
self.0.update_for_retry()
6969
}
7070

71+
fn is_out_or_merge(&self) -> bool {
72+
self.0.is_out_or_merge()
73+
}
74+
7175
fn pinned_connection(&self) -> Option<&crate::cmap::conn::PinnedConnectionHandle> {
7276
self.0.pinned_connection()
7377
}

src/operation/run_cursor_command.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ impl Operation for RunCursorCommand<'_> {
7979
self.run_command.update_for_retry()
8080
}
8181

82+
fn is_out_or_merge(&self) -> bool {
83+
self.run_command.is_out_or_merge()
84+
}
85+
8286
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
8387
self.run_command.pinned_connection()
8488
}

src/test/spec/crud.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,6 @@ async fn run_unified() {
4343
pre-5.0 server",
4444
"Requesting unacknowledged write with verboseResults is a client-side error",
4545
"Requesting unacknowledged write with ordered is a client-side error",
46-
// TODO RUST-663: Unskip these tests.
47-
"Aggregate with $out includes read preference for 5.0+ server",
48-
"Aggregate with $out omits read preference for pre-5.0 server",
49-
"Aggregate with $merge includes read preference for 5.0+ server",
50-
"Aggregate with $merge omits read preference for pre-5.0 server",
51-
"Database-level aggregate with $out omits read preference for pre-5.0 server",
52-
"Database-level aggregate with $merge omits read preference for pre-5.0 server",
5346
];
5447
// TODO: remove this manual skip when this test is fixed to skip on serverless
5548
if *SERVERLESS {

0 commit comments

Comments
 (0)