Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 133 additions & 3 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,38 @@ class TransactionState(IntEnum):
ABORTING_TRANSACTION = 5
ABORTABLE_ERROR = 6
FATAL_ERROR = 7
# KIP-360: intermediate state entered when a recoverable sequence-related
# error is encountered. The producer sends an InitProducerIdRequest v3+
# with its current producer_id/epoch to bump the epoch, then transitions
# back to READY on success. Records in the accumulator will be sent under
# the bumped epoch with fresh sequence numbers. In-flight batches at the
# moment of the bump are lost (their futures fail). (TODO re KAFKA-5793)
BUMPING_PRODUCER_EPOCH = 8

@classmethod
def is_transition_valid(cls, source, target):
if target == cls.INITIALIZING:
return source == cls.UNINITIALIZED
return source in (cls.UNINITIALIZED, cls.BUMPING_PRODUCER_EPOCH)
elif target == cls.READY:
return source in (cls.INITIALIZING, cls.COMMITTING_TRANSACTION, cls.ABORTING_TRANSACTION)
return source in (cls.INITIALIZING, cls.COMMITTING_TRANSACTION,
cls.ABORTING_TRANSACTION, cls.BUMPING_PRODUCER_EPOCH)
elif target == cls.IN_TRANSACTION:
return source == cls.READY
elif target == cls.COMMITTING_TRANSACTION:
return source == cls.IN_TRANSACTION
elif target == cls.ABORTING_TRANSACTION:
return source in (cls.IN_TRANSACTION, cls.ABORTABLE_ERROR)
elif target == cls.ABORTABLE_ERROR:
return source in (cls.IN_TRANSACTION, cls.COMMITTING_TRANSACTION, cls.ABORTABLE_ERROR)
return source in (cls.IN_TRANSACTION, cls.COMMITTING_TRANSACTION,
cls.ABORTABLE_ERROR, cls.BUMPING_PRODUCER_EPOCH)
elif target == cls.BUMPING_PRODUCER_EPOCH:
# A recoverable sequence-related error can arrive at any point in
# the producer's lifetime; the bump is a unilateral recovery
# action. Disallow only from UNINITIALIZED (no producer_id yet
# to bump) and the terminal error states.
return source in (cls.READY, cls.IN_TRANSACTION,
cls.COMMITTING_TRANSACTION, cls.ABORTING_TRANSACTION,
cls.ABORTABLE_ERROR)
elif target == cls.UNINITIALIZED:
# Disallow transitions to UNITIALIZED
return False
Expand Down Expand Up @@ -246,6 +263,119 @@ def has_error(self):
TransactionState.ABORTABLE_ERROR,
TransactionState.FATAL_ERROR)

def is_bumping_epoch(self):
with self._lock:
return self._current_state == TransactionState.BUMPING_PRODUCER_EPOCH

# KIP-360 error classification
#
# Errors whose correct recovery is to bump the producer epoch via
# InitProducerIdRequest v3+. On brokers that do not support the bump
# (api_version < 2.5) these degrade to FATAL for transactional producers
# and NEEDS_PRODUCER_ID_RESET for non-transactional idempotent producers,
# matching the pre-KIP-360 behavior.
_NEEDS_EPOCH_BUMP_ERRORS = frozenset({
Errors.OutOfOrderSequenceNumberError,
Errors.UnknownProducerIdError,
Errors.InvalidProducerEpochError,
})

# Errors that are always fatal regardless of broker version: auth
# failures, fencing, or structural state corruption where no recovery
# is possible without operator action.
_FATAL_ERRORS = frozenset({
Errors.ClusterAuthorizationFailedError,
Errors.TransactionalIdAuthorizationFailedError,
Errors.ProducerFencedError,
Errors.InvalidTxnStateError,
})

