Skip to content

Conversation

AHeise
Copy link
Contributor

@AHeise AHeise commented Sep 29, 2025

When cancelling a job, we may interrupt the KafkaCommitter. This currently leads to ERROR log "Transaction ... encountered error and data has been potentially lost."

However, that exception is expected and not leading to any data loss beyond the normal inconsistencies because of cancellation. In many cases, the commit already succeeded. Further, a job restart will lead to data being eventually committed.

Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks mostly good left some minor comments.

Comment on lines 137 to 138
// reset interrupt flag that is when the Kafka exception is created
Thread.interrupted();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Since we rethrow the interrupted exception, it's not necessary to mark the thread interrupted again, isn't it?

Copy link
Contributor Author

@AHeise AHeise Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you should not even do it. Hence I reset the flag here. (This is not Thread.interrupt()).

new Thread(
() -> {
try {
serverSocket.accept().getInputStream().read();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this also trigger by some kind of metadata request that the internal kafkaProducer does? I want to avoid interrupting before we actually call commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are only listening right before commitTransaction. So any other producer initialization would be done already. However, since we use resumeTransaction with reflection hacks, we actually don't communicate with the broker at all (it's not available for this test by design).

Note that commitTransaction actually issues two requests (one is a metadata, one the actual commit). I haven't found a way or a reason to only wait on the second without relying on internals. WDYT?

@AHeise
Copy link
Contributor Author

AHeise commented Sep 29, 2025

Note that I reran the new test 1000 times to make sure that it's not flaky (on my machine). I also used break points inside the socket listener to force producer retries.

@AHeise AHeise force-pushed the FLINK-38451-interrupts branch from 71c16ca to c56aebe Compare September 29, 2025 11:18
When cancelling a job, we may interrupt the KafkaCommitter. This currently leads to ERROR log "Transaction ... encountered error and data has been potentially lost."

However, that exception is expected and not leading to any data loss beyond the normal inconsistencies because of cancellation. In many cases, the commit already succeeded. Further, a job restart will lead to data being eventually committed.
@AHeise AHeise force-pushed the FLINK-38451-interrupts branch from c56aebe to ddf6e9c Compare September 29, 2025 12:29
Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@AHeise AHeise merged commit ea0b9ab into apache:main Sep 29, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants