Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,20 @@ def _complete_batch(self, batch, partition_response):
self._accumulator.deallocate(batch)
if self._sensors:
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
elif self._is_retention_based_unknown_producer_id(batch, error, partition_response):
# KAFKA-5793: the broker's producer state aged out due to
# retention (log_start_offset > last_acked_offset), not
# actual data loss. Reset the partition sequence and retry.
log.warning("%s: UnknownProducerIdError for %s appears to be retention-based"
" (log_start_offset=%s, last_acked_offset=%s); resetting sequence and retrying",
str(self), batch.topic_partition,
partition_response.log_start_offset,
self._transaction_manager.last_acked_offset(batch.topic_partition))
self._transaction_manager.reset_sequence_for_partition(batch.topic_partition)
self._accumulator.reenqueue(batch)
self._maybe_remove_from_inflight_batches(batch)
if self._sensors:
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
elif self._can_retry(batch, error):
# retry
log.warning("%s: Got error produce response on topic-partition %s, retrying (%s attempts left): %s%s",
Expand Down Expand Up @@ -563,6 +577,11 @@ def _complete_batch(self, batch, partition_response):
self._maybe_remove_from_inflight_batches(batch)
self._accumulator.deallocate(batch)

# Track last ack'd offset for KAFKA-5793 retention detection.
if self._transaction_manager and self._transaction_manager.producer_id_and_epoch.match(batch):
self._transaction_manager.update_last_acked_offset(
batch.topic_partition, partition_response.base_offset, batch.record_count)

# Unmute the completed partition.
if self.config['guarantee_message_order']:
self._accumulator.muted.remove(batch.topic_partition)
Expand All @@ -577,6 +596,32 @@ def _can_retry(self, batch, error):
batch.final_state is None and
getattr(error, 'retriable', False))

def _is_retention_based_unknown_producer_id(self, batch, error, partition_response):
"""Detect retention-based UnknownProducerIdError (KAFKA-5793).

The broker returns UnknownProducerIdError either because the producer
state was legitimately removed by retention, or because of actual
data loss. If the broker's log_start_offset is strictly greater than
the last offset we acknowledged for this partition, then the records
we previously wrote have been aged out — the producer can safely
reset its sequence to 0 and resume.
"""
if error is not Errors.UnknownProducerIdError:
return False
if not self._transaction_manager:
return False
if not self._transaction_manager.producer_id_and_epoch.match(batch):
return False
if batch.has_reached_delivery_timeout(self._accumulator.delivery_timeout_ms):
return False
if batch.final_state is not None:
return False
log_start_offset = partition_response.log_start_offset
if log_start_offset is None or log_start_offset < 0:
return False
last_acked = self._transaction_manager.last_acked_offset(batch.topic_partition)
return log_start_offset > last_acked

def _can_split(self, batch, error):
"""
We can split and retry a batch if the error indicates the batch is too
Expand Down
25 changes: 25 additions & 0 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ def __init__(self, transactional_id=None, transaction_timeout_ms=0, retry_backof
self._metadata = metadata

self._sequence_numbers = collections.defaultdict(lambda: 0)
# The offset of the last ack'd record for each partition. Used to
# distinguish retention-based UnknownProducerIdError (broker's
# log_start_offset > last_acked_offset → safe to reset and retry)
# from actual data loss. See KAFKA-5793.
self._last_acked_offset = {}

self.transactional_id = transactional_id
self.transaction_timeout_ms = transaction_timeout_ms
Expand Down Expand Up @@ -326,6 +331,26 @@ def set_sequence_number(self, tp, sequence):
def reset_sequence_for_partition(self, tp):
with self._lock:
self._sequence_numbers.pop(tp, None)
self._last_acked_offset.pop(tp, None)

def update_last_acked_offset(self, tp, base_offset, record_count):
"""Record the offset of the last successfully-produced record for tp.

Called from the sender on each successful batch completion. The
last acked offset is used to detect whether a subsequent
UnknownProducerIdError reflects retention (safe to retry) vs. real
data loss (fatal). See KAFKA-5793.
"""
if base_offset < 0:
return
last_offset = base_offset + record_count - 1
with self._lock:
if last_offset > self._last_acked_offset.get(tp, -1):
self._last_acked_offset[tp] = last_offset

def last_acked_offset(self, tp):
with self._lock:
return self._last_acked_offset.get(tp, -1)

def next_request_handler(self, has_incomplete_batches):
with self._lock:
Expand Down
Loading
Loading