# Classification outcomes returned by classify_batch_error().
ERROR_CLASS_RETRIABLE = 'RETRIABLE'
ERROR_CLASS_ABORTABLE = 'ABORTABLE'
ERROR_CLASS_FATAL = 'FATAL'
ERROR_CLASS_NEEDS_EPOCH_BUMP = 'NEEDS_EPOCH_BUMP'
ERROR_CLASS_NEEDS_PRODUCER_ID_RESET = 'NEEDS_PRODUCER_ID_RESET'

def _supports_epoch_bump(self):
"""Return True if the broker supports InitProducerIdRequest v3+ (KIP-360).

KIP-360 landed in Kafka 2.5. On older brokers we fall back to the
pre-KIP-360 recovery: reset producer id for idempotent producers,
fatal state for transactional producers.
"""
return self._api_version >= (2, 5)

def classify_batch_error(self, error, batch, log_start_offset=-1):
"""Categorize a batch-completion error into a recovery outcome.

Used by the Sender to decide what to do with a failed batch. This
method does not mutate any state — it is a pure classification
helper. The caller is responsible for dispatching to the
appropriate recovery path.

Arguments:
error (type or BaseException): The error class or instance.
batch (ProducerBatch): The batch that failed.
log_start_offset (int): log_start_offset from the broker's
PartitionProduceResponse, or -1 if unknown / client-side
failure. Used for KAFKA-5793 retention detection.

Returns one of:
ERROR_CLASS_RETRIABLE — caller should retry the batch
ERROR_CLASS_ABORTABLE — transactional producer only;
abort the transaction
ERROR_CLASS_FATAL — unrecoverable; transition to
fatal error and fail the batch
ERROR_CLASS_NEEDS_EPOCH_BUMP — recoverable via KIP-360 epoch
bump (only when broker supports
InitProducerIdRequest v3+)
ERROR_CLASS_NEEDS_PRODUCER_ID_RESET — non-transactional pre-KIP-360
fallback: reset the
producer id entirely

Note: this classification is for transactional/idempotent producers
only. Non-idempotent producers don't call this; the Sender uses
simpler retry/fail logic for them.
"""
error_type = error if isinstance(error, type) else type(error)

if error_type in self._FATAL_ERRORS:
return self.ERROR_CLASS_FATAL

# KAFKA-5793: a retention-based UnknownProducerIdError is recoverable
# by resetting the partition's sequence (not a full epoch bump). The
# Sender checks this condition separately before consulting this
# classifier, but we mirror the logic here so the classifier alone
# gives the correct answer for callers that pass log_start_offset.
if error_type is Errors.UnknownProducerIdError and log_start_offset is not None and log_start_offset >= 0:
last_acked = self.last_acked_offset(batch.topic_partition)
if log_start_offset > last_acked:
return self.ERROR_CLASS_RETRIABLE

if error_type in self._NEEDS_EPOCH_BUMP_ERRORS:
if self._supports_epoch_bump():
return self.ERROR_CLASS_NEEDS_EPOCH_BUMP
# Pre-KIP-360 brokers: fall back to the older (lossier) recovery.
if self.is_transactional():
return self.ERROR_CLASS_FATAL
return self.ERROR_CLASS_NEEDS_PRODUCER_ID_RESET

# Retriable errors (broker-retriable or client connection errors)
# become ABORTABLE for transactional producers only if they're
# non-retriable AND we're in a transaction. The Sender's existing
# can_retry/can_split logic handles the actual retry decision; this
# classifier is only consulted for the FAIL branch.
if getattr(error_type, 'retriable', False):
return self.ERROR_CLASS_RETRIABLE

# Non-retriable, not in the bump or fatal sets: transactional
# producers should abort the current transaction; non-transactional
# idempotent producers just fail the batch without any state reset.
if self.is_transactional():
return self.ERROR_CLASS_ABORTABLE
return self.ERROR_CLASS_FATAL

def is_aborting(self):
with self._lock:
return self._current_state == TransactionState.ABORTING_TRANSACTION
Expand Down
193 changes: 193 additions & 0 deletions test/producer/test_transaction_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# pylint: skip-file

import pytest

