Skip to content

Commit ccf77a0

Browse files
author
Ivan Koryukov
committed
#93909 create topic before subscribing
1 parent 78a4539 commit ccf77a0

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

src/HighLevelConsumer.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,16 @@ protected function shouldBeStopped(int|float $startTime, int $eventsProcessed, C
130130

131131
protected function subscribe(string $topicName): void
132132
{
133-
$this->consumer->newTopic($topicName);
134-
sleep(1);
133+
$attempts = 0;
134+
do {
135+
$topicExists = (bool)$this->getPartitions($topicName);
136+
if (!$topicExists) {
137+
$this->consumer->newTopic($topicName);
138+
sleep(1);
139+
}
140+
$attempts++;
141+
} while (!$topicExists && $attempts < 10);
142+
135143
$this->consumer->subscribe([$topicName]);
136144
}
137145

0 commit comments

Comments
 (0)