Skip to content

Commit 8d94d8d

Browse files
PHPLIB-846: Improved change stream event visibility for C2C Replication (#949)
Spec tests synced with mongodb/specifications@8da1a89 Co-authored-by: Jeremy Mikola <[email protected]>
1 parent b566bca commit 8d94d8d

8 files changed

+572
-6
lines changed

Diff for: docs/includes/apiargs-MongoDBClient-method-watch-option.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ source:
4949
file: apiargs-common-option.yaml
5050
ref: session
5151
---
52+
source:
53+
file: apiargs-method-watch-option.yaml
54+
ref: showExpandedEvents
55+
post: |
56+
.. versionadded:: 1.13
57+
---
5258
source:
5359
file: apiargs-method-watch-option.yaml
5460
ref: startAfter

Diff for: docs/includes/apiargs-MongoDBCollection-method-watch-option.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ source:
4949
file: apiargs-common-option.yaml
5050
ref: session
5151
---
52+
source:
53+
file: apiargs-method-watch-option.yaml
54+
ref: showExpandedEvents
55+
post: |
56+
.. versionadded:: 1.13
57+
---
5258
source:
5359
file: apiargs-method-watch-option.yaml
5460
ref: startAfter

Diff for: docs/includes/apiargs-MongoDBDatabase-method-watch-option.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ source:
4949
file: apiargs-common-option.yaml
5050
ref: session
5151
---
52+
source:
53+
file: apiargs-method-watch-option.yaml
54+
ref: showExpandedEvents
55+
post: |
56+
.. versionadded:: 1.13
57+
---
5258
source:
5359
file: apiargs-method-watch-option.yaml
5460
ref: startAfter

Diff for: docs/includes/apiargs-method-watch-option.yaml

+25
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,31 @@ operation: ~
9797
optional: true
9898
---
9999
arg_name: option
100+
name: showExpandedEvents
101+
type: boolean
102+
description: |
103+
If true, instructs the server to include additional DDL events in the change
104+
stream. The additional events that may be included are:
105+
106+
- ``createIndexes``
107+
- ``dropIndexes``
108+
- ``modify``
109+
- ``create``
110+
- ``shardCollection``
111+
- ``reshardCollection`` (server 6.1+)
112+
- ``refineCollectionShardKey`` (server 6.1+)
113+
114+
This is not supported for server versions prior to 6.0 and will result in an
115+
exception at execution time if used.
116+
117+
.. note::
118+
119+
This is an option of the ``$changeStream`` pipeline stage.
120+
interface: phpmethod
121+
operation: ~
122+
optional: true
123+
---
124+
arg_name: option
100125
name: startAfter
101126
type: array|object
102127
description: |

Diff for: src/Operation/Watch.php

+11-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
use function array_unshift;
3939
use function count;
4040
use function is_array;
41+
use function is_bool;
4142
use function is_object;
4243
use function is_string;
4344
use function MongoDB\Driver\Monitoring\addSubscriber;
@@ -161,6 +162,11 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
161162
*
162163
* * session (MongoDB\Driver\Session): Client session.
163164
*
165+
* * showExpandedEvents (boolean): Enables the server to send the expanded
166+
* list of change stream events.
167+
*
168+
* This option is not supported for server versions < 6.0.
169+
*
164170
* * startAfter (document): Specifies the logical starting point for the
165171
* new change stream. Unlike "resumeAfter", this option can be used with
166172
* a resume token from an "invalidate" event.
@@ -229,6 +235,10 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
229235
throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class);
230236
}
231237

238+
if (isset($options['showExpandedEvents']) && ! is_bool($options['showExpandedEvents'])) {
239+
throw InvalidArgumentException::invalidType('"showExpandedEvents" option', $options['showExpandedEvents'], 'bool');
240+
}
241+
232242
/* In the absence of an explicit session, create one to ensure that the
233243
* initial aggregation and any resume attempts can use the same session
234244
* ("implicit from the user's perspective" per PHPLIB-342). Since this
@@ -244,7 +254,7 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
244254
}
245255

246256
$this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'comment' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
247-
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'fullDocumentBeforeChange' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);
257+
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'fullDocumentBeforeChange' => 1, 'resumeAfter' => 1, 'showExpandedEvents' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);
248258

249259
// Null database name implies a cluster-wide change stream
250260
if ($databaseName === null) {

Diff for: tests/UnifiedSpecTests/Operation.php

+5-1
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,11 @@ function (IndexInfo $info) {
491491
assertArrayHasKey('to', $args);
492492
assertIsString($args['to']);
493493

494-
return $collection->rename($args['to']);
494+
return $collection->rename(
495+
$args['to'],
496+
null, /* $toDatabaseName */
497+
array_diff_key($args, ['to' => 1])
498+
);
495499

496500
default:
497501
Assert::fail('Unsupported collection operation: ' . $this->name);

Diff for: tests/UnifiedSpecTests/Util.php

