Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kafka/net/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def await_ready(self, node_id, timeout_ms=30000):
conn = self._manager._conns.get(node_id)
if conn is not None and not conn.init_future.is_done:
self._manager.poll(timeout_ms=timeout_ms, future=conn.init_future)
# Connection may be initialized but paused (e.g. max_in_flight reached).
# Poll briefly to drain in-flight responses and unpause.
if conn is not None and conn.connected and conn.paused:
self._manager.poll(timeout_ms=min(timeout_ms, self._manager.config['request_timeout_ms']))
if not self.is_ready(node_id):
raise Errors.KafkaConnectionError('Node %s not ready after %s ms' % (node_id, timeout_ms))
return True
Expand Down
10 changes: 7 additions & 3 deletions kafka/net/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def send_request(self, request, request_timeout_ms=None):
self._request_buffer.append((request, future, timeout_at))
return future
elif self.paused:
return future.failure(Errors.NodeNotReadyError('Node paused: %s') % self.paused)
return future.failure(Errors.NodeNotReadyError(f'Node paused: {self.paused}'))
elif not self.connected:
return future.failure(Errors.KafkaConnectionError('Node not connected'))
else:
Expand Down Expand Up @@ -145,10 +145,14 @@ def _send_request(self, request, future=None, timeout_at=None):
else:
future.success(None)

# Write the current request's bytes before checking max_in_flight.
# Otherwise with max_in_flight=1, the first request would be added to
# in_flight_requests (len==1), trip the >= check, pause, and never be
# written to the transport — hanging forever.
if not self.paused:
self.transport.write(self.parser.send_bytes())
if len(self.in_flight_requests) >= self.config['max_in_flight_requests_per_connection']:
self.pause('max_in_flight')
elif not self.paused:
self.transport.write(self.parser.send_bytes())
return future

def send_buffered(self):
Expand Down
10 changes: 3 additions & 7 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,9 @@ def __init__(self, **configs):
if self.config['retries'] == 0:
raise Errors.KafkaConfigurationError("Must set 'retries' to non-zero when using the idempotent producer.")

if 'max_in_flight_requests_per_connection' not in user_provided_configs:
log.info("%s: Overriding the default 'max_in_flight_requests_per_connection' to 1 since idempontence is enabled.", str(self))
self.config['max_in_flight_requests_per_connection'] = 1
elif self.config['max_in_flight_requests_per_connection'] != 1:
raise Errors.KafkaConfigurationError("Must set 'max_in_flight_requests_per_connection' to 1 in order"
" to use the idempotent producer."
" Otherwise we cannot guarantee idempotence.")
if self.config['max_in_flight_requests_per_connection'] > 5:
raise Errors.KafkaConfigurationError("Must set 'max_in_flight_requests_per_connection' to at most 5"
" to use the idempotent producer.")

if 'acks' not in user_provided_configs:
log.info("%s: Overriding the default 'acks' config to 'all' since idempotence is enabled", str(self))
Expand Down
7 changes: 6 additions & 1 deletion kafka/producer/producer_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,14 @@ def producer_id(self):
def producer_epoch(self):
return self.records.producer_epoch if self.records else None

@property
def base_sequence(self):
return self.records.base_sequence if self.records else None

@property
def has_sequence(self):
return self.records.has_sequence if self.records else False
base_seq = self.base_sequence
return base_seq is not None and base_seq != -1

def try_append(self, timestamp_ms, key, value, headers, now=None):
metadata = self.records.append(timestamp_ms, key, value, headers)
Expand Down
13 changes: 13 additions & 0 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,15 @@ def split_and_reenqueue(self, batch, now=None):
now = time.monotonic() if now is None else now
tp = batch.topic_partition

# Roll back the partition's sequence counter to the failed batch's
# base sequence. The failed batch was never committed by the broker,
# so its sequence range is free to be reused by the split batches.
# They will get fresh sequences assigned during drain.
if self._transaction_manager:
base_sequence = batch.records.base_sequence
if base_sequence is not None and base_sequence != -1:
self._transaction_manager.set_sequence_number(tp, base_sequence)

# Read all records from the closed batch
records_list = []
for record_batch in MemoryRecords(batch.records.buffer()):
Expand Down Expand Up @@ -463,6 +472,10 @@ def drain_batches_for_one_node(self, cluster, node_id, max_size, now=None):
sequence_number,
self._transaction_manager.is_transactional()
)
# Increment sequence now so subsequent in-flight batches
# for the same partition get the correct next sequence.
self._transaction_manager.increment_sequence_number(
batch.topic_partition, batch.records.next_offset())
batch.records.close()
size += batch.records.size_in_bytes()
ready.append(batch)
Expand Down
13 changes: 8 additions & 5 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,14 @@ def _complete_batch(self, batch, partition_response):
error = partition_response.error
if error is Errors.NoError:
error = None
elif error is Errors.DuplicateSequenceNumberError:
# With max_in_flight > 1 and retries, a retried batch may arrive
# after the broker already committed the original. The broker
# returns DUPLICATE_SEQUENCE_NUMBER, which means the records were
# already written successfully. Treat as success.
log.debug("%s: Received DUPLICATE_SEQUENCE_NUMBER for %s — records already committed, treating as success",
str(self), batch.topic_partition)
error = None

if error is not None:
if self._can_split(batch, error):
Expand Down Expand Up @@ -555,11 +563,6 @@ def _complete_batch(self, batch, partition_response):
self._maybe_remove_from_inflight_batches(batch)
self._accumulator.deallocate(batch)

if self._transaction_manager and self._transaction_manager.producer_id_and_epoch.match(batch):
self._transaction_manager.increment_sequence_number(batch.topic_partition, batch.record_count)
log.debug("%s: Incremented sequence number for topic-partition %s to %s", str(self), batch.topic_partition,
self._transaction_manager.sequence_number(batch.topic_partition))

