Skip to content

Commit 7598b6a

Browse files
committed
Removed BC break and moved $transportName to correct place again
1 parent 74111e7 commit 7598b6a

File tree

4 files changed

+72
-29
lines changed

4 files changed

+72
-29
lines changed

src/Service/Sns/SnsConsumer.php

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
use Bref\Context\Context;
66
use Bref\Event\Sns\SnsEvent;
77
use Bref\Event\Sns\SnsHandler;
8+
use Bref\Event\Sns\SnsRecord;
89
use Bref\Symfony\Messenger\Service\BusDriver;
10+
use LogicException;
911
use Symfony\Component\Messenger\MessageBusInterface;
1012
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1113

@@ -15,19 +17,19 @@ final class SnsConsumer extends SnsHandler
1517
private $bus;
1618
/** @var SerializerInterface */
1719
protected $serializer;
18-
/** @var SnsTransportNameResolver */
19-
private $transportNameResolver;
20-
/** @var BusDriver */
21-
private $busDriver;
2220
/** @var string|null */
2321
private $transportName;
22+
/** @var BusDriver */
23+
private $busDriver;
24+
/** @var SnsTransportNameResolver|null */
25+
private $transportNameResolver;
2426

2527
public function __construct(
2628
BusDriver $busDriver,
2729
MessageBusInterface $bus,
2830
SerializerInterface $serializer,
29-
SnsTransportNameResolver $transportNameResolver,
30-
string $transportName = null
31+
string $transportName = null,
32+
SnsTransportNameResolver $transportNameResolver = null,
3133
) {
3234
$this->busDriver = $busDriver;
3335
$this->bus = $bus;
@@ -43,7 +45,16 @@ public function handleSns(SnsEvent $event, Context $context): void
4345
$headers = isset($attributes['Headers']) ? $attributes['Headers']->getValue() : '[]';
4446
$envelope = $this->serializer->decode(['body' => $record->getMessage(), 'headers' => json_decode($headers, true)]);
4547

46-
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope, $this->transportName ?? ($this->transportNameResolver)($record));
48+
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope, $this->resolveTransportName($record));
4749
}
4850
}
51+
52+
private function resolveTransportName(SnsRecord $record): string
53+
{
54+
if (null === $this->transportName && null === $this->transportNameResolver) {
55+
throw new LogicException('You need to set $transportNameResolver or $transportName.');
56+
}
57+
58+
return $this->transportName ?? ($this->transportNameResolver)($record);
59+
}
4960
}

src/Service/Sqs/SqsConsumer.php

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Bref\Event\Sqs\SqsHandler;
88
use Bref\Event\Sqs\SqsRecord;
99
use Bref\Symfony\Messenger\Service\BusDriver;
10+
use LogicException;
1011
use Psr\Log\LoggerInterface;
1112
use Psr\Log\NullLogger;
1213
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp;
@@ -38,18 +39,18 @@ public function __construct(
3839
BusDriver $busDriver,
3940
MessageBusInterface $bus,
4041
SerializerInterface $serializer,
41-
SqsTransportNameResolver $transportNameResolver,
4242
string $transportName = null,
4343
LoggerInterface $logger = null,
44-
bool $partialBatchFailure = false
44+
bool $partialBatchFailure = false,
45+
SqsTransportNameResolver $transportNameResolver = null
4546
) {
4647
$this->busDriver = $busDriver;
4748
$this->bus = $bus;
4849
$this->serializer = $serializer;
49-
$this->transportNameResolver = $transportNameResolver;
5050
$this->transportName = $transportName;
5151
$this->logger = $logger ?? new NullLogger();
5252
$this->partialBatchFailure = $partialBatchFailure;
53+
$this->transportNameResolver = $transportNameResolver;
5354
}
5455

5556
public function handleSqs(SqsEvent $event, Context $context): void
@@ -97,7 +98,7 @@ public function handleSqs(SqsEvent $event, Context $context): void
9798
if ('' !== $context->getTraceId()) {
9899
$stamps[] = new AmazonSqsXrayTraceHeaderStamp($context->getTraceId());
99100
}
100-
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->transportName ?? ($this->transportNameResolver)($record));
101+
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->resolveTransportName($record));
101102
} catch (UnrecoverableExceptionInterface $exception) {
102103
$this->logger->error(sprintf('SQS record with id "%s" failed to be processed. But failure was marked as unrecoverable. Message will be acknowledged.', $record->getMessageId()));
103104
$this->logger->error($exception);
@@ -120,4 +121,13 @@ private function readMessageGroupIdOfRecord(SqsRecord $record): ?string
120121
$recordAsArray = $record->toArray();
121122
return $recordAsArray['attributes']['MessageGroupId'] ?? null;
122123
}
124+
125+
private function resolveTransportName(SqsRecord $record): string
126+
{
127+
if (null === $this->transportName && null === $this->transportNameResolver) {
128+
throw new LogicException('You need to set $transportNameResolver or $transportName.');
129+
}
130+
131+
return $this->transportName ?? ($this->transportNameResolver)($record);
132+
}
123133
}

