Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,9 @@ def __init__(self, **configs):
transaction_manager=self._transaction_manager,
message_version=message_version,
**self.config)
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
guarantee_message_order = False
if self.config['enable_idempotence'] or self.config['max_in_flight_requests_per_connection'] == 1:
guarantee_message_order = True
self._sender = Sender(client, self._metadata,
self._accumulator,
metrics=self._metrics,
Expand Down
28 changes: 24 additions & 4 deletions test/producer/test_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,18 +744,38 @@ def test_idempotent_default_max_in_flight(self):
assert p.config['max_in_flight_requests_per_connection'] == 5
p.close(timeout=0)

def test_guarantee_message_order_only_when_max_in_flight_1(self):
"""guarantee_message_order is True only when max_in_flight == 1."""
def test_idempotent_producer_forces_guarantee_message_order(self):
"""guarantee_message_order is forced True when idempotence is enabled,
regardless of max_in_flight. Without partition muting, a transient
retryable error (e.g. NotLeader) triggers reenqueue via appendleft
which reverses concurrently-failed batches; the retried sends arrive
out of sequence and the broker rejects with OutOfOrderSequenceNumber.
Java's producer enforces this for the same reason.
"""
for max_in_flight in (1, 2, 3, 4, 5):
p = KafkaProducer(
enable_idempotence=True,
max_in_flight_requests_per_connection=max_in_flight,
api_version=(0, 11),
)
assert p._sender.config['guarantee_message_order'] is True, (
'idempotence should force guarantee_message_order=True (max_in_flight=%d)'
% max_in_flight)
p.close(timeout=0)

def test_non_idempotent_guarantee_message_order_only_when_max_in_flight_1(self):
"""For non-idempotent producers, guarantee_message_order is only True
when max_in_flight == 1 (the original Java behavior)."""
p1 = KafkaProducer(
enable_idempotence=True,
enable_idempotence=False,
max_in_flight_requests_per_connection=1,
api_version=(0, 11),
)
assert p1._sender.config['guarantee_message_order'] is True
p1.close(timeout=0)

p5 = KafkaProducer(
enable_idempotence=True,
enable_idempotence=False,
max_in_flight_requests_per_connection=5,
api_version=(0, 11),
)
Expand Down
176 changes: 175 additions & 1 deletion test/producer/test_transaction_manager_mock_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import pytest

import kafka.errors as Errors
from kafka import KafkaProducer
from kafka.net.compat import KafkaNetClient
from kafka.producer.transaction_manager import (
AddOffsetsToTxnHandler,
Expand All @@ -26,14 +27,17 @@
TransactionState,
TxnOffsetCommitHandler,
)
from kafka.protocol.metadata import FindCoordinatorResponse
from kafka.protocol.metadata import FindCoordinatorResponse, MetadataResponse
from kafka.protocol.producer import (
AddOffsetsToTxnResponse,
AddPartitionsToTxnResponse,
EndTxnResponse,
InitProducerIdResponse,
ProduceRequest,
ProduceResponse,
TxnOffsetCommitResponse,
)
from kafka.record import MemoryRecords
from kafka.structs import OffsetAndMetadata, TopicPartition

from test.mock_broker import MockBroker
Expand Down Expand Up @@ -1033,3 +1037,173 @@ def test_partial_retriable_retries_only_failed(self, broker, client):
assert tp_retry in tm._pending_txn_offset_commits
# Result not yet done--the retry has to complete first.
assert not result.is_done


# ---------------------------------------------------------------------------
# Idempotent producer ordering on retry
# ---------------------------------------------------------------------------


def _decode_produce_base_sequences(request_bytes, api_version):
"""Decode a ProduceRequest and return [(topic, partition, base_sequence)] in order."""
request = ProduceRequest.decode(request_bytes, version=api_version, header=True)
out = []
for topic_data in request.topic_data:
for partition_data in topic_data.partition_data:
records = MemoryRecords(bytes(partition_data.records))
batch = records.next_batch()
out.append((topic_data.name, partition_data.index, batch.base_sequence))
return out


class TestIdempotentProducerOrderingMockBroker:
"""Regression test: idempotent producer must mute the partition after a
retryable error so retried batches are not reordered against later batches.

Without partition muting, when the first ProduceRequest gets a transient
NotLeaderForPartition error, the sender re-enqueues that batch via
deque.appendleft. Meanwhile, additional batches (queued by send() while
the first was in flight) drain and go out with sequences ahead of the
retried base_sequence. The broker rejects them with
OutOfOrderSequenceNumberError. With muting, no further batch for the
partition drains until the in-flight batch's response is processed, so
the retry is sent before the next batch.
"""

_TOPIC = 'tx-order'

def _make_metadata_topic(self, version):
Topic = MetadataResponse.MetadataResponseTopic
Partition = Topic.MetadataResponsePartition
return Topic(version=version, error_code=0, name=self._TOPIC,
is_internal=False,
partitions=[
Partition(version=version, error_code=0,
partition_index=0, leader_id=0,
leader_epoch=0,
replica_nodes=[0], isr_nodes=[0],
offline_replicas=[]),
])

def _produce_response(self, version, error_code, base_offset):
Topic = ProduceResponse.TopicProduceResponse
Partition = Topic.PartitionProduceResponse
return ProduceResponse(
throttle_time_ms=0,
responses=[
Topic(name=self._TOPIC, partition_responses=[
Partition(index=0, error_code=error_code,
base_offset=base_offset, log_append_time_ms=-1,
log_start_offset=0, record_errors=[],
error_message=None, current_leader=None),
]),
],
)

def test_retry_does_not_reorder_against_later_batches(self):
"""First ProduceRequest fails with NotLeader; assert that the very
next ProduceRequest the broker sees is the retry (same base_sequence),
not a later batch that drained while the first was in flight."""
broker = MockBroker(broker_version=(2, 5))
broker.set_metadata(topics=[self._make_metadata_topic(version=8)])
# Producer's auto-version negotiation will land on (2, 5).
broker.respond(InitProducerIdResponse, InitProducerIdResponse(
throttle_time_ms=0, error_code=0,
producer_id=42, producer_epoch=0,
))

# Capture each ProduceRequest's first-partition base_sequence as it
# arrives. First call: hold the response (via an awaitable Future)
# until enough subsequent requests have arrived to demonstrate the
# in-flight window contains multiple batches; then return NotLeader
# (transient retryable). All subsequent calls: success.
#
# The hold is what surfaces the bug — without partition muting, the
# sender drains additional batches while the first is still in flight,
# and the first batch's retry (reenqueued via appendleft) is
# sequenced *after* them.
from kafka.future import Future
received_sequences = []
call_count = [0]
release_first = Future()

async def _held_notleader_response(api_version):
# Awaiting a kafka.future.Future yields until success/failure is
# set. While we're parked here, the broker's IO loop is free to
# process other queued ProduceRequests (each write() schedules its
# own _process_requests).
await release_first
return self._produce_response(
version=api_version,
error_code=Errors.NotLeaderForPartitionError.errno,
base_offset=-1)

def produce_response(api_key, api_version, correlation_id, request_bytes):
seqs = _decode_produce_base_sequences(request_bytes, api_version)
assert seqs, 'ProduceRequest had no partition data'
received_sequences.append(seqs[0][2])
call_count[0] += 1
if call_count[0] == 1:
# Return a coroutine; handle_request will await it.
return _held_notleader_response(api_version)
# Once a couple more requests have arrived, release the held one
# so the producer sees the NotLeader and reenqueues. With muting,
# call_count won't reach 3 (only one batch in flight at a time)
# and we'd hit the safety release in the test body below.
if call_count[0] >= 3 and not release_first.is_done:
release_first.success(None)
return self._produce_response(
version=api_version, error_code=0, base_offset=0)

# Register a respond_fn that handles every ProduceRequest the test
# sends. The MockBroker pops one queue entry per request, so we need
# one respond_fn per expected request. We don't know ahead of time
# how many will be sent, so register a generous batch.
for _ in range(64):
broker.respond_fn(ProduceRequest, produce_response)

producer = KafkaProducer(
kafka_client=broker.client_factory(),
bootstrap_servers=['%s:%d' % (broker.host, broker.port)],
api_version=(2, 5),
enable_idempotence=True,
max_in_flight_requests_per_connection=5,
batch_size=64, # tiny so multiple batches form quickly
linger_ms=5,
retry_backoff_ms=10,
request_timeout_ms=5000,
)
# Safety release: if muting *is* working, call_count never reaches 3
# (only one ProduceRequest in flight at a time), so the held NotLeader
# response would never fire on its own. Time-bound release after a
# short delay to keep the test fast in either case.
import threading
threading.Timer(0.1, lambda: (
release_first.success(None) if not release_first.is_done else None
)).start()
try:
futures = [
producer.send(self._TOPIC, value=('msg-%02d' % i).encode(),
partition=0)
for i in range(20)
]
for f in futures:
f.get(timeout=10)
finally:
producer.close(timeout=2)

# The first ProduceRequest had base_sequence 0 and was rejected with
# NotLeader. With partition muting (the fix), the second
# ProduceRequest the broker sees must be the *retry* of that batch
# — same base_sequence. Without muting, a later batch with a higher
# base_sequence would have drained while the first was in flight,
# arriving here ahead of the retry.
assert len(received_sequences) >= 2, (
'expected at least 2 ProduceRequests, got %r' % received_sequences)
assert received_sequences[0] == 0, (
'first ProduceRequest should carry base_sequence 0; got %r'
% received_sequences)
assert received_sequences[1] == 0, (
'second ProduceRequest must be the retry of base_sequence 0; '
'got %r — partition was not muted, later batch drained ahead of '
'retry' % received_sequences)
Loading