Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions kafka/producer/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from kafka import errors as Errors
from kafka.future import Future
from kafka.util import Timer


class FutureProduceResult(Future):
Expand Down Expand Up @@ -57,11 +58,38 @@ def _produce_success(self, result):
serialized_value_size, serialized_header_size)
self.success(metadata)

def rebind(self, new_produce_future, new_batch_index):
"""Rebind this future to a new produce future with a new batch index.

Used when a batch is split due to MESSAGE_TOO_LARGE. The original
FutureRecordMetadata is rebound to the new (smaller) batch's future.

This must be called from the sender thread while the old produce_future
has not been completed. Any user thread blocked in get() on the old
produce_future's latch will be woken and will re-wait on the new one.
"""
old_produce_future = self._produce_future
self._produce_future = new_produce_future
_, timestamp_ms, checksum, sk, sv, sh = self.args
self.args = (new_batch_index, timestamp_ms, checksum, sk, sv, sh)
new_produce_future.add_callback(self._produce_success)
new_produce_future.add_errback(self.failure)
# Wake any thread blocked in get() so it re-waits on the new future.
# The old produce_future is never completed, so its stale callbacks
# (registered in __init__) will never fire.
old_produce_future._latch.set()

def get(self, timeout=None):
if not self.is_done and not self._produce_future.wait(timeout):
raise Errors.KafkaTimeoutError(
"Timeout after waiting for %s secs." % (timeout,))
assert self.is_done
"""Wait for up to timeout seconds for future to complete."""
# Loop because rebind() may wake us from the old produce_future's
# latch before the record is actually done. A batch may be split
# multiple times, so each rebind wakes us and we re-wait on the
# (possibly new) _produce_future.
timer = Timer(timeout * 1000 if timeout is not None else None)
while not self.is_done and not timer.expired:
if not self._produce_future.wait(timer.timeout_secs):
raise Errors.KafkaTimeoutError(
"Timeout after waiting for %s secs." % (timeout,))
if self.failed():
raise self.exception # pylint: disable-msg=raising-bad-type
return self.value
Expand Down
4 changes: 3 additions & 1 deletion kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,9 @@ def max_usable_produce_magic(cls, api_version):
else:
return 0

def _estimate_size_in_bytes(self, key, value, headers=[]):
def _estimate_size_in_bytes(self, key, value, headers=None):
if headers is None:
headers = []
magic = self.max_usable_produce_magic(self.config['api_version'])
if magic == 2:
return DefaultRecordBatchBuilder.estimate_size_in_bytes(
Expand Down
2 changes: 2 additions & 0 deletions kafka/producer/producer_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self, tp, records, now=None):
self.records = records
self.topic_partition = tp
self.produce_future = FutureProduceResult(tp)
self._record_futures = []
self._retry = False
self._final_state = None

Expand Down Expand Up @@ -66,6 +67,7 @@ def try_append(self, timestamp_ms, key, value, headers, now=None):
len(key) if key is not None else -1,
len(value) if value is not None else -1,
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
self._record_futures.append(future)
return future

def abort(self, exception):
Expand Down
84 changes: 83 additions & 1 deletion kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import kafka.errors as Errors
from kafka.producer.producer_batch import ProducerBatch
from kafka.record.memory_records import MemoryRecordsBuilder
from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder
from kafka.structs import TopicPartition


Expand Down Expand Up @@ -206,6 +206,88 @@ def expired_batches(self, now=None):
break
return expired_batches

def split_and_reenqueue(self, batch, now=None):
"""Split an oversized batch into smaller batches and reenqueue them.

When a produce request fails with MESSAGE_TOO_LARGE, this method splits
the batch into two sub-batches (by record count) and enqueues them at
the front of the partition's deque. The original FutureRecordMetadata
objects are rebound to the new batches' futures.

If the new batches are still too large, they will be split again on the
next MESSAGE_TOO_LARGE response.

Only supported for message_version >= 2 (DefaultRecordBatch).

Arguments:
batch (ProducerBatch): The oversized batch to split.

Returns:
int: The number of new batches created.
"""
now = time.monotonic() if now is None else now
tp = batch.topic_partition

