Skip to content

Allow SubscribableKafkaMessageSource option of manually commiting offsets #555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
bradskuse opened this issue Mar 17, 2025 · 5 comments · Fixed by #556
Closed

Allow SubscribableKafkaMessageSource option of manually commiting offsets #555

bradskuse opened this issue Mar 17, 2025 · 5 comments · Fixed by #556

Comments

@bradskuse
Copy link
Contributor

bradskuse commented Mar 17, 2025

Enhancement Description

Currently when using the consumer choice of SubscribableKafkaMessageSource, there is no way to commit offsets pragmatically. It depends on enable-auto-commit: true to be set.

The problem with this is when a batch of consumer records are processed (lets say a batch size of 50), the batch could fail on the 15th message AND in the background the consumer can commit offsets through to the 50th event. When the consumer then resubscribes the events from 15 - 50 have been missed.

I'm proposing an option to allow FetchEventsTask to manually commit the offsets.

Current Behaviour

When an event throws an exception the consumer can commit the offsets and then miss records.

Wanted Behaviour

Maybe after the processRecords(...) shown below, the offsets could be committed if auto commit is disabled.

@Override
public void run() {
    try {
        while (running.get()) {
            ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
            logger.debug("Fetched [{}] number of ConsumerRecords", records.count());
            processRecords(records);
        }
    } catch (Exception e) {
        logger.warn("Encountered an exception fetching ConsumerRecords", e);
        runtimeErrorHandler.handle(new FetchEventException(
                "Cannot proceed with fetching ConsumerRecords since we encountered an exception",
                e));
    } finally {
        running.set(false);
        closeHandler.accept(this);
        consumer.close();
        logger.info("Fetch events task and used Consumer instance [{}] have been closed", consumer);
    }
}

Possible Workarounds

I don't think a work-a-round is possible.

@bradskuse
Copy link
Contributor Author

bradskuse commented Mar 17, 2025

Looking at above auto created fix... I dont think the configuration is available from the consumer. A new builder method could be added e.g. like pollTimeout

AsyncFetcher.<String, String, EventMessage<?>>builder()
                .pollTimeout(properties.getFetcher().getPollTimeout())
             ***.commitOnProcessed(!properties.getConsumer().isEnableAutoCommit())***
                .build();

@smcvb
Copy link
Member

smcvb commented Mar 19, 2025

Thanks for opening this issue, and the subsequent pull request, with us, @bradskuse!
I wanted to reply here to ascertain that you're being heard, but also the share that we're currently working towards a deadline to get a milestone for Axon Framework 5 out.
As such, I don't expect anyone from the team to be able to review your solution in the upcoming 2 to 3 weeks.
After that, I'll be sure to give it a look.

@bradskuse
Copy link
Contributor Author

All good @smcvb

Thank you for the update, I appreciate you must be pretty busy with a release coming up.

Hopefully my understanding of the problem is correct. Once the release is all done and the team is freed up, feel free to ping me any questions.

In the meantime I'll push up another change to PR to have option of synchronous vs asynchronous commit (which could be useful for others)

@smcvb
Copy link
Member

smcvb commented Apr 18, 2025

Sorry for the long wait here, @bradskuse! We've released Axon Framework 5, milestone 1, last Wednesday.
So, we can finally start taking a look again at the issues and pull requests that came in for Axon Framework 4.
As such, I've just reviewed the pull request you've provided for this issue.
Conceptually it looks fine, but I have some minor clean-up requests.
Once those are ironed out, I think we're good to go!

smcvb added a commit that referenced this issue Apr 24, 2025
@smcvb
Copy link
Member

smcvb commented Apr 24, 2025

Thank you for the issue and the pull request, @bradskuse! I have just approved your PR, saw the build turn green, and merged it.

I will be releasing this extension right now as well. That way, I can pair it up with some fixes for the Mongo Extension and Axon Framework 4.11.
Thus, you should be able to use your addition in production soon! :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants
@bradskuse @smcvb and others