From 766b175ffbb9c1ff3ca121ef679c13f621636648 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 10 Apr 2026 15:33:09 -0700 Subject: [PATCH 1/2] Sender: refactor _wait_on_metadata for fast path --- kafka/producer/kafka.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 0bd665f6f..952c1cd6a 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -919,19 +919,16 @@ def _wait_on_metadata(self, topic, max_wait_ms): Raises: KafkaTimeoutError: if partitions for topic were not obtained before specified max_wait timeout + TopicAuthorizationFailedError: if not authorized to access topic + Non-retriable errors that cause metadata refresh to fail """ - # add topic to metadata topic list if it is not there already. + partitions = self._metadata.partitions_for_topic(topic) + if partitions is not None: + return partitions self._sender.add_topic(topic) - timer = Timer(max_wait_ms, "Failed to update metadata after %.1f secs." % (max_wait_ms / 1000,)) - metadata_event = None - while True: - partitions = self._metadata.partitions_for_topic(topic) - if partitions is not None: - return partitions - timer.maybe_raise() - if not metadata_event: - metadata_event = threading.Event() - + timer = Timer(max_wait_ms) + metadata_event = threading.Event() + while not timer.expired: log.debug("%s: Requesting metadata update for topic %s", str(self), topic) metadata_event.clear() future = self._metadata.request_update() @@ -947,6 +944,11 @@ def _wait_on_metadata(self, topic, max_wait_ms): raise Errors.TopicAuthorizationFailedError(set([topic])) else: log.debug("%s: _wait_on_metadata woke after %s secs.", str(self), timer.elapsed_ms / 1000) + partitions = self._metadata.partitions_for_topic(topic) + if partitions is not None: + return partitions + else: + raise Errors.KafkaTimeoutError("Failed to update metadata after %.1f secs." % (max_wait_ms / 1000,)) def _serialize(self, f, topic, data): if not f: From 220b61cfd26c41b5fa904f3a51d7cd2f60318b10 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 10 Apr 2026 15:34:17 -0700 Subject: [PATCH 2/2] Refactor producer.send: Only _wait_on_metadata when needed; reduce scope of try/except block --- kafka/producer/kafka.py | 104 ++++++++++++++++++---------------------- 1 file changed, 46 insertions(+), 58 deletions(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 952c1cd6a..3e654648b 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -810,64 +810,52 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest assert value is not None or self.config['api_version'] >= (0, 8, 1), ( 'Null messages require kafka >= 0.8.1') assert not (value is None and key is None), 'Need at least one: key or value' - key_bytes = value_bytes = None - timer = Timer(self.config['max_block_ms'], "Failed to assign partition for message in max_block_ms.") - try: - assigned_partition = None - while assigned_partition is None and not timer.expired: - self._wait_on_metadata(topic, timer.timeout_ms) - - key_bytes = self._serialize( - self.config['key_serializer'], - topic, key) - value_bytes = self._serialize( - self.config['value_serializer'], - topic, value) - assert type(key_bytes) in (bytes, bytearray, memoryview, type(None)) - assert type(value_bytes) in (bytes, bytearray, memoryview, type(None)) - - assigned_partition = self._partition(topic, partition, key, value, - key_bytes, value_bytes) - if assigned_partition is None: - raise Errors.KafkaTimeoutError("Failed to assign partition for message after %s secs." % timer.elapsed_ms / 1000) - else: - partition = assigned_partition - - if headers is None: - headers = [] - assert isinstance(headers, list) - assert all(isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], str) and isinstance(item[1], bytes) for item in headers) - - message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers) - self._ensure_valid_record_size(message_size) - - tp = TopicPartition(topic, partition) - log.debug("%s: Sending (key=%r value=%r headers=%r) to %s", str(self), key, value, headers, tp) - - if self._transaction_manager and self._transaction_manager.is_transactional(): - self._transaction_manager.maybe_add_partition_to_transaction(tp) - - result = self._accumulator.append(tp, timestamp_ms, - key_bytes, value_bytes, headers) - future, batch_is_full, new_batch_created = result - if batch_is_full or new_batch_created: - log.debug("%s: Waking up the sender since %s is either full or" - " getting a new batch", str(self), tp) - self._sender.wakeup() - - return future - # handling exceptions and record the errors; - # for API exceptions return them in the future, - # for other exceptions raise directly - except Errors.BrokerResponseError as e: - log.error("%s: Exception occurred during message send: %s", str(self), e) - return FutureRecordMetadata( - FutureProduceResult(TopicPartition(topic, partition)), - -1, None, None, - len(key_bytes) if key_bytes is not None else -1, - len(value_bytes) if value_bytes is not None else -1, - sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1, - ).failure(e) + key_bytes = self._serialize( + self.config['key_serializer'], + topic, key) + value_bytes = self._serialize( + self.config['value_serializer'], + topic, value) + assert type(key_bytes) in (bytes, bytearray, memoryview, type(None)) + assert type(value_bytes) in (bytes, bytearray, memoryview, type(None)) + + if self._metadata.partitions_for_topic(topic) is None: + try: + self._wait_on_metadata(topic, self.config['max_block_ms']) + except Errors.BrokerResponseError as e: + log.error("%s: Exception occurred waiting for metadata during message send: %s", str(self), e) + return FutureRecordMetadata( + FutureProduceResult(TopicPartition(topic, partition)), + -1, None, None, + len(key_bytes) if key_bytes is not None else -1, + len(value_bytes) if value_bytes is not None else -1, + sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1, + ).failure(e) + + partition = self._partition(topic, partition, key, value, key_bytes, value_bytes) + assert partition is not None, f'Partitioner did not assign a partition for topic {topic}!' + + if headers is None: + headers = [] + assert isinstance(headers, list) + assert all(isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], str) and isinstance(item[1], bytes) for item in headers) + + message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers) + self._ensure_valid_record_size(message_size) + + tp = TopicPartition(topic, partition) + log.debug("%s: Sending (key=%r value=%r headers=%r) to %s", str(self), key, value, headers, tp) + + if self._transaction_manager and self._transaction_manager.is_transactional(): + self._transaction_manager.maybe_add_partition_to_transaction(tp) + + result = self._accumulator.append(tp, timestamp_ms, key_bytes, value_bytes, headers) + future, batch_is_full, new_batch_created = result + if batch_is_full or new_batch_created: + log.debug("%s: Waking up the sender since %s is either full or" + " getting a new batch", str(self), tp) + self._sender.wakeup() + return future def flush(self, timeout=None): """