We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent f4555b9 commit 00b3aa0Copy full SHA for 00b3aa0
src/worker.ts
@@ -305,9 +305,10 @@ export class Worker {
305
const topicCfg = kafkaConfig.topics[topicLabel];
306
const topic = await events.topic(topicCfg.topic);
307
const offSetValue = await this.offsetStore.getOffset(topicCfg.topic);
308
+ logger.info('subscribing to topic with offset value', topicCfg.topic, offSetValue);
309
if (topicCfg.events) {
310
for (let eventName of topicCfg.events) {
- await topic.on(eventName, eventListener);
311
+ await topic.on(eventName, eventListener, { startingOffset: offSetValue });
312
}
313
314
0 commit comments