Skip to content

Commit 7e8dc21

Browse files
authored
remove encoder / decoder mode (#27)
* remove mode * fix stan
1 parent ca621a7 commit 7e8dc21

File tree

9 files changed

+132
-80
lines changed

9 files changed

+132
-80
lines changed

src/Message/Decoder/AvroDecoder.php

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,16 @@ final class AvroDecoder implements AvroDecoderInterface
2323
*/
2424
private $recordSerializer;
2525

26-
/**
27-
* @var string
28-
*/
29-
private $decodeMode;
30-
3126
/**
3227
* @param AvroSchemaRegistryInterface $registry
3328
* @param RecordSerializer $recordSerializer
34-
* @param string $decodeMode
3529
*/
3630
public function __construct(
3731
AvroSchemaRegistryInterface $registry,
38-
RecordSerializer $recordSerializer,
39-
string $decodeMode = self::DECODE_ALL
32+
RecordSerializer $recordSerializer
4033
) {
4134
$this->recordSerializer = $recordSerializer;
4235
$this->registry = $registry;
43-
$this->decodeMode = $decodeMode;
4436
}
4537

4638
/**
@@ -68,18 +60,18 @@ public function decode(KafkaConsumerMessageInterface $consumerMessage): KafkaCon
6860
*/
6961
private function decodeBody(KafkaConsumerMessageInterface $consumerMessage)
7062
{
71-
$schemaDefinition = null;
7263
$body = $consumerMessage->getBody();
73-
74-
if (self::DECODE_KEY === $this->decodeMode) {
75-
return $body;
76-
}
64+
$topicName = $consumerMessage->getTopicName();
7765

7866
if (null === $body) {
7967
return null;
8068
}
8169

82-
$avroSchema = $this->registry->getBodySchemaForTopic($consumerMessage->getTopicName());
70+
if (false === $this->registry->hasBodySchemaForTopic($topicName)) {
71+
return $body;
72+
}
73+
74+
$avroSchema = $this->registry->getBodySchemaForTopic($topicName);
8375
$schemaDefinition = $avroSchema->getDefinition();
8476

8577
return $this->recordSerializer->decodeMessage($body, $schemaDefinition);
@@ -92,18 +84,18 @@ private function decodeBody(KafkaConsumerMessageInterface $consumerMessage)
9284
*/
9385
private function decodeKey(KafkaConsumerMessageInterface $consumerMessage)
9486
{
95-
$schemaDefinition = null;
9687
$key = $consumerMessage->getKey();
97-
98-
if (self::DECODE_BODY === $this->decodeMode) {
99-
return $key;
100-
}
88+
$topicName = $consumerMessage->getTopicName();
10189

10290
if (null === $key) {
10391
return null;
10492
}
10593

106-
$avroSchema = $this->registry->getKeySchemaForTopic($consumerMessage->getTopicName());
94+
if (false === $this->registry->hasKeySchemaForTopic($topicName)) {
95+
return $key;
96+
}
97+
98+
$avroSchema = $this->registry->getKeySchemaForTopic($topicName);
10799
$schemaDefinition = $avroSchema->getDefinition();
108100

109101
return $this->recordSerializer->decodeMessage($key, $schemaDefinition);

src/Message/Decoder/AvroDecoderInterface.php

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@
88

99
interface AvroDecoderInterface extends DecoderInterface
1010
{
11-
public const DECODE_ALL = 'all';
12-
public const DECODE_BODY = 'body';
13-
public const DECODE_KEY = 'key';
14-
1511
/**
1612
* @return AvroSchemaRegistryInterface
1713
*/

src/Message/Encoder/AvroEncoder.php

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,16 @@ final class AvroEncoder implements AvroEncoderInterface
2525
*/
2626
private $recordSerializer;
2727

28-
/**
29-
* @var string
30-
*/
31-
private $encodeMode;
32-
3328
/**
3429
* @param AvroSchemaRegistryInterface $registry
3530
* @param RecordSerializer $recordSerializer
36-
* @param string $encodeMode
3731
*/
3832
public function __construct(
3933
AvroSchemaRegistryInterface $registry,
40-
RecordSerializer $recordSerializer,
41-
string $encodeMode = self::ENCODE_ALL
34+
RecordSerializer $recordSerializer
4235
) {
4336
$this->recordSerializer = $recordSerializer;
4437
$this->registry = $registry;
45-
$this->encodeMode = $encodeMode;
4638
}
4739

4840
/**
@@ -53,62 +45,67 @@ public function __construct(
5345
*/
5446
public function encode(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface
5547
{
56-
return $producerMessage
57-
->withBody($this->encodeBody($producerMessage))
58-
->withKey($this->encodeKey($producerMessage));
48+
$producerMessage = $this->encodeBody($producerMessage);
49+
50+
return $this->encodeKey($producerMessage);
5951
}
6052

6153
/**
6254
* @param KafkaProducerMessageInterface $producerMessage
63-
* @return mixed
55+
* @return KafkaProducerMessageInterface
6456
* @throws SchemaRegistryException
6557
*/
66-
private function encodeBody(KafkaProducerMessageInterface $producerMessage)
58+
private function encodeBody(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface
6759
{
60+
$topicName = $producerMessage->getTopicName();
6861
$body = $producerMessage->getBody();
6962

70-
if (self::ENCODE_KEY === $this->encodeMode) {
71-
return $body;
63+
if (null === $body) {
64+
return $producerMessage;
7265
}
7366

74-
if (null === $body) {
75-
return null;
67+
if (false === $this->registry->hasBodySchemaForTopic($topicName)) {
68+
return $producerMessage;
7669
}
7770

78-
$topicName = $producerMessage->getTopicName();
7971
$avroSchema = $this->registry->getBodySchemaForTopic($topicName);
8072

81-
82-
return $this->recordSerializer->encodeRecord(
73+
$encodedBody = $this->recordSerializer->encodeRecord(
8374
$avroSchema->getName(),
8475
$this->getAvroSchemaDefinition($avroSchema),
8576
$body
8677
);
78+
79+
return $producerMessage->withBody($encodedBody);
8780
}
8881

8982
/**
9083
* @param KafkaProducerMessageInterface $producerMessage
91-
* @return string|null
84+
* @return KafkaProducerMessageInterface
9285
* @throws SchemaRegistryException
9386
*/
94-
private function encodeKey(KafkaProducerMessageInterface $producerMessage): ?string
87+
private function encodeKey(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface
9588
{
96-
if (self::ENCODE_BODY === $this->encodeMode) {
97-
return $producerMessage->getKey();
89+
$topicName = $producerMessage->getTopicName();
90+
$key = $producerMessage->getKey();
91+
92+
if (null === $key) {
93+
return $producerMessage;
9894
}
9995

100-
if (null === $producerMessage->getKey()) {
101-
return null;
96+
if (false === $this->registry->hasKeySchemaForTopic($topicName)) {
97+
return $producerMessage;
10298
}
10399

104-
$topicName = $producerMessage->getTopicName();
105100
$avroSchema = $this->registry->getKeySchemaForTopic($topicName);
106101

107-
return $this->recordSerializer->encodeRecord(
102+
$encodedKey = $this->recordSerializer->encodeRecord(
108103
$avroSchema->getName(),
109104
$this->getAvroSchemaDefinition($avroSchema),
110-
$producerMessage->getKey()
105+
$key
111106
);
107+
108+
return $producerMessage->withKey($encodedKey);
112109
}
113110

114111
private function getAvroSchemaDefinition(KafkaAvroSchemaInterface $avroSchema): AvroSchema

src/Message/Encoder/AvroEncoderInterface.php

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@
88

99
interface AvroEncoderInterface extends EncoderInterface
1010
{
11-
public const ENCODE_ALL = 'all';
12-
public const ENCODE_BODY = 'body';
13-
public const ENCODE_KEY = 'key';
14-
1511
/**
1612
* @return AvroSchemaRegistryInterface
1713
*/

src/Message/Registry/AvroSchemaRegistry.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,26 @@ public function getKeySchemaForTopic(string $topicName): KafkaAvroSchemaInterfac
7373
return $this->getSchemaForTopicAndType($topicName, self::KEY_IDX);
7474
}
7575

76+
/**
77+
* @param string $topicName
78+
* @return boolean
79+
* @throws SchemaRegistryException
80+
*/
81+
public function hasBodySchemaForTopic(string $topicName): bool
82+
{
83+
return isset($this->schemaMapping[self::BODY_IDX][$topicName]);
84+
}
85+
86+
/**
87+
* @param string $topicName
88+
* @return boolean
89+
* @throws SchemaRegistryException
90+
*/
91+
public function hasKeySchemaForTopic(string $topicName): bool
92+
{
93+
return isset($this->schemaMapping[self::KEY_IDX][$topicName]);
94+
}
95+
7696
/**
7797
* @param string $topicName
7898
* @param string $type

src/Message/Registry/AvroSchemaRegistryInterface.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,18 @@ public function getBodySchemaForTopic(string $topicName): KafkaAvroSchemaInterfa
4747
* @throws SchemaRegistryException
4848
*/
4949
public function getKeySchemaForTopic(string $topicName): KafkaAvroSchemaInterface;
50+
51+
/**
52+
* @param string $topicName
53+
* @return boolean
54+
* @throws SchemaRegistryException
55+
*/
56+
public function hasBodySchemaForTopic(string $topicName): bool;
57+
58+
/**
59+
* @param string $topicName
60+
* @return boolean
61+
* @throws SchemaRegistryException
62+
*/
63+
public function hasKeySchemaForTopic(string $topicName): bool;
5064
}

tests/Unit/Message/Decoder/AvroDecoderTest.php

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ public function testDecodeTombstone()
2323
$message->expects(self::once())->method('getBody')->willReturn(null);
2424

2525
$registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class);
26+
$registry->expects(self::never())->method('hasBodySchemaForTopic');
27+
$registry->expects(self::never())->method('hasKeySchemaForTopic');
28+
2629
$recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock();
2730
$recordSerializer->expects(self::never())->method('decodeMessage');
2831

@@ -53,7 +56,8 @@ public function testDecodeWithSchema()
5356
$registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class);
5457
$registry->expects(self::once())->method('getBodySchemaForTopic')->willReturn($avroSchema);
5558
$registry->expects(self::once())->method('getKeySchemaForTopic')->willReturn($avroSchema);
56-
59+
$registry->expects(self::once())->method('hasBodySchemaForTopic')->willReturn(true);
60+
$registry->expects(self::once())->method('hasKeySchemaForTopic')->willReturn(true);
5761

5862
$recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock();
5963
$recordSerializer->expects(self::at(0))->method('decodeMessage')->with($message->getKey(), $schemaDefinition)->willReturn('decoded-key');
@@ -77,7 +81,7 @@ public function testDecodeKeyMode()
7781
$avroSchema->expects(self::once())->method('getDefinition')->willReturn($schemaDefinition);
7882

7983
$message = $this->getMockForAbstractClass(KafkaConsumerMessageInterface::class);
80-
$message->expects(self::exactly(2))->method('getTopicName')->willReturn('test-topic');
84+
$message->expects(self::exactly(3))->method('getTopicName')->willReturn('test-topic');
8185
$message->expects(self::once())->method('getPartition')->willReturn(0);
8286
$message->expects(self::once())->method('getOffset')->willReturn(1);
8387
$message->expects(self::once())->method('getTimestamp')->willReturn(time());
@@ -88,12 +92,14 @@ public function testDecodeKeyMode()
8892
$registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class);
8993
$registry->expects(self::never())->method('getBodySchemaForTopic');
9094
$registry->expects(self::once())->method('getKeySchemaForTopic')->willReturn($avroSchema);
95+
$registry->expects(self::once())->method('hasBodySchemaForTopic')->willReturn(false);
96+
$registry->expects(self::once())->method('hasKeySchemaForTopic')->willReturn(true);
9197

9298

9399
$recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock();
94100
$recordSerializer->expects(self::once())->method('decodeMessage')->with($message->getKey(), $schemaDefinition)->willReturn('decoded-key');
95101

96-
$decoder = new AvroDecoder($registry, $recordSerializer, AvroDecoderInterface::DECODE_KEY);
102+
$decoder = new AvroDecoder($registry, $recordSerializer);
97103

98104
$result = $decoder->decode($message);
99105

@@ -111,7 +117,7 @@ public function testDecodeBodyMode()
111117
$avroSchema->expects(self::once())->method('getDefinition')->willReturn($schemaDefinition);
112118

113119
$message = $this->getMockForAbstractClass(KafkaConsumerMessageInterface::class);
114-
$message->expects(self::exactly(2))->method('getTopicName')->willReturn('test-topic');
120+
$message->expects(self::exactly(3))->method('getTopicName')->willReturn('test-topic');
115121
$message->expects(self::once())->method('getPartition')->willReturn(0);
116122
$message->expects(self::once())->method('getOffset')->willReturn(1);
117123
$message->expects(self::once())->method('getTimestamp')->willReturn(time());
@@ -122,12 +128,13 @@ public function testDecodeBodyMode()
122128
$registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class);
123129
$registry->expects(self::once())->method('getBodySchemaForTopic')->willReturn($avroSchema);
124130
$registry->expects(self::never())->method('getKeySchemaForTopic');
125-
131+
$registry->expects(self::once())->method('hasBodySchemaForTopic')->willReturn(true);
132+
$registry->expects(self::once())->method('hasKeySchemaForTopic')->willReturn(false);
126133

127134
$recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock();
128135
$recordSerializer->expects(self::once())->method('decodeMessage')->with($message->getBody(), $schemaDefinition)->willReturn(['test']);
129136

130-
$decoder = new AvroDecoder($registry, $recordSerializer, AvroDecoderInterface::DECODE_BODY);
137+
$decoder = new AvroDecoder($registry, $recordSerializer);
131138

132139
$result = $decoder->decode($message);
133140

0 commit comments

Comments
 (0)