Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from kafka.record.legacy_records import LegacyRecordBatchBuilder
from kafka.serializer import Serializer
from kafka.structs import TopicPartition
from kafka.util import Timer, ensure_valid_topic_name
from kafka.util import Timer


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -810,7 +810,6 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
assert value is not None or self.config['api_version'] >= (0, 8, 1), (
'Null messages require kafka >= 0.8.1')
assert not (value is None and key is None), 'Need at least one: key or value'
ensure_valid_topic_name(topic)
key_bytes = value_bytes = None
timer = Timer(self.config['max_block_ms'], "Failed to assign partition for message in max_block_ms.")
try:
Expand Down
2 changes: 2 additions & 0 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from kafka.producer.transaction_manager import ProducerIdAndEpoch
from kafka.protocol.producer import InitProducerIdRequest, ProduceRequest
from kafka.structs import TopicPartition
from kafka.util import ensure_valid_topic_name
from kafka.version import __version__

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -354,6 +355,7 @@ def add_topic(self, topic):
# is ok where self._metadata._topics should never
# remove topics for a producer instance, only add them.
if topic not in self._metadata._topics:
ensure_valid_topic_name(topic)
self._topics_to_add.add(topic)
self.wakeup()

Expand Down
Loading