Skip to content

Commit b02c90f

Browse files
authored
expose original error and exception (#62)
1 parent cb9fca9 commit b02c90f

5 files changed

+59
-8
lines changed

src/Exception/KafkaProducerTransactionAbortException.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@
77
class KafkaProducerTransactionAbortException extends \Exception
88
{
99
public const TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE =
10-
'Produce failed. You need to abort your current transaction and start a new one';
10+
'Produce failed. You need to abort your current transaction and start a new one (%s)';
1111
}

src/Exception/KafkaProducerTransactionFatalException.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@
77
class KafkaProducerTransactionFatalException extends \Exception
88
{
99
public const FATAL_TRANSACTION_EXCEPTION_MESSAGE =
10-
'Produce failed with a fatal error. This producer instance cannot be used anymore.';
10+
'Produce failed with a fatal error. This producer instance cannot be used anymore (%s)';
1111
}

src/Exception/KafkaProducerTransactionRetryException.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@
66

77
class KafkaProducerTransactionRetryException extends \Exception
88
{
9-
public const RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE = 'Produce failed but can be retried';
9+
public const RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE = 'Produce failed but can be retried (%s)';
1010
}

src/Producer/KafkaProducer.php

+18-3
Original file line numberDiff line numberDiff line change
@@ -255,18 +255,33 @@ private function handleTransactionError(RdKafkaErrorException $e): void
255255
{
256256
if (true === $e->isRetriable()) {
257257
throw new KafkaProducerTransactionRetryException(
258-
KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE
258+
sprintf(
259+
KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE,
260+
$e->getMessage()
261+
),
262+
$e->getCode(),
263+
$e
259264
);
260265
} elseif (true === $e->transactionRequiresAbort()) {
261266
throw new KafkaProducerTransactionAbortException(
262-
KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE
267+
sprintf(
268+
KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE,
269+
$e->getMessage()
270+
),
271+
$e->getCode(),
272+
$e
263273
);
264274
} else {
265275
$this->transactionInitialized = false;
266276
// according to librdkafka documentation, everything that is not retriable, abortable or fatal is fatal
267277
// fatal errors (so stated), need the producer to be destroyed
268278
throw new KafkaProducerTransactionFatalException(
269-
KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE
279+
sprintf(
280+
KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE,
281+
$e->getMessage()
282+
),
283+
$e->getCode(),
284+
$e
270285
);
271286
}
272287
}

tests/Unit/Producer/KafkaProducerTest.php

+38-2
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,9 @@ public function testBeginTransactionConsecutiveSuccess(): void
372372
public function testBeginTransactionWithRetriableError(): void
373373
{
374374
self::expectException(KafkaProducerTransactionRetryException::class);
375+
self::expectExceptionMessage(
376+
sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, '')
377+
);
375378

376379
$errorMock = $this->createMock(RdKafkaErrorException::class);
377380
$errorMock->expects(self::once())->method('isRetriable')->willReturn(true);
@@ -393,6 +396,9 @@ public function testBeginTransactionWithRetriableError(): void
393396
public function testBeginTransactionWithAbortError(): void
394397
{
395398
self::expectException(KafkaProducerTransactionAbortException::class);
399+
self::expectExceptionMessage(
400+
sprintf(KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE, '')
401+
);
396402

397403
$errorMock = $this->createMock(RdKafkaErrorException::class);
398404
$errorMock->expects(self::once())->method('isRetriable')->willReturn(false);
@@ -415,6 +421,9 @@ public function testBeginTransactionWithAbortError(): void
415421
public function testBeginTransactionWithFatalError(): void
416422
{
417423
self::expectException(KafkaProducerTransactionFatalException::class);
424+
self::expectExceptionMessage(
425+
sprintf(KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE, '')
426+
);
418427

419428
$errorMock = $this->createMock(RdKafkaErrorException::class);
420429
$errorMock->expects(self::once())->method('isRetriable')->willReturn(false);
@@ -439,6 +448,9 @@ public function testBeginTransactionWithFatalErrorWillTriggerInit(): void
439448
$firstExceptionCaught = false;
440449

441450
self::expectException(KafkaProducerTransactionFatalException::class);
451+
self::expectExceptionMessage(
452+
sprintf(KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE, '')
453+
);
442454

443455
$errorMock = $this->createMock(RdKafkaErrorException::class);
444456
$errorMock->expects(self::exactly(2))->method('isRetriable')->willReturn(false);
@@ -481,7 +493,9 @@ public function testAbortTransactionSuccess(): void
481493
public function testAbortTransactionFailure(): void
482494
{
483495
self::expectException(KafkaProducerTransactionRetryException::class);
484-
self::expectExceptionMessage(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE);
496+
self::expectExceptionMessage(
497+
sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, 'test')
498+
);
485499

486500
$exception = new RdKafkaErrorException('test', 1, 'some failure', false, true, false);
487501

@@ -513,7 +527,9 @@ public function testCommitTransactionSuccess(): void
513527
public function testCommitTransactionFailure(): void
514528
{
515529
self::expectException(KafkaProducerTransactionRetryException::class);
516-
self::expectExceptionMessage(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE);
530+
self::expectExceptionMessage(
531+
sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, 'test')
532+
);
517533

518534
$exception = new RdKafkaErrorException('test', 1, 'some failure', false, true, false);
519535

@@ -525,4 +541,24 @@ public function testCommitTransactionFailure(): void
525541

526542
$this->kafkaProducer->commitTransaction(10000);
527543
}
544+
545+
/**
546+
* @return void
547+
*/
548+
public function testCommitTransactionFailurePreviousException(): void
549+
{
550+
$exception = new RdKafkaErrorException('test', 1, 'some failure', false, true, false);
551+
552+
$this->rdKafkaProducerMock
553+
->expects(self::once())
554+
->method('commitTransaction')
555+
->with(10000)
556+
->willThrowException($exception);
557+
558+
try {
559+
$this->kafkaProducer->commitTransaction(10000);
560+
} catch (KafkaProducerTransactionRetryException $e) {
561+
self::assertSame($exception, $e->getPrevious());
562+
}
563+
}
528564
}

0 commit comments

Comments
 (0)