Skip to content

Commit 0c4a964

Browse files
Issue #38 - Commit offset when there"s a processing error
1 parent 03a0547 commit 0c4a964

File tree

1 file changed

+19
-6
lines changed

1 file changed

+19
-6
lines changed

src/app.js

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,27 +29,36 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (
2929
} catch (e) {
3030
logger.error('Invalid message JSON.')
3131
logger.error(e)
32+
3233
// ignore the message
33-
return
34+
return co(function * () {
35+
yield consumer.commitOffset({ topic, partition, offset: m.offset })
36+
})
3437
}
3538

3639
if (messageJSON.topic !== topic) {
3740
logger.error(`The message topic ${messageJSON.topic} doesn't match the Kafka topic ${topic}.`)
3841
// ignore the message
39-
return
42+
return co(function * () {
43+
yield consumer.commitOffset({ topic, partition, offset: m.offset })
44+
})
4045
}
4146

4247
// Process only messages with scanned status
4348
if (messageJSON.topic === config.AVSCAN_TOPIC && messageJSON.payload.status !== 'scanned') {
4449
logger.debug(`Ignoring message in topic ${messageJSON.topic} with status ${messageJSON.payload.status}`)
4550
// ignore the message
46-
return
51+
return co(function * () {
52+
yield consumer.commitOffset({ topic, partition, offset: m.offset })
53+
})
4754
}
4855

4956
if (topic === config.SUBMISSION_CREATE_TOPIC && messageJSON.payload.fileType === 'url') {
5057
logger.debug(`Ignoring message in topic ${messageJSON.topic} with file type as url`)
5158
// ignore the message
52-
return
59+
return co(function * () {
60+
yield consumer.commitOffset({ topic, partition, offset: m.offset })
61+
})
5362
}
5463

5564
return co(function * () {
@@ -66,11 +75,15 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (
6675
})
6776
// commit offset regardless of errors
6877
.then(() => {
69-
consumer.commitOffset({ topic, partition, offset: m.offset })
78+
return co(function * () {
79+
yield consumer.commitOffset({ topic, partition, offset: m.offset })
80+
})
7081
})
7182
.catch((err) => {
7283
logger.error(err)
73-
consumer.commitOffset({ topic, partition, offset: m.offset })
84+
return co(function * () {
85+
yield consumer.commitOffset({ topic, partition, offset: m.offset })
86+
})
7487
})
7588
})
7689

0 commit comments

Comments
 (0)