diff --git a/kafka/net/compat.py b/kafka/net/compat.py index db712fa4a..7fdf24cad 100644 --- a/kafka/net/compat.py +++ b/kafka/net/compat.py @@ -67,6 +67,10 @@ def await_ready(self, node_id, timeout_ms=30000): conn = self._manager._conns.get(node_id) if conn is not None and not conn.init_future.is_done: self._manager.poll(timeout_ms=timeout_ms, future=conn.init_future) + # Connection may be initialized but paused (e.g. max_in_flight reached). + # Poll briefly to drain in-flight responses and unpause. + if conn is not None and conn.connected and conn.paused: + self._manager.poll(timeout_ms=min(timeout_ms, self._manager.config['request_timeout_ms'])) if not self.is_ready(node_id): raise Errors.KafkaConnectionError('Node %s not ready after %s ms' % (node_id, timeout_ms)) return True diff --git a/kafka/net/connection.py b/kafka/net/connection.py index e21fdcc6f..f7e778237 100644 --- a/kafka/net/connection.py +++ b/kafka/net/connection.py @@ -112,7 +112,7 @@ def send_request(self, request, request_timeout_ms=None): self._request_buffer.append((request, future, timeout_at)) return future elif self.paused: - return future.failure(Errors.NodeNotReadyError('Node paused: %s') % self.paused) + return future.failure(Errors.NodeNotReadyError(f'Node paused: {self.paused}')) elif not self.connected: return future.failure(Errors.KafkaConnectionError('Node not connected')) else: @@ -145,10 +145,14 @@ def _send_request(self, request, future=None, timeout_at=None): else: future.success(None) + # Write the current request's bytes before checking max_in_flight. + # Otherwise with max_in_flight=1, the first request would be added to + # in_flight_requests (len==1), trip the >= check, pause, and never be + # written to the transport — hanging forever. + if not self.paused: + self.transport.write(self.parser.send_bytes()) if len(self.in_flight_requests) >= self.config['max_in_flight_requests_per_connection']: self.pause('max_in_flight') - elif not self.paused: - self.transport.write(self.parser.send_bytes()) return future def send_buffered(self): diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 2ea94e47c..b2d28056c 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -527,13 +527,9 @@ def __init__(self, **configs): if self.config['retries'] == 0: raise Errors.KafkaConfigurationError("Must set 'retries' to non-zero when using the idempotent producer.") - if 'max_in_flight_requests_per_connection' not in user_provided_configs: - log.info("%s: Overriding the default 'max_in_flight_requests_per_connection' to 1 since idempontence is enabled.", str(self)) - self.config['max_in_flight_requests_per_connection'] = 1 - elif self.config['max_in_flight_requests_per_connection'] != 1: - raise Errors.KafkaConfigurationError("Must set 'max_in_flight_requests_per_connection' to 1 in order" - " to use the idempotent producer." - " Otherwise we cannot guarantee idempotence.") + if self.config['max_in_flight_requests_per_connection'] > 5: + raise Errors.KafkaConfigurationError("Must set 'max_in_flight_requests_per_connection' to at most 5" + " to use the idempotent producer.") if 'acks' not in user_provided_configs: log.info("%s: Overriding the default 'acks' config to 'all' since idempotence is enabled", str(self)) diff --git a/kafka/producer/producer_batch.py b/kafka/producer/producer_batch.py index bb83ae2e0..a25415102 100644 --- a/kafka/producer/producer_batch.py +++ b/kafka/producer/producer_batch.py @@ -47,9 +47,14 @@ def producer_id(self): def producer_epoch(self): return self.records.producer_epoch if self.records else None + @property + def base_sequence(self): + return self.records.base_sequence if self.records else None + @property def has_sequence(self): - return self.records.has_sequence if self.records else False + base_seq = self.base_sequence + return base_seq is not None and base_seq != -1 def try_append(self, timestamp_ms, key, value, headers, now=None): metadata = self.records.append(timestamp_ms, key, value, headers) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index db6f8c114..ea8f707d3 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -228,6 +228,15 @@ def split_and_reenqueue(self, batch, now=None): now = time.monotonic() if now is None else now tp = batch.topic_partition + # Roll back the partition's sequence counter to the failed batch's + # base sequence. The failed batch was never committed by the broker, + # so its sequence range is free to be reused by the split batches. + # They will get fresh sequences assigned during drain. + if self._transaction_manager: + base_sequence = batch.records.base_sequence + if base_sequence is not None and base_sequence != -1: + self._transaction_manager.set_sequence_number(tp, base_sequence) + # Read all records from the closed batch records_list = [] for record_batch in MemoryRecords(batch.records.buffer()): @@ -463,6 +472,10 @@ def drain_batches_for_one_node(self, cluster, node_id, max_size, now=None): sequence_number, self._transaction_manager.is_transactional() ) + # Increment sequence now so subsequent in-flight batches + # for the same partition get the correct next sequence. + self._transaction_manager.increment_sequence_number( + batch.topic_partition, batch.records.next_offset()) batch.records.close() size += batch.records.size_in_bytes() ready.append(batch) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 64ae5eb0e..86aa7ae8c 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -505,6 +505,14 @@ def _complete_batch(self, batch, partition_response): error = partition_response.error if error is Errors.NoError: error = None + elif error is Errors.DuplicateSequenceNumberError: + # With max_in_flight > 1 and retries, a retried batch may arrive + # after the broker already committed the original. The broker + # returns DUPLICATE_SEQUENCE_NUMBER, which means the records were + # already written successfully. Treat as success. + log.debug("%s: Received DUPLICATE_SEQUENCE_NUMBER for %s — records already committed, treating as success", + str(self), batch.topic_partition) + error = None if error is not None: if self._can_split(batch, error): @@ -555,11 +563,6 @@ def _complete_batch(self, batch, partition_response): self._maybe_remove_from_inflight_batches(batch) self._accumulator.deallocate(batch) - if self._transaction_manager and self._transaction_manager.producer_id_and_epoch.match(batch): - self._transaction_manager.increment_sequence_number(batch.topic_partition, batch.record_count) - log.debug("%s: Incremented sequence number for topic-partition %s to %s", str(self), batch.topic_partition, - self._transaction_manager.sequence_number(batch.topic_partition)) - # Unmute the completed partition. if self.config['guarantee_message_order']: self._accumulator.muted.remove(batch.topic_partition) diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 6661a4519..907c4c825 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -319,6 +319,10 @@ def increment_sequence_number(self, tp, increment): else: self._sequence_numbers[tp] += increment + def set_sequence_number(self, tp, sequence): + with self._lock: + self._sequence_numbers[tp] = sequence + def reset_sequence_for_partition(self, tp): with self._lock: self._sequence_numbers.pop(tp, None) diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 2df422757..507de00cf 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -527,6 +527,10 @@ def producer_id(self): def producer_epoch(self): return self._producer_epoch + @property + def base_sequence(self): + return self._base_sequence + def _get_attributes(self, include_compression_type=True): attrs = 0 if include_compression_type: diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index 3ef2c3bfc..31aa10319 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -122,7 +122,8 @@ def __next__(self): class MemoryRecordsBuilder: __slots__ = ("_builder", "_batch_size", "_buffer", "_next_offset", "_closed", - "_magic", "_bytes_written", "_producer_id", "_producer_epoch") + "_magic", "_bytes_written", "_producer_id", "_producer_epoch", + "_base_sequence") def __init__(self, magic, compression_type, batch_size, offset=0, transactional=False, producer_id=-1, producer_epoch=-1, base_sequence=-1): @@ -140,12 +141,15 @@ def __init__(self, magic, compression_type, batch_size, offset=0, batch_size=batch_size) self._producer_id = producer_id self._producer_epoch = producer_epoch + self._base_sequence = base_sequence else: assert not transactional and producer_id == -1, "Idempotent messages are not supported for magic %s" % (magic,) self._builder = LegacyRecordBatchBuilder( magic=magic, compression_type=compression_type, batch_size=batch_size) self._producer_id = None + self._producer_epoch = None + self._base_sequence = None self._batch_size = batch_size self._buffer = None @@ -186,6 +190,8 @@ def set_producer_state(self, producer_id, producer_epoch, base_sequence, is_tran raise IllegalStateError("Trying to set producer state of an already closed batch. This indicates a bug on the client.") self._builder.set_producer_state(producer_id, producer_epoch, base_sequence, is_transactional) self._producer_id = producer_id + self._producer_epoch = producer_epoch + self._base_sequence = base_sequence @property def producer_id(self): @@ -195,6 +201,10 @@ def producer_id(self): def producer_epoch(self): return self._producer_epoch + @property + def base_sequence(self): + return self._base_sequence + def records(self): assert self._closed return MemoryRecords(self._buffer) @@ -211,6 +221,7 @@ def close(self): if self._magic == 2: self._producer_id = self._builder.producer_id self._producer_epoch = self._builder.producer_epoch + self._base_sequence = self._builder.base_sequence self._builder = None self._closed = True diff --git a/test/integration/test_producer_integration.py b/test/integration/test_producer_integration.py index ad42b055e..3a0dcbb73 100644 --- a/test/integration/test_producer_integration.py +++ b/test/integration/test_producer_integration.py @@ -144,6 +144,70 @@ def test_transactional_producer_messages(kafka_producer_factory, kafka_consumer_ assert messages == {b'msg3', b'msg4'} +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Idempotent producer requires broker >=0.11") +@pytest.mark.parametrize("max_in_flight", [1, 2, 5]) +def test_idempotent_producer_max_in_flight(kafka_producer_factory, kafka_consumer_factory, max_in_flight): + """Test idempotent producer with max_in_flight_requests_per_connection 1-5.""" + producer = kafka_producer_factory( + enable_idempotence=True, + max_in_flight_requests_per_connection=max_in_flight, + ) + topic = random_string(5) + messages = 100 + futures = [] + for i in range(messages): + futures.append(producer.send(topic, value=('msg %d' % i).encode())) + ret = [f.get(timeout=30) for f in futures] + assert len(ret) == messages + + # Verify ordering: offsets should be monotonically increasing per partition + partition_offsets = {} + for metadata in ret: + offsets = partition_offsets.setdefault(metadata.partition, []) + offsets.append(metadata.offset) + for offsets in partition_offsets.values(): + assert offsets == sorted(offsets), "Offsets should be monotonically increasing" + + # Verify all messages are readable + consumer = kafka_consumer_factory( + topics=(), + group_id=None, + consumer_timeout_ms=30000, + auto_offset_reset='earliest', + value_deserializer=bytes.decode, + ) + consumer.subscribe([topic]) + received = set() + for _ in range(messages): + try: + received.add(next(consumer).value) + except StopIteration: + break + assert received == set('msg %d' % i for i in range(messages)) + + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Idempotent producer requires broker >=0.11") +def test_idempotent_producer_high_throughput(kafka_producer_factory): + """Test idempotent producer with max_in_flight=5 handles many concurrent batches.""" + producer = kafka_producer_factory( + enable_idempotence=True, + max_in_flight_requests_per_connection=5, + batch_size=1024, # Small batches to create more in-flight requests + linger_ms=5, + ) + topic = random_string(5) + messages = 500 + futures = [] + for i in range(messages): + futures.append(producer.send(topic, value=('msg %d' % i).encode(), partition=0)) + ret = [f.get(timeout=30) for f in futures] + assert len(ret) == messages + + # All offsets should be unique and sequential for partition 0 + offsets = [r.offset for r in ret] + assert offsets == list(range(messages)) + + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Idempotent producer requires broker >=0.11") def test_transactional_producer_offsets(kafka_producer_factory, kafka_admin_client_factory): # Setting leader_epoch only supported in 2.1+ diff --git a/test/test_sender.py b/test/test_sender.py index 372f0a0cf..7ebd421f3 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -18,7 +18,7 @@ from kafka.producer.producer_batch import ProducerBatch from kafka.producer.record_accumulator import RecordAccumulator from kafka.producer.sender import PartitionResponse, Sender -from kafka.producer.transaction_manager import TransactionManager +from kafka.producer.transaction_manager import ProducerIdAndEpoch, TransactionManager from kafka.record.memory_records import MemoryRecordsBuilder from kafka.structs import TopicPartition @@ -104,9 +104,15 @@ def test_complete_batch_transaction(sender, transaction_manager): assert sender._transaction_manager.sequence_number(batch.topic_partition) == 0 assert sender._transaction_manager.producer_id_and_epoch.producer_id == batch.producer_id + # Sequence is now incremented at drain time, not completion time. + # Simulate drain-time increment. + sender._transaction_manager.increment_sequence_number(batch.topic_partition, batch.record_count) + assert sender._transaction_manager.sequence_number(batch.topic_partition) == batch.record_count + # No error, base_offset 0 sender._complete_batch(batch, PartitionResponse(base_offset=0)) assert batch.is_done + # Sequence should not change on completion (already incremented at drain) assert sender._transaction_manager.sequence_number(batch.topic_partition) == batch.record_count @@ -684,3 +690,193 @@ def test_end_to_end_split_and_complete(accumulator): assert future.value.offset == i assert future.value.topic == 'foo' assert future.value.partition == 0 + + +# ---- KAFKA-5494: Idempotent producer with max_in_flight > 1 ---- + +def test_idempotent_config_allows_max_in_flight_up_to_5(): + """Idempotent producer allows max_in_flight 1-5.""" + from kafka.producer.kafka import KafkaProducer + for max_in_flight in (1, 2, 3, 4, 5): + p = KafkaProducer( + enable_idempotence=True, + max_in_flight_requests_per_connection=max_in_flight, + api_version=(0, 11), + ) + assert p.config['max_in_flight_requests_per_connection'] == max_in_flight + p.close(timeout=0) + + +def test_idempotent_config_rejects_max_in_flight_above_5(): + """Idempotent producer rejects max_in_flight > 5.""" + from kafka.producer.kafka import KafkaProducer + with pytest.raises(Errors.KafkaConfigurationError, match="at most 5"): + KafkaProducer( + enable_idempotence=True, + max_in_flight_requests_per_connection=6, + api_version=(0, 11), + ) + + +def test_idempotent_default_max_in_flight(): + """Idempotent producer defaults to max_in_flight=5 (no longer overridden to 1).""" + from kafka.producer.kafka import KafkaProducer + p = KafkaProducer( + enable_idempotence=True, + api_version=(0, 11), + ) + assert p.config['max_in_flight_requests_per_connection'] == 5 + p.close(timeout=0) + + +def test_guarantee_message_order_only_when_max_in_flight_1(): + """guarantee_message_order is True only when max_in_flight == 1.""" + from kafka.producer.kafka import KafkaProducer + p1 = KafkaProducer( + enable_idempotence=True, + max_in_flight_requests_per_connection=1, + api_version=(0, 11), + ) + assert p1._sender.config['guarantee_message_order'] is True + p1.close(timeout=0) + + p5 = KafkaProducer( + enable_idempotence=True, + max_in_flight_requests_per_connection=5, + api_version=(0, 11), + ) + assert p5._sender.config['guarantee_message_order'] is False + p5.close(timeout=0) + + +def _setup_drain(client, transaction_manager, tp): + """Helper to set up cluster and transaction_manager for drain tests.""" + transaction_manager.set_producer_id_and_epoch(ProducerIdAndEpoch(1000, 0)) + client.cluster._partitions[tp] = None + client.cluster._broker_partitions = {0: [tp]} + + +def test_sequence_number_incremented_at_drain_time(client, transaction_manager): + """Sequence numbers are incremented during drain, not on completion.""" + accumulator = RecordAccumulator(transaction_manager=transaction_manager) + tp = TopicPartition('foo', 0) + _setup_drain(client, transaction_manager, tp) + + accumulator.append(tp, 0, b'key-0', b'value-0', []) + accumulator.append(tp, 0, b'key-1', b'value-1', []) + assert transaction_manager.sequence_number(tp) == 0 + + batches = accumulator.drain_batches_for_one_node(client.cluster, 0, 1048576) + assert len(batches) == 1 + + # Sequence should be incremented at drain time + assert transaction_manager.sequence_number(tp) == 2 + + +def test_multiple_batches_get_different_sequences(client, transaction_manager): + """With max_in_flight > 1, successive drains assign different sequence numbers.""" + accumulator = RecordAccumulator(batch_size=50, transaction_manager=transaction_manager) + tp = TopicPartition('foo', 0) + _setup_drain(client, transaction_manager, tp) + + for i in range(10): + accumulator.append(tp, 0, b'key-%d' % i, b'value-%d' % i, []) + + # First drain: gets first batch + batches1 = accumulator.drain_batches_for_one_node(client.cluster, 0, 1048576) + assert len(batches1) == 1 + seq_after_first = transaction_manager.sequence_number(tp) + assert seq_after_first > 0 + + # Second drain: gets next batch with higher sequence + batches2 = accumulator.drain_batches_for_one_node(client.cluster, 0, 1048576) + assert len(batches2) == 1 + seq_after_second = transaction_manager.sequence_number(tp) + assert seq_after_second > seq_after_first + + +def test_retry_batch_keeps_sequence(client, transaction_manager): + """Retried batches keep their original sequence number (in_retry=True skips reassignment).""" + accumulator = RecordAccumulator(transaction_manager=transaction_manager) + tp = TopicPartition('foo', 0) + _setup_drain(client, transaction_manager, tp) + + accumulator.append(tp, 0, b'key', b'value', []) + + batches = accumulator.drain_batches_for_one_node(client.cluster, 0, 1048576) + batch = batches[0] + seq_after_drain = transaction_manager.sequence_number(tp) + assert seq_after_drain == 1 # Incremented at drain + + # Re-enqueue for retry + accumulator.reenqueue(batch) + assert batch.in_retry() + + # Re-drain after backoff expires — sequence should NOT change (batch is in_retry) + future_time = time.monotonic() + 1 # past retry_backoff_ms + batches2 = accumulator.drain_batches_for_one_node(client.cluster, 0, 1048576, now=future_time) + assert len(batches2) == 1 + assert transaction_manager.sequence_number(tp) == seq_after_drain + + +def test_duplicate_sequence_number_treated_as_success(sender, accumulator): + """DuplicateSequenceNumberError is treated as successful completion.""" + batch = producer_batch() + accumulator._incomplete.add(batch) + + sender._complete_batch(batch, PartitionResponse( + error=Errors.DuplicateSequenceNumberError, base_offset=42, log_append_time=-1)) + + assert batch.is_done + assert batch.produce_future.succeeded() + assert batch.produce_future.value == (42, -1, None) + + +def test_split_resets_sequence_number(client, transaction_manager): + """split_and_reenqueue rolls back the sequence counter so split batches reuse the range.""" + accumulator = RecordAccumulator(transaction_manager=transaction_manager) + tp = TopicPartition('foo', 0) + _setup_drain(client, transaction_manager, tp) + + # Append a batch with multiple records + for i in range(5): + accumulator.append(tp, 0, b'key-%d' % i, b'value-%d' % i, []) + + assert transaction_manager.sequence_number(tp) == 0 + + # Drain — sequence advances to 5 + batches = accumulator.drain_batches_for_one_node(client.cluster, 0, 1048576) + assert len(batches) == 1 + batch = batches[0] + assert transaction_manager.sequence_number(tp) == 5 + + # Split — should roll back sequence to 0 (the failed batch's base_sequence) + accumulator.split_and_reenqueue(batch) + accumulator.deallocate(batch) + assert transaction_manager.sequence_number(tp) == 0 + + # Drain the split batches — each gets correct sequential sequences + dq = list(accumulator._batches[tp]) + assert len(dq) == 2 # Split into two halves + + batches1 = accumulator.drain_batches_for_one_node(client.cluster, 0, 1048576) + assert len(batches1) == 1 + seq_after_first = transaction_manager.sequence_number(tp) + assert seq_after_first == batches1[0].record_count # e.g., 3 + + batches2 = accumulator.drain_batches_for_one_node(client.cluster, 0, 1048576) + assert len(batches2) == 1 + seq_after_second = transaction_manager.sequence_number(tp) + assert seq_after_second == 5 # Back to where it was: 3 + 2 = 5 + + +def test_split_without_idempotence_no_sequence_reset(accumulator): + """split_and_reenqueue works without transaction_manager (no sequence to reset).""" + tp = TopicPartition('foo', 0) + batch, futures = multi_record_batch(num_records=4) + accumulator._incomplete.add(batch) + + # Should not raise even without a transaction_manager + num_new = accumulator.split_and_reenqueue(batch) + accumulator.deallocate(batch) + assert num_new == 2