Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async as config option #31

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

Andyandpandy
Copy link
Contributor

Based on this comment it seemed wise to make async as a config option.

#25 (comment)

@Andyandpandy Andyandpandy requested a review from a team as a code owner April 13, 2023 09:11
@CLAassistant
Copy link

CLAassistant commented Apr 13, 2023

CLA assistant check
All committers have signed the CLA.

@Andyandpandy Andyandpandy force-pushed the make-async-config-option branch from d8a8f9a to ac44712 Compare April 13, 2023 10:06
@barryhatfield
Copy link

Thank you

@Andyandpandy
Copy link
Contributor Author

Ready for merge?

@Andyandpandy
Copy link
Contributor Author

Any further changes needed? @RobertIndie

if (enableAsync){
getProducer(topic).newMessage()
.value(s.getBytes())
.sendAsync();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to add a callback to print a log when failing to publish messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This send function is only called inside a try/catch, where it logs "fail to send message" with the exception. See line 225-235.

Copy link
Member

@RobertIndie RobertIndie Jun 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't catch the exception thrown from sendAsync. We need to use CompletableFuture.exceptionally to catch it. Please see: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#exceptionally-java.util.function.Function-

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.. Am I reading this right?

CompletableFuture.exceptionally description:

Returns a new CompletableFuture that is completed when this CompletableFuture completes, with the result of the given function of the exception triggering this CompletableFuture's completion when it completes exceptionally; otherwise, if this CompletableFuture completes normally, then the returned CompletableFuture also completes normally with the same value.

This doesn't throw a custom Exception that inherits from the class Exception as usual? I don't think I understand what this is describing. I'm trying to decipher it, but does it mean that it returns the function that triggered an Exception rather than actually throwing the Exception. In that case I guess I couldn't try/catch? Or does it mean that I am supposed to just provide a function that is throwable, so when CompletableFuture throws, it triggers my function with where it failed?

Copy link
Member

@RobertIndie RobertIndie Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply.

The sendAsync will not throw any exceptions. The producer already catches all the exceptions and places them into the Future. You could check the code here.

Therefore if there are any exceptions thrown in the send operation, the function fn in the exceptionally will be called. And I think we need to print the error log in that function.

Or does it mean that I am supposed to just provide a function that is throwable, so when CompletableFuture throws, it triggers my function with where it failed?

That's correct.

However, you still need to use try/catch here, because we couldn't guarantee that this code won't throw the exception:

getProducer(topic).newMessage()
                .value(s.getBytes())
                .sendAsync();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants