diff --git a/.evergreen/config/generated/build/build-extension.yml b/.evergreen/config/generated/build/build-extension.yml index 993396ec7..294711013 100644 --- a/.evergreen/config/generated/build/build-extension.yml +++ b/.evergreen/config/generated/build/build-extension.yml @@ -9,7 +9,10 @@ tasks: - func: "compile extension" # TODO: remove once 2.0.0 is released vars: - EXTENSION_BRANCH: "v2.x" + # TODO: replace with "v2.x" once mongodb/mongo-php-driver#1790 is merged + # EXTENSION_BRANCH: "v2.x" + EXTENSION_REPO: "https://github.com/jmikola/mongo-php-driver.git" + EXTENSION_BRANCH: "2.x-bulkwrite" - func: "upload extension" # TODO: re-enable once 2.0.0 is released # - name: "build-php-8.4-lowest" @@ -51,7 +54,10 @@ tasks: - func: "compile extension" # TODO: remove once 2.0.0 is released vars: - EXTENSION_BRANCH: "v2.x" + # TODO: replace with "v2.x" once mongodb/mongo-php-driver#1790 is merged + # EXTENSION_BRANCH: "v2.x" + EXTENSION_REPO: "https://github.com/jmikola/mongo-php-driver.git" + EXTENSION_BRANCH: "2.x-bulkwrite" - func: "upload extension" # TODO: re-enable once 2.0.0 is released # - name: "build-php-8.3-lowest" @@ -93,7 +99,10 @@ tasks: - func: "compile extension" # TODO: remove once 2.0.0 is released vars: - EXTENSION_BRANCH: "v2.x" + # TODO: replace with "v2.x" once mongodb/mongo-php-driver#1790 is merged + # EXTENSION_BRANCH: "v2.x" + EXTENSION_REPO: "https://github.com/jmikola/mongo-php-driver.git" + EXTENSION_BRANCH: "2.x-bulkwrite" - func: "upload extension" # TODO: re-enable once 2.0.0 is released # - name: "build-php-8.2-lowest" @@ -135,7 +144,10 @@ tasks: - func: "compile extension" # TODO: remove once 2.0.0 is released vars: - EXTENSION_BRANCH: "v2.x" + # TODO: replace with "v2.x" once mongodb/mongo-php-driver#1790 is merged + # EXTENSION_BRANCH: "v2.x" + EXTENSION_REPO: "https://github.com/jmikola/mongo-php-driver.git" + EXTENSION_BRANCH: "2.x-bulkwrite" - func: "upload extension" # TODO: re-enable once 2.0.0 is released # - name: "build-php-8.1-lowest" diff --git a/.evergreen/config/templates/build/build-extension.yml b/.evergreen/config/templates/build/build-extension.yml index 1599967ab..ed6a0984b 100644 --- a/.evergreen/config/templates/build/build-extension.yml +++ b/.evergreen/config/templates/build/build-extension.yml @@ -7,7 +7,10 @@ - func: "compile extension" # TODO: remove once 2.0.0 is released vars: - EXTENSION_BRANCH: "v2.x" + # TODO: replace with "v2.x" once mongodb/mongo-php-driver#1790 is merged + # EXTENSION_BRANCH: "v2.x" + EXTENSION_REPO: "https://github.com/jmikola/mongo-php-driver.git" + EXTENSION_BRANCH: "2.x-bulkwrite" - func: "upload extension" # TODO: re-enable once 2.0.0 is released # - name: "build-php-%phpVersion%-lowest" diff --git a/.github/workflows/coding-standards.yml b/.github/workflows/coding-standards.yml index b03f7779f..ae33be3ad 100644 --- a/.github/workflows/coding-standards.yml +++ b/.github/workflows/coding-standards.yml @@ -15,7 +15,9 @@ env: PHP_VERSION: "8.2" # TODO: change to "stable" once 2.0.0 is released # DRIVER_VERSION: "stable" - DRIVER_VERSION: "mongodb/mongo-php-driver@v2.x" + # TODO: change to "mongodb/mongo-php-driver@v2.x" once mongodb/mongo-php-driver#1790 is merged + # DRIVER_VERSION: "mongodb/mongo-php-driver@v2.x" + DRIVER_VERSION: "jmikola/mongo-php-driver@2.x-bulkwrite" jobs: phpcs: diff --git a/.github/workflows/generator.yml b/.github/workflows/generator.yml index 711befabd..2e13f3245 100644 --- a/.github/workflows/generator.yml +++ b/.github/workflows/generator.yml @@ -15,7 +15,9 @@ env: PHP_VERSION: "8.2" # TODO: change to "stable" once 2.0.0 is released # DRIVER_VERSION: "stable" - DRIVER_VERSION: "mongodb/mongo-php-driver@v2.x" + # TODO: change to "mongodb/mongo-php-driver@v2.x" once mongodb/mongo-php-driver#1790 is merged + # DRIVER_VERSION: "mongodb/mongo-php-driver@v2.x" + DRIVER_VERSION: "jmikola/mongo-php-driver@2.x-bulkwrite" jobs: psalm: diff --git a/.github/workflows/static-analysis.yml b/.github/workflows/static-analysis.yml index 82919098f..d3b9466a6 100644 --- a/.github/workflows/static-analysis.yml +++ b/.github/workflows/static-analysis.yml @@ -21,7 +21,9 @@ env: PHP_VERSION: "8.2" # TODO: change to "stable" once 2.0.0 is released # DRIVER_VERSION: "stable" - DRIVER_VERSION: "mongodb/mongo-php-driver@v2.x" + # TODO: change to "mongodb/mongo-php-driver@v2.x" once mongodb/mongo-php-driver#1790 is merged + # DRIVER_VERSION: "mongodb/mongo-php-driver@v2.x" + DRIVER_VERSION: "jmikola/mongo-php-driver@2.x-bulkwrite" jobs: psalm: diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b7738ca3c..8c010e2db 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -14,7 +14,9 @@ on: env: # TODO: change to "stable" once 2.0.0 is released # DRIVER_VERSION: "stable" - DRIVER_VERSION: "mongodb/mongo-php-driver@v2.x" + # TODO: change to "mongodb/mongo-php-driver@v2.x" once mongodb/mongo-php-driver#1790 is merged + # DRIVER_VERSION: "mongodb/mongo-php-driver@v2.x" + DRIVER_VERSION: "jmikola/mongo-php-driver@2.x-bulkwrite" jobs: phpunit: diff --git a/psalm-baseline.xml b/psalm-baseline.xml index 005534e0a..a6dd2d5bf 100644 --- a/psalm-baseline.xml +++ b/psalm-baseline.xml @@ -235,6 +235,12 @@ + + + + + + @@ -545,6 +551,14 @@ + + + + + + executeBulkWriteCommand($this->bulkWriteCommand, $options)]]> + + diff --git a/psalm.xml.dist b/psalm.xml.dist index 57b29cd71..28efbec86 100644 --- a/psalm.xml.dist +++ b/psalm.xml.dist @@ -20,6 +20,9 @@ + + + diff --git a/src/Client.php b/src/Client.php index a6b96462b..dce14446e 100644 --- a/src/Client.php +++ b/src/Client.php @@ -24,6 +24,8 @@ use MongoDB\Builder\BuilderEncoder; use MongoDB\Builder\Pipeline; use MongoDB\Codec\Encoder; +use MongoDB\Driver\BulkWriteCommand; +use MongoDB\Driver\BulkWriteCommandResult; use MongoDB\Driver\ClientEncryption; use MongoDB\Driver\Exception\InvalidArgumentException as DriverInvalidArgumentException; use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException; @@ -39,6 +41,7 @@ use MongoDB\Model\BSONArray; use MongoDB\Model\BSONDocument; use MongoDB\Model\DatabaseInfo; +use MongoDB\Operation\ClientBulkWriteCommand; use MongoDB\Operation\DropDatabase; use MongoDB\Operation\ListDatabaseNames; use MongoDB\Operation\ListDatabases; @@ -189,6 +192,32 @@ final public function addSubscriber(Subscriber $subscriber): void $this->manager->addSubscriber($subscriber); } + /** + * Executes multiple write operations across multiple namespaces. + * + * @param BulkWriteCommand|ClientBulkWrite $bulk Assembled bulk write command or builder + * @param array $options Additional options + * @throws UnsupportedException if options are unsupported on the selected server + * @throws InvalidArgumentException for parameter/option parsing errors + * @throws DriverRuntimeException for other driver errors (e.g. connection errors) + * @see ClientBulkWriteCommand::__construct() for supported options + */ + public function bulkWrite(BulkWriteCommand|ClientBulkWrite $bulk, array $options = []): BulkWriteCommandResult + { + if (! isset($options['writeConcern']) && ! is_in_transaction($options)) { + $options['writeConcern'] = $this->writeConcern; + } + + if ($bulk instanceof ClientBulkWrite) { + $bulk = $bulk->bulkWriteCommand; + } + + $operation = new ClientBulkWriteCommand($bulk, $options); + $server = select_server_for_write($this->manager, $options); + + return $operation->execute($server); + } + /** * Returns a ClientEncryption instance for explicit encryption and decryption * diff --git a/src/ClientBulkWrite.php b/src/ClientBulkWrite.php new file mode 100644 index 000000000..02c6c16f8 --- /dev/null +++ b/src/ClientBulkWrite.php @@ -0,0 +1,246 @@ + */ + private Encoder $builderEncoder, + private ?DocumentCodec $codec, + ) { + } + + public static function createWithCollection(Collection $collection, array $options = []): self + { + $options += ['ordered' => true]; + + if (isset($options['bypassDocumentValidation']) && ! is_bool($options['bypassDocumentValidation'])) { + throw InvalidArgumentException::invalidType('"bypassDocumentValidation" option', $options['bypassDocumentValidation'], 'boolean'); + } + + if (isset($options['let']) && ! is_document($options['let'])) { + throw InvalidArgumentException::expectedDocumentType('"let" option', $options['let']); + } + + if (! is_bool($options['ordered'])) { + throw InvalidArgumentException::invalidType('"ordered" option', $options['ordered'], 'boolean'); + } + + if (isset($options['verboseResults']) && ! is_bool($options['verboseResults'])) { + throw InvalidArgumentException::invalidType('"verboseResults" option', $options['verboseResults'], 'boolean'); + } + + return new self( + new BulkWriteCommand($options), + $collection->getManager(), + $collection->getNamespace(), + $collection->getBuilderEncoder(), + $collection->getCodec(), + ); + } + + public function deleteMany(array|object $filter, array $options = []): self + { + $filter = $this->builderEncoder->encodeIfSupported($filter); + + if (isset($options['collation']) && ! is_document($options['collation'])) { + throw InvalidArgumentException::expectedDocumentType('"collation" option', $options['collation']); + } + + if (isset($options['hint']) && ! is_string($options['hint']) && ! is_document($options['hint'])) { + throw InvalidArgumentException::expectedDocumentOrStringType('"hint" option', $options['hint']); + } + + $this->bulkWriteCommand->deleteMany($this->namespace, $filter, $options); + + return $this; + } + + public function deleteOne(array|object $filter, array $options = []): self + { + $filter = $this->builderEncoder->encodeIfSupported($filter); + + if (isset($options['collation']) && ! is_document($options['collation'])) { + throw InvalidArgumentException::expectedDocumentType('"collation" option', $options['collation']); + } + + if (isset($options['hint']) && ! is_string($options['hint']) && ! is_document($options['hint'])) { + throw InvalidArgumentException::expectedDocumentOrStringType('"hint" option', $options['hint']); + } + + $this->bulkWriteCommand->deleteOne($this->namespace, $filter, $options); + + return $this; + } + + public function insertOne(array|object $document, mixed &$id = null): self + { + if ($this->codec) { + $document = $this->codec->encode($document); + } + + // Capture the document's _id, which may have been generated, in an optional output variable + /** @var mixed */ + $id = $this->bulkWriteCommand->insertOne($this->namespace, $document); + + return $this; + } + + public function replaceOne(array|object $filter, array|object $replacement, array $options = []): self + { + $filter = $this->builderEncoder->encodeIfSupported($filter); + + if ($this->codec) { + $replacement = $this->codec->encode($replacement); + } + + // Treat empty arrays as replacement documents for BC + if ($replacement === []) { + $replacement = (object) $replacement; + } + + if (is_first_key_operator($replacement)) { + throw new InvalidArgumentException('First key in $replacement is an update operator'); + } + + if (is_pipeline($replacement, true)) { + throw new InvalidArgumentException('$replacement is an update pipeline'); + } + + if (isset($options['collation']) && ! is_document($options['collation'])) { + throw InvalidArgumentException::expectedDocumentType('"collation" option', $options['collation']); + } + + if (isset($options['hint']) && ! is_string($options['hint']) && ! is_document($options['hint'])) { + throw InvalidArgumentException::expectedDocumentOrStringType('"hint" option', $options['hint']); + } + + if (isset($options['sort']) && ! is_document($options['sort'])) { + throw InvalidArgumentException::expectedDocumentType('"sort" option', $options['sort']); + } + + if (isset($options['upsert']) && ! is_bool($options['upsert'])) { + throw InvalidArgumentException::invalidType('"upsert" option', $options['upsert'], 'boolean'); + } + + $this->bulkWriteCommand->replaceOne($this->namespace, $filter, $replacement, $options); + + return $this; + } + + public function updateMany(array|object $filter, array|object $update, array $options = []): self + { + $filter = $this->builderEncoder->encodeIfSupported($filter); + $update = $this->builderEncoder->encodeIfSupported($update); + + if (! is_first_key_operator($update) && ! is_pipeline($update)) { + throw new InvalidArgumentException('Expected update operator(s) or non-empty pipeline for $update'); + } + + if (isset($options['arrayFilters']) && ! is_array($options['arrayFilters'])) { + throw InvalidArgumentException::invalidType('"arrayFilters" option', $options['arrayFilters'], 'array'); + } + + if (isset($options['collation']) && ! is_document($options['collation'])) { + throw InvalidArgumentException::expectedDocumentType('"collation" option', $options['collation']); + } + + if (isset($options['hint']) && ! is_string($options['hint']) && ! is_document($options['hint'])) { + throw InvalidArgumentException::expectedDocumentOrStringType('"hint" option', $options['hint']); + } + + if (isset($options['upsert']) && ! is_bool($options['upsert'])) { + throw InvalidArgumentException::invalidType('"upsert" option', $options['upsert'], 'boolean'); + } + + $this->bulkWriteCommand->updateMany($this->namespace, $filter, $update, $options); + + return $this; + } + + public function updateOne(array|object $filter, array|object $update, array $options = []): self + { + $filter = $this->builderEncoder->encodeIfSupported($filter); + $update = $this->builderEncoder->encodeIfSupported($update); + + if (! is_first_key_operator($update) && ! is_pipeline($update)) { + throw new InvalidArgumentException('Expected update operator(s) or non-empty pipeline for $update'); + } + + if (isset($options['arrayFilters']) && ! is_array($options['arrayFilters'])) { + throw InvalidArgumentException::invalidType('"arrayFilters" option', $options['arrayFilters'], 'array'); + } + + if (isset($options['collation']) && ! is_document($options['collation'])) { + throw InvalidArgumentException::expectedDocumentType('"collation" option', $options['collation']); + } + + if (isset($options['hint']) && ! is_string($options['hint']) && ! is_document($options['hint'])) { + throw InvalidArgumentException::expectedDocumentOrStringType('"hint" option', $options['hint']); + } + + if (isset($options['sort']) && ! is_document($options['sort'])) { + throw InvalidArgumentException::expectedDocumentType('"sort" option', $options['sort']); + } + + if (isset($options['upsert']) && ! is_bool($options['upsert'])) { + throw InvalidArgumentException::invalidType('"upsert" option', $options['upsert'], 'boolean'); + } + + $this->bulkWriteCommand->updateOne($this->namespace, $filter, $update, $options); + + return $this; + } + + public function withCollection(Collection $collection): self + { + /* Prohibit mixing Collections associated with different Manager + * objects. This is not technically necessary, since the Collection is + * only used to derive a namespace and encoding options; however, it + * may prevent a user from inadvertently mixing writes destined for + * different deployments. */ + if ($this->manager !== $collection->getManager()) { + throw new InvalidArgumentException('$collection is associated with a different MongoDB\Driver\Manager'); + } + + return new self( + $this->bulkWriteCommand, + $this->manager, + $collection->getNamespace(), + $collection->getBuilderEncoder(), + $collection->getCodec(), + ); + } +} diff --git a/src/Collection.php b/src/Collection.php index 34d82544a..04a61981c 100644 --- a/src/Collection.php +++ b/src/Collection.php @@ -753,6 +753,17 @@ public function findOneAndUpdate(array|object $filter, array|object $update, arr return $operation->execute(select_server_for_write($this->manager, $options)); } + /** @psalm-return Encoder */ + public function getBuilderEncoder(): Encoder + { + return $this->builderEncoder; + } + + public function getCodec(): ?DocumentCodec + { + return $this->codec; + } + /** * Return the collection name. */ diff --git a/src/Exception/UnsupportedException.php b/src/Exception/UnsupportedException.php index 1a5273715..a2204b886 100644 --- a/src/Exception/UnsupportedException.php +++ b/src/Exception/UnsupportedException.php @@ -47,7 +47,7 @@ public static function hintNotSupported(): self */ public static function readConcernNotSupportedInTransaction(): self { - return new self('The "readConcern" option cannot be specified within a transaction. Instead, specify it when starting the transaction.'); + return new self('Cannot set read concern after starting a transaction. Instead, specify the "readConcern" option when starting the transaction.'); } /** @@ -57,6 +57,6 @@ public static function readConcernNotSupportedInTransaction(): self */ public static function writeConcernNotSupportedInTransaction(): self { - return new self('The "writeConcern" option cannot be specified within a transaction. Instead, specify it when starting the transaction.'); + return new self('Cannot set write concern after starting a transaction. Instead, specify the "writeConcern" option when starting the transaction.'); } } diff --git a/src/Operation/ClientBulkWriteCommand.php b/src/Operation/ClientBulkWriteCommand.php new file mode 100644 index 000000000..9bf9c6d2a --- /dev/null +++ b/src/Operation/ClientBulkWriteCommand.php @@ -0,0 +1,95 @@ +options['session']) && ! $this->options['session'] instanceof Session) { + throw InvalidArgumentException::invalidType('"session" option', $this->options['session'], Session::class); + } + + if (isset($this->options['writeConcern']) && ! $this->options['writeConcern'] instanceof WriteConcern) { + throw InvalidArgumentException::invalidType('"writeConcern" option', $this->options['writeConcern'], WriteConcern::class); + } + + if (isset($this->options['writeConcern']) && $this->options['writeConcern']->isDefault()) { + unset($this->options['writeConcern']); + } + } + + /** + * Execute the operation. + * + * @throws UnsupportedException if write concern is used and unsupported + * @throws DriverRuntimeException for other driver errors (e.g. connection errors) + */ + public function execute(Server $server): BulkWriteCommandResult + { + $inTransaction = isset($this->options['session']) && $this->options['session']->isInTransaction(); + if ($inTransaction && isset($this->options['writeConcern'])) { + throw UnsupportedException::writeConcernNotSupportedInTransaction(); + } + + $options = array_filter($this->options, fn ($value) => isset($value)); + + return $server->executeBulkWriteCommand($this->bulkWriteCommand, $options); + } +} diff --git a/stubs/Driver/BulkWriteCommand.stub.php b/stubs/Driver/BulkWriteCommand.stub.php new file mode 100644 index 000000000..464b3f1ae --- /dev/null +++ b/stubs/Driver/BulkWriteCommand.stub.php @@ -0,0 +1,40 @@ + $this->createMock(Encoder::class)]; + $collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $collectionOptions); + + $this->assertSame($collectionOptions['builderEncoder'], $collection->getBuilderEncoder()); + } + + public function testGetCodec(): void + { + $collectionOptions = ['codec' => $this->createMock(DocumentCodec::class)]; + $collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $collectionOptions); + + $this->assertSame($collectionOptions['codec'], $collection->getCodec()); + } + public function testGetManager(): void { $this->assertSame($this->manager, $this->collection->getManager()); @@ -718,7 +734,7 @@ public function testMethodInTransactionWithWriteConcernOption($method): void $session->startTransaction(); $this->expectException(UnsupportedException::class); - $this->expectExceptionMessage('"writeConcern" option cannot be specified within a transaction'); + $this->expectExceptionMessage('Cannot set write concern after starting a transaction'); try { call_user_func($method, $this->collection, $session, ['writeConcern' => new WriteConcern(1)]); @@ -738,7 +754,7 @@ public function testMethodInTransactionWithReadConcernOption($method): void $session->startTransaction(); $this->expectException(UnsupportedException::class); - $this->expectExceptionMessage('"readConcern" option cannot be specified within a transaction'); + $this->expectExceptionMessage('Cannot set read concern after starting a transaction'); try { call_user_func($method, $this->collection, $session, ['readConcern' => new ReadConcern(ReadConcern::LOCAL)]); diff --git a/tests/PedantryTest.php b/tests/PedantryTest.php index 3d89f259a..4a5cd7096 100644 --- a/tests/PedantryTest.php +++ b/tests/PedantryTest.php @@ -12,6 +12,7 @@ use function array_filter; use function array_map; +use function array_values; use function in_array; use function realpath; use function str_contains; @@ -39,11 +40,12 @@ public function testMethodsAreOrderedAlphabeticallyByVisibility($className): voi $class = new ReflectionClass($className); $methods = $class->getMethods(); - $methods = array_filter( + $methods = array_values(array_filter( $methods, fn (ReflectionMethod $method) => $method->getDeclaringClass() == $class // Exclude inherited methods - && $method->getFileName() === $class->getFileName(), // Exclude methods inherited from traits - ); + && $method->getFileName() === $class->getFileName() // Exclude methods inherited from traits + && ! ($method->isConstructor() && ! $method->isPublic()), // Exclude non-public constructors + )); $getSortValue = function (ReflectionMethod $method) { $prefix = $method->isPrivate() ? '2' : ($method->isProtected() ? '1' : '0'); diff --git a/tests/SpecTests/Crud/Prose11_BulkWriteBatchSplitsWhenNamespaceExceedsMessageSizeTest.php b/tests/SpecTests/Crud/Prose11_BulkWriteBatchSplitsWhenNamespaceExceedsMessageSizeTest.php new file mode 100644 index 000000000..a44986217 --- /dev/null +++ b/tests/SpecTests/Crud/Prose11_BulkWriteBatchSplitsWhenNamespaceExceedsMessageSizeTest.php @@ -0,0 +1,127 @@ +isServerless()) { + $this->markTestSkipped('bulkWrite command is not supported'); + } + + $this->skipIfServerVersion('<', '8.0', 'bulkWrite command is not supported'); + + $this->client = self::createTestClient(); + + $hello = $this->getPrimaryServer()->getInfo(); + self::assertIsInt($maxBsonObjectSize = $hello['maxBsonObjectSize'] ?? null); + self::assertIsInt($maxMessageSizeBytes = $hello['maxMessageSizeBytes'] ?? null); + + $opsBytes = $maxMessageSizeBytes - 1122; + $this->numModels = (int) ($opsBytes / $maxBsonObjectSize); + $remainderBytes = $opsBytes % $maxBsonObjectSize; + + // Use namespaces specific to the test, as they are relevant to batch calculations + $this->dropCollection('db', 'coll'); + $collection = $this->client->selectCollection('db', 'coll'); + + $this->bulkWrite = ClientBulkWrite::createWithCollection($collection); + + for ($i = 0; $i < $this->numModels; ++$i) { + $this->bulkWrite->insertOne(['a' => str_repeat('b', $maxBsonObjectSize - 57)]); + } + + if ($remainderBytes >= 217) { + ++$this->numModels; + $this->bulkWrite->insertOne(['a' => str_repeat('b', $remainderBytes - 57)]); + } + } + + public function testNoBatchSplittingRequired(): void + { + $subscriber = $this->createSubscriber(); + $this->client->addSubscriber($subscriber); + + $this->bulkWrite->insertOne(['a' => 'b']); + + $result = $this->client->bulkWrite($this->bulkWrite); + + self::assertSame($this->numModels + 1, $result->getInsertedCount()); + self::assertCount(1, $subscriber->commandStartedEvents); + $command = $subscriber->commandStartedEvents[0]->getCommand(); + self::assertCount($this->numModels + 1, $command->ops); + self::assertCount(1, $command->nsInfo); + self::assertSame('db.coll', $command->nsInfo[0]->ns ?? null); + } + + public function testBatchSplittingRequired(): void + { + $subscriber = $this->createSubscriber(); + $this->client->addSubscriber($subscriber); + + $secondCollectionName = str_repeat('c', 200); + $this->dropCollection('db', $secondCollectionName); + $secondCollection = $this->client->selectCollection('db', $secondCollectionName); + $this->bulkWrite->withCollection($secondCollection)->insertOne(['a' => 'b']); + + $result = $this->client->bulkWrite($this->bulkWrite); + + self::assertSame($this->numModels + 1, $result->getInsertedCount()); + self::assertCount(2, $subscriber->commandStartedEvents); + [$firstEvent, $secondEvent] = $subscriber->commandStartedEvents; + + $firstCommand = $firstEvent->getCommand(); + self::assertCount($this->numModels, $firstCommand->ops); + self::assertCount(1, $firstCommand->nsInfo); + self::assertSame('db.coll', $firstCommand->nsInfo[0]->ns ?? null); + + $secondCommand = $secondEvent->getCommand(); + self::assertCount(1, $secondCommand->ops); + self::assertCount(1, $secondCommand->nsInfo); + self::assertSame($secondCollection->getNamespace(), $secondCommand->nsInfo[0]->ns ?? null); + } + + private function createSubscriber(): CommandSubscriber + { + return new class implements CommandSubscriber { + public array $commandStartedEvents = []; + + public function commandStarted(CommandStartedEvent $event): void + { + if ($event->getCommandName() === 'bulkWrite') { + $this->commandStartedEvents[] = $event; + } + } + + public function commandSucceeded(CommandSucceededEvent $event): void + { + } + + public function commandFailed(CommandFailedEvent $event): void + { + } + }; + } +} diff --git a/tests/SpecTests/Crud/Prose12_BulkWriteExceedsMaxMessageSizeBytesTest.php b/tests/SpecTests/Crud/Prose12_BulkWriteExceedsMaxMessageSizeBytesTest.php new file mode 100644 index 000000000..854c101d0 --- /dev/null +++ b/tests/SpecTests/Crud/Prose12_BulkWriteExceedsMaxMessageSizeBytesTest.php @@ -0,0 +1,76 @@ +isServerless()) { + $this->markTestSkipped('bulkWrite command is not supported'); + } + + $this->skipIfServerVersion('<', '8.0', 'bulkWrite command is not supported'); + } + + public function testDocumentTooLarge(): void + { + $client = self::createTestClient(); + + $maxMessageSizeBytes = $this->getPrimaryServer()->getInfo()['maxMessageSizeBytes'] ?? null; + self::assertIsInt($maxMessageSizeBytes); + + $collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName()); + $bulkWrite = ClientBulkWrite::createWithCollection($collection); + $bulkWrite->insertOne(['a' => str_repeat('b', $maxMessageSizeBytes)]); + + try { + $client->bulkWrite($bulkWrite); + self::fail('Exception was not thrown'); + } catch (BulkWriteCommandException $e) { + /* Note: although the client-side error occurs on the first operation, libmongoc still populates the partial + * result (see: CDRIVER-5969). This causes PHPC to proxy the underlying InvalidArgumentException behind + * BulkWriteCommandException. Until this is addressed, unwrap the error and check the partial result. */ + self::assertInstanceOf(InvalidArgumentException::class, $e->getPrevious()); + self::assertSame(0, $e->getPartialResult()->getInsertedCount()); + } + } + + public function testNamespaceTooLarge(): void + { + $client = self::createTestClient(); + + $maxMessageSizeBytes = $this->getPrimaryServer()->getInfo()['maxMessageSizeBytes'] ?? null; + self::assertIsInt($maxMessageSizeBytes); + + $collectionName = str_repeat('c', $maxMessageSizeBytes); + $collection = $client->selectCollection($this->getDatabaseName(), $collectionName); + $bulkWrite = ClientBulkWrite::createWithCollection($collection); + $bulkWrite->insertOne(['a' => 'b']); + + try { + $client->bulkWrite($bulkWrite); + self::fail('Exception was not thrown'); + } catch (BulkWriteCommandException $e) { + /* Note: although the client-side error occurs on the first operation, libmongoc still populates the partial + * result (see: CDRIVER-5969). This causes PHPC to proxy the underlying InvalidArgumentException behind + * BulkWriteCommandException. Until this is addressed, unwrap the error and check the partial result. */ + self::assertInstanceOf(InvalidArgumentException::class, $e->getPrevious()); + self::assertSame(0, $e->getPartialResult()->getInsertedCount()); + } + } +} diff --git a/tests/SpecTests/Crud/Prose13_BulkWriteUnsupportedForAutoEncryptionTest.php b/tests/SpecTests/Crud/Prose13_BulkWriteUnsupportedForAutoEncryptionTest.php new file mode 100644 index 000000000..d9b6b70a7 --- /dev/null +++ b/tests/SpecTests/Crud/Prose13_BulkWriteUnsupportedForAutoEncryptionTest.php @@ -0,0 +1,44 @@ +isServerless()) { + $this->markTestSkipped('bulkWrite command is not supported'); + } + + $this->skipIfServerVersion('<', '8.0', 'bulkWrite command is not supported'); + + $this->skipIfClientSideEncryptionIsNotSupported(); + + $client = self::createTestClient(null, [], [ + 'autoEncryption' => [ + 'keyVaultNamespace' => $this->getNamespace(), + 'kmsProviders' => ['aws' => ['accessKeyId' => 'foo', 'secretAccessKey' => 'bar']], + ], + ]); + + $collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName()); + $bulkWrite = ClientBulkWrite::createWithCollection($collection); + $bulkWrite->insertOne(['a' => 'b']); + + try { + $client->bulkWrite($bulkWrite); + self::fail('InvalidArgumentException was not thrown'); + } catch (InvalidArgumentException $e) { + self::assertStringContainsString('bulkWrite does not currently support automatic encryption', $e->getMessage()); + } + } +} diff --git a/tests/SpecTests/Crud/Prose15_BulkWriteUnacknowledgedWriteConcernTest.php b/tests/SpecTests/Crud/Prose15_BulkWriteUnacknowledgedWriteConcernTest.php new file mode 100644 index 000000000..f35433dac --- /dev/null +++ b/tests/SpecTests/Crud/Prose15_BulkWriteUnacknowledgedWriteConcernTest.php @@ -0,0 +1,87 @@ +isServerless()) { + $this->markTestSkipped('bulkWrite command is not supported'); + } + + $this->skipIfServerVersion('<', '8.0', 'bulkWrite command is not supported'); + + $client = self::createTestClient(); + + $hello = $this->getPrimaryServer()->getInfo(); + self::assertIsInt($maxBsonObjectSize = $hello['maxBsonObjectSize'] ?? null); + self::assertIsInt($maxMessageSizeBytes = $hello['maxMessageSizeBytes'] ?? null); + + // Explicitly create the collection to work around SERVER-95537 + $this->createCollection($this->getDatabaseName(), $this->getCollectionName()); + + $collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName()); + $bulkWrite = ClientBulkWrite::createWithCollection($collection, ['ordered' => false]); + + $numModels = (int) ($maxMessageSizeBytes / $maxBsonObjectSize) + 1; + + for ($i = 0; $i < $numModels; ++$i) { + $bulkWrite->insertOne(['a' => str_repeat('b', $maxBsonObjectSize - 500)]); + } + + $subscriber = new class implements CommandSubscriber { + public array $commandStartedEvents = []; + + public function commandStarted(CommandStartedEvent $event): void + { + if ($event->getCommandName() === 'bulkWrite') { + $this->commandStartedEvents[] = $event; + } + } + + public function commandSucceeded(CommandSucceededEvent $event): void + { + } + + public function commandFailed(CommandFailedEvent $event): void + { + } + }; + + $client->addSubscriber($subscriber); + + $result = $client->bulkWrite($bulkWrite, ['writeConcern' => new WriteConcern(0)]); + + self::assertFalse($result->isAcknowledged()); + self::assertCount(2, $subscriber->commandStartedEvents); + [$firstEvent, $secondEvent] = $subscriber->commandStartedEvents; + + $firstCommand = $firstEvent->getCommand(); + self::assertIsArray($firstCommand->ops ?? null); + self::assertCount($numModels - 1, $firstCommand->ops); + self::assertSame(0, $firstCommand->writeConcern->w ?? null); + + $secondCommand = $secondEvent->getCommand(); + self::assertIsArray($secondCommand->ops ?? null); + self::assertCount(1, $secondCommand->ops); + self::assertSame(0, $secondCommand->writeConcern->w ?? null); + + self::assertSame($numModels, $collection->countDocuments()); + } +} diff --git a/tests/SpecTests/Crud/Prose3_BulkWriteSplitsOnMaxWriteBatchSizeTest.php b/tests/SpecTests/Crud/Prose3_BulkWriteSplitsOnMaxWriteBatchSizeTest.php new file mode 100644 index 000000000..94e158423 --- /dev/null +++ b/tests/SpecTests/Crud/Prose3_BulkWriteSplitsOnMaxWriteBatchSizeTest.php @@ -0,0 +1,72 @@ +isServerless()) { + $this->markTestSkipped('bulkWrite command is not supported'); + } + + $this->skipIfServerVersion('<', '8.0', 'bulkWrite command is not supported'); + + $client = self::createTestClient(); + + $maxWriteBatchSize = $this->getPrimaryServer()->getInfo()['maxWriteBatchSize'] ?? null; + self::assertIsInt($maxWriteBatchSize); + + $this->dropCollection($this->getDatabaseName(), $this->getCollectionName()); + $collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName()); + $bulkWrite = ClientBulkWrite::createWithCollection($collection); + + for ($i = 0; $i < $maxWriteBatchSize + 1; ++$i) { + $bulkWrite->insertOne(['a' => 'b']); + } + + $subscriber = new class implements CommandSubscriber { + public array $commandStartedEvents = []; + + public function commandStarted(CommandStartedEvent $event): void + { + if ($event->getCommandName() === 'bulkWrite') { + $this->commandStartedEvents[] = $event; + } + } + + public function commandSucceeded(CommandSucceededEvent $event): void + { + } + + public function commandFailed(CommandFailedEvent $event): void + { + } + }; + + $client->addSubscriber($subscriber); + + $result = $client->bulkWrite($bulkWrite); + + self::assertSame($maxWriteBatchSize + 1, $result->getInsertedCount()); + self::assertCount(2, $subscriber->commandStartedEvents); + [$firstEvent, $secondEvent] = $subscriber->commandStartedEvents; + self::assertIsArray($firstCommandOps = $firstEvent->getCommand()->ops ?? null); + self::assertCount($maxWriteBatchSize, $firstCommandOps); + self::assertIsArray($secondCommandOps = $secondEvent->getCommand()->ops ?? null); + self::assertCount(1, $secondCommandOps); + self::assertEquals($firstEvent->getOperationId(), $secondEvent->getOperationId()); + } +} diff --git a/tests/SpecTests/Crud/Prose4_BulkWriteSplitsOnMaxMessageSizeBytesTest.php b/tests/SpecTests/Crud/Prose4_BulkWriteSplitsOnMaxMessageSizeBytesTest.php new file mode 100644 index 000000000..2b45a5999 --- /dev/null +++ b/tests/SpecTests/Crud/Prose4_BulkWriteSplitsOnMaxMessageSizeBytesTest.php @@ -0,0 +1,78 @@ +isServerless()) { + $this->markTestSkipped('bulkWrite command is not supported'); + } + + $this->skipIfServerVersion('<', '8.0', 'bulkWrite command is not supported'); + + $client = self::createTestClient(); + + $hello = $this->getPrimaryServer()->getInfo(); + self::assertIsInt($maxBsonObjectSize = $hello['maxBsonObjectSize'] ?? null); + self::assertIsInt($maxMessageSizeBytes = $hello['maxMessageSizeBytes'] ?? null); + + $numModels = (int) ($maxMessageSizeBytes / $maxBsonObjectSize + 1); + $document = ['a' => str_repeat('b', $maxBsonObjectSize - 500)]; + + $this->dropCollection($this->getDatabaseName(), $this->getCollectionName()); + $collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName()); + $bulkWrite = ClientBulkWrite::createWithCollection($collection); + + for ($i = 0; $i < $numModels; ++$i) { + $bulkWrite->insertOne($document); + } + + $subscriber = new class implements CommandSubscriber { + public array $commandStartedEvents = []; + + public function commandStarted(CommandStartedEvent $event): void + { + if ($event->getCommandName() === 'bulkWrite') { + $this->commandStartedEvents[] = $event; + } + } + + public function commandSucceeded(CommandSucceededEvent $event): void + { + } + + public function commandFailed(CommandFailedEvent $event): void + { + } + }; + + $client->addSubscriber($subscriber); + + $result = $client->bulkWrite($bulkWrite); + + self::assertSame($numModels, $result->getInsertedCount()); + self::assertCount(2, $subscriber->commandStartedEvents); + [$firstEvent, $secondEvent] = $subscriber->commandStartedEvents; + self::assertIsArray($firstCommandOps = $firstEvent->getCommand()->ops ?? null); + self::assertCount($numModels - 1, $firstCommandOps); + self::assertIsArray($secondCommandOps = $secondEvent->getCommand()->ops ?? null); + self::assertCount(1, $secondCommandOps); + self::assertEquals($firstEvent->getOperationId(), $secondEvent->getOperationId()); + } +} diff --git a/tests/SpecTests/Crud/Prose5_BulkWriteCollectsWriteConcernErrorsAcrossBatchesTest.php b/tests/SpecTests/Crud/Prose5_BulkWriteCollectsWriteConcernErrorsAcrossBatchesTest.php new file mode 100644 index 000000000..727347a95 --- /dev/null +++ b/tests/SpecTests/Crud/Prose5_BulkWriteCollectsWriteConcernErrorsAcrossBatchesTest.php @@ -0,0 +1,85 @@ +isServerless()) { + $this->markTestSkipped('bulkWrite command is not supported'); + } + + $this->skipIfServerVersion('<', '8.0', 'bulkWrite command is not supported'); + + $client = self::createTestClient(null, ['retryWrites' => false]); + + $maxWriteBatchSize = $this->getPrimaryServer()->getInfo()['maxWriteBatchSize'] ?? null; + self::assertIsInt($maxWriteBatchSize); + + $this->configureFailPoint([ + 'configureFailPoint' => 'failCommand', + 'mode' => ['times' => 2], + 'data' => [ + 'failCommands' => ['bulkWrite'], + 'writeConcernError' => [ + 'code' => 91, // ShutdownInProgress + 'errmsg' => 'Replication is being shut down', + ], + ], + ]); + + $this->dropCollection($this->getDatabaseName(), $this->getCollectionName()); + $collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName()); + $bulkWrite = ClientBulkWrite::createWithCollection($collection); + + for ($i = 0; $i < $maxWriteBatchSize + 1; ++$i) { + $bulkWrite->insertOne(['a' => 'b']); + } + + $subscriber = new class implements CommandSubscriber { + public int $numBulkWriteObserved = 0; + + public function commandStarted(CommandStartedEvent $event): void + { + if ($event->getCommandName() === 'bulkWrite') { + ++$this->numBulkWriteObserved; + } + } + + public function commandSucceeded(CommandSucceededEvent $event): void + { + } + + public function commandFailed(CommandFailedEvent $event): void + { + } + }; + + $client->addSubscriber($subscriber); + + try { + $client->bulkWrite($bulkWrite); + self::fail('BulkWriteCommandException was not thrown'); + } catch (BulkWriteCommandException $e) { + self::assertCount(2, $e->getWriteConcernErrors()); + $partialResult = $e->getPartialResult(); + self::assertNotNull($partialResult); + self::assertSame($maxWriteBatchSize + 1, $partialResult->getInsertedCount()); + self::assertSame(2, $subscriber->numBulkWriteObserved); + } + } +} diff --git a/tests/SpecTests/Crud/Prose6_BulkWriteHandlesWriteErrorsAcrossBatchesTest.php b/tests/SpecTests/Crud/Prose6_BulkWriteHandlesWriteErrorsAcrossBatchesTest.php new file mode 100644 index 000000000..975147e4f --- /dev/null +++ b/tests/SpecTests/Crud/Prose6_BulkWriteHandlesWriteErrorsAcrossBatchesTest.php @@ -0,0 +1,110 @@ +isServerless()) { + $this->markTestSkipped('bulkWrite command is not supported'); + } + + $this->skipIfServerVersion('<', '8.0', 'bulkWrite command is not supported'); + } + + public function testOrdered(): void + { + $client = self::createTestClient(); + + $maxWriteBatchSize = $this->getPrimaryServer()->getInfo()['maxWriteBatchSize'] ?? null; + self::assertIsInt($maxWriteBatchSize); + + $this->dropCollection($this->getDatabaseName(), $this->getCollectionName()); + $collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName()); + $collection->insertOne(['_id' => 1]); + + $bulkWrite = ClientBulkWrite::createWithCollection($collection, ['ordered' => true]); + + for ($i = 0; $i < $maxWriteBatchSize + 1; ++$i) { + $bulkWrite->insertOne(['_id' => 1]); + } + + $subscriber = $this->createSubscriber(); + $client->addSubscriber($subscriber); + + try { + $client->bulkWrite($bulkWrite); + self::fail('BulkWriteCommandException was not thrown'); + } catch (BulkWriteCommandException $e) { + self::assertCount(1, $e->getWriteErrors()); + self::assertSame(1, $subscriber->numBulkWriteObserved); + } + } + + public function testUnordered(): void + { + $client = self::createTestClient(); + + $maxWriteBatchSize = $this->getPrimaryServer()->getInfo()['maxWriteBatchSize'] ?? null; + self::assertIsInt($maxWriteBatchSize); + + $this->dropCollection($this->getDatabaseName(), $this->getCollectionName()); + $collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName()); + $collection->insertOne(['_id' => 1]); + + $bulkWrite = ClientBulkWrite::createWithCollection($collection, ['ordered' => false]); + + for ($i = 0; $i < $maxWriteBatchSize + 1; ++$i) { + $bulkWrite->insertOne(['_id' => 1]); + } + + $subscriber = $this->createSubscriber(); + $client->addSubscriber($subscriber); + + try { + $client->bulkWrite($bulkWrite); + self::fail('BulkWriteCommandException was not thrown'); + } catch (BulkWriteCommandException $e) { + self::assertCount($maxWriteBatchSize + 1, $e->getWriteErrors()); + self::assertSame(2, $subscriber->numBulkWriteObserved); + } + } + + private function createSubscriber(): CommandSubscriber + { + return new class implements CommandSubscriber { + public int $numBulkWriteObserved = 0; + + public function commandStarted(CommandStartedEvent $event): void + { + if ($event->getCommandName() === 'bulkWrite') { + ++$this->numBulkWriteObserved; + } + } + + public function commandSucceeded(CommandSucceededEvent $event): void + { + } + + public function commandFailed(CommandFailedEvent $event): void + { + } + }; + } +} diff --git a/tests/SpecTests/Crud/Prose7_BulkWriteHandlesCursorRequiringGetMoreTest.php b/tests/SpecTests/Crud/Prose7_BulkWriteHandlesCursorRequiringGetMoreTest.php new file mode 100644 index 000000000..1171c3998 --- /dev/null +++ b/tests/SpecTests/Crud/Prose7_BulkWriteHandlesCursorRequiringGetMoreTest.php @@ -0,0 +1,76 @@ +isServerless()) { + $this->markTestSkipped('bulkWrite command is not supported'); + } + + $this->skipIfServerVersion('<', '8.0', 'bulkWrite command is not supported'); + + $client = self::createTestClient(); + + $maxBsonObjectSize = $this->getPrimaryServer()->getInfo()['maxBsonObjectSize'] ?? null; + self::assertIsInt($maxBsonObjectSize); + + $this->dropCollection($this->getDatabaseName(), $this->getCollectionName()); + $collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName()); + + $bulkWrite = ClientBulkWrite::createWithCollection($collection, ['verboseResults' => true]); + $bulkWrite->updateOne( + ['_id' => str_repeat('a', (int) ($maxBsonObjectSize / 2))], + ['$set' => ['x' => 1]], + ['upsert' => true], + ); + $bulkWrite->updateOne( + ['_id' => str_repeat('b', (int) ($maxBsonObjectSize / 2))], + ['$set' => ['x' => 1]], + ['upsert' => true], + ); + + $subscriber = new class implements CommandSubscriber { + public int $numGetMoreObserved = 0; + + public function commandStarted(CommandStartedEvent $event): void + { + if ($event->getCommandName() === 'getMore') { + ++$this->numGetMoreObserved; + } + } + + public function commandSucceeded(CommandSucceededEvent $event): void + { + } + + public function commandFailed(CommandFailedEvent $event): void + { + } + }; + + $client->addSubscriber($subscriber); + + $result = $client->bulkWrite($bulkWrite); + + self::assertSame(2, $result->getUpsertedCount()); + self::assertCount(2, $result->getUpdateResults()); + self::assertSame(1, $subscriber->numGetMoreObserved); + } +} diff --git a/tests/SpecTests/Crud/Prose8_BulkWriteHandlesCursorRequiringGetMoreWithinTransactionTest.php b/tests/SpecTests/Crud/Prose8_BulkWriteHandlesCursorRequiringGetMoreWithinTransactionTest.php new file mode 100644 index 000000000..d6b3290ae --- /dev/null +++ b/tests/SpecTests/Crud/Prose8_BulkWriteHandlesCursorRequiringGetMoreWithinTransactionTest.php @@ -0,0 +1,82 @@ +isServerless()) { + $this->markTestSkipped('bulkWrite command is not supported'); + } + + $this->skipIfServerVersion('<', '8.0', 'bulkWrite command is not supported'); + $this->skipIfTransactionsAreNotSupported(); + + $client = self::createTestClient(); + + $maxBsonObjectSize = $this->getPrimaryServer()->getInfo()['maxBsonObjectSize'] ?? null; + self::assertIsInt($maxBsonObjectSize); + + $this->dropCollection($this->getDatabaseName(), $this->getCollectionName()); + $collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName()); + + $bulkWrite = ClientBulkWrite::createWithCollection($collection, ['verboseResults' => true]); + $bulkWrite->updateOne( + ['_id' => str_repeat('a', (int) ($maxBsonObjectSize / 2))], + ['$set' => ['x' => 1]], + ['upsert' => true], + ); + $bulkWrite->updateOne( + ['_id' => str_repeat('b', (int) ($maxBsonObjectSize / 2))], + ['$set' => ['x' => 1]], + ['upsert' => true], + ); + + $subscriber = new class implements CommandSubscriber { + public int $numGetMoreObserved = 0; + + public function commandStarted(CommandStartedEvent $event): void + { + if ($event->getCommandName() === 'getMore') { + ++$this->numGetMoreObserved; + } + } + + public function commandSucceeded(CommandSucceededEvent $event): void + { + } + + public function commandFailed(CommandFailedEvent $event): void + { + } + }; + + $client->addSubscriber($subscriber); + + $session = $client->startSession(); + $session->startTransaction(); + + /* Note: the prose test does not call for committing the transaction. + * The transaction will be aborted when the Session object is freed. */ + $result = $client->bulkWrite($bulkWrite, ['session' => $session]); + + self::assertSame(2, $result->getUpsertedCount()); + self::assertCount(2, $result->getUpdateResults()); + self::assertSame(1, $subscriber->numGetMoreObserved); + } +} diff --git a/tests/SpecTests/Crud/Prose9_BulkWriteHandlesGetMoreErrorTest.php b/tests/SpecTests/Crud/Prose9_BulkWriteHandlesGetMoreErrorTest.php new file mode 100644 index 000000000..d65cf0b05 --- /dev/null +++ b/tests/SpecTests/Crud/Prose9_BulkWriteHandlesGetMoreErrorTest.php @@ -0,0 +1,104 @@ +isServerless()) { + $this->markTestSkipped('bulkWrite command is not supported'); + } + + $this->skipIfServerVersion('<', '8.0', 'bulkWrite command is not supported'); + + $client = self::createTestClient(); + + $maxBsonObjectSize = $this->getPrimaryServer()->getInfo()['maxBsonObjectSize'] ?? null; + self::assertIsInt($maxBsonObjectSize); + + $this->configureFailPoint([ + 'configureFailPoint' => 'failCommand', + 'mode' => ['times' => 1], + 'data' => [ + 'failCommands' => ['getMore'], + 'errorCode' => self::UNKNOWN_ERROR, + ], + ]); + + $this->dropCollection($this->getDatabaseName(), $this->getCollectionName()); + $collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName()); + + $bulkWrite = ClientBulkWrite::createWithCollection($collection, ['verboseResults' => true]); + $bulkWrite->updateOne( + ['_id' => str_repeat('a', (int) ($maxBsonObjectSize / 2))], + ['$set' => ['x' => 1]], + ['upsert' => true], + ); + $bulkWrite->updateOne( + ['_id' => str_repeat('b', (int) ($maxBsonObjectSize / 2))], + ['$set' => ['x' => 1]], + ['upsert' => true], + ); + + $subscriber = new class implements CommandSubscriber { + public int $numGetMoreObserved = 0; + public int $numKillCursorsObserved = 0; + + public function commandStarted(CommandStartedEvent $event): void + { + if ($event->getCommandName() === 'getMore') { + ++$this->numGetMoreObserved; + } elseif ($event->getCommandName() === 'killCursors') { + ++$this->numKillCursorsObserved; + } + } + + public function commandSucceeded(CommandSucceededEvent $event): void + { + } + + public function commandFailed(CommandFailedEvent $event): void + { + } + }; + + $client->addSubscriber($subscriber); + + try { + $client->bulkWrite($bulkWrite); + self::fail('BulkWriteCommandException was not thrown'); + } catch (BulkWriteCommandException $e) { + $errorReply = $e->getErrorReply(); + $this->assertNotNull($errorReply); + $this->assertSame(self::UNKNOWN_ERROR, $errorReply['code'] ?? null); + + // PHPC will also apply the top-level error code to BulkWriteCommandException + $this->assertSame(self::UNKNOWN_ERROR, $e->getCode()); + + $partialResult = $e->getPartialResult(); + self::assertNotNull($partialResult); + self::assertSame(2, $partialResult->getUpsertedCount()); + self::assertCount(1, $partialResult->getUpdateResults()); + self::assertSame(1, $subscriber->numGetMoreObserved); + self::assertSame(1, $subscriber->numKillCursorsObserved); + } + } +} diff --git a/tests/UnifiedSpecTests/Constraint/Matches.php b/tests/UnifiedSpecTests/Constraint/Matches.php index 9a527dcc7..a6bda1420 100644 --- a/tests/UnifiedSpecTests/Constraint/Matches.php +++ b/tests/UnifiedSpecTests/Constraint/Matches.php @@ -5,6 +5,7 @@ use LogicException; use MongoDB\BSON\Document; use MongoDB\BSON\Int64; +use MongoDB\BSON\PackedArray; use MongoDB\BSON\Serializable; use MongoDB\BSON\Type; use MongoDB\Model\BSONArray; @@ -457,8 +458,14 @@ private static function prepare(mixed $bson): mixed return self::prepare($bson->bsonSerialize()); } - /* Serializable has already been handled, so any remaining instances of - * Type will not serialize as BSON arrays or objects */ + // Recurse on the PHP representation of Document and PackedArray types + if ($bson instanceof Document || $bson instanceof PackedArray) { + return self::prepare($bson->toPHP()); + } + + /* Serializable, Document, and PackedArray have already been handled. + * Any remaining Type instances will not serialize as BSON arrays or + * objects. */ if ($bson instanceof Type) { return $bson; } diff --git a/tests/UnifiedSpecTests/ExpectedError.php b/tests/UnifiedSpecTests/ExpectedError.php index c7ace6d7f..092701479 100644 --- a/tests/UnifiedSpecTests/ExpectedError.php +++ b/tests/UnifiedSpecTests/ExpectedError.php @@ -2,6 +2,7 @@ namespace MongoDB\Tests\UnifiedSpecTests; +use MongoDB\Driver\Exception\BulkWriteCommandException; use MongoDB\Driver\Exception\BulkWriteException; use MongoDB\Driver\Exception\CommandException; use MongoDB\Driver\Exception\ExecutionTimeoutException; @@ -12,6 +13,7 @@ use stdClass; use Throwable; +use function count; use function PHPUnit\Framework\assertArrayHasKey; use function PHPUnit\Framework\assertContainsOnly; use function PHPUnit\Framework\assertCount; @@ -56,7 +58,7 @@ final class ExpectedError private ?string $codeName = null; - private ?Matches $matchesResultDocument = null; + private ?Matches $matchesErrorResponse = null; private array $includedLabels = []; @@ -64,6 +66,10 @@ final class ExpectedError private ?ExpectedResult $expectedResult = null; + private ?array $writeErrors = null; + + private ?array $writeConcernErrors = null; + public function __construct(?stdClass $o, EntityMap $entityMap) { if ($o === null) { @@ -102,7 +108,7 @@ public function __construct(?stdClass $o, EntityMap $entityMap) if (isset($o->errorResponse)) { assertIsObject($o->errorResponse); - $this->matchesResultDocument = new Matches($o->errorResponse, $entityMap); + $this->matchesErrorResponse = new Matches($o->errorResponse, $entityMap); } if (isset($o->errorLabelsContain)) { @@ -120,6 +126,24 @@ public function __construct(?stdClass $o, EntityMap $entityMap) if (property_exists($o, 'expectResult')) { $this->expectedResult = new ExpectedResult($o, $entityMap); } + + if (isset($o->writeErrors)) { + assertIsObject($o->writeErrors); + assertContainsOnly('object', (array) $o->writeErrors); + + foreach ($o->writeErrors as $i => $writeError) { + $this->writeErrors[$i] = new Matches($writeError, $entityMap); + } + } + + if (isset($o->writeConcernErrors)) { + assertIsArray($o->writeConcernErrors); + assertContainsOnly('object', $o->writeConcernErrors); + + foreach ($o->writeConcernErrors as $i => $writeConcernError) { + $this->writeConcernErrors[$i] = new Matches($writeConcernError, $entityMap); + } + } } /** @@ -159,15 +183,21 @@ public function assert(?Throwable $e = null): void $this->assertCodeName($e); } - if (isset($this->matchesResultDocument)) { - assertThat($e, logicalOr(isInstanceOf(CommandException::class), isInstanceOf(BulkWriteException::class))); + if (isset($this->matchesErrorResponse)) { + assertThat($e, logicalOr( + isInstanceOf(CommandException::class), + isInstanceOf(BulkWriteException::class), + isInstanceOf(BulkWriteCommandException::class), + )); if ($e instanceof CommandException) { - assertThat($e->getResultDocument(), $this->matchesResultDocument, 'CommandException result document matches'); + assertThat($e->getResultDocument(), $this->matchesErrorResponse, 'CommandException result document matches expected errorResponse'); + } elseif ($e instanceof BulkWriteCommandException) { + assertThat($e->getErrorReply(), $this->matchesErrorResponse, 'BulkWriteCommandException error reply matches expected errorResponse'); } elseif ($e instanceof BulkWriteException) { $writeErrors = $e->getWriteResult()->getErrorReplies(); assertCount(1, $writeErrors); - assertThat($writeErrors[0], $this->matchesResultDocument, 'BulkWriteException result document matches'); + assertThat($writeErrors[0], $this->matchesErrorResponse, 'BulkWriteException first error reply matches expected errorResponse'); } } @@ -184,16 +214,34 @@ public function assert(?Throwable $e = null): void } if (isset($this->expectedResult)) { - assertInstanceOf(BulkWriteException::class, $e); - $this->expectedResult->assert($e->getWriteResult()); + assertThat($e, logicalOr( + isInstanceOf(BulkWriteException::class), + isInstanceOf(BulkWriteCommandException::class), + )); + + if ($e instanceof BulkWriteCommandException) { + $this->expectedResult->assert($e->getPartialResult()); + } elseif ($e instanceof BulkWriteException) { + $this->expectedResult->assert($e->getWriteResult()); + } + } + + if (isset($this->writeErrors)) { + assertInstanceOf(BulkWriteCommandException::class, $e); + $this->assertWriteErrors($e->getWriteErrors()); + } + + if (isset($this->writeConcernErrors)) { + assertInstanceOf(BulkWriteCommandException::class, $e); + $this->assertWriteConcernErrors($e->getWriteConcernErrors()); } } private function assertIsClientError(Throwable $e): void { - /* Note: BulkWriteException may proxy a previous exception. Unwrap it - * to check the original error. */ - if ($e instanceof BulkWriteException && $e->getPrevious() !== null) { + /* Note: BulkWriteException and BulkWriteCommandException may proxy a + * previous exception. Unwrap it to check the original error. */ + if (($e instanceof BulkWriteException || $e instanceof BulkWriteCommandException) && $e->getPrevious() !== null) { $e = $e->getPrevious(); } @@ -230,4 +278,47 @@ private function assertCodeName(ServerException $e): void assertObjectHasProperty('codeName', $result); assertSame($this->codeName, $result->codeName); } + + private function assertWriteErrors(array $writeErrors): void + { + assertCount(count($this->writeErrors), $writeErrors); + + foreach ($this->writeErrors as $i => $matchesWriteError) { + assertArrayHasKey($i, $writeErrors); + $writeError = $writeErrors[$i]; + + // Not required by the spec test, but asserts PHPC correctness + assertSame((int) $i, $writeError->getIndex()); + + /* Convert the WriteError into a document for matching. These + * field names are derived from the CRUD spec. */ + $writeErrorDocument = [ + 'code' => $writeError->getCode(), + 'message' => $writeError->getMessage(), + 'details' => $writeError->getInfo(), + ]; + + assertThat($writeErrorDocument, $matchesWriteError); + } + } + + private function assertWriteConcernErrors(array $writeConcernErrors): void + { + assertCount(count($this->writeConcernErrors), $writeConcernErrors); + + foreach ($this->writeConcernErrors as $i => $matchesWriteConcernError) { + assertArrayHasKey($i, $writeConcernErrors); + $writeConcernError = $writeConcernErrors[$i]; + + /* Convert the WriteConcernError into a document for matching. + * These field names are derived from the CRUD spec. */ + $writeConcernErrorDocument = [ + 'code' => $writeConcernError->getCode(), + 'message' => $writeConcernError->getMessage(), + 'details' => $writeConcernError->getInfo(), + ]; + + assertThat($writeConcernErrorDocument, $matchesWriteConcernError); + } + } } diff --git a/tests/UnifiedSpecTests/ExpectedResult.php b/tests/UnifiedSpecTests/ExpectedResult.php index 5edc6e3ce..29871c289 100644 --- a/tests/UnifiedSpecTests/ExpectedResult.php +++ b/tests/UnifiedSpecTests/ExpectedResult.php @@ -4,6 +4,7 @@ use MongoDB\BulkWriteResult; use MongoDB\DeleteResult; +use MongoDB\Driver\BulkWriteCommandResult; use MongoDB\Driver\WriteResult; use MongoDB\InsertManyResult; use MongoDB\InsertOneResult; @@ -11,6 +12,7 @@ use MongoDB\UpdateResult; use stdClass; +use function array_filter; use function is_object; use function PHPUnit\Framework\assertThat; use function property_exists; @@ -57,6 +59,10 @@ private static function prepare($value) return $value; } + if ($value instanceof BulkWriteCommandResult) { + return self::prepareBulkWriteCommandResult($value); + } + if ( $value instanceof BulkWriteResult || $value instanceof WriteResult || @@ -71,7 +77,37 @@ private static function prepare($value) return $value; } - private static function prepareWriteResult($value) + private static function prepareBulkWriteCommandResult(BulkWriteCommandResult $result): array + { + $retval = ['acknowledged' => $result->isAcknowledged()]; + + if (! $retval['acknowledged']) { + return $retval; + } + + $retval = [ + 'deletedCount' => $result->getDeletedCount(), + 'insertedCount' => $result->getInsertedCount(), + 'matchedCount' => $result->getMatchedCount(), + 'modifiedCount' => $result->getModifiedCount(), + 'upsertedCount' => $result->getUpsertedCount(), + ]; + + /* Tests use $$unsetOrMatches to expect either no key or an empty + * document when verboseResults=false, so filter out null values. */ + $retval += array_filter( + [ + 'deleteResults' => $result->getDeleteResults()?->toPHP(), + 'insertResults' => $result->getInsertResults()?->toPHP(), + 'updateResults' => $result->getUpdateResults()?->toPHP(), + ], + fn ($value) => $value !== null, + ); + + return $retval; + } + + private static function prepareWriteResult($value): array { $result = ['acknowledged' => $value->isAcknowledged()]; diff --git a/tests/UnifiedSpecTests/Operation.php b/tests/UnifiedSpecTests/Operation.php index 96e1703ce..8cb4509dd 100644 --- a/tests/UnifiedSpecTests/Operation.php +++ b/tests/UnifiedSpecTests/Operation.php @@ -7,6 +7,7 @@ use MongoDB\Client; use MongoDB\Collection; use MongoDB\Database; +use MongoDB\Driver\BulkWriteCommand; use MongoDB\Driver\ClientEncryption; use MongoDB\Driver\Cursor; use MongoDB\Driver\Server; @@ -87,10 +88,7 @@ final class Operation 'assertNumberConnectionsCheckedOut' => 'PHP does not implement CMAP', 'createEntities' => 'createEntities is not implemented (PHPC-1760)', ], - Client::class => [ - 'clientBulkWrite' => 'clientBulkWrite is not implemented (PHPLIB-847)', - 'listDatabaseObjects' => 'listDatabaseObjects is not implemented', - ], + Client::class => ['listDatabaseObjects' => 'listDatabaseObjects is not implemented'], Cursor::class => ['iterateOnce' => 'iterateOnce is not implemented (PHPC-1760)'], Database::class => [ 'createCommandCursor' => 'commandCursor API is not yet implemented (PHPLIB-1077)', @@ -257,6 +255,18 @@ private function executeForClient(Client $client) Util::assertArgumentsBySchema(Client::class, $this->name, $args); switch ($this->name) { + case 'clientBulkWrite': + assertArrayHasKey('models', $args); + assertIsArray($args['models']); + + // Options for ClientBulkWriteCommand and Server::executeBulkWriteCommand() will be mixed + $options = array_diff_key($args, ['models' => 1]); + + return $client->bulkWrite( + self::prepareBulkWriteCommand($args['models'], $options), + $options, + ); + case 'createChangeStream': assertArrayHasKey('pipeline', $args); assertIsArray($args['pipeline']); @@ -1001,6 +1011,82 @@ private function skipIfOperationIsNotSupported(string $executingObjectName): voi Assert::markTestSkipped($skipReason); } + private static function prepareBulkWriteCommand(array $models, array $options): BulkWriteCommand + { + $bulk = new BulkWriteCommand($options); + + foreach ($models as $model) { + $model = (array) $model; + assertCount(1, $model); + + $type = key($model); + $args = current($model); + assertIsObject($args); + $args = (array) $args; + + assertArrayHasKey('namespace', $args); + assertIsString($args['namespace']); + + switch ($type) { + case 'deleteMany': + case 'deleteOne': + assertArrayHasKey('filter', $args); + assertInstanceOf(stdClass::class, $args['filter']); + + $bulk->{$type}( + $args['namespace'], + $args['filter'], + array_diff_key($args, ['namespace' => 1, 'filter' => 1]), + ); + break; + + case 'insertOne': + assertArrayHasKey('document', $args); + assertInstanceOf(stdClass::class, $args['document']); + + $bulk->insertOne( + $args['namespace'], + $args['document'], + ); + break; + + case 'replaceOne': + assertArrayHasKey('filter', $args); + assertArrayHasKey('replacement', $args); + assertInstanceOf(stdClass::class, $args['filter']); + assertInstanceOf(stdClass::class, $args['replacement']); + + $bulk->replaceOne( + $args['namespace'], + $args['filter'], + $args['replacement'], + array_diff_key($args, ['namespace' => 1, 'filter' => 1, 'replacement' => 1]), + ); + break; + + case 'updateMany': + case 'updateOne': + assertArrayHasKey('filter', $args); + assertArrayHasKey('update', $args); + assertInstanceOf(stdClass::class, $args['filter']); + assertThat($args['update'], logicalOr(new IsType('array'), new IsType('object'))); + + $bulk->{$type}( + $args['namespace'], + $args['filter'], + $args['update'], + array_diff_key($args, ['namespace' => 1, 'filter' => 1, 'update' => 1]), + ); + break; + + default: + Assert::fail('Unsupported bulk write model: ' . $type); + } + } + + return $bulk; + } + private static function prepareBulkWriteRequest(stdClass $request): array { $request = (array) $request; @@ -1026,6 +1112,7 @@ private static function prepareBulkWriteRequest(stdClass $request): array case 'insertOne': assertArrayHasKey('document', $args); + assertInstanceOf(stdClass::class, $args['document']); return ['insertOne' => [$args['document']]]; diff --git a/tests/UnifiedSpecTests/UnifiedSpecTest.php b/tests/UnifiedSpecTests/UnifiedSpecTest.php index 3552699e9..d057c5420 100644 --- a/tests/UnifiedSpecTests/UnifiedSpecTest.php +++ b/tests/UnifiedSpecTests/UnifiedSpecTest.php @@ -29,6 +29,8 @@ class UnifiedSpecTest extends FunctionalTestCase * @var array */ private static array $incompleteTestGroups = [ + // Spec tests for named KMS providers depends on unimplemented functionality from UTF schema 1.18 + 'client-side-encryption/namedKMS' => 'UTF schema 1.18 is not supported (PHPLIB-1328)', // Many load balancer tests use CMAP events and/or assertNumberConnectionsCheckedOut 'load-balancers/cursors are correctly pinned to connections for load-balanced clusters' => 'PHPC does not implement CMAP', 'load-balancers/transactions are correctly pinned to connections for load-balanced clusters' => 'PHPC does not implement CMAP', diff --git a/tests/UnifiedSpecTests/UnifiedTestRunner.php b/tests/UnifiedSpecTests/UnifiedTestRunner.php index 78e5772a5..6b700f49e 100644 --- a/tests/UnifiedSpecTests/UnifiedTestRunner.php +++ b/tests/UnifiedSpecTests/UnifiedTestRunner.php @@ -63,8 +63,13 @@ final class UnifiedTestRunner * - 1.11: Not implemented, but CMAP is not applicable * - 1.13: Only $$matchAsDocument and $$matchAsRoot is implemented * - 1.14: Not implemented + * - 1.16: Not implemented + * - 1.17: Not implemented + * - 1.18: Not implemented + * - 1.19: Not implemented + * - 1.20: Not implemented */ - public const MAX_SCHEMA_VERSION = '1.15'; + public const MAX_SCHEMA_VERSION = '1.21'; private Client $internalClient; diff --git a/tests/UnifiedSpecTests/Util.php b/tests/UnifiedSpecTests/Util.php index 12cca41df..e134cb109 100644 --- a/tests/UnifiedSpecTests/Util.php +++ b/tests/UnifiedSpecTests/Util.php @@ -58,6 +58,7 @@ final class Util 'loop' => ['operations', 'storeErrorsAsEntity', 'storeFailuresAsEntity', 'storeSuccessesAsEntity', 'storeIterationsAsEntity'], ], Client::class => [ + 'clientBulkWrite' => ['models', 'bypassDocumentValidation', 'comment', 'let', 'ordered', 'session', 'verboseResults', 'writeConcern'], 'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS', 'showExpandedEvents'], 'listDatabaseNames' => ['authorizedDatabases', 'filter', 'maxTimeMS', 'session'], 'listDatabases' => ['authorizedDatabases', 'filter', 'maxTimeMS', 'session'],