# Unmute the completed partition.
if self.config['guarantee_message_order']:
self._accumulator.muted.remove(batch.topic_partition)
Expand Down
4 changes: 4 additions & 0 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ def increment_sequence_number(self, tp, increment):
else:
self._sequence_numbers[tp] += increment

def set_sequence_number(self, tp, sequence):
with self._lock:
self._sequence_numbers[tp] = sequence

def reset_sequence_for_partition(self, tp):
with self._lock:
self._sequence_numbers.pop(tp, None)
Expand Down
4 changes: 4 additions & 0 deletions kafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,10 @@ def producer_id(self):
def producer_epoch(self):
return self._producer_epoch

@property
def base_sequence(self):
return self._base_sequence

def _get_attributes(self, include_compression_type=True):
attrs = 0
if include_compression_type:
Expand Down
13 changes: 12 additions & 1 deletion kafka/record/memory_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ def __next__(self):
class MemoryRecordsBuilder:

__slots__ = ("_builder", "_batch_size", "_buffer", "_next_offset", "_closed",
"_magic", "_bytes_written", "_producer_id", "_producer_epoch")
"_magic", "_bytes_written", "_producer_id", "_producer_epoch",
"_base_sequence")

def __init__(self, magic, compression_type, batch_size, offset=0,
transactional=False, producer_id=-1, producer_epoch=-1, base_sequence=-1):
Expand All @@ -140,12 +141,15 @@ def __init__(self, magic, compression_type, batch_size, offset=0,
batch_size=batch_size)
self._producer_id = producer_id
self._producer_epoch = producer_epoch
self._base_sequence = base_sequence
else:
assert not transactional and producer_id == -1, "Idempotent messages are not supported for magic %s" % (magic,)
self._builder = LegacyRecordBatchBuilder(
magic=magic, compression_type=compression_type,
batch_size=batch_size)
self._producer_id = None
self._producer_epoch = None
self._base_sequence = None
self._batch_size = batch_size
self._buffer = None

Expand Down Expand Up @@ -186,6 +190,8 @@ def set_producer_state(self, producer_id, producer_epoch, base_sequence, is_tran
raise IllegalStateError("Trying to set producer state of an already closed batch. This indicates a bug on the client.")
self._builder.set_producer_state(producer_id, producer_epoch, base_sequence, is_transactional)
self._producer_id = producer_id
self._producer_epoch = producer_epoch
self._base_sequence = base_sequence

@property
def producer_id(self):
Expand All @@ -195,6 +201,10 @@ def producer_id(self):
def producer_epoch(self):
return self._producer_epoch

@property
def base_sequence(self):
return self._base_sequence

def records(self):
assert self._closed
return MemoryRecords(self._buffer)
Expand All @@ -211,6 +221,7 @@ def close(self):
if self._magic == 2:
self._producer_id = self._builder.producer_id
self._producer_epoch = self._builder.producer_epoch
self._base_sequence = self._builder.base_sequence
self._builder = None
self._closed = True

Expand Down
64 changes: 64 additions & 0 deletions test/integration/test_producer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,70 @@ def test_transactional_producer_messages(kafka_producer_factory, kafka_consumer_
assert messages == {b'msg3', b'msg4'}


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Idempotent producer requires broker >=0.11")
@pytest.mark.parametrize("max_in_flight", [1, 2, 5])
def test_idempotent_producer_max_in_flight(kafka_producer_factory, kafka_consumer_factory, max_in_flight):
"""Test idempotent producer with max_in_flight_requests_per_connection 1-5."""
producer = kafka_producer_factory(
enable_idempotence=True,
max_in_flight_requests_per_connection=max_in_flight,
)
topic = random_string(5)
messages = 100
futures = []
for i in range(messages):
futures.append(producer.send(topic, value=('msg %d' % i).encode()))
ret = [f.get(timeout=30) for f in futures]
assert len(ret) == messages

# Verify ordering: offsets should be monotonically increasing per partition
partition_offsets = {}
for metadata in ret:
offsets = partition_offsets.setdefault(metadata.partition, [])
offsets.append(metadata.offset)
for offsets in partition_offsets.values():
assert offsets == sorted(offsets), "Offsets should be monotonically increasing"

# Verify all messages are readable
consumer = kafka_consumer_factory(
topics=(),
group_id=None,
consumer_timeout_ms=30000,
auto_offset_reset='earliest',
value_deserializer=bytes.decode,
)
consumer.subscribe([topic])
received = set()
for _ in range(messages):
try:
received.add(next(consumer).value)
except StopIteration:
break
assert received == set('msg %d' % i for i in range(messages))


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Idempotent producer requires broker >=0.11")
def test_idempotent_producer_high_throughput(kafka_producer_factory):
"""Test idempotent producer with max_in_flight=5 handles many concurrent batches."""
producer = kafka_producer_factory(
enable_idempotence=True,
max_in_flight_requests_per_connection=5,
batch_size=1024, # Small batches to create more in-flight requests
linger_ms=5,
)
topic = random_string(5)
messages = 500
futures = []
for i in range(messages):
futures.append(producer.send(topic, value=('msg %d' % i).encode(), partition=0))
ret = [f.get(timeout=30) for f in futures]
assert len(ret) == messages

# All offsets should be unique and sequential for partition 0
offsets = [r.offset for r in ret]
assert offsets == list(range(messages))


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Idempotent producer requires broker >=0.11")
def test_transactional_producer_offsets(kafka_producer_factory, kafka_admin_client_factory):
# Setting leader_epoch only supported in 2.1+
Expand Down
Loading
Loading