Skip to content

improve transaction exceptions #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Exception/KafkaProducerTransactionAbortException.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
class KafkaProducerTransactionAbortException extends \Exception
{
public const TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE =
'Produce failed. You need to abort your current transaction and start a new one';
'Produce failed. You need to abort your current transaction and start a new one (%s)';
}
2 changes: 1 addition & 1 deletion src/Exception/KafkaProducerTransactionFatalException.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
class KafkaProducerTransactionFatalException extends \Exception
{
public const FATAL_TRANSACTION_EXCEPTION_MESSAGE =
'Produce failed with a fatal error. This producer instance cannot be used anymore.';
'Produce failed with a fatal error. This producer instance cannot be used anymore (%s)';
}
2 changes: 1 addition & 1 deletion src/Exception/KafkaProducerTransactionRetryException.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@

class KafkaProducerTransactionRetryException extends \Exception
{
public const RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE = 'Produce failed but can be retried';
public const RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE = 'Produce failed but can be retried (%s)';
}
21 changes: 18 additions & 3 deletions src/Producer/KafkaProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -255,18 +255,33 @@ private function handleTransactionError(SkcErrorException $e): void
{
if (true === $e->isRetriable()) {
throw new KafkaProducerTransactionRetryException(
KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE
sprintf(
KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE,
$e->getMessage()
),
$e->getCode(),
$e
);
} elseif (true === $e->transactionRequiresAbort()) {
throw new KafkaProducerTransactionAbortException(
KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE
sprintf(
KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE,
$e->getMessage()
),
$e->getCode(),
$e
);
} else {
$this->transactionInitialized = false;
// according to librdkafka documentation, everything that is not retriable, abortable or fatal is fatal
// fatal errors (so stated), need the producer to be destroyed
throw new KafkaProducerTransactionFatalException(
KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE
sprintf(
KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE,
$e->getMessage()
),
$e->getCode(),
$e
);
}
}
Expand Down
41 changes: 39 additions & 2 deletions tests/Unit/Producer/KafkaProducerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ public function testBeginTransactionConsecutiveSuccess(): void
public function testBeginTransactionWithRetriableError(): void
{
self::expectException(KafkaProducerTransactionRetryException::class);
self::expectExceptionMessage(
sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, '')
);

$errorMock = $this->createMock(SkcErrorException::class);
$errorMock->expects(self::once())->method('isRetriable')->willReturn(true);
Expand All @@ -389,6 +392,9 @@ public function testBeginTransactionWithRetriableError(): void
public function testBeginTransactionWithAbortError(): void
{
self::expectException(KafkaProducerTransactionAbortException::class);
self::expectExceptionMessage(
sprintf(KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE, '')
);

$errorMock = $this->createMock(SkcErrorException::class);
$errorMock->expects(self::once())->method('isRetriable')->willReturn(false);
Expand All @@ -411,6 +417,9 @@ public function testBeginTransactionWithAbortError(): void
public function testBeginTransactionWithFatalError(): void
{
self::expectException(KafkaProducerTransactionFatalException::class);
self::expectExceptionMessage(
sprintf(KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE, '')
);

$errorMock = $this->createMock(SkcErrorException::class);
$errorMock->expects(self::once())->method('isRetriable')->willReturn(false);
Expand All @@ -435,6 +444,9 @@ public function testBeginTransactionWithFatalErrorWillTriggerInit(): void
$firstExceptionCaught = false;

self::expectException(KafkaProducerTransactionFatalException::class);
self::expectExceptionMessage(
sprintf(KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE, '')
);

$errorMock = $this->createMock(SkcErrorException::class);
$errorMock->expects(self::exactly(2))->method('isRetriable')->willReturn(false);
Expand Down Expand Up @@ -476,7 +488,9 @@ public function testAbortTransactionSuccess(): void
public function testAbortTransactionFailure(): void
{
self::expectException(KafkaProducerTransactionRetryException::class);
self::expectExceptionMessage(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE);
self::expectExceptionMessage(
sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, 'test')
);

$exception = new SkcErrorException('test', 1, 'some failure', false, true, false);

Expand Down Expand Up @@ -507,7 +521,9 @@ public function testCommitTransactionSuccess(): void
public function testCommitTransactionFailure(): void
{
self::expectException(KafkaProducerTransactionRetryException::class);
self::expectExceptionMessage(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE);
self::expectExceptionMessage(
sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, 'test')
);

$exception = new SkcErrorException('test', 1, 'some failure', false, true, false);

Expand All @@ -519,4 +535,25 @@ public function testCommitTransactionFailure(): void

$this->kafkaProducer->commitTransaction(10000);
}

/**
* @return void
*/
public function testCommitTransactionFailurePreviousException(): void
{
$exception = new SkcErrorException('test', 1, 'some failure', false, true, false);

$this->rdKafkaProducerMock
->expects(self::once())
->method('commitTransaction')
->with(10000)
->willThrowException($exception);

try {
$this->kafkaProducer->commitTransaction(10000);
} catch (KafkaProducerTransactionRetryException $e) {
self::assertSame($exception, $e->getPrevious());
}

}
}