Skip to content

Commit 29de855

Browse files
authored
Merge pull request #7 from ensi-platform/task-93909-1
#93909 create topic before subscribing
2 parents 188c908 + ccf77a0 commit 29de855

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ Processors in config have the following configuration options:
7373
]
7474

7575
```
76+
77+
**Important!** Some topics have to have different consumer settings, such as start reading topic from the beginning or don't create topic if it is not exists yet.
78+
For such cases you need to configure several consumers and use suitable one.
79+
7680
### Synchronous processors
7781

7882
Most of the time all tou need is a synchronous processor.

src/HighLevelConsumer.php

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public function forceStop(): static
4646
*/
4747
public function listen(string $topicName, ProcessorData $processorData, ConsumerOptions $options): void
4848
{
49-
$this->consumer->subscribe([ $topicName ]);
49+
$this->subscribe($topicName);
5050

5151
[$startTime, $eventsProcessed] = [hrtime(true) / 1e9, 0];
5252

@@ -128,6 +128,21 @@ protected function shouldBeStopped(int|float $startTime, int $eventsProcessed, C
128128
return $this->forceStop;
129129
}
130130

131+
protected function subscribe(string $topicName): void
132+
{
133+
$attempts = 0;
134+
do {
135+
$topicExists = (bool)$this->getPartitions($topicName);
136+
if (!$topicExists) {
137+
$this->consumer->newTopic($topicName);
138+
sleep(1);
139+
}
140+
$attempts++;
141+
} while (!$topicExists && $attempts < 10);
142+
143+
$this->consumer->subscribe([$topicName]);
144+
}
145+
131146
/**
132147
* @param string $topicName
133148
* @return TopicPartition[]

0 commit comments

Comments
 (0)