diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 5b728b9ed..78ff5a352 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -8,8 +8,8 @@ from kafka import errors as Errors from kafka.metrics.measurable import AnonMeasurable from kafka.metrics.stats import Avg, Max, Rate -from kafka.producer.transaction_manager import ProducerIdAndEpoch, TransactionManager -from kafka.protocol.producer import InitProducerIdRequest, ProduceRequest, ProduceResponse +from kafka.producer.transaction_manager import TransactionManager +from kafka.protocol.producer import ProduceRequest, ProduceResponse from kafka.structs import TopicPartition from kafka.util import ensure_valid_topic_name from kafka.version import __version__ @@ -147,10 +147,15 @@ def run_once(self): if self._transaction_manager: try: - if not self._transaction_manager.is_transactional(): - # this is an idempotent producer, so make sure we have a producer id - self._maybe_wait_for_producer_id() - elif self._transaction_manager.has_in_flight_transactional_request() or self._maybe_send_transactional_request(): + if (not self._transaction_manager.is_transactional() + and not self._transaction_manager.has_producer_id()): + # Idempotent producer: ensure an InitProducerIdHandler is + # enqueued. Dispatch happens below via the same handler-queue + # path used for transactional requests; the produce gate + # below blocks new sends until the response arrives. + self._transaction_manager.init_producer_id() + + if self._transaction_manager.has_in_flight_transactional_request() or self._maybe_send_pending_request(): # as long as there are outstanding transactional requests, we simply wait for them to return self._client.poll(timeout_ms=self.config['retry_backoff_ms']) return @@ -284,7 +289,7 @@ def _send_producer_data(self, now=None): self._failed_produce, batches, node_id)) return poll_timeout_ms - def _maybe_send_transactional_request(self): + def _maybe_send_pending_request(self): if self._transaction_manager.is_completing() and self._accumulator.has_incomplete: if self._transaction_manager.is_aborting(): self._accumulator.abort_undrained_batches(Errors.KafkaError("Failing batch since transaction was aborted")) @@ -365,39 +370,6 @@ def add_topic(self, topic): self._topics_to_add.add(topic) self.wakeup() - def _maybe_wait_for_producer_id(self): - while not self._transaction_manager.has_producer_id(): - try: - node_id = self._client.least_loaded_node() - if node_id is None or not self._client.await_ready(node_id): - log.debug("%s, Could not find an available broker to send InitProducerIdRequest to." + - " Will back off and try again.", str(self)) - time.sleep(self._client.least_loaded_node_refresh_ms() / 1000) - return - version = self._client.api_version(InitProducerIdRequest, max_version=1) - request = InitProducerIdRequest[version]( - transactional_id=self.config['transactional_id'], - transaction_timeout_ms=self.config['transaction_timeout_ms'], - ) - response = self._client.send_and_receive(node_id, request) - error_type = Errors.for_code(response.error_code) - if error_type is Errors.NoError: - self._transaction_manager.set_producer_id_and_epoch(ProducerIdAndEpoch(response.producer_id, response.producer_epoch)) - break - elif getattr(error_type, 'retriable', False): - log.debug("%s: Retriable error from InitProducerId response: %s", str(self), error_type.__name__) - if getattr(error_type, 'invalid_metadata', False): - self._metadata.request_update() - else: - self._transaction_manager.transition_to_fatal_error(error_type()) - break - except Errors.KafkaConnectionError: - log.debug("%s: Broker %s disconnected while awaiting InitProducerId response", str(self), node_id) - except Errors.RequestTimedOutError: - log.debug("%s: InitProducerId request to node %s timed out", str(self), node_id) - log.debug("%s: Retry InitProducerIdRequest in %sms.", str(self), self.config['retry_backoff_ms']) - time.sleep(self.config['retry_backoff_ms'] / 1000) - def _failed_produce(self, batches, node_id, error): log.error("%s: Error sending produce request to node %d: %s", str(self), node_id, error) # trace for batch in batches: diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 96f045254..2625d2057 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -159,6 +159,26 @@ def initialize_transactions(self): self._enqueue_request(handler) return handler.result + def init_producer_id(self): + """Idempotent (non-transactional) producer: enqueue an InitProducerIdHandler. + + Drives UNINITIALIZED -> INITIALIZING; the handler completes the + transition to READY on success. No-op outside UNINITIALIZED so + repeated calls from the sender's run loop are safe. + """ + with self._lock: + if self.is_transactional(): + raise Errors.IllegalStateError( + "init_producer_id is for idempotent (non-transactional) producers;" + " use initialize_transactions for transactional producers") + if self._current_state != TransactionState.UNINITIALIZED: + return + self._transition_to(TransactionState.INITIALIZING) + self.set_producer_id_and_epoch(ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH)) + self._sequence_numbers.clear() + handler = InitProducerIdHandler(self, 0) + self._enqueue_request(handler) + def begin_transaction(self): with self._lock: self._ensure_transactional() @@ -895,6 +915,14 @@ def __init__(self, transaction_manager, transaction_timeout_ms, is_epoch_bump=Fa def priority(self): return Priority.INIT_PRODUCER_ID + @property + def coordinator_type(self): + # Idempotent (non-transactional) producers don't have a transaction + # coordinator -- InitProducerIdRequest can be sent to any broker. + if self.transaction_manager.transactional_id is None: + return None + return 'transaction' + def handle_response(self, response): error_type = Errors.for_code(response.error_code) diff --git a/test/producer/test_sender.py b/test/producer/test_sender.py index cb0bb9533..53b4046c6 100644 --- a/test/producer/test_sender.py +++ b/test/producer/test_sender.py @@ -288,10 +288,6 @@ def test_failed_produce(sender, mocker): ]) -def test_maybe_wait_for_producer_id(): - pass - - def test_run_once(): pass @@ -1159,9 +1155,6 @@ def test_sender_loop_gates_on_bumping_state(self, sender, accumulator, mocker): tm._current_state = _TS.BUMPING_PRODUCER_EPOCH mocker.patch.object(sender, '_send_producer_data') mocker.patch.object(sender._client, 'poll') - # is_transactional() is False (no transactional_id), so the sender - # runs _maybe_wait_for_producer_id() -- mock that to a no-op - mocker.patch.object(sender, '_maybe_wait_for_producer_id') sender.run_once() diff --git a/test/producer/test_transaction_manager_mock_broker.py b/test/producer/test_transaction_manager_mock_broker.py index 459a5dd15..aa9ef8d65 100644 --- a/test/producer/test_transaction_manager_mock_broker.py +++ b/test/producer/test_transaction_manager_mock_broker.py @@ -292,6 +292,60 @@ def test_bump_invalid_epoch_falls_back_to_fresh_init(self, broker, client): assert len(fallbacks) == 1 assert fallbacks[0]._is_epoch_bump is False + def test_idempotent_producer_init_then_bump(self, broker, client): + """Idempotent (non-transactional) producer drives the same state + machine as transactional: UNINITIALIZED -> INITIALIZING -> READY + on init, then READY -> BUMPING_PRODUCER_EPOCH -> READY on a KIP-360 + epoch bump. Reproduces the failure mode from + test_idempotent_producer_high_throughput where the idempotent + producer was never driven out of UNINITIALIZED, causing a bump from + an OUT_OF_ORDER_SEQUENCE_NUMBER produce response to raise an + invalid-state-transition KafkaError. + """ + tm = TransactionManager( + transactional_id=None, + api_version=_API_VERSION, + metadata=client.cluster, + ) + assert tm._current_state == TransactionState.UNINITIALIZED + assert not tm.has_producer_id() + + # Step 1: enqueue the initial InitProducerIdHandler. + tm.init_producer_id() + assert tm._current_state == TransactionState.INITIALIZING + + broker.respond(InitProducerIdResponse, InitProducerIdResponse( + throttle_time_ms=0, + error_code=0, + producer_id=_PRODUCER_ID, + producer_epoch=_PRODUCER_EPOCH, + )) + _, future = _dispatch_next(client, tm) + _poll_for_future(client, future) + + # Init complete: producer_id set, state advanced to READY. + assert tm._current_state == TransactionState.READY + assert tm.producer_id_and_epoch.producer_id == _PRODUCER_ID + assert tm.producer_id_and_epoch.epoch == _PRODUCER_EPOCH + + # Step 2: simulate the OUT_OF_ORDER_SEQUENCE_NUMBER recovery path. + tm.bump_producer_id_and_epoch() + assert tm.is_bumping_epoch() + + broker.respond(InitProducerIdResponse, InitProducerIdResponse( + throttle_time_ms=0, + error_code=0, + producer_id=_PRODUCER_ID, + producer_epoch=_PRODUCER_EPOCH + 1, + )) + _, future = _dispatch_next(client, tm) + _poll_for_future(client, future) + + # Bump complete: same producer_id, epoch incremented, READY again. + assert tm._current_state == TransactionState.READY + assert tm.producer_id_and_epoch.producer_id == _PRODUCER_ID + assert tm.producer_id_and_epoch.epoch == _PRODUCER_EPOCH + 1 + # --------------------------------------------------------------------------- # AddPartitionsToTxnHandler