How to properly consume kafka messages concurrently #1925
Replies: 3 comments
-
|
Unfortunately, Kafka doesn't support general concurrent message consuming. You should commit offset one by one, in the concurrent case you can't recognize the correct offset position if any error occured. So, you should scale Kafka consumers by partitions - single consumer per partition. |
Beta Was this translation helpful? Give feedback.
-
|
Hi) Recently we've built solution https://github.com/modern-python/faststream-concurrent-aiokafka |
Beta Was this translation helpful? Give feedback.
-
|
Hi, The Kafka subscriber docs (e.g. Basic Subscriber – Concurrent processing, v0.7) describe concurrent processing for max_workers > 1 only for:
My questions: If middleware converts failures into “successful” completion (catch exception, perform side effects, then return without re-raising), AckPolicy.NACK_ON_ERROR’s error path would not run for those failures. My goal is to consume asynchronously from a topic with 8 partitions using one consumer (processing messages one by one within each partition, but concurrently across partitions) using AckPolicy.ACK |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
I need to scale up my kafka processing which is basically multiple asynchronous IO operations (requests). I understand that i can use multiple workers, but this is an leads to an uncessary usage of resource. I wonder if it is possible to make better usage of asyncio with faststream.
If im not doing something wrong, the subscriber below will process 1 message at time.
Log:
I wish something just like fastapi's endpoints works. The below view will accepet multiple requests concurrently.
I made the implementation below, but it leads to missing the manual Ack controllablility.
Which should be the correct implementation to work with async concurrency and kafka ?
Beta Was this translation helpful? Give feedback.
All reactions