diff --git a/src/Exception/KafkaProducerTransactionAbortException.php b/src/Exception/KafkaProducerTransactionAbortException.php index 862cb86..8cd9804 100644 --- a/src/Exception/KafkaProducerTransactionAbortException.php +++ b/src/Exception/KafkaProducerTransactionAbortException.php @@ -7,5 +7,5 @@ class KafkaProducerTransactionAbortException extends \Exception { public const TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE = - 'Produce failed. You need to abort your current transaction and start a new one'; + 'Produce failed. You need to abort your current transaction and start a new one (%s)'; } diff --git a/src/Exception/KafkaProducerTransactionFatalException.php b/src/Exception/KafkaProducerTransactionFatalException.php index eb99713..6c48694 100644 --- a/src/Exception/KafkaProducerTransactionFatalException.php +++ b/src/Exception/KafkaProducerTransactionFatalException.php @@ -7,5 +7,5 @@ class KafkaProducerTransactionFatalException extends \Exception { public const FATAL_TRANSACTION_EXCEPTION_MESSAGE = - 'Produce failed with a fatal error. This producer instance cannot be used anymore.'; + 'Produce failed with a fatal error. This producer instance cannot be used anymore (%s)'; } diff --git a/src/Exception/KafkaProducerTransactionRetryException.php b/src/Exception/KafkaProducerTransactionRetryException.php index 935914f..cc32fda 100644 --- a/src/Exception/KafkaProducerTransactionRetryException.php +++ b/src/Exception/KafkaProducerTransactionRetryException.php @@ -6,5 +6,5 @@ class KafkaProducerTransactionRetryException extends \Exception { - public const RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE = 'Produce failed but can be retried'; + public const RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE = 'Produce failed but can be retried (%s)'; } diff --git a/src/Producer/KafkaProducer.php b/src/Producer/KafkaProducer.php index 7638a69..93585eb 100644 --- a/src/Producer/KafkaProducer.php +++ b/src/Producer/KafkaProducer.php @@ -255,18 +255,33 @@ private function handleTransactionError(RdKafkaErrorException $e): void { if (true === $e->isRetriable()) { throw new KafkaProducerTransactionRetryException( - KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE + sprintf( + KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, + $e->getMessage() + ), + $e->getCode(), + $e ); } elseif (true === $e->transactionRequiresAbort()) { throw new KafkaProducerTransactionAbortException( - KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE + sprintf( + KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE, + $e->getMessage() + ), + $e->getCode(), + $e ); } else { $this->transactionInitialized = false; // according to librdkafka documentation, everything that is not retriable, abortable or fatal is fatal // fatal errors (so stated), need the producer to be destroyed throw new KafkaProducerTransactionFatalException( - KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE + sprintf( + KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE, + $e->getMessage() + ), + $e->getCode(), + $e ); } } diff --git a/tests/Unit/Producer/KafkaProducerTest.php b/tests/Unit/Producer/KafkaProducerTest.php index 8944c01..623d4d4 100644 --- a/tests/Unit/Producer/KafkaProducerTest.php +++ b/tests/Unit/Producer/KafkaProducerTest.php @@ -372,6 +372,9 @@ public function testBeginTransactionConsecutiveSuccess(): void public function testBeginTransactionWithRetriableError(): void { self::expectException(KafkaProducerTransactionRetryException::class); + self::expectExceptionMessage( + sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, '') + ); $errorMock = $this->createMock(RdKafkaErrorException::class); $errorMock->expects(self::once())->method('isRetriable')->willReturn(true); @@ -393,6 +396,9 @@ public function testBeginTransactionWithRetriableError(): void public function testBeginTransactionWithAbortError(): void { self::expectException(KafkaProducerTransactionAbortException::class); + self::expectExceptionMessage( + sprintf(KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE, '') + ); $errorMock = $this->createMock(RdKafkaErrorException::class); $errorMock->expects(self::once())->method('isRetriable')->willReturn(false); @@ -415,6 +421,9 @@ public function testBeginTransactionWithAbortError(): void public function testBeginTransactionWithFatalError(): void { self::expectException(KafkaProducerTransactionFatalException::class); + self::expectExceptionMessage( + sprintf(KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE, '') + ); $errorMock = $this->createMock(RdKafkaErrorException::class); $errorMock->expects(self::once())->method('isRetriable')->willReturn(false); @@ -439,6 +448,9 @@ public function testBeginTransactionWithFatalErrorWillTriggerInit(): void $firstExceptionCaught = false; self::expectException(KafkaProducerTransactionFatalException::class); + self::expectExceptionMessage( + sprintf(KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE, '') + ); $errorMock = $this->createMock(RdKafkaErrorException::class); $errorMock->expects(self::exactly(2))->method('isRetriable')->willReturn(false); @@ -481,7 +493,9 @@ public function testAbortTransactionSuccess(): void public function testAbortTransactionFailure(): void { self::expectException(KafkaProducerTransactionRetryException::class); - self::expectExceptionMessage(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE); + self::expectExceptionMessage( + sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, 'test') + ); $exception = new RdKafkaErrorException('test', 1, 'some failure', false, true, false); @@ -513,7 +527,9 @@ public function testCommitTransactionSuccess(): void public function testCommitTransactionFailure(): void { self::expectException(KafkaProducerTransactionRetryException::class); - self::expectExceptionMessage(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE); + self::expectExceptionMessage( + sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, 'test') + ); $exception = new RdKafkaErrorException('test', 1, 'some failure', false, true, false); @@ -525,4 +541,24 @@ public function testCommitTransactionFailure(): void $this->kafkaProducer->commitTransaction(10000); } + + /** + * @return void + */ + public function testCommitTransactionFailurePreviousException(): void + { + $exception = new RdKafkaErrorException('test', 1, 'some failure', false, true, false); + + $this->rdKafkaProducerMock + ->expects(self::once()) + ->method('commitTransaction') + ->with(10000) + ->willThrowException($exception); + + try { + $this->kafkaProducer->commitTransaction(10000); + } catch (KafkaProducerTransactionRetryException $e) { + self::assertSame($exception, $e->getPrevious()); + } + } }