Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 59 additions & 69 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,64 +810,52 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
assert value is not None or self.config['api_version'] >= (0, 8, 1), (
'Null messages require kafka >= 0.8.1')
assert not (value is None and key is None), 'Need at least one: key or value'
key_bytes = value_bytes = None
timer = Timer(self.config['max_block_ms'], "Failed to assign partition for message in max_block_ms.")
try:
assigned_partition = None
while assigned_partition is None and not timer.expired:
self._wait_on_metadata(topic, timer.timeout_ms)

key_bytes = self._serialize(
self.config['key_serializer'],
topic, key)
value_bytes = self._serialize(
self.config['value_serializer'],
topic, value)
assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))

assigned_partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
if assigned_partition is None:
raise Errors.KafkaTimeoutError("Failed to assign partition for message after %s secs." % timer.elapsed_ms / 1000)
else:
partition = assigned_partition

if headers is None:
headers = []
assert isinstance(headers, list)
assert all(isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], str) and isinstance(item[1], bytes) for item in headers)

message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers)
self._ensure_valid_record_size(message_size)

tp = TopicPartition(topic, partition)
log.debug("%s: Sending (key=%r value=%r headers=%r) to %s", str(self), key, value, headers, tp)

if self._transaction_manager and self._transaction_manager.is_transactional():
self._transaction_manager.maybe_add_partition_to_transaction(tp)

result = self._accumulator.append(tp, timestamp_ms,
key_bytes, value_bytes, headers)
future, batch_is_full, new_batch_created = result
if batch_is_full or new_batch_created:
log.debug("%s: Waking up the sender since %s is either full or"
" getting a new batch", str(self), tp)
self._sender.wakeup()

return future
# handling exceptions and record the errors;
# for API exceptions return them in the future,
# for other exceptions raise directly
except Errors.BrokerResponseError as e:
log.error("%s: Exception occurred during message send: %s", str(self), e)
return FutureRecordMetadata(
FutureProduceResult(TopicPartition(topic, partition)),
-1, None, None,
len(key_bytes) if key_bytes is not None else -1,
len(value_bytes) if value_bytes is not None else -1,
sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1,
).failure(e)
key_bytes = self._serialize(
self.config['key_serializer'],
topic, key)
value_bytes = self._serialize(
self.config['value_serializer'],
topic, value)
assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))

if self._metadata.partitions_for_topic(topic) is None:
try:
self._wait_on_metadata(topic, self.config['max_block_ms'])
except Errors.BrokerResponseError as e:
log.error("%s: Exception occurred waiting for metadata during message send: %s", str(self), e)
return FutureRecordMetadata(
FutureProduceResult(TopicPartition(topic, partition)),
-1, None, None,
len(key_bytes) if key_bytes is not None else -1,
len(value_bytes) if value_bytes is not None else -1,
sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1,
).failure(e)

partition = self._partition(topic, partition, key, value, key_bytes, value_bytes)
assert partition is not None, f'Partitioner did not assign a partition for topic {topic}!'

if headers is None:
headers = []
assert isinstance(headers, list)
assert all(isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], str) and isinstance(item[1], bytes) for item in headers)

message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers)
self._ensure_valid_record_size(message_size)

tp = TopicPartition(topic, partition)
log.debug("%s: Sending (key=%r value=%r headers=%r) to %s", str(self), key, value, headers, tp)

if self._transaction_manager and self._transaction_manager.is_transactional():
self._transaction_manager.maybe_add_partition_to_transaction(tp)

result = self._accumulator.append(tp, timestamp_ms, key_bytes, value_bytes, headers)
future, batch_is_full, new_batch_created = result
if batch_is_full or new_batch_created:
log.debug("%s: Waking up the sender since %s is either full or"
" getting a new batch", str(self), tp)
self._sender.wakeup()
return future

def flush(self, timeout=None):
"""
Expand Down Expand Up @@ -919,19 +907,16 @@ def _wait_on_metadata(self, topic, max_wait_ms):
Raises:
KafkaTimeoutError: if partitions for topic were not obtained before
specified max_wait timeout
TopicAuthorizationFailedError: if not authorized to access topic
Non-retriable errors that cause metadata refresh to fail
"""
# add topic to metadata topic list if it is not there already.
partitions = self._metadata.partitions_for_topic(topic)
if partitions is not None:
return partitions
self._sender.add_topic(topic)
timer = Timer(max_wait_ms, "Failed to update metadata after %.1f secs." % (max_wait_ms / 1000,))
metadata_event = None
while True:
partitions = self._metadata.partitions_for_topic(topic)
if partitions is not None:
return partitions
timer.maybe_raise()
if not metadata_event:
metadata_event = threading.Event()

timer = Timer(max_wait_ms)
metadata_event = threading.Event()
while not timer.expired:
log.debug("%s: Requesting metadata update for topic %s", str(self), topic)
metadata_event.clear()
future = self._metadata.request_update()
Expand All @@ -947,6 +932,11 @@ def _wait_on_metadata(self, topic, max_wait_ms):
raise Errors.TopicAuthorizationFailedError(set([topic]))
else:
log.debug("%s: _wait_on_metadata woke after %s secs.", str(self), timer.elapsed_ms / 1000)
partitions = self._metadata.partitions_for_topic(topic)
if partitions is not None:
return partitions
else:
raise Errors.KafkaTimeoutError("Failed to update metadata after %.1f secs." % (max_wait_ms / 1000,))

def _serialize(self, f, topic, data):
if not f:
Expand Down
Loading