Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
}
},
"suggest": {
"flix-tech/avro-serde-php": "Is needed for Avro support"
"flix-tech/avro-serde-php": "Is needed for Avro support",
"jobcloud/avro-validator": "Useful for debug purposes in development, not recommended for production use"
},
"extra": {
"branch-alias": {
Expand Down
9 changes: 9 additions & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
parameters:
level: 8
paths: [ src ]
ignoreErrors:
- '#Comparison operation ">" between int and RdKafka\\TopicPartition results in an error.#'
- '#Call to method validate\(\) on an unknown class Jobcloud\\Avro\\Validator\\Validator#'
- '#Call to static method fromSchema\(\) on an unknown class Jobcloud\\Avro\\Validator\\RecordRegistry#'
- '#PHPDoc tag @throws with type [a-zA-Z0-9\\_]+#'
- '#Parameter \#[0-9] [a-z\$]+ of method [a-zA-Z0-9\\_]+::setOffset\(\) expects string, int given.#'
- '#Parameter \#[0-9] [a-z\$]+ of class Jobcloud\\Kafka\\Exception\\AvroValidatorException constructor expects string, string\|false given.#'
- '#Cannot call method to_avro\(\) on AvroSchema\|null.#'
- '#Instantiated class Jobcloud\\Avro\\Validator\\Validator not found.#'
10 changes: 10 additions & 0 deletions src/Exception/AvroValidatorException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Jobcloud\Kafka\Exception;

class AvroValidatorException extends \Exception
{

}
78 changes: 67 additions & 11 deletions src/Message/Encoder/AvroEncoder.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
namespace Jobcloud\Kafka\Message\Encoder;

use AvroSchema;
use FlixTech\AvroSerializer\Objects\Exceptions\AvroEncodingException;
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Exception\SchemaRegistryException;
use Jobcloud\Avro\Validator\Exception\RecordRegistryException;
use Jobcloud\Avro\Validator\Exception\ValidatorException;
use Jobcloud\Avro\Validator\RecordRegistry;
use Jobcloud\Avro\Validator\Validator;
use Jobcloud\Kafka\Exception\AvroEncoderException;
use Jobcloud\Kafka\Exception\AvroValidatorException;
use Jobcloud\Kafka\Message\KafkaAvroSchemaInterface;
use Jobcloud\Kafka\Message\KafkaProducerMessageInterface;
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistryInterface;
Expand Down Expand Up @@ -40,8 +46,10 @@ public function __construct(
/**
* @param KafkaProducerMessageInterface $producerMessage
* @return KafkaProducerMessageInterface
* @throws AvroValidatorException
* @throws RecordRegistryException
* @throws SchemaRegistryException
* @throws AvroEncoderException
* @throws ValidatorException
*/
public function encode(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface
{
Expand All @@ -53,7 +61,10 @@ public function encode(KafkaProducerMessageInterface $producerMessage): KafkaPro
/**
* @param KafkaProducerMessageInterface $producerMessage
* @return KafkaProducerMessageInterface
* @throws AvroValidatorException
* @throws RecordRegistryException
* @throws SchemaRegistryException
* @throws ValidatorException
*/
private function encodeBody(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface
{
Expand All @@ -70,19 +81,35 @@ private function encodeBody(KafkaProducerMessageInterface $producerMessage): Kaf

$avroSchema = $this->registry->getBodySchemaForTopic($topicName);

$encodedBody = $this->recordSerializer->encodeRecord(
$avroSchema->getName(),
$this->getAvroSchemaDefinition($avroSchema),
$body
);
$encodedBody = null;
try {
$encodedBody = $this->recordSerializer->encodeRecord(
$avroSchema->getName(),
$this->getAvroSchemaDefinition($avroSchema),
$body
);
} catch (AvroEncodingException $exception) {
if (class_exists(Validator::class)) {
$validationErrors = $this->validateSchema(
$avroSchema->getDefinition()->to_avro(),
$body,
$topicName
);

throw new AvroValidatorException(json_encode($validationErrors));
}
}

return $producerMessage->withBody($encodedBody);
}

/**
* @param KafkaProducerMessageInterface $producerMessage
* @return KafkaProducerMessageInterface
* @throws AvroValidatorException
* @throws RecordRegistryException
* @throws SchemaRegistryException
* @throws ValidatorException
*/
private function encodeKey(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface
{
Expand All @@ -99,11 +126,24 @@ private function encodeKey(KafkaProducerMessageInterface $producerMessage): Kafk

$avroSchema = $this->registry->getKeySchemaForTopic($topicName);

$encodedKey = $this->recordSerializer->encodeRecord(
$avroSchema->getName(),
$this->getAvroSchemaDefinition($avroSchema),
$key
);
$encodedKey = null;
try {
$encodedKey = $this->recordSerializer->encodeRecord(
$avroSchema->getName(),
$this->getAvroSchemaDefinition($avroSchema),
$key
);
} catch (AvroEncodingException $exception) {
if (class_exists(Validator::class)) {
$validationErrors = $this->validateSchema(
$avroSchema->getDefinition()->to_avro(),
$key,
$topicName
);

throw new AvroValidatorException(json_encode($validationErrors));
}
}

return $producerMessage->withKey($encodedKey);
}
Expand Down Expand Up @@ -131,4 +171,20 @@ public function getRegistry(): AvroSchemaRegistryInterface
{
return $this->registry;
}

/**
* @param array<mixed> $avroSchema
* @param mixed $data
* @param string $topicName
* @return array<mixed>
* @throws RecordRegistryException
* @throws ValidatorException
*/
private function validateSchema(array $avroSchema, $data, string $topicName): array
{
$recordRegistry = RecordRegistry::fromSchema(json_encode($avroSchema));
$validator = new Validator($recordRegistry);

return $validator->validate(json_encode($data), $topicName);
}
}
131 changes: 129 additions & 2 deletions tests/Unit/Message/Encoder/AvroEncoderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,51 @@

namespace Jobcloud\Kafka\Tests\Unit\Kafka\Message\Encoder;

use FlixTech\AvroSerializer\Objects\Exceptions\AvroEncodingException;
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use Jobcloud\Kafka\Exception\AvroEncoderException;
use Jobcloud\Kafka\Message\Encoder\AvroEncoderInterface;
use Jobcloud\Kafka\Exception\AvroValidatorException;
use Jobcloud\Kafka\Message\KafkaAvroSchemaInterface;
use Jobcloud\Kafka\Message\KafkaProducerMessageInterface;
use Jobcloud\Kafka\Message\Encoder\AvroEncoder;
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistryInterface;
use PHPStan\Testing\TestCase;
use \AvroSchema;

/**
* @covers \Jobcloud\Kafka\Message\Encoder\AvroEncoder
*/
class AvroEncoderTest extends TestCase
{
private $avroValidatorClass = "./src/Message/Encoder/AvroEncoder.php";

private $originalNamespaces = [
"Jobcloud\Avro\Validator\RecordRegistry",
"Jobcloud\Avro\Validator\Validator"
];

private $replacedNamespaces = [
"Jobcloud\Kafka\Tests\Unit\Kafka\Message\Encoder\RecordRegistry",
"Jobcloud\Kafka\Tests\Unit\Kafka\Message\Encoder\Validator"
];

protected function setUp(): void
{
$avroEncoderContent = file_get_contents($this->avroValidatorClass);

$avroEncoderContent = str_replace($this->originalNamespaces, $this->replacedNamespaces, $avroEncoderContent);

file_put_contents($this->avroValidatorClass, $avroEncoderContent);
}

protected function tearDown(): void
{
$avroEncoderContent = file_get_contents($this->avroValidatorClass);

$avroEncoderContent = str_replace($this->replacedNamespaces, $this->originalNamespaces, $avroEncoderContent);

file_put_contents($this->avroValidatorClass, $avroEncoderContent);
}

public function testEncodeTombstone()
{
$producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class);
Expand Down Expand Up @@ -169,4 +199,101 @@ public function testGetRegistry()

self::assertSame($registry, $encoder->getRegistry());
}

public function testAvroValidatorBodyException()
{
$schemaDefinition = $this->getMockBuilder(\AvroSchema::class)->disableOriginalConstructor()->getMock();

$avroSchema = $this->getMockForAbstractClass(KafkaAvroSchemaInterface::class);
$avroSchema->expects(self::once())->method('getName')->willReturn('schemaName');
$avroSchema->expects(self::exactly(2))->method('getDefinition')->willReturn($schemaDefinition);
$schemaDefinition->method('to_avro')->willReturn(['type' => 'record']);

$registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class);
$registry->expects(self::once())->method('getBodySchemaForTopic')->willReturn($avroSchema);
$registry->expects(self::once())->method('hasBodySchemaForTopic')->willReturn(true);

$producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class);
$producerMessage->expects(self::once())->method('getTopicName')->willReturn('test');
$producerMessage->expects(self::once())->method('getBody')->willReturn(['id' => 123]);

$avroEncodingException = $this->getMockBuilder(AvroEncodingException::class)
->disableOriginalConstructor()
->getMock();
$recordSerializer = $this->getMockBuilder(RecordSerializer::class)
->disableOriginalConstructor()
->getMock();
$recordSerializer
->expects(self::once())
->method('encodeRecord')
->willReturnOnConsecutiveCalls('encodedValue')
->willThrowException($avroEncodingException);

$encoder = new AvroEncoder($registry, $recordSerializer);

self::expectException(AvroValidatorException::class);
self::expectExceptionMessage(json_encode(['test' => 'test']));
self::assertNotSame($producerMessage, $encoder->encode($producerMessage));
}

public function testAvroValidatorKeyException()
{
$schemaDefinition = $this->getMockBuilder(\AvroSchema::class)->disableOriginalConstructor()->getMock();

$avroSchema = $this->getMockForAbstractClass(KafkaAvroSchemaInterface::class);
$avroSchema->expects(self::exactly(4))->method('getName')->willReturn('schemaName');
$avroSchema->expects(self::never())->method('getVersion');
$avroSchema->expects(self::exactly(5))->method('getDefinition')->willReturn($schemaDefinition);
$schemaDefinition->method('to_avro')->willReturn([]);

$registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class);
$registry->expects(self::once())->method('getBodySchemaForTopic')->willReturn($avroSchema);
$registry->expects(self::once())->method('getKeySchemaForTopic')->willReturn($avroSchema);
$registry->expects(self::once())->method('hasBodySchemaForTopic')->willReturn(true);
$registry->expects(self::once())->method('hasKeySchemaForTopic')->willReturn(true);

$producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class);
$producerMessage->expects(self::exactly(2))->method('getTopicName')->willReturn('test');
$producerMessage->expects(self::once())->method('getBody')->willReturn([]);
$producerMessage->expects(self::once())->method('getKey')->willReturn('test-key');
$producerMessage->expects(self::once())->method('withBody')->with('encodedValue')->willReturn($producerMessage);

$avroEncodingException = $this->getMockBuilder(AvroEncodingException::class)->disableOriginalConstructor()->getMock();

$recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock();
$recordSerializer
->expects(self::exactly(2))
->method('encodeRecord')
->withConsecutive(
[$avroSchema->getName(), $avroSchema->getDefinition(), []],
[$avroSchema->getName(), $avroSchema->getDefinition(), 'test-key']
)
->willReturnOnConsecutiveCalls('encodedValue', $this->throwException($avroEncodingException));

$encoder = new AvroEncoder($registry, $recordSerializer);

self::expectException(AvroValidatorException::class);
self::expectExceptionMessage(json_encode(['test' => 'test']));
self::assertNotSame($producerMessage, $encoder->encode($producerMessage));
}
}

class RecordRegistry {
public function fromSchema(string $schema): string
{
return $schema;
}
}

class Validator {
public function validate(): array
{
return [
'test' => 'test',
];
}
}

class AvroValidationException {

}