Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 71 additions & 78 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,7 @@ def producer_epoch(self):
return self.transaction_manager.producer_id_and_epoch.epoch

def fatal_error(self, exc):
log.error(f'Fatal Error handling request {self.request.name if self.request else "none"}: {exc}')
self.transaction_manager.transition_to_fatal_error(exc)
self._result.done(error=exc)

Expand Down Expand Up @@ -895,21 +896,20 @@ def priority(self):
return Priority.INIT_PRODUCER_ID

def handle_response(self, response):
error = Errors.for_code(response.error_code)
error_type = Errors.for_code(response.error_code)

if error is Errors.NoError:
if error_type is Errors.NoError:
self.transaction_manager.set_producer_id_and_epoch(ProducerIdAndEpoch(response.producer_id, response.producer_epoch))
if self._is_epoch_bump:
self.transaction_manager._complete_epoch_bump()
else:
self.transaction_manager._transition_to(TransactionState.READY)
self._result.done()
elif error in (Errors.NotCoordinatorError, Errors.CoordinatorNotAvailableError):
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
self.reenqueue()
elif error in (Errors.CoordinatorLoadInProgressError, Errors.ConcurrentTransactionsError):
elif getattr(error_type, 'retriable', False):
if error_type in (Errors.NotCoordinatorError, Errors.CoordinatorNotAvailableError):
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
self.reenqueue()
elif error is Errors.InvalidProducerEpochError and self._is_epoch_bump:
elif error_type is Errors.InvalidProducerEpochError and self._is_epoch_bump:
# KIP-360: our (producer_id, epoch) are stale--the broker no
# longer recognizes them. Fall back to allocating a fresh
# producer_id by reissuing InitProducerIdRequest without
Expand All @@ -918,14 +918,14 @@ def handle_response(self, response):
"falling back to a fresh producer id")
self.transaction_manager._restart_epoch_bump_without_producer_id(
self.request.transaction_timeout_ms, self._result)
elif error is Errors.ProducerFencedError:
elif error_type is Errors.ProducerFencedError:
# Another producer instance has taken over this transactional_id.
# Fatal--the application must rebuild the producer.
self.fatal_error(error())
elif error is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error())
self.fatal_error(error_type())
elif error_type is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error_type())
else:
self.fatal_error(Errors.KafkaError("Unexpected error in InitProducerIdResponse: %s" % (error())))
self.fatal_error(Errors.KafkaError("Unexpected error in InitProducerIdResponse: %s" % (error_type())))

class AddPartitionsToTxnHandler(TxnRequestHandler):
def __init__(self, transaction_manager, topic_partitions):
Expand Down Expand Up @@ -959,37 +959,33 @@ def handle_response(self, response):
for topic, partition_data in response.results_by_topic_v3_and_below
for partition, error_code in partition_data}

for tp, error in results.items():
if error is Errors.NoError:
for tp, error_type in results.items():
if error_type is Errors.NoError:
continue
elif error in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError):
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
elif getattr(error_type, 'retriable', False):
if error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError):
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
elif error_type is Errors.ConcurrentTransactionsError:
self.maybe_override_retry_backoff_ms()
self.reenqueue()
return
elif error is Errors.ConcurrentTransactionsError:
self.maybe_override_retry_backoff_ms()
self.reenqueue()
return
elif error in (Errors.CoordinatorLoadInProgressError, Errors.UnknownTopicOrPartitionError):
self.reenqueue()
elif error_type is Errors.InvalidProducerEpochError:
self.fatal_error(error_type())
return
elif error is Errors.InvalidProducerEpochError:
self.fatal_error(error())
elif error_type is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error_type())
return
elif error is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error())
elif error_type in (Errors.InvalidProducerIdMappingError, Errors.InvalidTxnStateError):
self.fatal_error(Errors.KafkaError(error_type()))
return
elif error in (Errors.InvalidProducerIdMappingError, Errors.InvalidTxnStateError):
self.fatal_error(Errors.KafkaError(error()))
return
elif error is Errors.TopicAuthorizationFailedError:
elif error_type is Errors.TopicAuthorizationFailedError:
unauthorized_topics.add(tp.topic)
elif error is Errors.OperationNotAttemptedError:
elif error_type is Errors.OperationNotAttemptedError:
log.debug("Did not attempt to add partition %s to transaction because other partitions in the"
" batch had errors.", tp)
has_partition_errors = True
else:
log.error("Could not add partition %s due to unexpected error %s", tp, error())
log.error("Could not add partition %s due to unexpected error %s", tp, error_type())
has_partition_errors = True

partitions = set(results)
Expand Down Expand Up @@ -1056,26 +1052,26 @@ def coordinator_key(self):
return None

def handle_response(self, response):
error = Errors.for_code(response.error_code)
error_type = Errors.for_code(response.error_code)

if error is Errors.NoError:
if error_type is Errors.NoError:
coordinator_id = self.transaction_manager._metadata.add_coordinator(
response, self._coord_type, self._coord_key)
if self._coord_type == 'group':
self.transaction_manager._consumer_group_coordinator = coordinator_id
elif self._coord_type == 'transaction':
self.transaction_manager._transaction_coordinator = coordinator_id
self._result.done()
elif error is Errors.CoordinatorNotAvailableError:
elif getattr(error_type, 'retriable', False):
self.reenqueue()
elif error is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error())
elif error is Errors.GroupAuthorizationFailedError:
self.abortable_error(error(self._coord_key))
elif error_type is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error_type())
elif error_type is Errors.GroupAuthorizationFailedError:
self.abortable_error(error_type(self._coord_key))
else:
self.fatal_error(Errors.KafkaError(
"Could not find a coordinator with type %s with key %s due to"
" unexpected error: %s" % (self._coord_type, self._coord_key, error())))
" unexpected error: %s" % (self._coord_type, self._coord_key, error_type())))