import kafka.errors as Errors
from kafka.cluster import ClusterMetadata
from kafka.producer.transaction_manager import (
TransactionManager,
TransactionState,
)
from kafka.structs import TopicPartition


class _FakeBatch:
"""Minimal ProducerBatch stand-in for classifier tests.

The classifier only reads `topic_partition`; nothing else.
"""
def __init__(self, topic='foo', partition=0):
self.topic_partition = TopicPartition(topic, partition)


def _make_manager(transactional_id=None, api_version=(2, 5)):
return TransactionManager(
transactional_id=transactional_id,
transaction_timeout_ms=60000,
retry_backoff_ms=100,
api_version=api_version,
metadata=ClusterMetadata(),
)


class TestBumpingProducerEpochState:
def test_state_value(self):
assert TransactionState.BUMPING_PRODUCER_EPOCH == 8

@pytest.mark.parametrize("source", [
TransactionState.READY,
TransactionState.IN_TRANSACTION,
TransactionState.COMMITTING_TRANSACTION,
TransactionState.ABORTING_TRANSACTION,
TransactionState.ABORTABLE_ERROR,
])
def test_valid_entry_states(self, source):
assert TransactionState.is_transition_valid(
source, TransactionState.BUMPING_PRODUCER_EPOCH
)

@pytest.mark.parametrize("source", [
TransactionState.UNINITIALIZED,
TransactionState.INITIALIZING,
TransactionState.FATAL_ERROR,
])
def test_invalid_entry_states(self, source):
assert not TransactionState.is_transition_valid(
source, TransactionState.BUMPING_PRODUCER_EPOCH
)

@pytest.mark.parametrize("target", [
TransactionState.READY,
TransactionState.INITIALIZING,
TransactionState.ABORTABLE_ERROR,
TransactionState.FATAL_ERROR,
])
def test_valid_exit_targets(self, target):
assert TransactionState.is_transition_valid(
TransactionState.BUMPING_PRODUCER_EPOCH, target
)

def test_cannot_transition_bumping_to_in_transaction_directly(self):
# Must go through READY first
assert not TransactionState.is_transition_valid(
TransactionState.BUMPING_PRODUCER_EPOCH, TransactionState.IN_TRANSACTION
)

def test_is_bumping_epoch_helper(self):
tm = _make_manager()
assert not tm.is_bumping_epoch()
tm._current_state = TransactionState.BUMPING_PRODUCER_EPOCH
assert tm.is_bumping_epoch()
tm._current_state = TransactionState.READY
assert not tm.is_bumping_epoch()


class TestClassifyBatchError:
def test_fatal_errors_always_fatal(self):
tm = _make_manager(api_version=(2, 5))
batch = _FakeBatch()
for err in (
Errors.ClusterAuthorizationFailedError,
Errors.TransactionalIdAuthorizationFailedError,
Errors.ProducerFencedError,
Errors.InvalidTxnStateError,
):
assert tm.classify_batch_error(err, batch) == TransactionManager.ERROR_CLASS_FATAL

def test_epoch_bump_errors_on_modern_broker(self):
tm = _make_manager(api_version=(2, 5))
batch = _FakeBatch()
for err in (
Errors.OutOfOrderSequenceNumberError,
Errors.UnknownProducerIdError,
Errors.InvalidProducerEpochError,
):
assert tm.classify_batch_error(err, batch) == TransactionManager.ERROR_CLASS_NEEDS_EPOCH_BUMP

def test_epoch_bump_errors_on_old_broker_idempotent(self):
tm = _make_manager(transactional_id=None, api_version=(2, 0))
batch = _FakeBatch()
for err in (
Errors.OutOfOrderSequenceNumberError,
Errors.UnknownProducerIdError,
Errors.InvalidProducerEpochError,
):
assert tm.classify_batch_error(err, batch) == TransactionManager.ERROR_CLASS_NEEDS_PRODUCER_ID_RESET

