Skip to content

Commit 7a7ec5b

Browse files
authored
fix(NODE-6171): RTT set to zero when serverMonitoringMode=stream (#4110)
1 parent 7c91272 commit 7a7ec5b

File tree

2 files changed

+120
-9
lines changed

2 files changed

+120
-9
lines changed

Diff for: src/sdam/monitor.ts

+7-9
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
219219
return this.rttSampler.min();
220220
}
221221

222-
get latestRtt(): number {
223-
return this.rttSampler.last ?? 0; // FIXME: Check if this is acceptable
222+
get latestRtt(): number | null {
223+
return this.rttSampler.last;
224224
}
225225

226226
addRttSample(rtt: number) {
@@ -304,7 +304,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
304304
}
305305

306306
// NOTE: here we use the latestRtt as this measurement corresponds with the value
307-
// obtained for this successful heartbeat
307+
// obtained for this successful heartbeat, if there is no latestRtt, then we calculate the
308+
// duration
308309
const duration =
309310
isAwaitable && monitor.rttPinger
310311
? monitor.rttPinger.latestRtt ?? calculateDurationInMs(start)
@@ -498,7 +499,7 @@ export class RTTPinger {
498499
this[kCancellationToken] = monitor[kCancellationToken];
499500
this.closed = false;
500501
this.monitor = monitor;
501-
this.latestRtt = monitor.latestRtt;
502+
this.latestRtt = monitor.latestRtt ?? undefined;
502503

503504
const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS;
504505
this[kMonitorId] = setTimeout(() => this.measureRoundTripTime(), heartbeatFrequencyMS);
@@ -520,10 +521,7 @@ export class RTTPinger {
520521
this.connection = undefined;
521522
}
522523

523-
private measureAndReschedule(start?: number, conn?: Connection) {
524-
if (start == null) {
525-
start = now();
526-
}
524+
private measureAndReschedule(start: number, conn?: Connection) {
527525
if (this.closed) {
528526
conn?.destroy();
529527
return;
@@ -565,7 +563,7 @@ export class RTTPinger {
565563
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
566564
// eslint-disable-next-line github/no-then
567565
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
568-
() => this.measureAndReschedule(),
566+
() => this.measureAndReschedule(start),
569567
() => {
570568
this.connection?.destroy();
571569
this.connection = undefined;

Diff for: test/integration/server-discovery-and-monitoring/server_discover_and_monitoring.test.ts

+113
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1+
import { setTimeout } from 'node:timers/promises';
2+
3+
import { expect } from 'chai';
4+
import * as sinon from 'sinon';
5+
6+
import {
7+
Connection,
8+
type MongoClient,
9+
promiseWithResolvers,
10+
type ServerHeartbeatSucceededEvent
11+
} from '../../mongodb';
112
import { loadSpecTests } from '../../spec';
213
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';
314

@@ -8,3 +19,105 @@ describe('SDAM Unified Tests (Node Driver)', function () {
819
);
920
runUnifiedSuite(clonedAndAlteredSpecTests);
1021
});
22+
23+
describe('Monitoring rtt tests', function () {
24+
let client: MongoClient;
25+
let heartbeatDurations: Record<string, number[]>;
26+
const HEARTBEATS_TO_COLLECT_PER_NODE = 65;
27+
const IGNORE_SIZE = 5;
28+
const DELAY_MS = 10;
29+
30+
beforeEach(function () {
31+
heartbeatDurations = Object.create(null);
32+
});
33+
34+
afterEach(async function () {
35+
if (client) {
36+
await client.close();
37+
}
38+
sinon.restore();
39+
});
40+
41+
for (const serverMonitoringMode of ['poll', 'stream']) {
42+
context(`when serverMonitoringMode is set to '${serverMonitoringMode}'`, function () {
43+
context('after collecting a number of heartbeats', function () {
44+
beforeEach(async function () {
45+
client = this.configuration.newClient({
46+
heartbeatFrequencyMS: 100,
47+
serverMonitoringMode
48+
});
49+
50+
// make sendCommand delay for DELAY_MS ms to ensure that the actual time between sending
51+
// a heartbeat and receiving a response don't drop below 1ms. This is done since our
52+
// testing is colocated with its mongo deployment so network latency is very low
53+
const stub = sinon
54+
// @ts-expect-error accessing private method
55+
.stub(Connection.prototype, 'sendCommand')
56+
.callsFake(async function* (...args) {
57+
await setTimeout(DELAY_MS);
58+
yield* stub.wrappedMethod.call(this, ...args);
59+
});
60+
await client.connect();
61+
62+
const { promise, resolve } = promiseWithResolvers<void>();
63+
client.on('serverHeartbeatSucceeded', (ev: ServerHeartbeatSucceededEvent) => {
64+
heartbeatDurations[ev.connectionId] ??= [];
65+
if (
66+
heartbeatDurations[ev.connectionId].length <
67+
HEARTBEATS_TO_COLLECT_PER_NODE + IGNORE_SIZE
68+
)
69+
heartbeatDurations[ev.connectionId].push(ev.duration);
70+
71+
// We ignore the first few heartbeats since the problem reported in NODE-6172 showed that the
72+
// first few heartbeats were recorded properly
73+
if (
74+
Object.keys(heartbeatDurations).length === client.topology.s.servers.size &&
75+
Object.values(heartbeatDurations).every(
76+
d => d.length === HEARTBEATS_TO_COLLECT_PER_NODE + IGNORE_SIZE
77+
)
78+
) {
79+
client.removeAllListeners('serverHeartbeatSucceeded');
80+
resolve();
81+
}
82+
});
83+
await promise;
84+
});
85+
86+
it(
87+
'heartbeat duration is not incorrectly reported as zero on ServerHeartbeatSucceededEvents',
88+
{
89+
metadata: {
90+
requires: { topology: '!load-balanced' }
91+
},
92+
test: async function () {
93+
for (const durations of Object.values(heartbeatDurations)) {
94+
const relevantDurations = durations.slice(IGNORE_SIZE);
95+
expect(relevantDurations).to.have.length.gt(0);
96+
const averageDuration =
97+
relevantDurations.reduce((acc, x) => acc + x) / relevantDurations.length;
98+
expect(averageDuration).to.be.gt(DELAY_MS);
99+
}
100+
}
101+
}
102+
);
103+
104+
it('ServerDescription.roundTripTime is not incorrectly reported as zero', {
105+
metadata: {
106+
requires: { topology: '!load-balanced' }
107+
},
108+
test: async function () {
109+
for (const [server, durations] of Object.entries(heartbeatDurations)) {
110+
const relevantDurations = durations.slice(IGNORE_SIZE);
111+
expect(relevantDurations).to.have.length.gt(0);
112+
const averageDuration =
113+
relevantDurations.reduce((acc, x) => acc + x) / relevantDurations.length;
114+
const rtt = client.topology.description.servers.get(server).roundTripTime;
115+
expect(rtt).to.not.equal(0);
116+
expect(rtt).to.be.approximately(averageDuration, 3);
117+
}
118+
}
119+
});
120+
});
121+
});
122+
}
123+
});

0 commit comments

Comments
 (0)