From 498a38103b81296f1b45d16caeede3851c48826f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 10 Apr 2026 15:31:26 -0700 Subject: [PATCH] Only ensure_valid_topic_name new topics on send --- kafka/producer/kafka.py | 3 +-- kafka/producer/sender.py | 2 ++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 3b530d58d..0bd665f6f 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -20,7 +20,7 @@ from kafka.record.legacy_records import LegacyRecordBatchBuilder from kafka.serializer import Serializer from kafka.structs import TopicPartition -from kafka.util import Timer, ensure_valid_topic_name +from kafka.util import Timer log = logging.getLogger(__name__) @@ -810,7 +810,6 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest assert value is not None or self.config['api_version'] >= (0, 8, 1), ( 'Null messages require kafka >= 0.8.1') assert not (value is None and key is None), 'Need at least one: key or value' - ensure_valid_topic_name(topic) key_bytes = value_bytes = None timer = Timer(self.config['max_block_ms'], "Failed to assign partition for message in max_block_ms.") try: diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index ef71294fe..1230b41f8 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -11,6 +11,7 @@ from kafka.producer.transaction_manager import ProducerIdAndEpoch from kafka.protocol.producer import InitProducerIdRequest, ProduceRequest from kafka.structs import TopicPartition +from kafka.util import ensure_valid_topic_name from kafka.version import __version__ log = logging.getLogger(__name__) @@ -354,6 +355,7 @@ def add_topic(self, topic): # is ok where self._metadata._topics should never # remove topics for a producer instance, only add them. if topic not in self._metadata._topics: + ensure_valid_topic_name(topic) self._topics_to_add.add(topic) self.wakeup()