diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 1080f26cb..fd55483dd 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -554,7 +554,9 @@ def __init__(self, **configs): transaction_manager=self._transaction_manager, message_version=message_version, **self.config) - guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) + guarantee_message_order = False + if self.config['enable_idempotence'] or self.config['max_in_flight_requests_per_connection'] == 1: + guarantee_message_order = True self._sender = Sender(client, self._metadata, self._accumulator, metrics=self._metrics, diff --git a/test/producer/test_sender.py b/test/producer/test_sender.py index 53b4046c6..13e01af07 100644 --- a/test/producer/test_sender.py +++ b/test/producer/test_sender.py @@ -744,10 +744,30 @@ def test_idempotent_default_max_in_flight(self): assert p.config['max_in_flight_requests_per_connection'] == 5 p.close(timeout=0) - def test_guarantee_message_order_only_when_max_in_flight_1(self): - """guarantee_message_order is True only when max_in_flight == 1.""" + def test_idempotent_producer_forces_guarantee_message_order(self): + """guarantee_message_order is forced True when idempotence is enabled, + regardless of max_in_flight. Without partition muting, a transient + retryable error (e.g. NotLeader) triggers reenqueue via appendleft + which reverses concurrently-failed batches; the retried sends arrive + out of sequence and the broker rejects with OutOfOrderSequenceNumber. + Java's producer enforces this for the same reason. + """ + for max_in_flight in (1, 2, 3, 4, 5): + p = KafkaProducer( + enable_idempotence=True, + max_in_flight_requests_per_connection=max_in_flight, + api_version=(0, 11), + ) + assert p._sender.config['guarantee_message_order'] is True, ( + 'idempotence should force guarantee_message_order=True (max_in_flight=%d)' + % max_in_flight) + p.close(timeout=0) + + def test_non_idempotent_guarantee_message_order_only_when_max_in_flight_1(self): + """For non-idempotent producers, guarantee_message_order is only True + when max_in_flight == 1 (the original Java behavior).""" p1 = KafkaProducer( - enable_idempotence=True, + enable_idempotence=False, max_in_flight_requests_per_connection=1, api_version=(0, 11), ) @@ -755,7 +775,7 @@ def test_guarantee_message_order_only_when_max_in_flight_1(self): p1.close(timeout=0) p5 = KafkaProducer( - enable_idempotence=True, + enable_idempotence=False, max_in_flight_requests_per_connection=5, api_version=(0, 11), ) diff --git a/test/producer/test_transaction_manager_mock_broker.py b/test/producer/test_transaction_manager_mock_broker.py index aa9ef8d65..b01391dbb 100644 --- a/test/producer/test_transaction_manager_mock_broker.py +++ b/test/producer/test_transaction_manager_mock_broker.py @@ -14,6 +14,7 @@ import pytest import kafka.errors as Errors +from kafka import KafkaProducer from kafka.net.compat import KafkaNetClient from kafka.producer.transaction_manager import ( AddOffsetsToTxnHandler, @@ -26,14 +27,17 @@ TransactionState, TxnOffsetCommitHandler, ) -from kafka.protocol.metadata import FindCoordinatorResponse +from kafka.protocol.metadata import FindCoordinatorResponse, MetadataResponse from kafka.protocol.producer import ( AddOffsetsToTxnResponse, AddPartitionsToTxnResponse, EndTxnResponse, InitProducerIdResponse, + ProduceRequest, + ProduceResponse, TxnOffsetCommitResponse, ) +from kafka.record import MemoryRecords from kafka.structs import OffsetAndMetadata, TopicPartition from test.mock_broker import MockBroker @@ -1033,3 +1037,173 @@ def test_partial_retriable_retries_only_failed(self, broker, client): assert tp_retry in tm._pending_txn_offset_commits # Result not yet done--the retry has to complete first. assert not result.is_done + + +# --------------------------------------------------------------------------- +# Idempotent producer ordering on retry +# --------------------------------------------------------------------------- + + +def _decode_produce_base_sequences(request_bytes, api_version): + """Decode a ProduceRequest and return [(topic, partition, base_sequence)] in order.""" + request = ProduceRequest.decode(request_bytes, version=api_version, header=True) + out = [] + for topic_data in request.topic_data: + for partition_data in topic_data.partition_data: + records = MemoryRecords(bytes(partition_data.records)) + batch = records.next_batch() + out.append((topic_data.name, partition_data.index, batch.base_sequence)) + return out + + +class TestIdempotentProducerOrderingMockBroker: + """Regression test: idempotent producer must mute the partition after a + retryable error so retried batches are not reordered against later batches. + + Without partition muting, when the first ProduceRequest gets a transient + NotLeaderForPartition error, the sender re-enqueues that batch via + deque.appendleft. Meanwhile, additional batches (queued by send() while + the first was in flight) drain and go out with sequences ahead of the + retried base_sequence. The broker rejects them with + OutOfOrderSequenceNumberError. With muting, no further batch for the + partition drains until the in-flight batch's response is processed, so + the retry is sent before the next batch. + """ + + _TOPIC = 'tx-order' + + def _make_metadata_topic(self, version): + Topic = MetadataResponse.MetadataResponseTopic + Partition = Topic.MetadataResponsePartition + return Topic(version=version, error_code=0, name=self._TOPIC, + is_internal=False, + partitions=[ + Partition(version=version, error_code=0, + partition_index=0, leader_id=0, + leader_epoch=0, + replica_nodes=[0], isr_nodes=[0], + offline_replicas=[]), + ]) + + def _produce_response(self, version, error_code, base_offset): + Topic = ProduceResponse.TopicProduceResponse + Partition = Topic.PartitionProduceResponse + return ProduceResponse( + throttle_time_ms=0, + responses=[ + Topic(name=self._TOPIC, partition_responses=[ + Partition(index=0, error_code=error_code, + base_offset=base_offset, log_append_time_ms=-1, + log_start_offset=0, record_errors=[], + error_message=None, current_leader=None), + ]), + ], + ) + + def test_retry_does_not_reorder_against_later_batches(self): + """First ProduceRequest fails with NotLeader; assert that the very + next ProduceRequest the broker sees is the retry (same base_sequence), + not a later batch that drained while the first was in flight.""" + broker = MockBroker(broker_version=(2, 5)) + broker.set_metadata(topics=[self._make_metadata_topic(version=8)]) + # Producer's auto-version negotiation will land on (2, 5). + broker.respond(InitProducerIdResponse, InitProducerIdResponse( + throttle_time_ms=0, error_code=0, + producer_id=42, producer_epoch=0, + )) + + # Capture each ProduceRequest's first-partition base_sequence as it + # arrives. First call: hold the response (via an awaitable Future) + # until enough subsequent requests have arrived to demonstrate the + # in-flight window contains multiple batches; then return NotLeader + # (transient retryable). All subsequent calls: success. + # + # The hold is what surfaces the bug — without partition muting, the + # sender drains additional batches while the first is still in flight, + # and the first batch's retry (reenqueued via appendleft) is + # sequenced *after* them. + from kafka.future import Future + received_sequences = [] + call_count = [0] + release_first = Future() + + async def _held_notleader_response(api_version): + # Awaiting a kafka.future.Future yields until success/failure is + # set. While we're parked here, the broker's IO loop is free to + # process other queued ProduceRequests (each write() schedules its + # own _process_requests). + await release_first + return self._produce_response( + version=api_version, + error_code=Errors.NotLeaderForPartitionError.errno, + base_offset=-1) + + def produce_response(api_key, api_version, correlation_id, request_bytes): + seqs = _decode_produce_base_sequences(request_bytes, api_version) + assert seqs, 'ProduceRequest had no partition data' + received_sequences.append(seqs[0][2]) + call_count[0] += 1 + if call_count[0] == 1: + # Return a coroutine; handle_request will await it. + return _held_notleader_response(api_version) + # Once a couple more requests have arrived, release the held one + # so the producer sees the NotLeader and reenqueues. With muting, + # call_count won't reach 3 (only one batch in flight at a time) + # and we'd hit the safety release in the test body below. + if call_count[0] >= 3 and not release_first.is_done: + release_first.success(None) + return self._produce_response( + version=api_version, error_code=0, base_offset=0) + + # Register a respond_fn that handles every ProduceRequest the test + # sends. The MockBroker pops one queue entry per request, so we need + # one respond_fn per expected request. We don't know ahead of time + # how many will be sent, so register a generous batch. + for _ in range(64): + broker.respond_fn(ProduceRequest, produce_response) + + producer = KafkaProducer( + kafka_client=broker.client_factory(), + bootstrap_servers=['%s:%d' % (broker.host, broker.port)], + api_version=(2, 5), + enable_idempotence=True, + max_in_flight_requests_per_connection=5, + batch_size=64, # tiny so multiple batches form quickly + linger_ms=5, + retry_backoff_ms=10, + request_timeout_ms=5000, + ) + # Safety release: if muting *is* working, call_count never reaches 3 + # (only one ProduceRequest in flight at a time), so the held NotLeader + # response would never fire on its own. Time-bound release after a + # short delay to keep the test fast in either case. + import threading + threading.Timer(0.1, lambda: ( + release_first.success(None) if not release_first.is_done else None + )).start() + try: + futures = [ + producer.send(self._TOPIC, value=('msg-%02d' % i).encode(), + partition=0) + for i in range(20) + ] + for f in futures: + f.get(timeout=10) + finally: + producer.close(timeout=2) + + # The first ProduceRequest had base_sequence 0 and was rejected with + # NotLeader. With partition muting (the fix), the second + # ProduceRequest the broker sees must be the *retry* of that batch + # — same base_sequence. Without muting, a later batch with a higher + # base_sequence would have drained while the first was in flight, + # arriving here ahead of the retry. + assert len(received_sequences) >= 2, ( + 'expected at least 2 ProduceRequests, got %r' % received_sequences) + assert received_sequences[0] == 0, ( + 'first ProduceRequest should carry base_sequence 0; got %r' + % received_sequences) + assert received_sequences[1] == 0, ( + 'second ProduceRequest must be the retry of base_sequence 0; ' + 'got %r — partition was not muted, later batch drained ahead of ' + 'retry' % received_sequences)