We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent 158eb45 commit 45b26d9Copy full SHA for 45b26d9
cwm_worker_operator/kafka_streamer.py
@@ -95,6 +95,7 @@ def commit(topic, consumer, domains_config, agg_data, no_kafka_commit=False):
95
raise NotImplementedError(f"topic {topic} is not supported")
96
if not no_kafka_commit:
97
consumer.commit()
98
+ agg_data.clear()
99
100
101
def delete_records(topic, latest_partition_offset):
0 commit comments