diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 28b3a60b6..96f045254 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -918,10 +918,12 @@ def handle_response(self, response): "falling back to a fresh producer id") self.transaction_manager._restart_epoch_bump_without_producer_id( self.request.transaction_timeout_ms, self._result) - elif error_type is Errors.ProducerFencedError: + elif error_type in (Errors.ProducerFencedError, Errors.InvalidProducerEpochError): # Another producer instance has taken over this transactional_id. - # Fatal--the application must rebuild the producer. - self.fatal_error(error_type()) + # Fatal--the application must rebuild the producer. Mirrors the + # Java client, which normalizes INVALID_PRODUCER_EPOCH to + # PRODUCER_FENCED on the non-bump InitProducerId path. + self.fatal_error(Errors.ProducerFencedError()) elif error_type is Errors.TransactionalIdAuthorizationFailedError: self.fatal_error(error_type()) else: @@ -969,8 +971,10 @@ def handle_response(self, response): self.maybe_override_retry_backoff_ms() self.reenqueue() return - elif error_type is Errors.InvalidProducerEpochError: - self.fatal_error(error_type()) + elif error_type in (Errors.InvalidProducerEpochError, Errors.ProducerFencedError): + # Java client normalizes INVALID_PRODUCER_EPOCH to PRODUCER_FENCED + # on the txn-coordinator RPC paths (KIP-360). + self.fatal_error(Errors.ProducerFencedError()) return elif error_type is Errors.TransactionalIdAuthorizationFailedError: self.fatal_error(error_type()) @@ -1104,8 +1108,10 @@ def handle_response(self, response): if error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError): self.transaction_manager._lookup_coordinator('transaction', self.transactional_id) self.reenqueue() - elif error_type is Errors.InvalidProducerEpochError: - self.fatal_error(error_type()) + elif error_type in (Errors.InvalidProducerEpochError, Errors.ProducerFencedError): + # Java client normalizes INVALID_PRODUCER_EPOCH to PRODUCER_FENCED + # on the txn-coordinator RPC paths (KIP-360). + self.fatal_error(Errors.ProducerFencedError()) elif error_type is Errors.TransactionalIdAuthorizationFailedError: self.fatal_error(error_type()) elif error_type is Errors.InvalidTxnStateError: @@ -1155,7 +1161,11 @@ def handle_response(self, response): self.reenqueue() elif error_type in (Errors.CoordinatorLoadInProgressError, Errors.ConcurrentTransactionsError): self.reenqueue() - elif error_type in (Errors.InvalidProducerEpochError, Errors.TransactionalIdAuthorizationFailedError): + elif error_type in (Errors.InvalidProducerEpochError, Errors.ProducerFencedError): + # Java client normalizes INVALID_PRODUCER_EPOCH to PRODUCER_FENCED + # on the txn-coordinator RPC paths (KIP-360). + self.fatal_error(Errors.ProducerFencedError()) + elif error_type is Errors.TransactionalIdAuthorizationFailedError: self.fatal_error(error_type()) elif error_type is Errors.GroupAuthorizationFailedError: self.abortable_error(error_type(self.consumer_group_id)) @@ -1226,8 +1236,12 @@ def handle_response(self, response): elif error_type is Errors.GroupAuthorizationFailedError: self.abortable_error(error_type(self.consumer_group_id)) return + elif error_type in (Errors.InvalidProducerEpochError, Errors.ProducerFencedError): + # Java client normalizes INVALID_PRODUCER_EPOCH to PRODUCER_FENCED + # on the txn-coordinator RPC paths (KIP-360). + self.fatal_error(Errors.ProducerFencedError()) + return elif error_type in (Errors.TransactionalIdAuthorizationFailedError, - Errors.InvalidProducerEpochError, Errors.UnsupportedForMessageFormatError): self.fatal_error(error_type()) return diff --git a/test/producer/test_transaction_manager_mock_broker.py b/test/producer/test_transaction_manager_mock_broker.py index 225e2ed96..459a5dd15 100644 --- a/test/producer/test_transaction_manager_mock_broker.py +++ b/test/producer/test_transaction_manager_mock_broker.py @@ -221,11 +221,15 @@ def test_success_transitions_to_ready(self, broker, client): assert handler._result.is_done assert not handler._result.failed - @pytest.mark.parametrize("error", [ - Errors.ProducerFencedError, - Errors.TransactionalIdAuthorizationFailedError, + @pytest.mark.parametrize("error,expected", [ + (Errors.ProducerFencedError, Errors.ProducerFencedError), + # Java client normalizes non-bump INVALID_PRODUCER_EPOCH to + # PRODUCER_FENCED on the InitProducerId path (KIP-360). + (Errors.InvalidProducerEpochError, Errors.ProducerFencedError), + (Errors.TransactionalIdAuthorizationFailedError, + Errors.TransactionalIdAuthorizationFailedError), ]) - def test_fatal_error(self, broker, client, error): + def test_fatal_error(self, broker, client, error, expected): tm = _make_manager(client) tm._current_state = TransactionState.INITIALIZING handler = self._enqueue_init(tm) @@ -239,7 +243,7 @@ def test_fatal_error(self, broker, client, error): assert tm._current_state == TransactionState.FATAL_ERROR assert tm.has_fatal_error() - assert isinstance(tm.last_error, error) + assert isinstance(tm.last_error, expected) assert handler._result.failed assert handler not in _pending_handlers(tm) @@ -394,13 +398,17 @@ def test_success_marks_partition_added(self, broker, client): assert handler not in _pending_handlers(tm) assert handler._result.is_done and not handler._result.failed - @pytest.mark.parametrize("error", [ - Errors.InvalidProducerEpochError, - Errors.TransactionalIdAuthorizationFailedError, - Errors.InvalidProducerIdMappingError, - Errors.InvalidTxnStateError, + @pytest.mark.parametrize("error,expected", [ + # Java client normalizes INVALID_PRODUCER_EPOCH and PRODUCER_FENCED to + # PRODUCER_FENCED on the txn-coordinator RPC paths (KIP-360). + (Errors.InvalidProducerEpochError, Errors.ProducerFencedError), + (Errors.ProducerFencedError, Errors.ProducerFencedError), + (Errors.TransactionalIdAuthorizationFailedError, + Errors.TransactionalIdAuthorizationFailedError), + (Errors.InvalidProducerIdMappingError, Errors.KafkaError), + (Errors.InvalidTxnStateError, Errors.KafkaError), ]) - def test_fatal_partition_error(self, broker, client, error): + def test_fatal_partition_error(self, broker, client, error, expected): tm = _make_manager(client) handler, tp = self._enqueue_add_partitions(tm) @@ -412,6 +420,7 @@ def test_fatal_partition_error(self, broker, client, error): assert tm._current_state == TransactionState.FATAL_ERROR assert tm.has_fatal_error() + assert isinstance(tm.last_error, expected) assert handler._result.failed assert handler not in _pending_handlers(tm) @@ -625,12 +634,16 @@ def test_success_completes_transaction(self, broker, client): assert not tm._partitions_in_transaction assert handler._result.is_done and not handler._result.failed - @pytest.mark.parametrize("error", [ - Errors.InvalidProducerEpochError, - Errors.TransactionalIdAuthorizationFailedError, - Errors.InvalidTxnStateError, + @pytest.mark.parametrize("error,expected", [ + # Java client normalizes INVALID_PRODUCER_EPOCH and PRODUCER_FENCED to + # PRODUCER_FENCED on the txn-coordinator RPC paths (KIP-360). + (Errors.InvalidProducerEpochError, Errors.ProducerFencedError), + (Errors.ProducerFencedError, Errors.ProducerFencedError), + (Errors.TransactionalIdAuthorizationFailedError, + Errors.TransactionalIdAuthorizationFailedError), + (Errors.InvalidTxnStateError, Errors.InvalidTxnStateError), ]) - def test_fatal_error(self, broker, client, error): + def test_fatal_error(self, broker, client, error, expected): tm = _make_manager(client) handler = self._enqueue_end_txn(tm) @@ -641,7 +654,7 @@ def test_fatal_error(self, broker, client, error): assert tm._current_state == TransactionState.FATAL_ERROR assert tm.has_fatal_error() - assert isinstance(tm.last_error, error) + assert isinstance(tm.last_error, expected) assert handler._result.failed assert handler not in _pending_handlers(tm) @@ -736,11 +749,15 @@ def test_success_enqueues_offset_commit(self, broker, client): assert not handler._result.is_done assert tm._pending_txn_offset_commits == offsets - @pytest.mark.parametrize("error", [ - Errors.InvalidProducerEpochError, - Errors.TransactionalIdAuthorizationFailedError, + @pytest.mark.parametrize("error,expected", [ + # Java client normalizes INVALID_PRODUCER_EPOCH and PRODUCER_FENCED to + # PRODUCER_FENCED on the txn-coordinator RPC paths (KIP-360). + (Errors.InvalidProducerEpochError, Errors.ProducerFencedError), + (Errors.ProducerFencedError, Errors.ProducerFencedError), + (Errors.TransactionalIdAuthorizationFailedError, + Errors.TransactionalIdAuthorizationFailedError), ]) - def test_fatal_error(self, broker, client, error): + def test_fatal_error(self, broker, client, error, expected): tm = _make_manager(client) handler, _, _ = self._enqueue_add_offsets(tm) @@ -751,7 +768,7 @@ def test_fatal_error(self, broker, client, error): _poll_for_future(client, future) assert tm._current_state == TransactionState.FATAL_ERROR - assert isinstance(tm.last_error, error) + assert isinstance(tm.last_error, expected) assert handler._result.failed def test_group_authz_failed_is_abortable(self, broker, client): @@ -872,12 +889,17 @@ def test_success_completes_result(self, broker, client): assert handler._result.is_done and not handler._result.failed assert tp not in tm._pending_txn_offset_commits - @pytest.mark.parametrize("error", [ - Errors.TransactionalIdAuthorizationFailedError, - Errors.InvalidProducerEpochError, - Errors.UnsupportedForMessageFormatError, + @pytest.mark.parametrize("error,expected", [ + # Java client normalizes INVALID_PRODUCER_EPOCH and PRODUCER_FENCED to + # PRODUCER_FENCED on the txn-coordinator RPC paths (KIP-360). + (Errors.InvalidProducerEpochError, Errors.ProducerFencedError), + (Errors.ProducerFencedError, Errors.ProducerFencedError), + (Errors.TransactionalIdAuthorizationFailedError, + Errors.TransactionalIdAuthorizationFailedError), + (Errors.UnsupportedForMessageFormatError, + Errors.UnsupportedForMessageFormatError), ]) - def test_fatal_partition_error(self, broker, client, error): + def test_fatal_partition_error(self, broker, client, error, expected): tm = _make_manager(client) handler, tp = self._enqueue_offset_commit(tm) @@ -889,7 +911,7 @@ def test_fatal_partition_error(self, broker, client, error): _poll_for_future(client, future) assert tm._current_state == TransactionState.FATAL_ERROR - assert isinstance(tm.last_error, error) + assert isinstance(tm.last_error, expected) assert handler._result.failed def test_group_authz_failed_is_abortable(self, broker, client):