# Read all records from the closed batch
records_list = []
for record_batch in MemoryRecords(batch.records.buffer()):
for record in record_batch:
records_list.append(record)

# Split records into two halves by count
mid = (len(records_list) + 1) // 2
groups = [records_list[:mid], records_list[mid:]]

new_batches = []
future_index = 0
for group in groups:
if not group:
continue
builder = MemoryRecordsBuilder(
self.config['message_version'],
self.config['compression_attrs'],
self.config['batch_size'],
)
current_batch = ProducerBatch(tp, builder, now=now)
current_batch.created = batch.created

for record in group:
metadata = builder.append(record.timestamp, record.key, record.value, record.headers)
if metadata is None:
# Record doesn't fit (extremely unlikely for split batches).
# Finalize this batch and start a new one.
new_batches.append(current_batch)
builder = MemoryRecordsBuilder(
self.config['message_version'],
self.config['compression_attrs'],
self.config['batch_size'],
)
current_batch = ProducerBatch(tp, builder, now=now)
current_batch.created = batch.created
metadata = builder.append(record.timestamp, record.key, record.value, record.headers)

# Rebind original future to new batch
if future_index < len(batch._record_futures):
original_future = batch._record_futures[future_index]
original_future.rebind(current_batch.produce_future, metadata.offset)
current_batch._record_futures.append(original_future)
future_index += 1

new_batches.append(current_batch)

# Enqueue in reverse order so first batch is at front of deque
with self._tp_lock(tp):
dq = self._batches[tp]
for new_batch in reversed(new_batches):
new_batch.attempts = batch.attempts
new_batch.last_attempt = now
dq.appendleft(new_batch)
self._incomplete.add(new_batch)

log.info("Split oversized batch for %s into %d new batches (%d total records)",
tp, len(new_batches), future_index)
return len(new_batches)

def reenqueue(self, batch, now=None):
"""
Re-enqueue the given record batch in the accumulator. In Sender._complete_batch method, we check
Expand Down
21 changes: 20 additions & 1 deletion kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,15 @@ def _complete_batch(self, batch, partition_response):
error = None

if error is not None:
if self._can_retry(batch, error):
if self._can_split(batch, error):
log.warning("%s: Got %s on topic-partition %s with %d records, splitting batch and retrying",
str(self), error.__name__, batch.topic_partition, batch.record_count)
self._accumulator.split_and_reenqueue(batch)
self._maybe_remove_from_inflight_batches(batch)
self._accumulator.deallocate(batch)
if self._sensors:
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
elif self._can_retry(batch, error):
# retry
log.warning("%s: Got error produce response on topic-partition %s, retrying (%s attempts left): %s%s",
str(self), batch.topic_partition,
Expand Down Expand Up @@ -566,6 +574,17 @@ def _can_retry(self, batch, error):
batch.final_state is None and
getattr(error, 'retriable', False))

def _can_split(self, batch, error):
"""
We can split and retry a batch if the error indicates the batch is too
large for the broker, the batch contains more than one record (so it
can actually be split), and the delivery timeout has not been reached.
"""
return (error in (Errors.MessageSizeTooLargeError, Errors.RecordListTooLargeError) and
batch.record_count > 1 and
batch.final_state is None and
not batch.has_reached_delivery_timeout(self._accumulator.delivery_timeout_ms))

def _create_produce_requests(self, collated):
"""
Transfer the record batches into a list of produce requests on a
Expand Down
4 changes: 3 additions & 1 deletion kafka/record/memory_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,13 @@ def skip(self, offsets_to_skip):
# Exposed for testing compacted records
self._next_offset += offsets_to_skip

def append(self, timestamp, key, value, headers=[]):
def append(self, timestamp, key, value, headers=None):
""" Append a message to the buffer.

Returns: RecordMetadata or None if unable to append
"""
if headers is None:
headers = []
if self._closed:
return None

Expand Down
5 changes: 5 additions & 0 deletions kafka/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ def timeout_ms(self):
else:
return int(remaining * 1000)

@property
def timeout_secs(self):
timeout_ms = self.timeout_ms
return timeout_ms / 1000 if timeout_ms is not None else None

@property
def elapsed_ms(self):
return int(1000 * (time.monotonic() - self._start_at))
Expand Down
Loading
Loading