Skip to content

Commit

Permalink
Support producer transactions (#18)
Browse files Browse the repository at this point in the history
* save work on producer transactions

* update code

* save work

* up coverage

* update readme

* update readme

* update readme

* udpate readme

* update readme

* update readme
  • Loading branch information
nick-zh authored Feb 9, 2021
1 parent 3fb81db commit 5addccf
Show file tree
Hide file tree
Showing 7 changed files with 410 additions and 0 deletions.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ can help out to understand the internals of this library.
- ext-rdkafka: >=4.0.0
- librdkafka: >=0.11.6 (if you use `<librdkafka:1.x` please define your own error callback)

:warning: To use the transactional producer you'll need:
- ext-rdkafka: >=4.1.0
- librdkafka: >=1.4

## Installation
```
composer require jobcloud/php-kafka-lib "~1.0"
Expand Down Expand Up @@ -57,6 +61,41 @@ $producer->produce($message);
// Shutdown producer, flush messages that are in queue. Give up after 20s
$result = $producer->flush(20000);
```

##### Transactional producer (needs >=php-rdkafka:4.1 and >=librdkafka:1.4)
```php
<?php

use Jobcloud\Kafka\Message\KafkaProducerMessage;
use Jobcloud\Kafka\Producer\KafkaProducerBuilder;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionRetryException;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionAbortException;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionFatalException;

$producer = KafkaProducerBuilder::create()
->withAdditionalBroker('localhost:9092')
->build();

$message = KafkaProducerMessage::create('test-topic', 0)
->withKey('asdf-asdf-asfd-asdf')
->withBody('some test message payload')
->withHeaders([ 'key' => 'value' ]);
try {
$producer->beginTransaction(10000);
$producer->produce($message);
$producer->commitTransaction(10000);
} catch (KafkaProducerTransactionRetryException $e) {
// something went wrong but you can retry the failed call (either beginTransaction or commitTransaction)
} catch (KafkaProducerTransactionAbortException $e) {
// you need to call $producer->abortTransaction(10000); and try again
} catch (KafkaProducerTransactionFatalException $e) {
// something went very wrong, re-create your producer, otherwise you could jeopardize the idempotency guarantees
}

// Shutdown producer, flush messages that are in queue. Give up after 20s
$result = $producer->flush(20000);
```

##### Avro Producer
To create an avro prodcuer add the avro encoder.

Expand Down
11 changes: 11 additions & 0 deletions src/Exception/KafkaProducerTransactionAbortException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Jobcloud\Kafka\Exception;

class KafkaProducerTransactionAbortException extends \Exception
{
public const TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE =
'Produce failed. You need to abort your current transaction and start a new one';
}
11 changes: 11 additions & 0 deletions src/Exception/KafkaProducerTransactionFatalException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Jobcloud\Kafka\Exception;

class KafkaProducerTransactionFatalException extends \Exception
{
public const FATAL_TRANSACTION_EXCEPTION_MESSAGE =
'Produce failed with a fatal error. This producer instance cannot be used anymore.';
}
10 changes: 10 additions & 0 deletions src/Exception/KafkaProducerTransactionRetryException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Jobcloud\Kafka\Exception;

class KafkaProducerTransactionRetryException extends \Exception
{
public const RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE = 'Produce failed but can be retried';
}
98 changes: 98 additions & 0 deletions src/Producer/KafkaProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@

namespace Jobcloud\Kafka\Producer;

use Jobcloud\Kafka\Exception\KafkaProducerTransactionAbortException;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionFatalException;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionRetryException;
use Jobcloud\Kafka\Message\KafkaProducerMessageInterface;
use Jobcloud\Kafka\Message\Encoder\EncoderInterface;
use Jobcloud\Kafka\Conf\KafkaConfiguration;
use RdKafka\Producer as RdKafkaProducer;
use RdKafka\ProducerTopic as RdKafkaProducerTopic;
use RdKafka\Metadata\Topic as RdKafkaMetadataTopic;
use RdKafka\Exception as RdKafkaException;
use RdKafka\KafkaErrorException as RdKafkaErrorException;

final class KafkaProducer implements KafkaProducerInterface
{
Expand All @@ -35,6 +39,11 @@ final class KafkaProducer implements KafkaProducerInterface
*/
protected $encoder;

/**
* @var bool
*/
private $transactionInitialized = false;

/**
* KafkaProducer constructor.
* @param RdKafkaProducer $producer
Expand Down Expand Up @@ -160,6 +169,68 @@ public function getMetadataForTopic(string $topicName, int $timeoutMs = 10000):
->current();
}

/**
* Start a producer transaction
*
* @param int $timeoutMs
* @return void
*
* @throws KafkaProducerTransactionAbortException
* @throws KafkaProducerTransactionFatalException
* @throws KafkaProducerTransactionRetryException
*/
public function beginTransaction(int $timeoutMs): void
{
try {
if (false === $this->transactionInitialized) {
$this->producer->initTransactions($timeoutMs);
$this->transactionInitialized = true;
}

$this->producer->beginTransaction();
} catch (RdKafkaErrorException $e) {
$this->handleTransactionError($e);
}
}

/**
* Commit the current producer transaction
*
* @param int $timeoutMs
* @return void
*
* @throws KafkaProducerTransactionAbortException
* @throws KafkaProducerTransactionFatalException
* @throws KafkaProducerTransactionRetryException
*/
public function commitTransaction(int $timeoutMs): void
{
try {
$this->producer->commitTransaction($timeoutMs);
} catch (RdKafkaErrorException $e) {
$this->handleTransactionError($e);
}
}

/**
* Abort the current producer transaction
*
* @param int $timeoutMs
* @return void
*
* @throws KafkaProducerTransactionAbortException
* @throws KafkaProducerTransactionFatalException
* @throws KafkaProducerTransactionRetryException
*/
public function abortTransaction(int $timeoutMs): void
{
try {
$this->producer->abortTransaction($timeoutMs);
} catch (RdKafkaErrorException $e) {
$this->handleTransactionError($e);
}
}

/**
* @param string $topic
* @return RdKafkaProducerTopic
Expand All @@ -172,4 +243,31 @@ private function getProducerTopicForTopic(string $topic): RdKafkaProducerTopic

return $this->producerTopics[$topic];
}

/**
* @param RdKafkaErrorException $e
*
* @throws KafkaProducerTransactionAbortException
* @throws KafkaProducerTransactionFatalException
* @throws KafkaProducerTransactionRetryException
*/
private function handleTransactionError(RdKafkaErrorException $e): void
{
if (true === $e->isRetriable()) {
throw new KafkaProducerTransactionRetryException(
KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE
);
} elseif (true === $e->transactionRequiresAbort()) {
throw new KafkaProducerTransactionAbortException(
KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE
);
} else {
$this->transactionInitialized = false;
// according to librdkafka documentation, everything that is not retriable, abortable or fatal is fatal
// fatal errors (so stated), need the producer to be destroyed
throw new KafkaProducerTransactionFatalException(
KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE
);
}
}
}
39 changes: 39 additions & 0 deletions src/Producer/KafkaProducerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

namespace Jobcloud\Kafka\Producer;

use Jobcloud\Kafka\Exception\KafkaProducerTransactionAbortException;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionFatalException;
use Jobcloud\Kafka\Exception\KafkaProducerTransactionRetryException;
use Jobcloud\Kafka\Message\KafkaProducerMessageInterface;
use RdKafka\Metadata\Topic as RdKafkaMetadataTopic;

Expand Down Expand Up @@ -71,4 +74,40 @@ public function flush(int $timeoutMs): int;
* @return RdKafkaMetadataTopic
*/
public function getMetadataForTopic(string $topicName, int $timeoutMs = 10000): RdKafkaMetadataTopic;

/**
* Start a producer transaction
*
* @param int $timeoutMs
* @return void
*
* @throws KafkaProducerTransactionAbortException
* @throws KafkaProducerTransactionFatalException
* @throws KafkaProducerTransactionRetryException
*/
public function beginTransaction(int $timeoutMs): void;

/**
* Commit the current producer transaction
*
* @param int $timeoutMs
* @return void
*
* @throws KafkaProducerTransactionAbortException
* @throws KafkaProducerTransactionFatalException
* @throws KafkaProducerTransactionRetryException
*/
public function commitTransaction(int $timeoutMs): void;

/**
* Abort the current producer transaction
*
* @param int $timeoutMs
* @return void
*
* @throws KafkaProducerTransactionAbortException
* @throws KafkaProducerTransactionFatalException
* @throws KafkaProducerTransactionRetryException
*/
public function abortTransaction(int $timeoutMs): void;
}
Loading

0 comments on commit 5addccf

Please sign in to comment.