File tree Expand file tree Collapse file tree 2 files changed +9
-1
lines changed Expand file tree Collapse file tree 2 files changed +9
-1
lines changed Original file line number Diff line number Diff line change @@ -102,6 +102,10 @@ class UnsupportedCodecError(KafkaError):
102
102
pass
103
103
104
104
105
+ class TransactionAbortedError (KafkaError ):
106
+ pass
107
+
108
+
105
109
class BrokerResponseError (KafkaError ):
106
110
errno = None
107
111
message = None
Original file line number Diff line number Diff line change @@ -166,7 +166,11 @@ def run_once(self):
166
166
self ._client .poll (timeout_ms = self .config ['retry_backoff_ms' ])
167
167
return
168
168
elif self ._transaction_manager .has_abortable_error ():
169
- self ._accumulator .abort_undrained_batches (self ._transaction_manager .last_error )
169
+ # Attempt to get the last error that caused this abort.
170
+ # If there was no error, but we are still aborting,
171
+ # then this is most likely a case where there was no fatal error.
172
+ exception = self ._transaction_manager .last_error or Errors .TransactionAbortedError ()
173
+ self ._accumulator .abort_undrained_batches (exception )
170
174
171
175
except Errors .SaslAuthenticationFailedError as e :
172
176
# This is already logged as error, but propagated here to perform any clean ups.
You can’t perform that action at this time.
0 commit comments