Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,10 +918,12 @@ def handle_response(self, response):
"falling back to a fresh producer id")
self.transaction_manager._restart_epoch_bump_without_producer_id(
self.request.transaction_timeout_ms, self._result)
elif error_type is Errors.ProducerFencedError:
elif error_type in (Errors.ProducerFencedError, Errors.InvalidProducerEpochError):
# Another producer instance has taken over this transactional_id.
# Fatal--the application must rebuild the producer.
self.fatal_error(error_type())
# Fatal--the application must rebuild the producer. Mirrors the
# Java client, which normalizes INVALID_PRODUCER_EPOCH to
# PRODUCER_FENCED on the non-bump InitProducerId path.
self.fatal_error(Errors.ProducerFencedError())
elif error_type is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error_type())
else:
Expand Down Expand Up @@ -969,8 +971,10 @@ def handle_response(self, response):
self.maybe_override_retry_backoff_ms()
self.reenqueue()
return
elif error_type is Errors.InvalidProducerEpochError:
self.fatal_error(error_type())
elif error_type in (Errors.InvalidProducerEpochError, Errors.ProducerFencedError):
# Java client normalizes INVALID_PRODUCER_EPOCH to PRODUCER_FENCED
# on the txn-coordinator RPC paths (KIP-360).
self.fatal_error(Errors.ProducerFencedError())
return
elif error_type is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error_type())
Expand Down Expand Up @@ -1104,8 +1108,10 @@ def handle_response(self, response):
if error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError):
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
self.reenqueue()
elif error_type is Errors.InvalidProducerEpochError:
self.fatal_error(error_type())
elif error_type in (Errors.InvalidProducerEpochError, Errors.ProducerFencedError):
# Java client normalizes INVALID_PRODUCER_EPOCH to PRODUCER_FENCED
# on the txn-coordinator RPC paths (KIP-360).
self.fatal_error(Errors.ProducerFencedError())
elif error_type is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error_type())
elif error_type is Errors.InvalidTxnStateError:
Expand Down Expand Up @@ -1155,7 +1161,11 @@ def handle_response(self, response):
self.reenqueue()
elif error_type in (Errors.CoordinatorLoadInProgressError, Errors.ConcurrentTransactionsError):
self.reenqueue()
elif error_type in (Errors.InvalidProducerEpochError, Errors.TransactionalIdAuthorizationFailedError):
elif error_type in (Errors.InvalidProducerEpochError, Errors.ProducerFencedError):
# Java client normalizes INVALID_PRODUCER_EPOCH to PRODUCER_FENCED
# on the txn-coordinator RPC paths (KIP-360).
self.fatal_error(Errors.ProducerFencedError())
elif error_type is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error_type())
elif error_type is Errors.GroupAuthorizationFailedError:
self.abortable_error(error_type(self.consumer_group_id))
Expand Down Expand Up @@ -1226,8 +1236,12 @@ def handle_response(self, response):
elif error_type is Errors.GroupAuthorizationFailedError:
self.abortable_error(error_type(self.consumer_group_id))
return
elif error_type in (Errors.InvalidProducerEpochError, Errors.ProducerFencedError):
# Java client normalizes INVALID_PRODUCER_EPOCH to PRODUCER_FENCED
# on the txn-coordinator RPC paths (KIP-360).
self.fatal_error(Errors.ProducerFencedError())
return
elif error_type in (Errors.TransactionalIdAuthorizationFailedError,
Errors.InvalidProducerEpochError,
Errors.UnsupportedForMessageFormatError):
self.fatal_error(error_type())
return
Expand Down
78 changes: 50 additions & 28 deletions test/producer/test_transaction_manager_mock_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,15 @@ def test_success_transitions_to_ready(self, broker, client):
assert handler._result.is_done
assert not handler._result.failed

