Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ class KafkaProducer:
defaults to be suitable. If the values are set to something
incompatible with the idempotent producer, a KafkaConfigurationError
will be raised.

On Kafka 2.5+ brokers, the idempotent producer automatically
recovers from transient producer-state errors (OutOfOrderSequence,
UnknownProducerId, InvalidProducerEpoch) by bumping its producer
epoch via InitProducerIdRequest v3+ (KIP-360). On older brokers,
these errors remain fatal for transactional producers and reset
the producer id for non-transactional idempotent producers.
Batches that are in-flight at the moment of a bump will have
their futures fail--their records are lost. Records still in
the accumulator (not yet drained) are produced under the bumped
epoch on the next drain.
delivery_timeout_ms (float): An upper bound on the time to report success
or failure after producer.send() returns. This limits the total time
that a record will be delayed prior to sending, the time to await
Expand Down
64 changes: 34 additions & 30 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from kafka import errors as Errors
from kafka.metrics.measurable import AnonMeasurable
from kafka.metrics.stats import Avg, Max, Rate
from kafka.producer.transaction_manager import ProducerIdAndEpoch
from kafka.producer.transaction_manager import ProducerIdAndEpoch, TransactionManager
from kafka.protocol.producer import InitProducerIdRequest, ProduceRequest, ProduceResponse
from kafka.structs import TopicPartition
from kafka.util import ensure_valid_topic_name
Expand Down Expand Up @@ -155,9 +155,13 @@ def run_once(self):
self._client.poll(timeout_ms=self.config['retry_backoff_ms'])
return

# do not continue sending if the transaction manager is in a failed state or if there
# is no producer id (for the idempotent case).
if self._transaction_manager.has_fatal_error() or not self._transaction_manager.has_producer_id():
# do not continue sending if the transaction manager is in a failed state, if there
# is no producer id (for the idempotent case), or if we're currently bumping the
# producer epoch (KIP-360) -- the InitProducerIdRequest has to complete before we
# can safely send any new produce requests under the new epoch.
if (self._transaction_manager.has_fatal_error()
or not self._transaction_manager.has_producer_id()
or self._transaction_manager.is_bumping_epoch()):
last_error = self._transaction_manager.last_error
if last_error is not None:
self._maybe_abort_batches(last_error)
Expand Down Expand Up @@ -547,36 +551,36 @@ def _dispatch_error(self, batch, exception, partition_response):
self._maybe_remove_from_inflight_batches(batch)
self._record_retries(batch)
else:
# FAIL: transaction state transitions then batch finalization.
# FAIL: dispatch transaction state transition via the
# classifier (KIP-360), then finalize the batch.
if self._transaction_manager:
if isinstance(exception, Errors.OutOfOrderSequenceNumberError) and \
not self._transaction_manager.is_transactional() and \
self._transaction_manager.has_producer_id(batch.producer_id):
base_offset = partition_response.base_offset if partition_response is not None else -1
log.error("%s: The broker received an out of order sequence number for topic-partition %s"
" at offset %s. This indicates data loss on the broker, and should be investigated.",
str(self), batch.topic_partition, base_offset)
# Reset the producer state since we have hit an irrecoverable
# exception and cannot make any guarantees about previously
# committed messages. This discards the producer id and all
# partition sequence numbers.
classification = self._transaction_manager.classify_batch_error(
exception, batch, log_start_offset=log_start_offset)