tests/Unit/Service/Sns/SnsConsumerTest.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Bref\Symfony\Messenger\Service\BusDriver;
88
use Bref\Symfony\Messenger\Service\Sns\SnsConsumer;
99
use Bref\Symfony\Messenger\Service\Sns\SnsTransportNameResolver;
10+
use LogicException;
1011
use PHPUnit\Framework\TestCase;
1112
use Symfony\Component\Messenger\Envelope;
1213
use Symfony\Component\Messenger\MessageBus;
@@ -60,6 +61,7 @@ public function test_serializer()
6061
$this->busDriver,
6162
$this->bus,
6263
$this->serializer,
64+
null,
6365
$this->snsTransportNameResolver
6466
);
6567

@@ -86,6 +88,7 @@ public function test_event_with_transport_detection(): void
8688
$this->busDriver,
8789
$this->bus,
8890
$this->serializer,
91+
null,
8992
$this->snsTransportNameResolver
9093
);
9194

@@ -111,8 +114,8 @@ public function test_event_with_manually_set_transport(): void
111114
$this->busDriver,
112115
$this->bus,
113116
$this->serializer,
114-
$this->snsTransportNameResolver,
115-
'async'
117+
'async',
118+
$this->snsTransportNameResolver
116119
);
117120

118121
$event = $this->snsEvent($body, $headers);

tests/Unit/Service/Sqs/SqsConsumerTest.php

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,15 @@ public function test_batch_events()
6363
),
6464
];
6565

66-
$consumer = new SqsConsumer($this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), $this->sqsTransportNameResolver->reveal());
66+
$consumer = new SqsConsumer(
67+
$this->busDriver->reveal(),
68+
$this->bus,
69+
$this->serializer->reveal(),
70+
null,
71+
null,
72+
false,
73+
$this->sqsTransportNameResolver->reveal()
74+
);
6775
$failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', ''));
6876
$this->assertEmpty($failures);
6977
}
@@ -84,6 +92,9 @@ public function test_event_with_transport_detection(): void
8492
$this->busDriver->reveal(),
8593
$this->bus,
8694
$this->serializer->reveal(),
95+
null,
96+
null,
97+
false,
8798
$this->sqsTransportNameResolver->reveal()
8899
);
89100

@@ -108,8 +119,10 @@ public function test_event_with_manually_set_transport(): void
108119
$this->busDriver->reveal(),
109120
$this->bus,
110121
$this->serializer->reveal(),
122+
'async_test',
123+
null,
124+
false,
111125
$this->sqsTransportNameResolver->reveal(),
112-
'async_test'
113126
);
114127

115128
$consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', ''));
@@ -143,6 +156,9 @@ public function test_batch_events_with_failure()
143156
$this->busDriver->reveal(),
144157
$this->bus,
145158
$this->serializer->reveal(),
159+
$transport,
160+
null,
161+
false,
146162
$this->sqsTransportNameResolver->reveal()
147163
);
148164

@@ -178,10 +194,10 @@ public function test_batch_events_failure_with_partial_batch_failure_enabled()
178194
$this->busDriver->reveal(),
179195
$this->bus,
180196
$this->serializer->reveal(),
181-
$this->sqsTransportNameResolver->reveal(),
182197
null,
183198
null,
184-
true
199+
true,
200+
$this->sqsTransportNameResolver->reveal()
185201
);
186202

187203
$failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', ''));
@@ -221,10 +237,10 @@ public function test_batch_events_failure_on_fifo_queue_with_partial_batch_failu
221237
$this->busDriver->reveal(),
222238
$this->bus,
223239
$this->serializer->reveal(),
224-
$this->sqsTransportNameResolver->reveal(),
225-
null,
240+
$transport,
226241
null,
227-
true
242+
true,
243+
$this->sqsTransportNameResolver->reveal()
228244
);
229245

230246
$failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', ''));
@@ -266,6 +282,9 @@ public function test_x_ray_header_is_dispatched_on_bus()
266282
$this->busDriver->reveal(),
267283
$this->bus,
268284
$this->serializer->reveal(),
285+
$transport,
286+
null,
287+
false,
269288
$this->sqsTransportNameResolver->reveal()
270289
);
271290
$failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', $xrayTraceId));
@@ -305,10 +324,10 @@ public function test_unrecoverable_exception_during_batch()
305324
$this->busDriver->reveal(),
306325
$this->bus,
307326
$this->serializer->reveal(),
308-
$this->sqsTransportNameResolver->reveal(),
309-
null,
327+
$transport,
310328
null,
311-
true
329+
true,
330+
$this->sqsTransportNameResolver->reveal(),
312331
);
313332

314333
$failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', ''));
@@ -356,10 +375,10 @@ public function test_message_group_id_during_batch_of_fifo_queue()
356375
$this->busDriver->reveal(),
357376
$this->bus,
358377
$this->serializer->reveal(),
359-
$this->sqsTransportNameResolver->reveal(),
360-
null,
378+
$transport,
361379
null,
362-
true
380+
true,
381+
$this->sqsTransportNameResolver->reveal(),
363382
);
364383

365384
$failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', ''));
@@ -410,10 +429,10 @@ public function test_different_message_group_id_failed_during_batch_of_fifo_queu
410429
$this->busDriver->reveal(),
411430
$this->bus,
412431
$this->serializer->reveal(),
413-
$this->sqsTransportNameResolver->reveal(),
414-
null,
432+
$transport,
415433
null,
416-
true
434+
true,
435+
$this->sqsTransportNameResolver->reveal(),
417436
);
418437

419438
$failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', ''));

0 commit comments

Comments
 (0)