diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 3e654648b..3ac578f18 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -147,6 +147,17 @@ class KafkaProducer: defaults to be suitable. If the values are set to something incompatible with the idempotent producer, a KafkaConfigurationError will be raised. + + On Kafka 2.5+ brokers, the idempotent producer automatically + recovers from transient producer-state errors (OutOfOrderSequence, + UnknownProducerId, InvalidProducerEpoch) by bumping its producer + epoch via InitProducerIdRequest v3+ (KIP-360). On older brokers, + these errors remain fatal for transactional producers and reset + the producer id for non-transactional idempotent producers. + Batches that are in-flight at the moment of a bump will have + their futures fail--their records are lost. Records still in + the accumulator (not yet drained) are produced under the bumped + epoch on the next drain. delivery_timeout_ms (float): An upper bound on the time to report success or failure after producer.send() returns. This limits the total time that a record will be delayed prior to sending, the time to await diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 345010325..5b728b9ed 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -8,7 +8,7 @@ from kafka import errors as Errors from kafka.metrics.measurable import AnonMeasurable from kafka.metrics.stats import Avg, Max, Rate -from kafka.producer.transaction_manager import ProducerIdAndEpoch +from kafka.producer.transaction_manager import ProducerIdAndEpoch, TransactionManager from kafka.protocol.producer import InitProducerIdRequest, ProduceRequest, ProduceResponse from kafka.structs import TopicPartition from kafka.util import ensure_valid_topic_name @@ -155,9 +155,13 @@ def run_once(self): self._client.poll(timeout_ms=self.config['retry_backoff_ms']) return - # do not continue sending if the transaction manager is in a failed state or if there - # is no producer id (for the idempotent case). - if self._transaction_manager.has_fatal_error() or not self._transaction_manager.has_producer_id(): + # do not continue sending if the transaction manager is in a failed state, if there + # is no producer id (for the idempotent case), or if we're currently bumping the + # producer epoch (KIP-360) -- the InitProducerIdRequest has to complete before we + # can safely send any new produce requests under the new epoch. + if (self._transaction_manager.has_fatal_error() + or not self._transaction_manager.has_producer_id() + or self._transaction_manager.is_bumping_epoch()): last_error = self._transaction_manager.last_error if last_error is not None: self._maybe_abort_batches(last_error) @@ -547,36 +551,36 @@ def _dispatch_error(self, batch, exception, partition_response): self._maybe_remove_from_inflight_batches(batch) self._record_retries(batch) else: - # FAIL: transaction state transitions then batch finalization. + # FAIL: dispatch transaction state transition via the + # classifier (KIP-360), then finalize the batch. if self._transaction_manager: - if isinstance(exception, Errors.OutOfOrderSequenceNumberError) and \ - not self._transaction_manager.is_transactional() and \ - self._transaction_manager.has_producer_id(batch.producer_id): - base_offset = partition_response.base_offset if partition_response is not None else -1 - log.error("%s: The broker received an out of order sequence number for topic-partition %s" - " at offset %s. This indicates data loss on the broker, and should be investigated.", - str(self), batch.topic_partition, base_offset) - # Reset the producer state since we have hit an irrecoverable - # exception and cannot make any guarantees about previously - # committed messages. This discards the producer id and all - # partition sequence numbers. + classification = self._transaction_manager.classify_batch_error( + exception, batch, log_start_offset=log_start_offset) + + if classification == TransactionManager.ERROR_CLASS_NEEDS_EPOCH_BUMP: + # KIP-360 (Kafka 2.5+): bump the producer epoch and + # continue. The accumulator's unsent records will be + # drained under the new epoch. In-flight batches at + # this moment are lost; their futures (including this + # one) fail. + self._transaction_manager.bump_producer_id_and_epoch() + elif classification == TransactionManager.ERROR_CLASS_NEEDS_PRODUCER_ID_RESET: + # Pre-KIP-360 fallback (non-transactional idempotent + # producer on < 2.5 broker). + if isinstance(exception, Errors.OutOfOrderSequenceNumberError) and \ + self._transaction_manager.has_producer_id(batch.producer_id): + base_offset = partition_response.base_offset if partition_response is not None else -1 + log.error("%s: The broker received an out of order sequence number for topic-partition %s" + " at offset %s. This indicates data loss on the broker, and should be investigated.", + str(self), batch.topic_partition, base_offset) self._transaction_manager.reset_producer_id() - elif isinstance(exception, Errors.UnknownProducerIdError): - # The broker has no state for this producer. It will accept a - # write with sequence 0, so reset our sequence state for this - # partition. All in-flight requests to this partition will - # also fail with UnknownProducerId, so the sequence will - # remain at 0. Note: if the broker supports epoch bumping, - # KIP-360 will later reset all sequence numbers after - # calling InitProducerId. - self._transaction_manager.reset_sequence_for_partition(batch.topic_partition) - elif isinstance(exception, (Errors.ClusterAuthorizationFailedError, - Errors.TransactionalIdAuthorizationFailedError, - Errors.ProducerFencedError, - Errors.InvalidTxnStateError)): + elif classification == TransactionManager.ERROR_CLASS_FATAL: self._transaction_manager.transition_to_fatal_error(exception) - elif self._transaction_manager.is_transactional(): + elif classification == TransactionManager.ERROR_CLASS_ABORTABLE: self._transaction_manager.transition_to_abortable_error(exception) + # ERROR_CLASS_RETRIABLE at this point means we couldn't + # retry (e.g. delivery-timeout hit or retries exhausted); + # just fail the batch without any state transition. if self._sensors: self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 59d887dc3..02a7c02a7 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -432,12 +432,96 @@ def reset_producer_id(self): """ with self._lock: if self.is_transactional(): - raise Errors.IllegalStateError( + raise Errors.IllegalStateError( "Cannot reset producer state for a transactional producer." " You must either abort the ongoing transaction or" " reinitialize the transactional producer instead") self.set_producer_id_and_epoch(ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH)) self._sequence_numbers.clear() + self._last_acked_offset.clear() + + def bump_producer_id_and_epoch(self): + """KIP-360: recover from a transient producer-state error by bumping + the epoch. + + Transitions to BUMPING_PRODUCER_EPOCH and enqueues an + InitProducerIdRequest v3+ carrying the current producer_id/epoch. + When the broker responds with the bumped epoch, _complete_epoch_bump + transitions back to READY and the sender resumes producing under + the new epoch. Records in the accumulator that haven't been drained + yet will be stamped with the new epoch on the next drain. + + TODO (KAFKA-5793 full): in-flight batches at the moment of the bump + are lost--their futures fail. Adding in-place rewrite of the + closed batch buffer (producer_id/epoch/base_sequence fields + CRC + recompute) would let us retry them under the new epoch without + losing records. + + Requires broker >= 2.5 (InitProducerIdRequest v3+). On older + brokers, Sender falls back to reset_producer_id / fatal instead + via classify_batch_error. + + Idempotent: if we're already in BUMPING_PRODUCER_EPOCH, this is a + no-op. This matters because with max_in_flight > 1, multiple + in-flight batches may all fail with the same epoch-bump-triggering + error in quick succession; only the first should drive the bump. + """ + with self._lock: + if self._current_state == TransactionState.BUMPING_PRODUCER_EPOCH: + return + if self._current_state == TransactionState.FATAL_ERROR: + return + if not self._supports_epoch_bump(): + raise Errors.IllegalStateError( + "Cannot bump producer epoch: broker version %s does not support KIP-360 " + "(InitProducerIdRequest v3+ requires Kafka 2.5+)" % (self._api_version,)) + log.warning("Bumping producer epoch for %s after recoverable error", + self.producer_id_and_epoch) + self._transition_to(TransactionState.BUMPING_PRODUCER_EPOCH) + # Drop all per-partition sequence state. The bumped epoch starts + # each partition at sequence 0. last_acked_offset is also cleared + # since it's tied to the pre-bump producer_id/epoch range. + self._sequence_numbers.clear() + self._last_acked_offset.clear() + # Transactional state: the broker aborts any in-flight + # transaction as part of processing InitProducerIdRequest v3+ + # with a matching producer_id/epoch, so we clear our local + # view of which partitions are in the transaction. The user's + # ongoing begin/commit/abort coroutine (if any) will see the + # bump via the _result and can react accordingly. + self._transaction_started = False + self._partitions_in_transaction.clear() + self._new_partitions_in_transaction.clear() + self._pending_partitions_in_transaction.clear() + handler = InitProducerIdHandler(self, self.transaction_timeout_ms, is_epoch_bump=True) + self._enqueue_request(handler) + + def _complete_epoch_bump(self): + """Called from InitProducerIdHandler on successful bump response. + + Transitions BUMPING_PRODUCER_EPOCH -> READY so the sender resumes + producing under the new epoch. + """ + # Caller (handle_response) already holds _lock. + self._transition_to(TransactionState.READY) + self._last_error = None + + def _restart_epoch_bump_without_producer_id(self, transaction_timeout_ms, result): + """Called from InitProducerIdHandler when the broker rejects the bump + with INVALID_PRODUCER_EPOCH (our producer_id/epoch are stale). + + Falls back to requesting a fresh producer_id by enqueuing a new + InitProducerIdRequest without the producer_id/epoch fields. The + original TransactionalRequestResult is re-used so the caller waits + on the overall bump-then-init sequence. + """ + # Caller (handle_response) already holds _lock. + self.set_producer_id_and_epoch(ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH)) + # Stay in BUMPING_PRODUCER_EPOCH; the follow-up init will transition + # to READY on success via the regular (non-bump) code path. + handler = InitProducerIdHandler(self, transaction_timeout_ms, is_epoch_bump=False) + handler._result = result # thread the caller's result through + self._enqueue_request(handler) def sequence_number(self, tp): with self._lock: @@ -770,16 +854,41 @@ def priority(self): class InitProducerIdHandler(TxnRequestHandler): - def __init__(self, transaction_manager, transaction_timeout_ms): + def __init__(self, transaction_manager, transaction_timeout_ms, is_epoch_bump=False): super().__init__(transaction_manager) - if transaction_manager._api_version >= (2, 0): + self._is_epoch_bump = is_epoch_bump + api_version = transaction_manager._api_version + # KIP-360 / InitProducerIdRequest v3+ (Kafka 2.5+) lets us resume + # an existing producer_id by bumping its epoch rather than allocating + # a fresh one. v3+ takes producer_id + epoch fields; on broker match, + # the broker returns (same producer_id, epoch+1). + if api_version >= (2, 5): + version = 3 + elif api_version >= (2, 4): + version = 2 + elif api_version >= (2, 0): version = 1 else: version = 0 - self.request = InitProducerIdRequest[version]( - transactional_id=self.transactional_id, - transaction_timeout_ms=transaction_timeout_ms) + + if is_epoch_bump: + assert version >= 3, "KIP-360 epoch bump requires Kafka 2.5+ broker" + producer_id = transaction_manager.producer_id_and_epoch.producer_id + producer_epoch = transaction_manager.producer_id_and_epoch.epoch + else: + producer_id = NO_PRODUCER_ID + producer_epoch = NO_PRODUCER_EPOCH + + kwargs = { + 'version': version, + 'transactional_id': self.transactional_id, + 'transaction_timeout_ms': transaction_timeout_ms, + } + if version >= 3: + kwargs['producer_id'] = producer_id + kwargs['producer_epoch'] = producer_epoch + self.request = InitProducerIdRequest(**kwargs) @property def priority(self): @@ -790,13 +899,29 @@ def handle_response(self, response): if error is Errors.NoError: self.transaction_manager.set_producer_id_and_epoch(ProducerIdAndEpoch(response.producer_id, response.producer_epoch)) - self.transaction_manager._transition_to(TransactionState.READY) + if self._is_epoch_bump: + self.transaction_manager._complete_epoch_bump() + else: + self.transaction_manager._transition_to(TransactionState.READY) self._result.done() elif error in (Errors.NotCoordinatorError, Errors.CoordinatorNotAvailableError): self.transaction_manager._lookup_coordinator('transaction', self.transactional_id) self.reenqueue() elif error in (Errors.CoordinatorLoadInProgressError, Errors.ConcurrentTransactionsError): self.reenqueue() + elif error is Errors.InvalidProducerEpochError and self._is_epoch_bump: + # KIP-360: our (producer_id, epoch) are stale--the broker no + # longer recognizes them. Fall back to allocating a fresh + # producer_id by reissuing InitProducerIdRequest without + # producer_id/epoch fields. + log.info("InitProducerId bump rejected with INVALID_PRODUCER_EPOCH; " + "falling back to a fresh producer id") + self.transaction_manager._restart_epoch_bump_without_producer_id( + self.request.transaction_timeout_ms, self._result) + elif error is Errors.ProducerFencedError: + # Another producer instance has taken over this transactional_id. + # Fatal--the application must rebuild the producer. + self.fatal_error(error()) elif error is Errors.TransactionalIdAuthorizationFailedError: self.fatal_error(error()) else: diff --git a/test/producer/test_sender.py b/test/producer/test_sender.py index d04f4bab0..918176cd9 100644 --- a/test/producer/test_sender.py +++ b/test/producer/test_sender.py @@ -266,7 +266,7 @@ def test_out_of_order_sequence_number_reset_producer_id(sender, accumulator, tra mocker.patch.object(batch, 'done') assert sender._transaction_manager.producer_id_and_epoch.producer_id == batch.producer_id error = Errors.OutOfOrderSequenceNumberError - # OutOfOrderSequenceNumber is non-retriable — hits the FAIL branch of + # OutOfOrderSequenceNumber is non-retriable -- hits the FAIL branch of # _dispatch_error, which resets the producer id for non-transactional # idempotent producers. sender._complete_batch_with_exception(batch, error) @@ -600,7 +600,7 @@ def test_rebind_old_produce_future_callbacks_safe(self): future = FutureRecordMetadata(old_pf, 0, 1000, None, 3, 5, -1) future.rebind(new_pf, 0) - # Complete the new produce_future — should resolve the record future once + # Complete the new produce_future -- should resolve the record future once new_pf.success((100, -1, None)) assert future.is_done assert future.succeeded() @@ -634,7 +634,7 @@ def get_in_thread(): time.sleep(0.05) assert t.is_alive() - # Rebind to a new produce_future — this wakes the blocked thread + # Rebind to a new produce_future -- this wakes the blocked thread new_pf = FutureProduceResult(tp) future.rebind(new_pf, 0) @@ -827,7 +827,7 @@ def test_retry_batch_keeps_sequence(self, client, transaction_manager): accumulator.reenqueue(batch) assert batch.in_retry() - # Re-drain after backoff expires — sequence should NOT change (batch is in_retry) + # Re-drain after backoff expires -- sequence should NOT change (batch is in_retry) future_time = time.monotonic() + 1 # past retry_backoff_ms batches2 = accumulator.drain_batches_for_one_node(client.cluster, 0, 1048576, now=future_time) assert len(batches2) == 1 @@ -857,18 +857,18 @@ def test_split_resets_sequence_number(self, client, transaction_manager): assert transaction_manager.sequence_number(tp) == 0 - # Drain — sequence advances to 5 + # Drain -- sequence advances to 5 batches = accumulator.drain_batches_for_one_node(client.cluster, 0, 1048576) assert len(batches) == 1 batch = batches[0] assert transaction_manager.sequence_number(tp) == 5 - # Split — should roll back sequence to 0 (the failed batch's base_sequence) + # Split -- should roll back sequence to 0 (the failed batch's base_sequence) accumulator.split_and_reenqueue(batch) accumulator.deallocate(batch) assert transaction_manager.sequence_number(tp) == 0 - # Drain the split batches — each gets correct sequential sequences + # Drain the split batches -- each gets correct sequential sequences dq = list(accumulator._batches[tp]) assert len(dq) == 2 # Split into two halves @@ -944,7 +944,7 @@ def test_retention_based_unknown_producer_id_retries(self, sender, accumulator, assert transaction_manager.sequence_number(tp) == 10 batch = producer_batch() - # Broker's log_start_offset is 100 — way past our last acked + # Broker's log_start_offset is 100 -- way past our last acked sender._complete_batch(batch, _partition_response( error_cls=Errors.UnknownProducerIdError, base_offset=-1, @@ -973,14 +973,14 @@ def test_real_data_loss_unknown_producer_id_fails(self, sender, accumulator, tra batch = producer_batch() future = FutureRecordMetadata(batch.produce_future, -1, -1, -1, -1, -1, -1) - # Broker's log_start_offset is 50 — within our acked range → real data loss + # Broker's log_start_offset is 50 -- within our acked range -> real data loss sender._complete_batch(batch, _partition_response( error_cls=Errors.UnknownProducerIdError, base_offset=-1, log_start_offset=50, )) - # Batch should NOT be reenqueued — it should fail + # Batch should NOT be reenqueued -- it should fail accumulator.reenqueue.assert_not_called() assert batch.is_done assert future.failed() @@ -1015,3 +1015,155 @@ def test_unknown_producer_id_without_transaction_manager_fails(self, sender, acc )) accumulator.reenqueue.assert_not_called() assert batch.is_done + + +class TestKip360SenderIntegration: + def _make_txn_manager(self, transactional_id=None): + """Transaction manager on a KIP-360-capable broker version with a + valid producer_id already set (simulating post-InitProducerId state).""" + from kafka.producer.transaction_manager import TransactionState as _TS + tm = TransactionManager( + transactional_id=transactional_id, + transaction_timeout_ms=60000, + retry_backoff_ms=100, + api_version=(2, 5), + metadata=ClusterMetadata(), + ) + tm.set_producer_id_and_epoch(ProducerIdAndEpoch(1234, 5)) + tm._current_state = _TS.READY + return tm + + def test_out_of_order_sequence_triggers_epoch_bump(self, sender, accumulator, mocker): + """OutOfOrderSequenceNumberError on a 2.5+ broker routes through + bump_producer_id_and_epoch(), not reset_producer_id().""" + tm = self._make_txn_manager() + sender._transaction_manager = tm + mocker.patch.object(tm, 'bump_producer_id_and_epoch') + mocker.patch.object(tm, 'reset_producer_id') + batch = producer_batch() + # Match the batch's producer_id/epoch to the manager's so + # has_producer_id(batch.producer_id) returns True. + batch.records._producer_id = tm.producer_id_and_epoch.producer_id + batch.records._producer_epoch = tm.producer_id_and_epoch.epoch + + sender._complete_batch(batch, _partition_response( + error_cls=Errors.OutOfOrderSequenceNumberError)) + + tm.bump_producer_id_and_epoch.assert_called_once() + tm.reset_producer_id.assert_not_called() + assert batch.is_done + + def test_unknown_producer_id_triggers_epoch_bump(self, sender, accumulator, mocker): + """UnknownProducerIdError (non-retention case) triggers a bump on 2.5+.""" + tm = self._make_txn_manager() + sender._transaction_manager = tm + mocker.patch.object(tm, 'bump_producer_id_and_epoch') + batch = producer_batch() + batch.records._producer_id = tm.producer_id_and_epoch.producer_id + batch.records._producer_epoch = tm.producer_id_and_epoch.epoch + # last_acked_offset=49, log_start_offset=10 -> real data loss (not retention) + tm.update_last_acked_offset(batch.topic_partition, 0, 50) + + sender._complete_batch(batch, _partition_response( + error_cls=Errors.UnknownProducerIdError, + log_start_offset=10, + )) + + tm.bump_producer_id_and_epoch.assert_called_once() + + def test_retention_based_unknown_producer_id_does_not_bump(self, sender, accumulator, mocker): + """Retention-based UnknownProducerIdError still takes the + retention-reset retry path (classifier returns RETRIABLE), not bump.""" + tm = self._make_txn_manager() + sender._transaction_manager = tm + mocker.patch.object(tm, 'bump_producer_id_and_epoch') + mocker.patch.object(accumulator, 'reenqueue') + batch = producer_batch() + batch.records._producer_id = tm.producer_id_and_epoch.producer_id + batch.records._producer_epoch = tm.producer_id_and_epoch.epoch + tm.update_last_acked_offset(batch.topic_partition, 0, 5) # last_acked=4 + + sender._complete_batch(batch, _partition_response( + error_cls=Errors.UnknownProducerIdError, + log_start_offset=100, # well past last_acked=4 -> retention + )) + + tm.bump_producer_id_and_epoch.assert_not_called() + accumulator.reenqueue.assert_called_once() + + def test_old_broker_falls_back_to_reset_producer_id(self, sender, accumulator, mocker): + """On a < 2.5 broker, OutOfOrderSequenceNumberError for a + non-transactional idempotent producer still calls reset_producer_id + (pre-KIP-360 fallback).""" + from kafka.producer.transaction_manager import TransactionState as _TS + tm = TransactionManager( + transactional_id=None, + transaction_timeout_ms=60000, + retry_backoff_ms=100, + api_version=(2, 0), # pre-KIP-360 + metadata=ClusterMetadata(), + ) + tm.set_producer_id_and_epoch(ProducerIdAndEpoch(1234, 5)) + tm._current_state = _TS.READY + sender._transaction_manager = tm + mocker.patch.object(tm, 'bump_producer_id_and_epoch') + mocker.patch.object(tm, 'reset_producer_id') + batch = producer_batch() + batch.records._producer_id = tm.producer_id_and_epoch.producer_id + batch.records._producer_epoch = tm.producer_id_and_epoch.epoch + + sender._complete_batch(batch, _partition_response( + error_cls=Errors.OutOfOrderSequenceNumberError)) + + tm.bump_producer_id_and_epoch.assert_not_called() + tm.reset_producer_id.assert_called_once() + + def test_second_in_flight_error_does_not_cascade_bumps(self, sender, accumulator, mocker): + """With max_in_flight > 1, multiple in-flight batches may fail with + OutOfOrderSequenceNumberError at roughly the same time. Only the + first should drive the bump; subsequent calls must be no-ops.""" + tm = self._make_txn_manager() + sender._transaction_manager = tm + + batch_a = producer_batch() + batch_b = producer_batch(topic='bar') + for b in (batch_a, batch_b): + b.records._producer_id = tm.producer_id_and_epoch.producer_id + b.records._producer_epoch = tm.producer_id_and_epoch.epoch + + # First failure triggers the bump + sender._complete_batch(batch_a, _partition_response( + error_cls=Errors.OutOfOrderSequenceNumberError)) + assert tm.is_bumping_epoch() + # The bump has enqueued exactly one InitProducerIdHandler + from kafka.producer.transaction_manager import InitProducerIdHandler + first_init_handlers = [h for _, _, h in tm._pending_requests + if isinstance(h, InitProducerIdHandler)] + assert len(first_init_handlers) == 1 + + # Second in-flight batch fails with the same error -- should NOT + # enqueue a second InitProducerIdHandler + sender._complete_batch(batch_b, _partition_response( + error_cls=Errors.OutOfOrderSequenceNumberError)) + + second_init_handlers = [h for _, _, h in tm._pending_requests + if isinstance(h, InitProducerIdHandler)] + assert len(second_init_handlers) == 1 # still just one + + def test_sender_loop_gates_on_bumping_state(self, sender, accumulator, mocker): + """When in BUMPING_PRODUCER_EPOCH, run_once short-circuits before + sending produce data.""" + from kafka.producer.transaction_manager import TransactionState as _TS + tm = self._make_txn_manager() + sender._transaction_manager = tm + tm._current_state = _TS.BUMPING_PRODUCER_EPOCH + mocker.patch.object(sender, '_send_producer_data') + mocker.patch.object(sender._client, 'poll') + # is_transactional() is False (no transactional_id), so the sender + # runs _maybe_wait_for_producer_id() -- mock that to a no-op + mocker.patch.object(sender, '_maybe_wait_for_producer_id') + + sender.run_once() + + sender._send_producer_data.assert_not_called() + sender._client.poll.assert_called_once() diff --git a/test/producer/test_transaction_manager.py b/test/producer/test_transaction_manager.py index f22ba721e..98f0f2253 100644 --- a/test/producer/test_transaction_manager.py +++ b/test/producer/test_transaction_manager.py @@ -30,6 +30,15 @@ def _make_manager(transactional_id=None, api_version=(2, 5)): ) +def _set_producer_id(tm, producer_id=1234, epoch=5): + """Give the manager a valid producer_id/epoch without going through the + full initialize_transactions handshake.""" + from kafka.producer.transaction_manager import ProducerIdAndEpoch + tm.set_producer_id_and_epoch(ProducerIdAndEpoch(producer_id, epoch)) + # Drive to READY from UNINITIALIZED via the (fake) init path. + tm._current_state = TransactionState.READY + + class TestBumpingProducerEpochState: def test_state_value(self): assert TransactionState.BUMPING_PRODUCER_EPOCH == 8 @@ -191,3 +200,239 @@ def test_supports_epoch_bump_version_gate(self): assert not _make_manager(api_version=(2, 4))._supports_epoch_bump() assert _make_manager(api_version=(2, 5))._supports_epoch_bump() assert _make_manager(api_version=(3, 0))._supports_epoch_bump() + + +class TestBumpProducerIdAndEpoch: + def test_transitions_to_bumping_state(self): + tm = _make_manager(api_version=(2, 5)) + _set_producer_id(tm) + tp = TopicPartition('foo', 0) + # Populate some partition state so we can assert it gets cleared. + tm._sequence_numbers[tp] = 7 + tm.update_last_acked_offset(tp, 0, 5) + assert tm.sequence_number(tp) == 7 + assert tm.last_acked_offset(tp) == 4 + + tm.bump_producer_id_and_epoch() + + assert tm._current_state == TransactionState.BUMPING_PRODUCER_EPOCH + assert tm.is_bumping_epoch() + # Per-partition sequence state cleared + assert tm.sequence_number(tp) == 0 + assert tm.last_acked_offset(tp) == -1 + + def test_enqueues_init_producer_id_request_v3_with_bump_flag(self): + tm = _make_manager(api_version=(2, 5)) + _set_producer_id(tm, producer_id=42, epoch=7) + + tm.bump_producer_id_and_epoch() + + # There should be exactly one pending request -- an InitProducerIdHandler + # in epoch-bump mode carrying the current producer_id/epoch. + from kafka.producer.transaction_manager import InitProducerIdHandler + assert len(tm._pending_requests) == 1 + _, _, handler = tm._pending_requests[0] + assert isinstance(handler, InitProducerIdHandler) + assert handler._is_epoch_bump is True + # InitProducerIdRequest v3 has producer_id + producer_epoch fields + assert handler.request.producer_id == 42 + assert handler.request.producer_epoch == 7 + + def test_is_idempotent_across_concurrent_calls(self): + tm = _make_manager(api_version=(2, 5)) + _set_producer_id(tm) + + tm.bump_producer_id_and_epoch() + # Second call from a second in-flight batch failure should be a no-op + tm.bump_producer_id_and_epoch() + tm.bump_producer_id_and_epoch() + + # Only ONE InitProducerIdHandler should have been enqueued + from kafka.producer.transaction_manager import InitProducerIdHandler + init_handlers = [h for _, _, h in tm._pending_requests + if isinstance(h, InitProducerIdHandler)] + assert len(init_handlers) == 1 + + def test_does_nothing_in_fatal_state(self): + tm = _make_manager(api_version=(2, 5)) + _set_producer_id(tm) + tm._current_state = TransactionState.FATAL_ERROR + + tm.bump_producer_id_and_epoch() + + # Should not transition to BUMPING_PRODUCER_EPOCH from FATAL + assert tm._current_state == TransactionState.FATAL_ERROR + assert not tm.is_bumping_epoch() + + def test_raises_on_old_broker(self): + tm = _make_manager(api_version=(2, 0)) + _set_producer_id(tm) + with pytest.raises(Errors.IllegalStateError): + tm.bump_producer_id_and_epoch() + + def test_transactional_producer_clears_partition_state(self): + tm = _make_manager(transactional_id='txn-id', api_version=(2, 5)) + _set_producer_id(tm) + # Simulate an in-progress transaction with some partitions + tp1 = TopicPartition('foo', 0) + tp2 = TopicPartition('bar', 1) + tm._transaction_started = True + tm._partitions_in_transaction.add(tp1) + tm._new_partitions_in_transaction.add(tp2) + + tm.bump_producer_id_and_epoch() + + # All transactional partition tracking should be cleared -- the + # broker aborts the transaction as part of processing the bump. + assert tm._transaction_started is False + assert tm._partitions_in_transaction == set() + assert tm._new_partitions_in_transaction == set() + assert tm._pending_partitions_in_transaction == set() + + +class TestCompleteEpochBump: + def test_transitions_to_ready(self): + tm = _make_manager(api_version=(2, 5)) + _set_producer_id(tm) + tm.bump_producer_id_and_epoch() + assert tm.is_bumping_epoch() + + # Simulate the handler's success path + tm._complete_epoch_bump() + + assert tm._current_state == TransactionState.READY + assert tm._last_error is None + + +class TestInitProducerIdHandlerV3: + def _make_handler(self, api_version=(2, 5), transactional_id=None, is_epoch_bump=False, producer_id=42, epoch=7): + from kafka.producer.transaction_manager import InitProducerIdHandler, ProducerIdAndEpoch + tm = _make_manager(transactional_id=transactional_id, api_version=api_version) + tm.set_producer_id_and_epoch(ProducerIdAndEpoch(producer_id, epoch)) + tm._current_state = TransactionState.READY + if is_epoch_bump: + # The bump helper does the state transition and enqueue for us; + # pull the enqueued handler back out for inspection. + tm.bump_producer_id_and_epoch() + _, _, handler = tm._pending_requests[0] + else: + handler = InitProducerIdHandler(tm, transaction_timeout_ms=1000) + return tm, handler + + def _fake_init_producer_id_response(self, error_code=0, producer_id=42, producer_epoch=1): + """Construct a minimal InitProducerIdResponse for handler tests.""" + from kafka.protocol.producer import InitProducerIdResponse + # Use v3 which matches our target KIP-360 broker version. + return InitProducerIdResponse[3]( + throttle_time_ms=0, + error_code=error_code, + producer_id=producer_id, + producer_epoch=producer_epoch, + ) + + def test_v3_version_on_modern_broker(self): + _, handler = self._make_handler(api_version=(2, 5)) + assert handler.request.version == 3 + + def test_v2_version_on_2_4_broker(self): + _, handler = self._make_handler(api_version=(2, 4)) + assert handler.request.version == 2 + + def test_v1_version_on_2_0_broker(self): + _, handler = self._make_handler(api_version=(2, 0)) + assert handler.request.version == 1 + + def test_v0_version_on_old_broker(self): + _, handler = self._make_handler(api_version=(0, 11)) + assert handler.request.version == 0 + + def test_non_bump_request_has_no_producer_id(self): + _, handler = self._make_handler(api_version=(2, 5), is_epoch_bump=False) + # v3 request fields default to NO_PRODUCER_ID / NO_PRODUCER_EPOCH + assert handler.request.producer_id == -1 + assert handler.request.producer_epoch == -1 + assert handler._is_epoch_bump is False + + def test_bump_request_carries_current_producer_id_and_epoch(self): + _, handler = self._make_handler( + api_version=(2, 5), is_epoch_bump=True, producer_id=42, epoch=7) + assert handler.request.producer_id == 42 + assert handler.request.producer_epoch == 7 + assert handler._is_epoch_bump is True + + def test_successful_bump_transitions_to_ready(self): + tm, handler = self._make_handler( + api_version=(2, 5), is_epoch_bump=True, producer_id=42, epoch=7) + assert tm.is_bumping_epoch() + + # Simulate a NoError response with the bumped epoch + response = self._fake_init_producer_id_response( + error_code=0, producer_id=42, producer_epoch=8) + handler.handle_response(response) + + assert tm._current_state == TransactionState.READY + assert tm.producer_id_and_epoch.producer_id == 42 + assert tm.producer_id_and_epoch.epoch == 8 + assert handler._result.is_done + + def test_successful_non_bump_transitions_to_ready(self): + tm, handler = self._make_handler( + api_version=(2, 5), is_epoch_bump=False) + tm._current_state = TransactionState.INITIALIZING # fresh init + + response = self._fake_init_producer_id_response( + error_code=0, producer_id=100, producer_epoch=0) + handler.handle_response(response) + + assert tm._current_state == TransactionState.READY + assert tm.producer_id_and_epoch.producer_id == 100 + + def test_bump_invalid_epoch_falls_back_to_fresh_init(self): + tm, handler = self._make_handler( + api_version=(2, 5), is_epoch_bump=True, producer_id=42, epoch=7) + assert tm.is_bumping_epoch() + # Simulate the normal dequeue-then-dispatch flow: the bump handler + # has been pulled out of the queue by the sender before + # handle_response fires. + import heapq + heapq.heappop(tm._pending_requests) + + response = self._fake_init_producer_id_response( + error_code=Errors.InvalidProducerEpochError.errno, + producer_id=-1, producer_epoch=-1) + handler.handle_response(response) + + # Stay in BUMPING_PRODUCER_EPOCH with producer_id reset to NO_PRODUCER_ID + assert tm._current_state == TransactionState.BUMPING_PRODUCER_EPOCH + assert tm.producer_id_and_epoch.producer_id == -1 + assert tm.producer_id_and_epoch.epoch == -1 + # A fresh (non-bump) InitProducerIdHandler should be enqueued as the + # fallback. + from kafka.producer.transaction_manager import InitProducerIdHandler + fallback_handlers = [h for _, _, h in tm._pending_requests + if isinstance(h, InitProducerIdHandler)] + assert len(fallback_handlers) == 1 + assert fallback_handlers[0]._is_epoch_bump is False + + def test_non_bump_invalid_epoch_is_fatal(self): + tm, handler = self._make_handler(api_version=(2, 5), is_epoch_bump=False) + tm._current_state = TransactionState.INITIALIZING + + response = self._fake_init_producer_id_response( + error_code=Errors.InvalidProducerEpochError.errno, + producer_id=-1, producer_epoch=-1) + handler.handle_response(response) + + # Non-bump path treats INVALID_PRODUCER_EPOCH as fatal + assert tm._current_state == TransactionState.FATAL_ERROR + + def test_producer_fenced_is_fatal(self): + tm, handler = self._make_handler( + api_version=(2, 5), transactional_id='txn', is_epoch_bump=True) + + response = self._fake_init_producer_id_response( + error_code=Errors.ProducerFencedError.errno, + producer_id=-1, producer_epoch=-1) + handler.handle_response(response) + + assert tm._current_state == TransactionState.FATAL_ERROR