if classification == TransactionManager.ERROR_CLASS_NEEDS_EPOCH_BUMP:
# KIP-360 (Kafka 2.5+): bump the producer epoch and
# continue. The accumulator's unsent records will be
# drained under the new epoch. In-flight batches at
# this moment are lost; their futures (including this
# one) fail.
self._transaction_manager.bump_producer_id_and_epoch()
elif classification == TransactionManager.ERROR_CLASS_NEEDS_PRODUCER_ID_RESET:
# Pre-KIP-360 fallback (non-transactional idempotent
# producer on < 2.5 broker).
if isinstance(exception, Errors.OutOfOrderSequenceNumberError) and \
self._transaction_manager.has_producer_id(batch.producer_id):
base_offset = partition_response.base_offset if partition_response is not None else -1
log.error("%s: The broker received an out of order sequence number for topic-partition %s"
" at offset %s. This indicates data loss on the broker, and should be investigated.",
str(self), batch.topic_partition, base_offset)
self._transaction_manager.reset_producer_id()
elif isinstance(exception, Errors.UnknownProducerIdError):
# The broker has no state for this producer. It will accept a
# write with sequence 0, so reset our sequence state for this
# partition. All in-flight requests to this partition will
# also fail with UnknownProducerId, so the sequence will
# remain at 0. Note: if the broker supports epoch bumping,
# KIP-360 will later reset all sequence numbers after
# calling InitProducerId.
self._transaction_manager.reset_sequence_for_partition(batch.topic_partition)
elif isinstance(exception, (Errors.ClusterAuthorizationFailedError,
Errors.TransactionalIdAuthorizationFailedError,
Errors.ProducerFencedError,
Errors.InvalidTxnStateError)):
elif classification == TransactionManager.ERROR_CLASS_FATAL:
self._transaction_manager.transition_to_fatal_error(exception)
elif self._transaction_manager.is_transactional():
elif classification == TransactionManager.ERROR_CLASS_ABORTABLE:
self._transaction_manager.transition_to_abortable_error(exception)
# ERROR_CLASS_RETRIABLE at this point means we couldn't
# retry (e.g. delivery-timeout hit or retries exhausted);
# just fail the batch without any state transition.

if self._sensors:
self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
Expand Down
139 changes: 132 additions & 7 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,12 +432,96 @@ def reset_producer_id(self):
"""
with self._lock:
if self.is_transactional():
raise Errors.IllegalStateError(
raise Errors.IllegalStateError(
"Cannot reset producer state for a transactional producer."
" You must either abort the ongoing transaction or"
" reinitialize the transactional producer instead")
self.set_producer_id_and_epoch(ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH))
self._sequence_numbers.clear()
self._last_acked_offset.clear()

def bump_producer_id_and_epoch(self):
"""KIP-360: recover from a transient producer-state error by bumping
the epoch.

Transitions to BUMPING_PRODUCER_EPOCH and enqueues an
InitProducerIdRequest v3+ carrying the current producer_id/epoch.
When the broker responds with the bumped epoch, _complete_epoch_bump
transitions back to READY and the sender resumes producing under
the new epoch. Records in the accumulator that haven't been drained
yet will be stamped with the new epoch on the next drain.

TODO (KAFKA-5793 full): in-flight batches at the moment of the bump
are lost--their futures fail. Adding in-place rewrite of the
closed batch buffer (producer_id/epoch/base_sequence fields + CRC
recompute) would let us retry them under the new epoch without
losing records.

Requires broker >= 2.5 (InitProducerIdRequest v3+). On older
brokers, Sender falls back to reset_producer_id / fatal instead
via classify_batch_error.

Idempotent: if we're already in BUMPING_PRODUCER_EPOCH, this is a
no-op. This matters because with max_in_flight > 1, multiple
in-flight batches may all fail with the same epoch-bump-triggering
error in quick succession; only the first should drive the bump.
"""
with self._lock:
if self._current_state == TransactionState.BUMPING_PRODUCER_EPOCH:
return
if self._current_state == TransactionState.FATAL_ERROR:
return
if not self._supports_epoch_bump():
raise Errors.IllegalStateError(
"Cannot bump producer epoch: broker version %s does not support KIP-360 "
"(InitProducerIdRequest v3+ requires Kafka 2.5+)" % (self._api_version,))
log.warning("Bumping producer epoch for %s after recoverable error",
self.producer_id_and_epoch)
self._transition_to(TransactionState.BUMPING_PRODUCER_EPOCH)
# Drop all per-partition sequence state. The bumped epoch starts
# each partition at sequence 0. last_acked_offset is also cleared
# since it's tied to the pre-bump producer_id/epoch range.
self._sequence_numbers.clear()
self._last_acked_offset.clear()
# Transactional state: the broker aborts any in-flight
# transaction as part of processing InitProducerIdRequest v3+
# with a matching producer_id/epoch, so we clear our local
# view of which partitions are in the transaction. The user's
# ongoing begin/commit/abort coroutine (if any) will see the
# bump via the _result and can react accordingly.
self._transaction_started = False
self._partitions_in_transaction.clear()
self._new_partitions_in_transaction.clear()
self._pending_partitions_in_transaction.clear()
handler = InitProducerIdHandler(self, self.transaction_timeout_ms, is_epoch_bump=True)
self._enqueue_request(handler)

def _complete_epoch_bump(self):
"""Called from InitProducerIdHandler on successful bump response.

