From fc70d772fa831576e8818c454da62d24f259f1d1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 10 Apr 2026 19:38:57 -0700 Subject: [PATCH 1/2] KIP-360 (pt1): Add transaction manager state and helper methods --- kafka/producer/transaction_manager.py | 136 +++++++++++++++++++++++++- 1 file changed, 133 insertions(+), 3 deletions(-) diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index dcb50a4f5..59d887dc3 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -52,13 +52,21 @@ class TransactionState(IntEnum): ABORTING_TRANSACTION = 5 ABORTABLE_ERROR = 6 FATAL_ERROR = 7 + # KIP-360: intermediate state entered when a recoverable sequence-related + # error is encountered. The producer sends an InitProducerIdRequest v3+ + # with its current producer_id/epoch to bump the epoch, then transitions + # back to READY on success. Records in the accumulator will be sent under + # the bumped epoch with fresh sequence numbers. In-flight batches at the + # moment of the bump are lost (their futures fail). (TODO re KAFKA-5793) + BUMPING_PRODUCER_EPOCH = 8 @classmethod def is_transition_valid(cls, source, target): if target == cls.INITIALIZING: - return source == cls.UNINITIALIZED + return source in (cls.UNINITIALIZED, cls.BUMPING_PRODUCER_EPOCH) elif target == cls.READY: - return source in (cls.INITIALIZING, cls.COMMITTING_TRANSACTION, cls.ABORTING_TRANSACTION) + return source in (cls.INITIALIZING, cls.COMMITTING_TRANSACTION, + cls.ABORTING_TRANSACTION, cls.BUMPING_PRODUCER_EPOCH) elif target == cls.IN_TRANSACTION: return source == cls.READY elif target == cls.COMMITTING_TRANSACTION: @@ -66,7 +74,16 @@ def is_transition_valid(cls, source, target): elif target == cls.ABORTING_TRANSACTION: return source in (cls.IN_TRANSACTION, cls.ABORTABLE_ERROR) elif target == cls.ABORTABLE_ERROR: - return source in (cls.IN_TRANSACTION, cls.COMMITTING_TRANSACTION, cls.ABORTABLE_ERROR) + return source in (cls.IN_TRANSACTION, cls.COMMITTING_TRANSACTION, + cls.ABORTABLE_ERROR, cls.BUMPING_PRODUCER_EPOCH) + elif target == cls.BUMPING_PRODUCER_EPOCH: + # A recoverable sequence-related error can arrive at any point in + # the producer's lifetime; the bump is a unilateral recovery + # action. Disallow only from UNINITIALIZED (no producer_id yet + # to bump) and the terminal error states. + return source in (cls.READY, cls.IN_TRANSACTION, + cls.COMMITTING_TRANSACTION, cls.ABORTING_TRANSACTION, + cls.ABORTABLE_ERROR) elif target == cls.UNINITIALIZED: # Disallow transitions to UNITIALIZED return False @@ -246,6 +263,119 @@ def has_error(self): TransactionState.ABORTABLE_ERROR, TransactionState.FATAL_ERROR) + def is_bumping_epoch(self): + with self._lock: + return self._current_state == TransactionState.BUMPING_PRODUCER_EPOCH + + # KIP-360 error classification + # + # Errors whose correct recovery is to bump the producer epoch via + # InitProducerIdRequest v3+. On brokers that do not support the bump + # (api_version < 2.5) these degrade to FATAL for transactional producers + # and NEEDS_PRODUCER_ID_RESET for non-transactional idempotent producers, + # matching the pre-KIP-360 behavior. + _NEEDS_EPOCH_BUMP_ERRORS = frozenset({ + Errors.OutOfOrderSequenceNumberError, + Errors.UnknownProducerIdError, + Errors.InvalidProducerEpochError, + }) + + # Errors that are always fatal regardless of broker version: auth + # failures, fencing, or structural state corruption where no recovery + # is possible without operator action. + _FATAL_ERRORS = frozenset({ + Errors.ClusterAuthorizationFailedError, + Errors.TransactionalIdAuthorizationFailedError, + Errors.ProducerFencedError, + Errors.InvalidTxnStateError, + }) + + # Classification outcomes returned by classify_batch_error(). + ERROR_CLASS_RETRIABLE = 'RETRIABLE' + ERROR_CLASS_ABORTABLE = 'ABORTABLE' + ERROR_CLASS_FATAL = 'FATAL' + ERROR_CLASS_NEEDS_EPOCH_BUMP = 'NEEDS_EPOCH_BUMP' + ERROR_CLASS_NEEDS_PRODUCER_ID_RESET = 'NEEDS_PRODUCER_ID_RESET' + + def _supports_epoch_bump(self): + """Return True if the broker supports InitProducerIdRequest v3+ (KIP-360). + + KIP-360 landed in Kafka 2.5. On older brokers we fall back to the + pre-KIP-360 recovery: reset producer id for idempotent producers, + fatal state for transactional producers. + """ + return self._api_version >= (2, 5) + + def classify_batch_error(self, error, batch, log_start_offset=-1): + """Categorize a batch-completion error into a recovery outcome. + + Used by the Sender to decide what to do with a failed batch. This + method does not mutate any state — it is a pure classification + helper. The caller is responsible for dispatching to the + appropriate recovery path. + + Arguments: + error (type or BaseException): The error class or instance. + batch (ProducerBatch): The batch that failed. + log_start_offset (int): log_start_offset from the broker's + PartitionProduceResponse, or -1 if unknown / client-side + failure. Used for KAFKA-5793 retention detection. + + Returns one of: + ERROR_CLASS_RETRIABLE — caller should retry the batch + ERROR_CLASS_ABORTABLE — transactional producer only; + abort the transaction + ERROR_CLASS_FATAL — unrecoverable; transition to + fatal error and fail the batch + ERROR_CLASS_NEEDS_EPOCH_BUMP — recoverable via KIP-360 epoch + bump (only when broker supports + InitProducerIdRequest v3+) + ERROR_CLASS_NEEDS_PRODUCER_ID_RESET — non-transactional pre-KIP-360 + fallback: reset the + producer id entirely + + Note: this classification is for transactional/idempotent producers + only. Non-idempotent producers don't call this; the Sender uses + simpler retry/fail logic for them. + """ + error_type = error if isinstance(error, type) else type(error) + + if error_type in self._FATAL_ERRORS: + return self.ERROR_CLASS_FATAL + + # KAFKA-5793: a retention-based UnknownProducerIdError is recoverable + # by resetting the partition's sequence (not a full epoch bump). The + # Sender checks this condition separately before consulting this + # classifier, but we mirror the logic here so the classifier alone + # gives the correct answer for callers that pass log_start_offset. + if error_type is Errors.UnknownProducerIdError and log_start_offset is not None and log_start_offset >= 0: + last_acked = self.last_acked_offset(batch.topic_partition) + if log_start_offset > last_acked: + return self.ERROR_CLASS_RETRIABLE + + if error_type in self._NEEDS_EPOCH_BUMP_ERRORS: + if self._supports_epoch_bump(): + return self.ERROR_CLASS_NEEDS_EPOCH_BUMP + # Pre-KIP-360 brokers: fall back to the older (lossier) recovery. + if self.is_transactional(): + return self.ERROR_CLASS_FATAL + return self.ERROR_CLASS_NEEDS_PRODUCER_ID_RESET + + # Retriable errors (broker-retriable or client connection errors) + # become ABORTABLE for transactional producers only if they're + # non-retriable AND we're in a transaction. The Sender's existing + # can_retry/can_split logic handles the actual retry decision; this + # classifier is only consulted for the FAIL branch. + if getattr(error_type, 'retriable', False): + return self.ERROR_CLASS_RETRIABLE + + # Non-retriable, not in the bump or fatal sets: transactional + # producers should abort the current transaction; non-transactional + # idempotent producers just fail the batch without any state reset. + if self.is_transactional(): + return self.ERROR_CLASS_ABORTABLE + return self.ERROR_CLASS_FATAL + def is_aborting(self): with self._lock: return self._current_state == TransactionState.ABORTING_TRANSACTION From 9a0628206fc86772acd83a7d609aa3abd926c284 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 10 Apr 2026 20:47:13 -0700 Subject: [PATCH 2/2] test/producer/test_transaction_manager.py --- test/producer/test_transaction_manager.py | 193 ++++++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 test/producer/test_transaction_manager.py diff --git a/test/producer/test_transaction_manager.py b/test/producer/test_transaction_manager.py new file mode 100644 index 000000000..f22ba721e --- /dev/null +++ b/test/producer/test_transaction_manager.py @@ -0,0 +1,193 @@ +# pylint: skip-file + +import pytest + +import kafka.errors as Errors +from kafka.cluster import ClusterMetadata +from kafka.producer.transaction_manager import ( + TransactionManager, + TransactionState, +) +from kafka.structs import TopicPartition + + +class _FakeBatch: + """Minimal ProducerBatch stand-in for classifier tests. + + The classifier only reads `topic_partition`; nothing else. + """ + def __init__(self, topic='foo', partition=0): + self.topic_partition = TopicPartition(topic, partition) + + +def _make_manager(transactional_id=None, api_version=(2, 5)): + return TransactionManager( + transactional_id=transactional_id, + transaction_timeout_ms=60000, + retry_backoff_ms=100, + api_version=api_version, + metadata=ClusterMetadata(), + ) + + +class TestBumpingProducerEpochState: + def test_state_value(self): + assert TransactionState.BUMPING_PRODUCER_EPOCH == 8 + + @pytest.mark.parametrize("source", [ + TransactionState.READY, + TransactionState.IN_TRANSACTION, + TransactionState.COMMITTING_TRANSACTION, + TransactionState.ABORTING_TRANSACTION, + TransactionState.ABORTABLE_ERROR, + ]) + def test_valid_entry_states(self, source): + assert TransactionState.is_transition_valid( + source, TransactionState.BUMPING_PRODUCER_EPOCH + ) + + @pytest.mark.parametrize("source", [ + TransactionState.UNINITIALIZED, + TransactionState.INITIALIZING, + TransactionState.FATAL_ERROR, + ]) + def test_invalid_entry_states(self, source): + assert not TransactionState.is_transition_valid( + source, TransactionState.BUMPING_PRODUCER_EPOCH + ) + + @pytest.mark.parametrize("target", [ + TransactionState.READY, + TransactionState.INITIALIZING, + TransactionState.ABORTABLE_ERROR, + TransactionState.FATAL_ERROR, + ]) + def test_valid_exit_targets(self, target): + assert TransactionState.is_transition_valid( + TransactionState.BUMPING_PRODUCER_EPOCH, target + ) + + def test_cannot_transition_bumping_to_in_transaction_directly(self): + # Must go through READY first + assert not TransactionState.is_transition_valid( + TransactionState.BUMPING_PRODUCER_EPOCH, TransactionState.IN_TRANSACTION + ) + + def test_is_bumping_epoch_helper(self): + tm = _make_manager() + assert not tm.is_bumping_epoch() + tm._current_state = TransactionState.BUMPING_PRODUCER_EPOCH + assert tm.is_bumping_epoch() + tm._current_state = TransactionState.READY + assert not tm.is_bumping_epoch() + + +class TestClassifyBatchError: + def test_fatal_errors_always_fatal(self): + tm = _make_manager(api_version=(2, 5)) + batch = _FakeBatch() + for err in ( + Errors.ClusterAuthorizationFailedError, + Errors.TransactionalIdAuthorizationFailedError, + Errors.ProducerFencedError, + Errors.InvalidTxnStateError, + ): + assert tm.classify_batch_error(err, batch) == TransactionManager.ERROR_CLASS_FATAL + + def test_epoch_bump_errors_on_modern_broker(self): + tm = _make_manager(api_version=(2, 5)) + batch = _FakeBatch() + for err in ( + Errors.OutOfOrderSequenceNumberError, + Errors.UnknownProducerIdError, + Errors.InvalidProducerEpochError, + ): + assert tm.classify_batch_error(err, batch) == TransactionManager.ERROR_CLASS_NEEDS_EPOCH_BUMP + + def test_epoch_bump_errors_on_old_broker_idempotent(self): + tm = _make_manager(transactional_id=None, api_version=(2, 0)) + batch = _FakeBatch() + for err in ( + Errors.OutOfOrderSequenceNumberError, + Errors.UnknownProducerIdError, + Errors.InvalidProducerEpochError, + ): + assert tm.classify_batch_error(err, batch) == TransactionManager.ERROR_CLASS_NEEDS_PRODUCER_ID_RESET + + def test_epoch_bump_errors_on_old_broker_transactional(self): + tm = _make_manager(transactional_id='txn-id', api_version=(2, 0)) + batch = _FakeBatch() + for err in ( + Errors.OutOfOrderSequenceNumberError, + Errors.UnknownProducerIdError, + Errors.InvalidProducerEpochError, + ): + assert tm.classify_batch_error(err, batch) == TransactionManager.ERROR_CLASS_FATAL + + def test_retention_based_unknown_producer_id_is_retriable(self): + tm = _make_manager(api_version=(2, 5)) + batch = _FakeBatch() + # Simulate successful writes: last_acked_offset = 9 + tm.update_last_acked_offset(batch.topic_partition, 0, 10) + # Broker reports log_start_offset strictly past our last_acked_offset + # -> records aged out, safe to retry (with sequence reset) + result = tm.classify_batch_error( + Errors.UnknownProducerIdError, batch, log_start_offset=100 + ) + assert result == TransactionManager.ERROR_CLASS_RETRIABLE + + def test_real_data_loss_unknown_producer_id_still_needs_bump(self): + tm = _make_manager(api_version=(2, 5)) + batch = _FakeBatch() + tm.update_last_acked_offset(batch.topic_partition, 0, 100) # last_acked = 99 + # log_start_offset within our acked range -> real loss, not retention + result = tm.classify_batch_error( + Errors.UnknownProducerIdError, batch, log_start_offset=50 + ) + assert result == TransactionManager.ERROR_CLASS_NEEDS_EPOCH_BUMP + + def test_unknown_producer_id_without_log_start_offset_needs_bump(self): + tm = _make_manager(api_version=(2, 5)) + batch = _FakeBatch() + tm.update_last_acked_offset(batch.topic_partition, 0, 5) + # Old broker (log_start_offset = -1) -> no retention escape hatch + result = tm.classify_batch_error( + Errors.UnknownProducerIdError, batch, log_start_offset=-1 + ) + assert result == TransactionManager.ERROR_CLASS_NEEDS_EPOCH_BUMP + + def test_retriable_client_error(self): + tm = _make_manager(api_version=(2, 5)) + batch = _FakeBatch() + assert tm.classify_batch_error(Errors.KafkaConnectionError, batch) == TransactionManager.ERROR_CLASS_RETRIABLE + + def test_retriable_broker_error(self): + tm = _make_manager(api_version=(2, 5)) + batch = _FakeBatch() + assert tm.classify_batch_error(Errors.NotLeaderForPartitionError, batch) == TransactionManager.ERROR_CLASS_RETRIABLE + + def test_non_retriable_non_fatal_idempotent(self): + tm = _make_manager(transactional_id=None, api_version=(2, 5)) + batch = _FakeBatch() + # InvalidTopicError is non-retriable, not in FATAL_ERRORS, not in epoch-bump set + assert tm.classify_batch_error(Errors.InvalidTopicError, batch) == TransactionManager.ERROR_CLASS_FATAL + + def test_non_retriable_non_fatal_transactional(self): + tm = _make_manager(transactional_id='txn-id', api_version=(2, 5)) + batch = _FakeBatch() + # Transactional producers can abort the current transaction and retry + assert tm.classify_batch_error(Errors.InvalidTopicError, batch) == TransactionManager.ERROR_CLASS_ABORTABLE + + def test_classifier_accepts_exception_instance(self): + tm = _make_manager(api_version=(2, 5)) + batch = _FakeBatch() + # Passing an instance rather than a class should work identically + exc = Errors.OutOfOrderSequenceNumberError("some message") + assert tm.classify_batch_error(exc, batch) == TransactionManager.ERROR_CLASS_NEEDS_EPOCH_BUMP + + def test_supports_epoch_bump_version_gate(self): + assert not _make_manager(api_version=(0, 11))._supports_epoch_bump() + assert not _make_manager(api_version=(2, 0))._supports_epoch_bump() + assert not _make_manager(api_version=(2, 4))._supports_epoch_bump() + assert _make_manager(api_version=(2, 5))._supports_epoch_bump() + assert _make_manager(api_version=(3, 0))._supports_epoch_bump()