+4-4
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,13 @@ final class Util
5555
'loop' => ['operations', 'storeErrorsAsEntity', 'storeFailuresAsEntity', 'storeSuccessesAsEntity', 'storeIterationsAsEntity'],
5656
],
5757
Client::class => [
58-
'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS'],
58+
'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS', 'showExpandedEvents'],
5959
'listDatabaseNames' => ['authorizedDatabases', 'filter', 'maxTimeMS', 'session'],
6060
'listDatabases' => ['authorizedDatabases', 'filter', 'maxTimeMS', 'session'],
6161
],
6262
Database::class => [
6363
'aggregate' => ['pipeline', 'session', 'useCursor', 'allowDiskUse', 'batchSize', 'bypassDocumentValidation', 'collation', 'comment', 'explain', 'hint', 'let', 'maxAwaitTimeMS', 'maxTimeMS'],
64-
'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS'],
64+
'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS', 'showExpandedEvents'],
6565
'createCollection' => ['collection', 'session', 'autoIndexId', 'capped', 'changeStreamPreAndPostImages', 'clusteredIndex', 'collation', 'expireAfterSeconds', 'flags', 'indexOptionDefaults', 'max', 'maxTimeMS', 'pipeline', 'size', 'storageEngine', 'timeseries', 'validationAction', 'validationLevel', 'validator', 'viewOn'],
6666
'dropCollection' => ['collection', 'session'],
6767
'listCollectionNames' => ['authorizedCollections', 'filter', 'maxTimeMS', 'session'],
@@ -73,7 +73,7 @@ final class Util
7373
Collection::class => [
7474
'aggregate' => ['pipeline', 'session', 'useCursor', 'allowDiskUse', 'batchSize', 'bypassDocumentValidation', 'collation', 'comment', 'explain', 'hint', 'let', 'maxAwaitTimeMS', 'maxTimeMS'],
7575
'bulkWrite' => ['let', 'requests', 'session', 'ordered', 'bypassDocumentValidation', 'comment'],
76-
'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'fullDocumentBeforeChange', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS', 'comment'],
76+
'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'fullDocumentBeforeChange', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS', 'comment', 'showExpandedEvents'],
7777
'createFindCursor' => ['filter', 'session', 'allowDiskUse', 'allowPartialResults', 'batchSize', 'collation', 'comment', 'cursorType', 'hint', 'limit', 'max', 'maxAwaitTimeMS', 'maxScan', 'maxTimeMS', 'min', 'modifiers', 'noCursorTimeout', 'oplogReplay', 'projection', 'returnKey', 'showRecordId', 'skip', 'snapshot', 'sort'],
7878
'createIndex' => ['keys', 'commitQuorum', 'maxTimeMS', 'name', 'session', 'comment'],
7979
'dropIndex' => ['name', 'session', 'maxTimeMS', 'comment'],
@@ -88,7 +88,7 @@ final class Util
8888
'find' => ['let', 'filter', 'session', 'allowDiskUse', 'allowPartialResults', 'batchSize', 'collation', 'comment', 'cursorType', 'hint', 'limit', 'max', 'maxAwaitTimeMS', 'maxScan', 'maxTimeMS', 'min', 'modifiers', 'noCursorTimeout', 'oplogReplay', 'projection', 'returnKey', 'showRecordId', 'skip', 'snapshot', 'sort'],
8989
'findOne' => ['let', 'filter', 'session', 'allowDiskUse', 'allowPartialResults', 'batchSize', 'collation', 'comment', 'cursorType', 'hint', 'max', 'maxAwaitTimeMS', 'maxScan', 'maxTimeMS', 'min', 'modifiers', 'noCursorTimeout', 'oplogReplay', 'projection', 'returnKey', 'showRecordId', 'skip', 'snapshot', 'sort'],
9090
'findOneAndReplace' => ['let', 'returnDocument', 'filter', 'replacement', 'session', 'projection', 'returnDocument', 'upsert', 'arrayFilters', 'bypassDocumentValidation', 'collation', 'hint', 'maxTimeMS', 'new', 'remove', 'sort', 'comment'],
91-
'rename' => ['to', 'comment'],
91+
'rename' => ['to', 'comment', 'dropTarget'],
9292
'replaceOne' => ['let', 'filter', 'replacement', 'session', 'upsert', 'arrayFilters', 'bypassDocumentValidation', 'collation', 'hint', 'comment'],
9393
'findOneAndUpdate' => ['let', 'returnDocument', 'filter', 'update', 'session', 'upsert', 'projection', 'remove', 'arrayFilters', 'bypassDocumentValidation', 'collation', 'hint', 'maxTimeMS', 'sort', 'comment'],
9494
'updateMany' => ['let', 'filter', 'update', 'session', 'upsert', 'arrayFilters', 'bypassDocumentValidation', 'collation', 'hint', 'comment'],

0 commit comments

Comments
 (0)