class EndTxnHandler(TxnRequestHandler):
Expand All @@ -1099,24 +1095,23 @@ def priority(self):
return Priority.END_TXN

def handle_response(self, response):
error = Errors.for_code(response.error_code)
error_type = Errors.for_code(response.error_code)

if error is Errors.NoError:
if error_type is Errors.NoError:
self.transaction_manager._complete_transaction()
self._result.done()
elif error in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError):
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
self.reenqueue()
elif error in (Errors.CoordinatorLoadInProgressError, Errors.ConcurrentTransactionsError):
elif getattr(error_type, 'retriable', False):
if error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError):
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
self.reenqueue()
elif error is Errors.InvalidProducerEpochError:
self.fatal_error(error())
elif error is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error())
elif error is Errors.InvalidTxnStateError:
self.fatal_error(error())
elif error_type is Errors.InvalidProducerEpochError:
self.fatal_error(error_type())
elif error_type is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error_type())
elif error_type is Errors.InvalidTxnStateError:
self.fatal_error(error_type())
else:
self.fatal_error(Errors.KafkaError("Unhandled error in EndTxnResponse: %s" % (error())))
self.fatal_error(Errors.KafkaError("Unhandled error in EndTxnResponse: %s" % (error_type())))


class AddOffsetsToTxnHandler(TxnRequestHandler):
Expand All @@ -1142,9 +1137,9 @@ def priority(self):
return Priority.ADD_PARTITIONS_OR_OFFSETS

def handle_response(self, response):
error = Errors.for_code(response.error_code)
error_type = Errors.for_code(response.error_code)

if error is Errors.NoError:
if error_type is Errors.NoError:
log.debug("Successfully added partition for consumer group %s to transaction", self.consumer_group_id)

# note the result is not completed until the TxnOffsetCommit returns
Expand All @@ -1154,19 +1149,18 @@ def handle_response(self, response):
self.transaction_manager._pending_txn_offset_commits, self._result)
self.transaction_manager._enqueue_request(handler)
self.transaction_manager._transaction_started = True
elif error in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError):
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
elif getattr(error_type, 'retriable', False):
if error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError):
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
self.reenqueue()
elif error in (Errors.CoordinatorLoadInProgressError, Errors.ConcurrentTransactionsError):
elif error_type in (Errors.CoordinatorLoadInProgressError, Errors.ConcurrentTransactionsError):
self.reenqueue()
elif error is Errors.InvalidProducerEpochError:
self.fatal_error(error())
elif error is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error())
elif error is Errors.GroupAuthorizationFailedError:
self.abortable_error(error(self.consumer_group_id))
elif error_type in (Errors.InvalidProducerEpochError, Errors.TransactionalIdAuthorizationFailedError):
self.fatal_error(error_type())
elif error_type is Errors.GroupAuthorizationFailedError:
self.abortable_error(error_type(self.consumer_group_id))
else:
self.fatal_error(Errors.KafkaError("Unexpected error in AddOffsetsToTxnResponse: %s" % (error())))
self.fatal_error(Errors.KafkaError("Unexpected error in AddOffsetsToTxnResponse: %s" % (error_type())))


class TxnOffsetCommitHandler(TxnRequestHandler):
Expand Down Expand Up @@ -1220,26 +1214,25 @@ def handle_response(self, response):
for topic, partition_data in response.topics
for partition, error_code in partition_data}

for tp, error in errors.items():
if error is Errors.NoError:
for tp, error_type in errors.items():
if error_type is Errors.NoError:
log.debug("Successfully added offsets for %s from consumer group %s to transaction.",
tp, self.consumer_group_id)
del self.transaction_manager._pending_txn_offset_commits[tp]
elif error in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError, Errors.RequestTimedOutError):
retriable_failure = True
lookup_coordinator = True
elif error is Errors.UnknownTopicOrPartitionError:
elif getattr(error_type, 'retriable', False):
retriable_failure = True
elif error is Errors.GroupAuthorizationFailedError:
self.abortable_error(error(self.consumer_group_id))
if error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError, Errors.RequestTimedOutError):
lookup_coordinator = True
elif error_type is Errors.GroupAuthorizationFailedError:
self.abortable_error(error_type(self.consumer_group_id))
return
elif error in (Errors.TransactionalIdAuthorizationFailedError,
Errors.InvalidProducerEpochError,
Errors.UnsupportedForMessageFormatError):
self.fatal_error(error())
elif error_type in (Errors.TransactionalIdAuthorizationFailedError,
Errors.InvalidProducerEpochError,
Errors.UnsupportedForMessageFormatError):
self.fatal_error(error_type())
return
else:
self.fatal_error(Errors.KafkaError("Unexpected error in TxnOffsetCommitResponse: %s" % (error())))
self.fatal_error(Errors.KafkaError("Unexpected error in TxnOffsetCommitResponse: %s" % (error_type())))
return

if lookup_coordinator:
Expand Down
Loading
Loading