Transitions BUMPING_PRODUCER_EPOCH -> READY so the sender resumes
producing under the new epoch.
"""
# Caller (handle_response) already holds _lock.
self._transition_to(TransactionState.READY)
self._last_error = None

def _restart_epoch_bump_without_producer_id(self, transaction_timeout_ms, result):
"""Called from InitProducerIdHandler when the broker rejects the bump
with INVALID_PRODUCER_EPOCH (our producer_id/epoch are stale).

Falls back to requesting a fresh producer_id by enqueuing a new
InitProducerIdRequest without the producer_id/epoch fields. The
original TransactionalRequestResult is re-used so the caller waits
on the overall bump-then-init sequence.
"""
# Caller (handle_response) already holds _lock.
self.set_producer_id_and_epoch(ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH))
# Stay in BUMPING_PRODUCER_EPOCH; the follow-up init will transition
# to READY on success via the regular (non-bump) code path.
handler = InitProducerIdHandler(self, transaction_timeout_ms, is_epoch_bump=False)
handler._result = result # thread the caller's result through
self._enqueue_request(handler)

def sequence_number(self, tp):
with self._lock:
Expand Down Expand Up @@ -770,16 +854,41 @@ def priority(self):


class InitProducerIdHandler(TxnRequestHandler):
def __init__(self, transaction_manager, transaction_timeout_ms):
def __init__(self, transaction_manager, transaction_timeout_ms, is_epoch_bump=False):
super().__init__(transaction_manager)

if transaction_manager._api_version >= (2, 0):
self._is_epoch_bump = is_epoch_bump
api_version = transaction_manager._api_version
# KIP-360 / InitProducerIdRequest v3+ (Kafka 2.5+) lets us resume
# an existing producer_id by bumping its epoch rather than allocating
# a fresh one. v3+ takes producer_id + epoch fields; on broker match,
# the broker returns (same producer_id, epoch+1).
if api_version >= (2, 5):
version = 3
elif api_version >= (2, 4):
version = 2
elif api_version >= (2, 0):
version = 1
else:
version = 0
self.request = InitProducerIdRequest[version](
transactional_id=self.transactional_id,
transaction_timeout_ms=transaction_timeout_ms)

if is_epoch_bump:
assert version >= 3, "KIP-360 epoch bump requires Kafka 2.5+ broker"
producer_id = transaction_manager.producer_id_and_epoch.producer_id
producer_epoch = transaction_manager.producer_id_and_epoch.epoch
else:
producer_id = NO_PRODUCER_ID
producer_epoch = NO_PRODUCER_EPOCH

kwargs = {
'version': version,
'transactional_id': self.transactional_id,
'transaction_timeout_ms': transaction_timeout_ms,
}
if version >= 3:
kwargs['producer_id'] = producer_id
kwargs['producer_epoch'] = producer_epoch
self.request = InitProducerIdRequest(**kwargs)

@property
def priority(self):
Expand All @@ -790,13 +899,29 @@ def handle_response(self, response):

if error is Errors.NoError:
self.transaction_manager.set_producer_id_and_epoch(ProducerIdAndEpoch(response.producer_id, response.producer_epoch))
self.transaction_manager._transition_to(TransactionState.READY)
if self._is_epoch_bump:
self.transaction_manager._complete_epoch_bump()
else:
self.transaction_manager._transition_to(TransactionState.READY)
self._result.done()
elif error in (Errors.NotCoordinatorError, Errors.CoordinatorNotAvailableError):
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
self.reenqueue()
elif error in (Errors.CoordinatorLoadInProgressError, Errors.ConcurrentTransactionsError):
self.reenqueue()
elif error is Errors.InvalidProducerEpochError and self._is_epoch_bump:
# KIP-360: our (producer_id, epoch) are stale--the broker no
# longer recognizes them. Fall back to allocating a fresh
# producer_id by reissuing InitProducerIdRequest without
# producer_id/epoch fields.
log.info("InitProducerId bump rejected with INVALID_PRODUCER_EPOCH; "
"falling back to a fresh producer id")
self.transaction_manager._restart_epoch_bump_without_producer_id(
self.request.transaction_timeout_ms, self._result)
elif error is Errors.ProducerFencedError:
# Another producer instance has taken over this transactional_id.
# Fatal--the application must rebuild the producer.
self.fatal_error(error())
elif error is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error())
else:
Expand Down
Loading
Loading