diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 1230b41f8..345010325 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -9,7 +9,7 @@ from kafka.metrics.measurable import AnonMeasurable from kafka.metrics.stats import Avg, Max, Rate from kafka.producer.transaction_manager import ProducerIdAndEpoch -from kafka.protocol.producer import InitProducerIdRequest, ProduceRequest +from kafka.protocol.producer import InitProducerIdRequest, ProduceRequest, ProduceResponse from kafka.structs import TopicPartition from kafka.util import ensure_valid_topic_name from kafka.version import __version__ @@ -17,9 +17,11 @@ log = logging.getLogger(__name__) -PartitionResponse = collections.namedtuple("PartitionResponse", - ["error", "base_offset", "last_offset", "log_append_time", "log_start_offset", "record_errors", "error_message", "current_leader"]) -PartitionResponse.__new__.__defaults__ = (Errors.NoError, -1, -1, -1, -1, (), None, (-1, -1)) +# Short alias for the protocol type used throughout the sender's batch- +# completion paths. Synthetic instances (acks=0, locally-expired batches) +# are constructed with just a few fields set; unset fields fall through to +# the schema defaults via DataContainer.__getattr__. +_PartitionProduceResponse = ProduceResponse.TopicProduceResponse.PartitionProduceResponse class Sender(threading.Thread): @@ -233,7 +235,7 @@ def _send_producer_data(self, now=None): error_message = "Expiring %d record(s) for %s: %s ms has passed since batch creation" % ( expired_batch.record_count, expired_batch.topic_partition, int((time.monotonic() - expired_batch.created) * 1000)) - self._fail_batch(expired_batch, PartitionResponse(error=Errors.KafkaTimeoutError, error_message=error_message)) + self._complete_batch_with_exception(expired_batch, Errors.KafkaTimeoutError(error_message)) if self._sensors: self._sensors.update_produce_request_metrics(batches_by_node) @@ -395,46 +397,25 @@ def _maybe_wait_for_producer_id(self): def _failed_produce(self, batches, node_id, error): log.error("%s: Error sending produce request to node %d: %s", str(self), node_id, error) # trace for batch in batches: - self._complete_batch(batch, PartitionResponse(error=error)) + self._complete_batch_with_exception(batch, error) def _handle_produce_response(self, node_id, send_time, batches, response): """Handle a produce response.""" # if we have a response, parse it log.debug('%s: Parsing produce response: %r', str(self), response) if response: - batches_by_partition = dict([(batch.topic_partition, batch) - for batch in batches]) - - for topic, partitions in response.responses: - for partition_info in partitions: - log_append_time = -1 - log_start_offset = -1 - record_errors = () - error_message = None - if response.API_VERSION < 2: - partition, error_code, base_offset = partition_info - elif 2 <= response.API_VERSION <= 4: - partition, error_code, base_offset, log_append_time = partition_info - elif 5 <= response.API_VERSION <= 7: - partition, error_code, base_offset, log_append_time, log_start_offset = partition_info - else: - partition, error_code, base_offset, log_append_time, log_start_offset, record_errors, error_message = partition_info - tp = TopicPartition(topic, partition) + batches_by_partition = {batch.topic_partition: batch for batch in batches} + for topic_response in response.responses: + topic = topic_response.name + for partition_response in topic_response.partition_responses: + tp = TopicPartition(topic, partition_response.index) batch = batches_by_partition[tp] - partition_response = PartitionResponse( - error=Errors.for_code(error_code), - base_offset=base_offset, - last_offset=-1, - log_append_time=log_append_time, - log_start_offset=log_start_offset, - record_errors=record_errors, - error_message=error_message, - ) self._complete_batch(batch, partition_response) else: - # this is the acks = 0 case, just complete all requests + # acks=0: no response data, synthesize a success response + synthetic = _PartitionProduceResponse(error_code=0) for batch in batches: - self._complete_batch(batch, PartitionResponse()) + self._complete_batch(batch, synthetic) def _record_exceptions_fn(self, top_level_exception, record_errors, error_message): """Returns a fn mapping batch_index to exception""" @@ -453,152 +434,204 @@ def record_exceptions_fn(batch_index): return exc(err_msg) return record_exceptions_fn - def _fail_batch(self, batch, partition_response): - if partition_response.error is Errors.TopicAuthorizationFailedError: - exception = Errors.TopicAuthorizationFailedError(batch.topic_partition.topic) - elif partition_response.error is Errors.ClusterAuthorizationFailedError: - exception = Errors.ClusterAuthorizationFailedError("The producer is not authorized to do idempotent sends") - else: - exception = partition_response.error(partition_response.error_message) - - if self._transaction_manager: - if isinstance(exception, Errors.OutOfOrderSequenceNumberError) and \ - not self._transaction_manager.is_transactional() and \ - self._transaction_manager.has_producer_id(batch.producer_id): - log.error("%s: The broker received an out of order sequence number for topic-partition %s" - " at offset %s. This indicates data loss on the broker, and should be investigated.", - str(self), batch.topic_partition, partition_response.base_offset) - - # Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees - # about the previously committed message. Note that this will discard the producer id and sequence - # numbers for all existing partitions. - self._transaction_manager.reset_producer_id() - elif isinstance(exception, Errors.UnknownProducerIdError): - # If we get an UnknownProducerId for a partition, then the broker has no state for that producer. It will - # therefore accept a write with sequence number 0. We reset the sequence number for the partition here so - # that the producer can continue after aborting the transaction. All inflight-requests to this partition - # will also fail with an UnknownProducerId error, so the sequence will remain at 0. Note that if the - # broker supports bumping the epoch, we will later reset all sequence numbers after calling InitProducerId - self._transaction_manager.reset_sequence_for_partition(batch.topic_partition) - elif isinstance(exception, (Errors.ClusterAuthorizationFailedError, - Errors.TransactionalIdAuthorizationFailedError, - Errors.ProducerFencedError, - Errors.InvalidTxnStateError)): - self._transaction_manager.transition_to_fatal_error(exception) - elif self._transaction_manager.is_transactional(): - self._transaction_manager.transition_to_abortable_error(exception) - - if self._sensors: - self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) - - record_exceptions_fn = self._record_exceptions_fn(exception, partition_response.record_errors, partition_response.error_message) - if batch.complete_exceptionally(exception, record_exceptions_fn): - self._maybe_remove_from_inflight_batches(batch) - self._accumulator.deallocate(batch) - def _complete_batch(self, batch, partition_response): - """Complete or retry the given batch of records. + """Complete or retry the given batch of records based on a broker response. + + Handles both the success path (including treating + DuplicateSequenceNumberError as success, for max_in_flight > 1 + retry arrivals) and the error path, which delegates to + _dispatch_error with a context-aware exception instance. Arguments: batch (ProducerBatch): The record batch - partition_response (PartitionResponse): Response details for partition + partition_response (PartitionProduceResponse): Protocol-layer + partition response from the broker (or a synthetic instance + for the acks=0 case). """ - # Standardize no-error to None - error = partition_response.error - if error is Errors.NoError: - error = None - elif error is Errors.DuplicateSequenceNumberError: - # With max_in_flight > 1 and retries, a retried batch may arrive - # after the broker already committed the original. The broker - # returns DUPLICATE_SEQUENCE_NUMBER, which means the records were - # already written successfully. Treat as success. - log.debug("%s: Received DUPLICATE_SEQUENCE_NUMBER for %s — records already committed, treating as success", - str(self), batch.topic_partition) - error = None - - if error is not None: - if self._can_split(batch, error): - log.warning("%s: Got %s on topic-partition %s with %d records, splitting batch and retrying", - str(self), error.__name__, batch.topic_partition, batch.record_count) - self._accumulator.split_and_reenqueue(batch) + error_code = partition_response.error_code + if error_code != 0: + error_cls = Errors.for_code(error_code) + if error_cls is Errors.DuplicateSequenceNumberError: + # With max_in_flight > 1 and retries, a retried batch may + # arrive after the broker already committed the original. + # DUPLICATE_SEQUENCE_NUMBER means the records were already + # written successfully; treat as success. + log.debug("%s: Received DUPLICATE_SEQUENCE_NUMBER for %s — records already committed, treating as success", + str(self), batch.topic_partition) + error_code = 0 + + if error_code == 0: + # Success path + base_offset = partition_response.base_offset + log_append_time = partition_response.log_append_time_ms + if batch.complete(base_offset, log_append_time): self._maybe_remove_from_inflight_batches(batch) self._accumulator.deallocate(batch) - if self._sensors: - self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) - elif self._is_retention_based_unknown_producer_id(batch, error, partition_response): - # KAFKA-5793: the broker's producer state aged out due to - # retention (log_start_offset > last_acked_offset), not - # actual data loss. Reset the partition sequence and retry. - log.warning("%s: UnknownProducerIdError for %s appears to be retention-based" - " (log_start_offset=%s, last_acked_offset=%s); resetting sequence and retrying", - str(self), batch.topic_partition, - partition_response.log_start_offset, - self._transaction_manager.last_acked_offset(batch.topic_partition)) - self._transaction_manager.reset_sequence_for_partition(batch.topic_partition) - self._accumulator.reenqueue(batch) - self._maybe_remove_from_inflight_batches(batch) - if self._sensors: - self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) - elif self._can_retry(batch, error): - # retry - log.warning("%s: Got error produce response on topic-partition %s, retrying (%s attempts left): %s%s", - str(self), batch.topic_partition, - self.config['retries'] - batch.attempts - 1, - error.__name__, - (". Error Message: %s" % partition_response.error_message) if partition_response.error_message else "") - - # If idempotence is enabled only retry the request if the batch matches our current producer id and epoch - if not self._transaction_manager or self._transaction_manager.producer_id_and_epoch.match(batch): - log.debug("%s: Retrying batch to topic-partition %s. Sequence number: %s", - str(self), batch.topic_partition, - self._transaction_manager.sequence_number(batch.topic_partition) if self._transaction_manager else None) - self._accumulator.reenqueue(batch) - self._maybe_remove_from_inflight_batches(batch) - if self._sensors: - self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) - else: - log.warning("%s: Attempted to retry sending a batch but the producer id/epoch changed from %s/%s to %s/%s. This batch will be dropped", - str(self), batch.producer_id, batch.producer_epoch, - self._transaction_manager.producer_id_and_epoch.producer_id, - self._transaction_manager.producer_id_and_epoch.epoch) - self._fail_batch(batch, partition_response) - else: - # tell the user the result of their request - self._fail_batch(batch, partition_response) + # Track last ack'd offset for KAFKA-5793 retention detection. + if self._transaction_manager and self._transaction_manager.producer_id_and_epoch.match(batch): + self._transaction_manager.update_last_acked_offset( + batch.topic_partition, base_offset, batch.record_count) + if self.config['guarantee_message_order']: + self._accumulator.muted.remove(batch.topic_partition) + return + + # Error path: construct the exception with context-specific wrappers + # for auth errors that carry a topic or producer-specific message. + if error_cls is Errors.TopicAuthorizationFailedError: + exception = Errors.TopicAuthorizationFailedError(batch.topic_partition.topic) + elif error_cls is Errors.ClusterAuthorizationFailedError: + exception = Errors.ClusterAuthorizationFailedError("The producer is not authorized to do idempotent sends") + else: + exception = error_cls(partition_response.error_message) + self._dispatch_error(batch, exception, partition_response) + + def _complete_batch_with_exception(self, batch, exception): + """Complete a batch following a client-side failure. - if error is Errors.UnknownTopicOrPartitionError: - log.warning("%s: Received unknown topic or partition error in produce request on partition %s." - " The topic/partition may not exist or the user may not have Describe access to it", - str(self), batch.topic_partition) + Called from _failed_produce for network errors and from + _send_producer_data for locally-expired batches. The exception is + used as-is (no reconstruction), so any dynamic message is + preserved. - if getattr(error, 'invalid_metadata', False): - self._metadata.request_update() + Arguments: + batch (ProducerBatch): The record batch + exception (Exception or type): The client-side exception or its + class (a bare class is instantiated with no message) + """ + if isinstance(exception, type): + exception = exception(None) + self._dispatch_error(batch, exception, partition_response=None) + + def _dispatch_error(self, batch, exception, partition_response): + """Apply the appropriate outcome for a failed batch. + + Single decision point for both broker-reported errors (with a + partition_response) and client-side exceptions (partition_response + is None). Handles split / retry / retention-reset / fail along with + transaction-state transitions and post-error housekeeping + (metadata refresh, partition unmuting). + """ + error_cls = type(exception) + log_start_offset = partition_response.log_start_offset if partition_response is not None else -1 + if self._can_split(batch, error_cls): + log.warning("%s: Got %s on topic-partition %s with %d records, splitting batch and retrying", + str(self), error_cls.__name__, batch.topic_partition, batch.record_count) + self._accumulator.split_and_reenqueue(batch) + self._maybe_remove_from_inflight_batches(batch) + self._accumulator.deallocate(batch) + self._record_retries(batch) + elif self._is_retention_based_unknown_producer_id(batch, error_cls, log_start_offset): + # KAFKA-5793: the broker's producer state aged out due to + # retention (log_start_offset > last_acked_offset), not + # actual data loss. Reset the partition sequence and retry. + log.warning("%s: UnknownProducerIdError for %s appears to be retention-based" + " (log_start_offset=%s, last_acked_offset=%s); resetting sequence and retrying", + str(self), batch.topic_partition, log_start_offset, + self._transaction_manager.last_acked_offset(batch.topic_partition)) + self._transaction_manager.reset_sequence_for_partition(batch.topic_partition) + self._accumulator.reenqueue(batch) + self._maybe_remove_from_inflight_batches(batch) + self._record_retries(batch) + elif self._can_retry(batch, error_cls): + error_message = exception.args[0] if exception.args and exception.args[0] is not None else None + log.warning("%s: Got error produce response on topic-partition %s, retrying (%s attempts left): %s%s", + str(self), batch.topic_partition, + self.config['retries'] - batch.attempts - 1, + error_cls.__name__, + (". Error Message: %s" % error_message) if error_message else "") + log.debug("%s: Retrying batch to topic-partition %s. Sequence number: %s", + str(self), batch.topic_partition, + self._transaction_manager.sequence_number(batch.topic_partition) if self._transaction_manager else None) + self._accumulator.reenqueue(batch) + self._maybe_remove_from_inflight_batches(batch) + self._record_retries(batch) else: - if batch.complete(partition_response.base_offset, partition_response.log_append_time): + # FAIL: transaction state transitions then batch finalization. + if self._transaction_manager: + if isinstance(exception, Errors.OutOfOrderSequenceNumberError) and \ + not self._transaction_manager.is_transactional() and \ + self._transaction_manager.has_producer_id(batch.producer_id): + base_offset = partition_response.base_offset if partition_response is not None else -1 + log.error("%s: The broker received an out of order sequence number for topic-partition %s" + " at offset %s. This indicates data loss on the broker, and should be investigated.", + str(self), batch.topic_partition, base_offset) + # Reset the producer state since we have hit an irrecoverable + # exception and cannot make any guarantees about previously + # committed messages. This discards the producer id and all + # partition sequence numbers. + self._transaction_manager.reset_producer_id() + elif isinstance(exception, Errors.UnknownProducerIdError): + # The broker has no state for this producer. It will accept a + # write with sequence 0, so reset our sequence state for this + # partition. All in-flight requests to this partition will + # also fail with UnknownProducerId, so the sequence will + # remain at 0. Note: if the broker supports epoch bumping, + # KIP-360 will later reset all sequence numbers after + # calling InitProducerId. + self._transaction_manager.reset_sequence_for_partition(batch.topic_partition) + elif isinstance(exception, (Errors.ClusterAuthorizationFailedError, + Errors.TransactionalIdAuthorizationFailedError, + Errors.ProducerFencedError, + Errors.InvalidTxnStateError)): + self._transaction_manager.transition_to_fatal_error(exception) + elif self._transaction_manager.is_transactional(): + self._transaction_manager.transition_to_abortable_error(exception) + + if self._sensors: + self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) + + if partition_response is not None: + record_errors = partition_response.record_errors + error_message = partition_response.error_message + else: + record_errors = () + error_message = exception.args[0] if exception.args and exception.args[0] is not None else None + record_exceptions_fn = self._record_exceptions_fn(exception, record_errors, error_message) + if batch.complete_exceptionally(exception, record_exceptions_fn): self._maybe_remove_from_inflight_batches(batch) self._accumulator.deallocate(batch) - # Track last ack'd offset for KAFKA-5793 retention detection. - if self._transaction_manager and self._transaction_manager.producer_id_and_epoch.match(batch): - self._transaction_manager.update_last_acked_offset( - batch.topic_partition, partition_response.base_offset, batch.record_count) - - # Unmute the completed partition. + # Post-error housekeeping (runs for all branches above) + if error_cls is Errors.UnknownTopicOrPartitionError: + log.warning("%s: Received unknown topic or partition error in produce request on partition %s." + " The topic/partition may not exist or the user may not have Describe access to it", + str(self), batch.topic_partition) + if getattr(error_cls, 'invalid_metadata', False): + self._metadata.request_update() if self.config['guarantee_message_order']: self._accumulator.muted.remove(batch.topic_partition) - def _can_retry(self, batch, error): + def _record_retries(self, batch): + if self._sensors: + self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) + + def _can_retry(self, batch, error_cls): """ - We can retry a send if the error is transient and the number of - attempts taken is fewer than the maximum allowed + We can retry a send if the error is transient, the number of + attempts taken is fewer than the maximum allowed, and — for the + idempotent producer — the batch's producer id/epoch still matches + ours. A mismatched producer id/epoch (e.g. after a reset or future + KIP-360 epoch bump) means retrying would violate idempotence. """ - return (not batch.has_reached_delivery_timeout(self._accumulator.delivery_timeout_ms) and - batch.attempts < self.config['retries'] and - batch.final_state is None and - getattr(error, 'retriable', False)) + if batch.has_reached_delivery_timeout(self._accumulator.delivery_timeout_ms): + return False + if batch.attempts >= self.config['retries']: + return False + if batch.final_state is not None: + return False + if not getattr(error_cls, 'retriable', False): + return False + if self._transaction_manager and not self._transaction_manager.producer_id_and_epoch.match(batch): + log.warning("%s: Attempted to retry sending a batch but the producer id/epoch changed from %s/%s to %s/%s." + " This batch will be dropped", + str(self), batch.producer_id, batch.producer_epoch, + self._transaction_manager.producer_id_and_epoch.producer_id, + self._transaction_manager.producer_id_and_epoch.epoch) + return False + return True - def _is_retention_based_unknown_producer_id(self, batch, error, partition_response): + def _is_retention_based_unknown_producer_id(self, batch, error_cls, log_start_offset): """Detect retention-based UnknownProducerIdError (KAFKA-5793). The broker returns UnknownProducerIdError either because the producer @@ -608,7 +641,7 @@ def _is_retention_based_unknown_producer_id(self, batch, error, partition_respon we previously wrote have been aged out — the producer can safely reset its sequence to 0 and resume. """ - if error is not Errors.UnknownProducerIdError: + if error_cls is not Errors.UnknownProducerIdError: return False if not self._transaction_manager: return False @@ -618,7 +651,6 @@ def _is_retention_based_unknown_producer_id(self, batch, error, partition_respon return False if batch.final_state is not None: return False - log_start_offset = partition_response.log_start_offset if log_start_offset is None or log_start_offset < 0: return False last_acked = self._transaction_manager.last_acked_offset(batch.topic_partition) diff --git a/test/producer/test_sender.py b/test/producer/test_sender.py index 6388763dc..afb7f90fc 100644 --- a/test/producer/test_sender.py +++ b/test/producer/test_sender.py @@ -17,7 +17,50 @@ from kafka.producer.future import FutureRecordMetadata from kafka.producer.producer_batch import ProducerBatch from kafka.producer.record_accumulator import RecordAccumulator -from kafka.producer.sender import PartitionResponse, Sender +from kafka.producer.sender import Sender +from kafka.protocol.producer import ProduceResponse + +_PartitionProduceResponse = ProduceResponse.TopicProduceResponse.PartitionProduceResponse + + +def _partition_response(error_cls=None, **kwargs): + """Test helper that constructs a PartitionProduceResponse. + + Accepts the old PartitionResponse-style kwargs (base_offset, + log_append_time, log_start_offset, error_message) and maps them to + PartitionProduceResponse fields (base_offset, log_append_time_ms, + log_start_offset, error_message). + + If error_cls is provided it must be a broker error class with an + errno; its errno is used for error_code. For client-side errors, call + sender._complete_batch_with_exception directly instead. + """ + if 'log_append_time' in kwargs: + kwargs['log_append_time_ms'] = kwargs.pop('log_append_time') + if error_cls is not None: + kwargs.setdefault('error_code', error_cls.errno) + else: + kwargs.setdefault('error_code', 0) + return _PartitionProduceResponse(**kwargs) + + +def _is_broker_error(error_cls): + """Return True if the error class is a broker error with an errno.""" + return hasattr(error_cls, 'errno') and error_cls.errno != -1 + + +def _complete(sender, batch, error_cls=None, **kwargs): + """Call the appropriate sender entry point based on the error type. + + For broker errors (those with an errno), calls _complete_batch with a + synthetic PartitionProduceResponse. For client-side errors (exceptions + raised locally like KafkaConnectionError), calls + _complete_batch_with_exception. + """ + if error_cls is not None and not _is_broker_error(error_cls): + sender._complete_batch_with_exception(batch, error_cls) + else: + sender._complete_batch(batch, _partition_response(error_cls=error_cls, **kwargs)) from kafka.producer.transaction_manager import ProducerIdAndEpoch, TransactionManager from kafka.record.memory_records import MemoryRecordsBuilder from kafka.structs import TopicPartition @@ -91,7 +134,7 @@ def test_complete_batch_success(sender): assert not batch.produce_future.is_done # No error, base_offset 0 - sender._complete_batch(batch, PartitionResponse(base_offset=0, log_append_time=123)) + sender._complete_batch(batch, _partition_response(base_offset=0, log_append_time=123)) assert batch.is_done assert batch.produce_future.is_done assert batch.produce_future.succeeded() @@ -110,7 +153,7 @@ def test_complete_batch_transaction(sender, transaction_manager): assert sender._transaction_manager.sequence_number(batch.topic_partition) == batch.record_count # No error, base_offset 0 - sender._complete_batch(batch, PartitionResponse(base_offset=0)) + sender._complete_batch(batch, _partition_response(base_offset=0)) assert batch.is_done # Sequence should not change on completion (already incremented at drain) assert sender._transaction_manager.sequence_number(batch.topic_partition) == batch.record_count @@ -140,7 +183,7 @@ def test_complete_batch_error(sender, error, refresh_metadata): assert sender._client.cluster.ttl() > 0 batch = producer_batch() future = FutureRecordMetadata(batch.produce_future, -1, -1, -1, -1, -1, -1) - sender._complete_batch(batch, PartitionResponse(error=error)) + _complete(sender, batch, error_cls=error) if refresh_metadata: assert sender._client.cluster.ttl() == 0 else: @@ -172,12 +215,12 @@ def test_complete_batch_retry(sender, accumulator, mocker, error, retry): mocker.patch.object(accumulator, 'reenqueue') batch = producer_batch() future = FutureRecordMetadata(batch.produce_future, -1, -1, -1, -1, -1, -1) - sender._complete_batch(batch, PartitionResponse(error=error)) + _complete(sender, batch, error_cls=error) if retry: assert not batch.is_done accumulator.reenqueue.assert_called_with(batch) batch.attempts += 1 # normally handled by accumulator.reenqueue, but it's mocked - sender._complete_batch(batch, PartitionResponse(error=error)) + _complete(sender, batch, error_cls=error) assert batch.is_done assert future.failed() assert isinstance(future.exception, error) @@ -194,12 +237,12 @@ def test_complete_batch_producer_id_changed_no_retry(sender, accumulator, transa error = Errors.NotLeaderForPartitionError batch = producer_batch() future = FutureRecordMetadata(batch.produce_future, -1, -1, -1, -1, -1, -1) - sender._complete_batch(batch, PartitionResponse(error=error)) + _complete(sender, batch, error_cls=error) assert not batch.is_done accumulator.reenqueue.assert_called_with(batch) batch.records._producer_id = 123 # simulate different producer_id assert batch.producer_id != sender._transaction_manager.producer_id_and_epoch.producer_id - sender._complete_batch(batch, PartitionResponse(error=error)) + _complete(sender, batch, error_cls=error) assert batch.is_done assert future.failed() assert isinstance(future.exception, error) @@ -211,7 +254,7 @@ def test_fail_batch(sender, accumulator, transaction_manager, mocker): mocker.patch.object(batch, 'done') assert sender._transaction_manager.producer_id_and_epoch.producer_id == batch.producer_id error = Errors.KafkaError - sender._fail_batch(batch, PartitionResponse(error=error)) + sender._complete_batch_with_exception(batch, error) batch.done.assert_called_with(top_level_exception=error(None), record_exceptions_fn=mocker.ANY) @@ -223,7 +266,10 @@ def test_out_of_order_sequence_number_reset_producer_id(sender, accumulator, tra mocker.patch.object(batch, 'done') assert sender._transaction_manager.producer_id_and_epoch.producer_id == batch.producer_id error = Errors.OutOfOrderSequenceNumberError - sender._fail_batch(batch, PartitionResponse(base_offset=0, log_append_time=None, error=error)) + # OutOfOrderSequenceNumber is non-retriable — hits the FAIL branch of + # _dispatch_error, which resets the producer id for non-transactional + # idempotent producers. + sender._complete_batch_with_exception(batch, error) sender._transaction_manager.reset_producer_id.assert_called_once() batch.done.assert_called_with(top_level_exception=error(None), record_exceptions_fn=mocker.ANY) @@ -233,13 +279,13 @@ def test_handle_produce_response(): def test_failed_produce(sender, mocker): - mocker.patch.object(sender, '_complete_batch') + mocker.patch.object(sender, '_complete_batch_with_exception') mock_batches = ['foo', 'bar', 'fizzbuzz'] sender._failed_produce(mock_batches, 0, 'error') - sender._complete_batch.assert_has_calls([ - call('foo', PartitionResponse(error='error')), - call('bar', PartitionResponse(error='error')), - call('fizzbuzz', PartitionResponse(error='error')), + sender._complete_batch_with_exception.assert_has_calls([ + call('foo', 'error'), + call('bar', 'error'), + call('fizzbuzz', 'error'), ]) @@ -275,7 +321,7 @@ def test__record_exceptions_fn(sender): assert record_exceptions_fn(0) == Errors.KafkaError('err-0') -class SplitAndReenqueTests: +class TestSplitAndReenqueue: def multi_record_batch(self, num_records=5, topic='foo', partition=0, batch_size=100000): """Create a ProducerBatch with multiple records for split testing.""" tp = TopicPartition(topic, partition) @@ -418,7 +464,7 @@ def test_complete_batch_splits_on_message_too_large(self, sender, accumulator, m batch, futures = self.multi_record_batch(num_records=5) accumulator._incomplete.add(batch) - sender._complete_batch(batch, PartitionResponse(error=Errors.MessageSizeTooLargeError)) + sender._complete_batch(batch, _partition_response(error_cls=Errors.MessageSizeTooLargeError)) # Original batch should be deallocated (not in incomplete set) assert batch not in accumulator._incomplete.all() @@ -440,7 +486,7 @@ def test_complete_batch_splits_on_record_list_too_large(self, sender, accumulato batch, futures = self.multi_record_batch(num_records=5) accumulator._incomplete.add(batch) - sender._complete_batch(batch, PartitionResponse(error=Errors.RecordListTooLargeError)) + sender._complete_batch(batch, _partition_response(error_cls=Errors.RecordListTooLargeError)) dq = accumulator._batches[tp] assert len(dq) >= 2 @@ -453,7 +499,7 @@ def test_complete_batch_single_record_fails_normally(self, sender, accumulator): accumulator._incomplete.add(batch) sender.config['retries'] = 0 - sender._complete_batch(batch, PartitionResponse(error=Errors.MessageSizeTooLargeError)) + sender._complete_batch(batch, _partition_response(error_cls=Errors.MessageSizeTooLargeError)) assert batch.is_done assert futures[0].is_done @@ -469,7 +515,7 @@ def test_complete_batch_split_unmutes_partition(self, sender, accumulator): batch, _ = self.multi_record_batch(num_records=5, topic='foo', partition=0) accumulator._incomplete.add(batch) - sender._complete_batch(batch, PartitionResponse(error=Errors.MessageSizeTooLargeError)) + sender._complete_batch(batch, _partition_response(error_cls=Errors.MessageSizeTooLargeError)) assert tp not in accumulator.muted @@ -673,7 +719,7 @@ def test_end_to_end_split_and_complete(self, accumulator): assert future.value.partition == 0 -class IdempotentProducerMaxInFlightTests: +class TestIdempotentProducerMaxInFlight: def test_idempotent_config_allows_max_in_flight_up_to_5(self): """Idempotent producer allows max_in_flight 1-5.""" for max_in_flight in (1, 2, 3, 4, 5): @@ -792,8 +838,8 @@ def test_duplicate_sequence_number_treated_as_success(self, sender, accumulator) batch = producer_batch() accumulator._incomplete.add(batch) - sender._complete_batch(batch, PartitionResponse( - error=Errors.DuplicateSequenceNumberError, base_offset=42, log_append_time=-1)) + sender._complete_batch(batch, _partition_response( + error_cls=Errors.DuplicateSequenceNumberError, base_offset=42, log_append_time=-1)) assert batch.is_done assert batch.produce_future.succeeded() @@ -839,7 +885,11 @@ def test_split_resets_sequence_number(self, client, transaction_manager): def test_split_without_idempotence_no_sequence_reset(self, accumulator): """split_and_reenqueue works without transaction_manager (no sequence to reset).""" tp = TopicPartition('foo', 0) - batch, futures = self.multi_record_batch(num_records=4) + tp_records = MemoryRecordsBuilder(magic=2, compression_type=0, batch_size=100000) + batch = ProducerBatch(tp, tp_records) + for i in range(4): + batch.try_append(0, b'key-%d' % i, b'value-%d' % i, []) + batch.records.close() accumulator._incomplete.add(batch) # Should not raise even without a transaction_manager @@ -848,14 +898,14 @@ def test_split_without_idempotence_no_sequence_reset(self, accumulator): assert num_new == 2 -class TransactionManagerLastAckedOffsetTests: +class TestTransactionManagerLastAckedOffset: def test_update_last_acked_offset_on_success(self, sender, accumulator, transaction_manager): """Sender updates last_acked_offset in TransactionManager on successful completion.""" sender._transaction_manager = transaction_manager batch = producer_batch() # 1 record assert transaction_manager.last_acked_offset(batch.topic_partition) == -1 - sender._complete_batch(batch, PartitionResponse(base_offset=42, log_append_time=-1)) + sender._complete_batch(batch, _partition_response(base_offset=42, log_append_time=-1)) # last_offset = base_offset(42) + record_count(1) - 1 = 42 assert transaction_manager.last_acked_offset(batch.topic_partition) == 42 @@ -895,8 +945,8 @@ def test_retention_based_unknown_producer_id_retries(self, sender, accumulator, batch = producer_batch() # Broker's log_start_offset is 100 — way past our last acked - sender._complete_batch(batch, PartitionResponse( - error=Errors.UnknownProducerIdError, + sender._complete_batch(batch, _partition_response( + error_cls=Errors.UnknownProducerIdError, base_offset=-1, log_start_offset=100, )) @@ -924,8 +974,8 @@ def test_real_data_loss_unknown_producer_id_fails(self, sender, accumulator, tra future = FutureRecordMetadata(batch.produce_future, -1, -1, -1, -1, -1, -1) # Broker's log_start_offset is 50 — within our acked range → real data loss - sender._complete_batch(batch, PartitionResponse( - error=Errors.UnknownProducerIdError, + sender._complete_batch(batch, _partition_response( + error_cls=Errors.UnknownProducerIdError, base_offset=-1, log_start_offset=50, )) @@ -946,8 +996,8 @@ def test_unknown_producer_id_without_log_start_offset_fails(self, sender, accumu batch = producer_batch() # Old broker response: log_start_offset = -1 (unknown) - sender._complete_batch(batch, PartitionResponse( - error=Errors.UnknownProducerIdError, + sender._complete_batch(batch, _partition_response( + error_cls=Errors.UnknownProducerIdError, base_offset=-1, log_start_offset=-1, )) @@ -959,8 +1009,8 @@ def test_unknown_producer_id_without_transaction_manager_fails(self, sender, acc """UnknownProducerIdError without transaction_manager falls through to normal failure path.""" mocker.patch.object(accumulator, 'reenqueue') batch = producer_batch() - sender._complete_batch(batch, PartitionResponse( - error=Errors.UnknownProducerIdError, + sender._complete_batch(batch, _partition_response( + error_cls=Errors.UnknownProducerIdError, log_start_offset=100, )) accumulator.reenqueue.assert_not_called()