def test_epoch_bump_errors_on_old_broker_transactional(self):
tm = _make_manager(transactional_id='txn-id', api_version=(2, 0))
batch = _FakeBatch()
for err in (
Errors.OutOfOrderSequenceNumberError,
Errors.UnknownProducerIdError,
Errors.InvalidProducerEpochError,
):
assert tm.classify_batch_error(err, batch) == TransactionManager.ERROR_CLASS_FATAL

def test_retention_based_unknown_producer_id_is_retriable(self):
tm = _make_manager(api_version=(2, 5))
batch = _FakeBatch()
# Simulate successful writes: last_acked_offset = 9
tm.update_last_acked_offset(batch.topic_partition, 0, 10)
# Broker reports log_start_offset strictly past our last_acked_offset
# -> records aged out, safe to retry (with sequence reset)
result = tm.classify_batch_error(
Errors.UnknownProducerIdError, batch, log_start_offset=100
)
assert result == TransactionManager.ERROR_CLASS_RETRIABLE

def test_real_data_loss_unknown_producer_id_still_needs_bump(self):
tm = _make_manager(api_version=(2, 5))
batch = _FakeBatch()
tm.update_last_acked_offset(batch.topic_partition, 0, 100) # last_acked = 99
# log_start_offset within our acked range -> real loss, not retention
result = tm.classify_batch_error(
Errors.UnknownProducerIdError, batch, log_start_offset=50
)
assert result == TransactionManager.ERROR_CLASS_NEEDS_EPOCH_BUMP

def test_unknown_producer_id_without_log_start_offset_needs_bump(self):
tm = _make_manager(api_version=(2, 5))
batch = _FakeBatch()
tm.update_last_acked_offset(batch.topic_partition, 0, 5)
# Old broker (log_start_offset = -1) -> no retention escape hatch
result = tm.classify_batch_error(
Errors.UnknownProducerIdError, batch, log_start_offset=-1
)
assert result == TransactionManager.ERROR_CLASS_NEEDS_EPOCH_BUMP

def test_retriable_client_error(self):
tm = _make_manager(api_version=(2, 5))
batch = _FakeBatch()
assert tm.classify_batch_error(Errors.KafkaConnectionError, batch) == TransactionManager.ERROR_CLASS_RETRIABLE

def test_retriable_broker_error(self):
tm = _make_manager(api_version=(2, 5))
batch = _FakeBatch()
assert tm.classify_batch_error(Errors.NotLeaderForPartitionError, batch) == TransactionManager.ERROR_CLASS_RETRIABLE

def test_non_retriable_non_fatal_idempotent(self):
tm = _make_manager(transactional_id=None, api_version=(2, 5))
batch = _FakeBatch()
# InvalidTopicError is non-retriable, not in FATAL_ERRORS, not in epoch-bump set
assert tm.classify_batch_error(Errors.InvalidTopicError, batch) == TransactionManager.ERROR_CLASS_FATAL

def test_non_retriable_non_fatal_transactional(self):
tm = _make_manager(transactional_id='txn-id', api_version=(2, 5))
batch = _FakeBatch()
# Transactional producers can abort the current transaction and retry
assert tm.classify_batch_error(Errors.InvalidTopicError, batch) == TransactionManager.ERROR_CLASS_ABORTABLE

def test_classifier_accepts_exception_instance(self):
tm = _make_manager(api_version=(2, 5))
batch = _FakeBatch()
# Passing an instance rather than a class should work identically
exc = Errors.OutOfOrderSequenceNumberError("some message")
assert tm.classify_batch_error(exc, batch) == TransactionManager.ERROR_CLASS_NEEDS_EPOCH_BUMP

def test_supports_epoch_bump_version_gate(self):
assert not _make_manager(api_version=(0, 11))._supports_epoch_bump()
assert not _make_manager(api_version=(2, 0))._supports_epoch_bump()
assert not _make_manager(api_version=(2, 4))._supports_epoch_bump()
assert _make_manager(api_version=(2, 5))._supports_epoch_bump()
assert _make_manager(api_version=(3, 0))._supports_epoch_bump()
Loading