@pytest.mark.parametrize("error", [
Errors.ProducerFencedError,
Errors.TransactionalIdAuthorizationFailedError,
@pytest.mark.parametrize("error,expected", [
(Errors.ProducerFencedError, Errors.ProducerFencedError),
# Java client normalizes non-bump INVALID_PRODUCER_EPOCH to
# PRODUCER_FENCED on the InitProducerId path (KIP-360).
(Errors.InvalidProducerEpochError, Errors.ProducerFencedError),
(Errors.TransactionalIdAuthorizationFailedError,
Errors.TransactionalIdAuthorizationFailedError),
])
def test_fatal_error(self, broker, client, error):
def test_fatal_error(self, broker, client, error, expected):
tm = _make_manager(client)
tm._current_state = TransactionState.INITIALIZING
handler = self._enqueue_init(tm)
Expand All @@ -239,7 +243,7 @@ def test_fatal_error(self, broker, client, error):

assert tm._current_state == TransactionState.FATAL_ERROR
assert tm.has_fatal_error()
assert isinstance(tm.last_error, error)
assert isinstance(tm.last_error, expected)
assert handler._result.failed
assert handler not in _pending_handlers(tm)

Expand Down Expand Up @@ -394,13 +398,17 @@ def test_success_marks_partition_added(self, broker, client):
assert handler not in _pending_handlers(tm)
assert handler._result.is_done and not handler._result.failed

@pytest.mark.parametrize("error", [
Errors.InvalidProducerEpochError,
Errors.TransactionalIdAuthorizationFailedError,
Errors.InvalidProducerIdMappingError,
Errors.InvalidTxnStateError,
@pytest.mark.parametrize("error,expected", [
# Java client normalizes INVALID_PRODUCER_EPOCH and PRODUCER_FENCED to
# PRODUCER_FENCED on the txn-coordinator RPC paths (KIP-360).
(Errors.InvalidProducerEpochError, Errors.ProducerFencedError),
(Errors.ProducerFencedError, Errors.ProducerFencedError),
(Errors.TransactionalIdAuthorizationFailedError,
Errors.TransactionalIdAuthorizationFailedError),
(Errors.InvalidProducerIdMappingError, Errors.KafkaError),
(Errors.InvalidTxnStateError, Errors.KafkaError),
])
def test_fatal_partition_error(self, broker, client, error):
def test_fatal_partition_error(self, broker, client, error, expected):
tm = _make_manager(client)
handler, tp = self._enqueue_add_partitions(tm)

Expand All @@ -412,6 +420,7 @@ def test_fatal_partition_error(self, broker, client, error):

assert tm._current_state == TransactionState.FATAL_ERROR
assert tm.has_fatal_error()
assert isinstance(tm.last_error, expected)
assert handler._result.failed
assert handler not in _pending_handlers(tm)

Expand Down Expand Up @@ -625,12 +634,16 @@ def test_success_completes_transaction(self, broker, client):
assert not tm._partitions_in_transaction
assert handler._result.is_done and not handler._result.failed

@pytest.mark.parametrize("error", [
Errors.InvalidProducerEpochError,
Errors.TransactionalIdAuthorizationFailedError,
Errors.InvalidTxnStateError,
@pytest.mark.parametrize("error,expected", [
# Java client normalizes INVALID_PRODUCER_EPOCH and PRODUCER_FENCED to
# PRODUCER_FENCED on the txn-coordinator RPC paths (KIP-360).
(Errors.InvalidProducerEpochError, Errors.ProducerFencedError),
(Errors.ProducerFencedError, Errors.ProducerFencedError),
(Errors.TransactionalIdAuthorizationFailedError,
Errors.TransactionalIdAuthorizationFailedError),
(Errors.InvalidTxnStateError, Errors.InvalidTxnStateError),
])
def test_fatal_error(self, broker, client, error):
def test_fatal_error(self, broker, client, error, expected):
tm = _make_manager(client)
handler = self._enqueue_end_txn(tm)

Expand All @@ -641,7 +654,7 @@ def test_fatal_error(self, broker, client, error):

assert tm._current_state == TransactionState.FATAL_ERROR
assert tm.has_fatal_error()
assert isinstance(tm.last_error, error)
assert isinstance(tm.last_error, expected)
assert handler._result.failed
assert handler not in _pending_handlers(tm)

Expand Down Expand Up @@ -736,11 +749,15 @@ def test_success_enqueues_offset_commit(self, broker, client):
assert not handler._result.is_done
assert tm._pending_txn_offset_commits == offsets

@pytest.mark.parametrize("error", [
Errors.InvalidProducerEpochError,
Errors.TransactionalIdAuthorizationFailedError,
@pytest.mark.parametrize("error,expected", [
# Java client normalizes INVALID_PRODUCER_EPOCH and PRODUCER_FENCED to
# PRODUCER_FENCED on the txn-coordinator RPC paths (KIP-360).
(Errors.InvalidProducerEpochError, Errors.ProducerFencedError),
(Errors.ProducerFencedError, Errors.ProducerFencedError),
(Errors.TransactionalIdAuthorizationFailedError,
Errors.TransactionalIdAuthorizationFailedError),
])
def test_fatal_error(self, broker, client, error):
def test_fatal_error(self, broker, client, error, expected):
tm = _make_manager(client)
handler, _, _ = self._enqueue_add_offsets(tm)

Expand All @@ -751,7 +768,7 @@ def test_fatal_error(self, broker, client, error):
_poll_for_future(client, future)

assert tm._current_state == TransactionState.FATAL_ERROR
assert isinstance(tm.last_error, error)
assert isinstance(tm.last_error, expected)
assert handler._result.failed

def test_group_authz_failed_is_abortable(self, broker, client):
Expand Down Expand Up @@ -872,12 +889,17 @@ def test_success_completes_result(self, broker, client):
assert handler._result.is_done and not handler._result.failed
assert tp not in tm._pending_txn_offset_commits

@pytest.mark.parametrize("error", [
Errors.TransactionalIdAuthorizationFailedError,
Errors.InvalidProducerEpochError,
Errors.UnsupportedForMessageFormatError,
@pytest.mark.parametrize("error,expected", [
# Java client normalizes INVALID_PRODUCER_EPOCH and PRODUCER_FENCED to
# PRODUCER_FENCED on the txn-coordinator RPC paths (KIP-360).
(Errors.InvalidProducerEpochError, Errors.ProducerFencedError),
(Errors.ProducerFencedError, Errors.ProducerFencedError),
(Errors.TransactionalIdAuthorizationFailedError,
Errors.TransactionalIdAuthorizationFailedError),
(Errors.UnsupportedForMessageFormatError,
Errors.UnsupportedForMessageFormatError),
])
def test_fatal_partition_error(self, broker, client, error):
def test_fatal_partition_error(self, broker, client, error, expected):
tm = _make_manager(client)
handler, tp = self._enqueue_offset_commit(tm)

Expand All @@ -889,7 +911,7 @@ def test_fatal_partition_error(self, broker, client, error):
_poll_for_future(client, future)

assert tm._current_state == TransactionState.FATAL_ERROR
assert isinstance(tm.last_error, error)
assert isinstance(tm.last_error, expected)
assert handler._result.failed

def test_group_authz_failed_is_abortable(self, broker, client):
Expand Down
Loading