Skip to content

Commit 78d495d

Browse files
GODRIVER-3444 Implement validation
1 parent e9a79e7 commit 78d495d

File tree

8 files changed

+128
-120
lines changed

8 files changed

+128
-120
lines changed

internal/driverutil/operation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func CalculateMaxTimeMS(ctx context.Context, rttMin time.Duration, rttStats stri
4848

4949
// Always round up to the next millisecond value so we never truncate the calculated
5050
// maxTimeMS value (e.g. 400 microseconds evaluates to 1ms, not 0ms).
51-
maxTimeMS := int64((remainingTimeout - rttMin + time.Millisecond - 1) / time.Millisecond)
51+
maxTimeMS := int64((remainingTimeout - rttMin) / time.Millisecond)
5252
if maxTimeMS <= 0 {
5353
return 0, fmt.Errorf(
5454
"remaining time %v until context deadline is less than or equal to min network round-trip time %v (%v): %w",

internal/integration/unified/collection_operation_execution.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1487,14 +1487,12 @@ func createFindCursor(ctx context.Context, operation *operation) (*cursorResult,
14871487
case "sort":
14881488
opts.SetSort(val.Document())
14891489
case "timeoutMode":
1490-
return nil, newSkipTestError(fmt.Sprintf("timeoutMode is not supported"))
1490+
return nil, newSkipTestError("timeoutMode is not supported")
14911491
case "cursorType":
1492-
fmt.Println("cursorType check", val.String(), strings.ToLower(val.String()), val.String() == "tailableAwait")
14931492
switch strings.ToLower(val.StringValue()) {
14941493
case "tailable":
14951494
opts.SetCursorType(options.Tailable)
14961495
case "tailableawait":
1497-
fmt.Println("gottem")
14981496
opts.SetCursorType(options.TailableAwait)
14991497
case "nontailable":
15001498
opts.SetCursorType(options.NonTailable)

internal/integration/unified/cursor_operation_execution.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,18 @@ func executeIterateOnce(ctx context.Context, operation *operation) (*operationRe
1919
return nil, err
2020
}
2121

22-
fmt.Println("iterate once")
23-
2422
// TryNext will attempt to get the next document, potentially issuing a single 'getMore'.
2523
if cursor.TryNext(ctx) {
2624
// We don't expect the server to return malformed documents, so any errors from Decode here are treated
2725
// as fatal.
2826
var res bson.Raw
2927
if err := cursor.Decode(&res); err != nil {
30-
fmt.Println("err:", err)
3128
return nil, fmt.Errorf("error decoding cursor result: %w", err)
3229
}
3330

34-
fmt.Println("err:", cursor.Err())
35-
3631
return newDocumentResult(res, nil), nil
3732
}
3833

39-
fmt.Println("err:", cursor.Err())
4034
return newErrorResult(cursor.Err()), nil
4135
}
4236

internal/integration/unified/unified_spec_runner.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,18 @@ var (
8383
"timeoutMS is refreshed for getMore if maxAwaitTimeMS is set": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
8484
"timeoutMS is refreshed for getMore - failure": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
8585

86+
// GODRIVER-3473: the implementation of DRIVERS-2868 makes it clear that the
87+
// Go Driver does not correctly implement the following validation for
88+
// tailable awaitData cursors:
89+
//
90+
// Drivers MUST error if this option is set, timeoutMS is set to a
91+
// non-zero value, and maxAwaitTimeMS is greater than or equal to
92+
// timeoutMS.
93+
//
94+
// Once GODRIVER-3473 is completed, we can continue running these tests.
95+
"error if maxAwaitTimeMS is equal to timeoutMS": "Go Driver does not implement this behavior. See GODRIVER-3473",
96+
"error if maxAwaitTimeMS is greater than timeoutMS": "Go Driver does not implement this behavior. See GODRIVER-3473",
97+
8698
// DRIVERS-2953: This test requires that the driver sends a "getMore"
8799
// with "maxTimeMS" set. However, "getMore" can only include "maxTimeMS"
88100
// for tailable awaitData cursors. Including "maxTimeMS" on "getMore"
@@ -182,14 +194,14 @@ func runTestFile(t *testing.T, filepath string, expectValidFail bool, opts ...*O
182194
mt.Skip("Skipping CSOT spec test because SKIP_CSOT_TESTS=true")
183195
}
184196

185-
//defer func() {
186-
// // catch panics from looking up elements and fail if it's unexpected
187-
// if r := recover(); r != nil {
188-
// if !expectValidFail {
189-
// mt.Fatal(r)
190-
// }
191-
// }
192-
//}()
197+
defer func() {
198+
// catch panics from looking up elements and fail if it's unexpected
199+
if r := recover(); r != nil {
200+
if !expectValidFail {
201+
mt.Fatal(r)
202+
}
203+
}
204+
}()
193205
err := testCase.Run(mt)
194206
if expectValidFail {
195207
if err != nil {

testdata/client-side-operations-timeout/tailable-awaitData.json

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
"client": {
1818
"id": "client",
1919
"uriOptions": {
20-
"heartbeatFrequencyMS": 500,
2120
"timeoutMS": 200
2221
},
2322
"useMultipleMongoses": false,
@@ -113,46 +112,6 @@
113112
}
114113
]
115114
},
116-
{
117-
"description": "some test for us",
118-
"operations": [
119-
{
120-
"name": "wait",
121-
"object": "testRunner",
122-
"arguments": {
123-
"ms": 750
124-
}
125-
},
126-
{
127-
"name": "createFindCursor",
128-
"object": "collection",
129-
"arguments": {
130-
"filter": {},
131-
"cursorType": "tailableAwait",
132-
"batchSize": 1,
133-
"maxAwaitTimeMS": 10
134-
},
135-
"saveResultAsEntity": "tailableCursor"
136-
},
137-
{
138-
"name": "iterateOnce",
139-
"object": "tailableCursor",
140-
"arguments": {
141-
"timeoutMS": 11
142-
}
143-
},
144-
{
145-
"name": "iterateOnce",
146-
"object": "tailableCursor",
147-
"arguments": {
148-
"timeoutMS": 11
149-
},
150-
"expectError": {
151-
"isTimeoutError": false
152-
}
153-
}
154-
]
155-
},
156115
{
157116
"description": "timeoutMS applied to find",
158117
"operations": [
@@ -458,6 +417,60 @@
458417
]
459418
}
460419
]
420+
},
421+
{
422+
"description": "apply remaining timeoutMS if less than maxAwaitTimeMS ",
423+
"operations": [
424+
{
425+
"name": "createFindCursor",
426+
"object": "collection",
427+
"arguments": {
428+
"filter": {},
429+
"cursorType": "tailableAwait",
430+
"batchSize": 1,
431+
"maxAwaitTimeMS": 100
432+
},
433+
"saveResultAsEntity": "tailableCursor"
434+
},
435+
{
436+
"name": "iterateOnce",
437+
"object": "tailableCursor",
438+
"arguments": {
439+
"timeoutMS": 50
440+
}
441+
},
442+
{
443+
"name": "iterateOnce",
444+
"object": "tailableCursor",
445+
"arguments": {
446+
"timeoutMS": 50
447+
}
448+
}
449+
],
450+
"expectEvents": [
451+
{
452+
"client": "client",
453+
"events": [
454+
{
455+
"commandStartedEvent": {
456+
"commandName": "find",
457+
"databaseName": "test"
458+
}
459+
},
460+
{
461+
"commandStartedEvent": {
462+
"commandName": "getMore",
463+
"databaseName": "test",
464+
"command": {
465+
"maxTimeMS": {
466+
"$$lte": 50
467+
}
468+
}
469+
}
470+
}
471+
]
472+
}
473+
]
461474
}
462475
]
463476
}

testdata/client-side-operations-timeout/tailable-awaitData.yml

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,6 @@ createEntities:
1212
- client:
1313
id: &client client
1414
uriOptions:
15-
# Use a high heartbeatFrequencyMS to ensure, when needed, the RTT
16-
# monitor has enough samples to produce a minRTT.
17-
heartbeatFrequencyMS: 500
1815
timeoutMS: 200
1916
useMultipleMongoses: false
2017
observeEvents:
@@ -74,36 +71,6 @@ tests:
7471
expectError:
7572
isClientError: true
7673

77-
- description: "some test for us"
78-
operations:
79-
# Wait for enough samples to be added to the RTT Monitor to ensure a
80-
# minimum RTT is available.
81-
- name: wait
82-
object: testRunner
83-
arguments:
84-
ms: 750
85-
- name: createFindCursor
86-
object: *collection
87-
arguments:
88-
filter: {}
89-
cursorType: tailableAwait
90-
batchSize: 1
91-
maxAwaitTimeMS: 10
92-
saveResultAsEntity: &tailableCursor tailableCursor
93-
# Iterate twice to force a getMore.
94-
- name: iterateOnce
95-
object: *tailableCursor
96-
arguments:
97-
timeoutMS: 11
98-
# ms: 1
99-
# timeoutMS: 11
100-
- name: iterateOnce
101-
object: *tailableCursor
102-
arguments:
103-
timeoutMS: 11
104-
expectError:
105-
isTimeoutError: false
106-
10774
- description: "timeoutMS applied to find"
10875
operations:
10976
- name: failPoint
@@ -278,3 +245,34 @@ tests:
278245
command:
279246
getMore: { $$type: ["int", "long"] }
280247
collection: *collectionName
248+
249+
- description: "apply remaining timeoutMS if less than maxAwaitTimeMS "
250+
operations:
251+
- name: createFindCursor
252+
object: *collection
253+
arguments:
254+
filter: {}
255+
cursorType: tailableAwait
256+
batchSize: 1
257+
maxAwaitTimeMS: 100
258+
saveResultAsEntity: &tailableCursor tailableCursor
259+
# Iterate twice to force a getMore.
260+
- name: iterateOnce
261+
object: *tailableCursor
262+
arguments:
263+
timeoutMS: 50
264+
- name: iterateOnce
265+
object: *tailableCursor
266+
arguments:
267+
timeoutMS: 50
268+
expectEvents:
269+
- client: *client
270+
events:
271+
- commandStartedEvent:
272+
commandName: find
273+
databaseName: *databaseName
274+
- commandStartedEvent:
275+
commandName: getMore
276+
databaseName: *databaseName
277+
command:
278+
maxTimeMS: { $$lte: 50 }

x/mongo/driver/batch_cursor.go

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -379,31 +379,28 @@ func (bc *BatchCursor) getMore(ctx context.Context) {
379379
return
380380
}
381381

382-
dl, ok := ctx.Deadline()
383-
fmt.Println("bc deadline: ", time.Until(dl), ok)
384-
fmt.Println("bc maxAwaitTime: ", *bc.maxAwaitTime)
385-
if bc.maxAwaitTime != nil {
386-
}
387-
388382
bc.err = Operation{
389383
CommandFn: func(dst []byte, _ description.SelectedServer) ([]byte, error) {
390-
conn, err := bc.Server().Connection(context.Background())
391-
if err != nil {
392-
panic(err)
393-
}
394-
395-
rttMonitor := bc.Server().RTTMonitor()
396-
maxTimeMS, err := driverutil.CalculateMaxTimeMS(ctx, rttMonitor.Min(), rttMonitor.Stats(), ErrDeadlineWouldBeExceeded)
397-
if bc.maxAwaitTime != nil && int64(*bc.maxAwaitTime/time.Millisecond) >= maxTimeMS {
398-
fmt.Println(int64(*bc.maxAwaitTime/time.Millisecond), maxTimeMS)
399-
return nil, ErrDeadlineWouldBeExceeded
400-
}
401-
402-
dl, ok := ctx.Deadline()
403-
fmt.Println("bc deadline: ", time.Until(dl), ok)
404-
fmt.Println("min (bc): ", bc.Server().RTTMonitor().Min(), conn.Describer.ID(), conn.Describer.Address(), maxTimeMS, time.Until(dl), ok, bc.maxAwaitTime)
405-
if err != nil {
406-
return nil, err
384+
// If maxAwaitTime > remaining timeoutMS - minRoundTripTime, then use
385+
// send remaining TimeoutMS - minRoundTripTime allowing the server an
386+
// opportunity to respond with an empty batch.
387+
var maxTimeMS int64
388+
if bc.maxAwaitTime != nil {
389+
_, ctxDeadlineSet := ctx.Deadline()
390+
391+
if ctxDeadlineSet {
392+
rttMonitor := bc.Server().RTTMonitor()
393+
394+
var err error
395+
maxTimeMS, err = driverutil.CalculateMaxTimeMS(ctx, rttMonitor.Min(), rttMonitor.Stats(), ErrDeadlineWouldBeExceeded)
396+
if err != nil {
397+
return nil, err
398+
}
399+
}
400+
401+
if !ctxDeadlineSet || bc.maxAwaitTime.Milliseconds() < maxTimeMS {
402+
maxTimeMS = bc.maxAwaitTime.Milliseconds()
403+
}
407404
}
408405

409406
dst = bsoncore.AppendInt64Element(dst, "getMore", bc.id)
@@ -412,10 +409,8 @@ func (bc *BatchCursor) getMore(ctx context.Context) {
412409
dst = bsoncore.AppendInt32Element(dst, "batchSize", numToReturn)
413410
}
414411

415-
if bc.maxAwaitTime != nil && *bc.maxAwaitTime > 0 {
412+
if maxTimeMS > 0 {
416413
dst = bsoncore.AppendInt64Element(dst, "maxTimeMS", maxTimeMS)
417-
418-
//dst = bsoncore.AppendInt64Element(dst, "maxTimeMS", int64(*bc.maxAwaitTime)/int64(time.Millisecond))
419414
}
420415

421416
comment, err := codecutil.MarshalValue(bc.comment, bc.encoderFn)

x/mongo/driver/topology/connection.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,8 +405,6 @@ func (c *connection) readWireMessage(ctx context.Context) ([]byte, error) {
405405
return nil, ConnectionError{ConnectionID: c.id, Wrapped: err, message: "failed to set read deadline"}
406406
}
407407

408-
fmt.Println("contextDeadlineUsed", contextDeadlineUsed, time.Until(deadline))
409-
410408
dst, errMsg, err := c.read(ctx)
411409
if err != nil {
412410
if c.awaitRemainingBytes == nil {

0 commit comments

Comments
 (0)