Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 12 additions & 40 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from kafka import errors as Errors
from kafka.metrics.measurable import AnonMeasurable
from kafka.metrics.stats import Avg, Max, Rate
from kafka.producer.transaction_manager import ProducerIdAndEpoch, TransactionManager
from kafka.protocol.producer import InitProducerIdRequest, ProduceRequest, ProduceResponse
from kafka.producer.transaction_manager import TransactionManager
from kafka.protocol.producer import ProduceRequest, ProduceResponse
from kafka.structs import TopicPartition
from kafka.util import ensure_valid_topic_name
from kafka.version import __version__
Expand Down Expand Up @@ -147,10 +147,15 @@ def run_once(self):

if self._transaction_manager:
try:
if not self._transaction_manager.is_transactional():
# this is an idempotent producer, so make sure we have a producer id
self._maybe_wait_for_producer_id()
elif self._transaction_manager.has_in_flight_transactional_request() or self._maybe_send_transactional_request():
if (not self._transaction_manager.is_transactional()
and not self._transaction_manager.has_producer_id()):
# Idempotent producer: ensure an InitProducerIdHandler is
# enqueued. Dispatch happens below via the same handler-queue
# path used for transactional requests; the produce gate
# below blocks new sends until the response arrives.
self._transaction_manager.init_producer_id()

if self._transaction_manager.has_in_flight_transactional_request() or self._maybe_send_pending_request():
# as long as there are outstanding transactional requests, we simply wait for them to return
self._client.poll(timeout_ms=self.config['retry_backoff_ms'])
return
Expand Down Expand Up @@ -284,7 +289,7 @@ def _send_producer_data(self, now=None):
self._failed_produce, batches, node_id))
return poll_timeout_ms

def _maybe_send_transactional_request(self):
def _maybe_send_pending_request(self):
if self._transaction_manager.is_completing() and self._accumulator.has_incomplete:
if self._transaction_manager.is_aborting():
self._accumulator.abort_undrained_batches(Errors.KafkaError("Failing batch since transaction was aborted"))
Expand Down Expand Up @@ -365,39 +370,6 @@ def add_topic(self, topic):
self._topics_to_add.add(topic)
self.wakeup()

def _maybe_wait_for_producer_id(self):
while not self._transaction_manager.has_producer_id():
try:
node_id = self._client.least_loaded_node()
if node_id is None or not self._client.await_ready(node_id):
log.debug("%s, Could not find an available broker to send InitProducerIdRequest to." +
" Will back off and try again.", str(self))
time.sleep(self._client.least_loaded_node_refresh_ms() / 1000)
return
version = self._client.api_version(InitProducerIdRequest, max_version=1)
request = InitProducerIdRequest[version](
transactional_id=self.config['transactional_id'],
transaction_timeout_ms=self.config['transaction_timeout_ms'],
)
response = self._client.send_and_receive(node_id, request)
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
self._transaction_manager.set_producer_id_and_epoch(ProducerIdAndEpoch(response.producer_id, response.producer_epoch))
break
elif getattr(error_type, 'retriable', False):
log.debug("%s: Retriable error from InitProducerId response: %s", str(self), error_type.__name__)
if getattr(error_type, 'invalid_metadata', False):
self._metadata.request_update()
else:
self._transaction_manager.transition_to_fatal_error(error_type())
break
except Errors.KafkaConnectionError:
log.debug("%s: Broker %s disconnected while awaiting InitProducerId response", str(self), node_id)
except Errors.RequestTimedOutError:
log.debug("%s: InitProducerId request to node %s timed out", str(self), node_id)
log.debug("%s: Retry InitProducerIdRequest in %sms.", str(self), self.config['retry_backoff_ms'])
time.sleep(self.config['retry_backoff_ms'] / 1000)

def _failed_produce(self, batches, node_id, error):
log.error("%s: Error sending produce request to node %d: %s", str(self), node_id, error) # trace
for batch in batches:
Expand Down
28 changes: 28 additions & 0 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,26 @@ def initialize_transactions(self):
self._enqueue_request(handler)
return handler.result

def init_producer_id(self):
"""Idempotent (non-transactional) producer: enqueue an InitProducerIdHandler.

Drives UNINITIALIZED -> INITIALIZING; the handler completes the
transition to READY on success. No-op outside UNINITIALIZED so
repeated calls from the sender's run loop are safe.
"""
with self._lock:
if self.is_transactional():
raise Errors.IllegalStateError(
"init_producer_id is for idempotent (non-transactional) producers;"
" use initialize_transactions for transactional producers")
if self._current_state != TransactionState.UNINITIALIZED:
return
self._transition_to(TransactionState.INITIALIZING)
self.set_producer_id_and_epoch(ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH))
self._sequence_numbers.clear()
handler = InitProducerIdHandler(self, 0)
self._enqueue_request(handler)

def begin_transaction(self):
with self._lock:
self._ensure_transactional()
Expand Down Expand Up @@ -895,6 +915,14 @@ def __init__(self, transaction_manager, transaction_timeout_ms, is_epoch_bump=Fa
def priority(self):
return Priority.INIT_PRODUCER_ID

@property
def coordinator_type(self):
# Idempotent (non-transactional) producers don't have a transaction
# coordinator -- InitProducerIdRequest can be sent to any broker.
if self.transaction_manager.transactional_id is None:
return None
return 'transaction'

def handle_response(self, response):
error_type = Errors.for_code(response.error_code)

Expand Down
7 changes: 0 additions & 7 deletions test/producer/test_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,6 @@ def test_failed_produce(sender, mocker):
])


def test_maybe_wait_for_producer_id():
pass


def test_run_once():
pass

Expand Down Expand Up @@ -1159,9 +1155,6 @@ def test_sender_loop_gates_on_bumping_state(self, sender, accumulator, mocker):
tm._current_state = _TS.BUMPING_PRODUCER_EPOCH
mocker.patch.object(sender, '_send_producer_data')
mocker.patch.object(sender._client, 'poll')
# is_transactional() is False (no transactional_id), so the sender
# runs _maybe_wait_for_producer_id() -- mock that to a no-op
mocker.patch.object(sender, '_maybe_wait_for_producer_id')

sender.run_once()

Expand Down
54 changes: 54 additions & 0 deletions test/producer/test_transaction_manager_mock_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,60 @@ def test_bump_invalid_epoch_falls_back_to_fresh_init(self, broker, client):
assert len(fallbacks) == 1
assert fallbacks[0]._is_epoch_bump is False

def test_idempotent_producer_init_then_bump(self, broker, client):
"""Idempotent (non-transactional) producer drives the same state
machine as transactional: UNINITIALIZED -> INITIALIZING -> READY
on init, then READY -> BUMPING_PRODUCER_EPOCH -> READY on a KIP-360
epoch bump. Reproduces the failure mode from
test_idempotent_producer_high_throughput where the idempotent
producer was never driven out of UNINITIALIZED, causing a bump from
an OUT_OF_ORDER_SEQUENCE_NUMBER produce response to raise an
invalid-state-transition KafkaError.
"""
tm = TransactionManager(
transactional_id=None,
api_version=_API_VERSION,
metadata=client.cluster,
)
assert tm._current_state == TransactionState.UNINITIALIZED
assert not tm.has_producer_id()

# Step 1: enqueue the initial InitProducerIdHandler.
tm.init_producer_id()
assert tm._current_state == TransactionState.INITIALIZING

broker.respond(InitProducerIdResponse, InitProducerIdResponse(
throttle_time_ms=0,
error_code=0,
producer_id=_PRODUCER_ID,
producer_epoch=_PRODUCER_EPOCH,
))
_, future = _dispatch_next(client, tm)
_poll_for_future(client, future)

# Init complete: producer_id set, state advanced to READY.
assert tm._current_state == TransactionState.READY
assert tm.producer_id_and_epoch.producer_id == _PRODUCER_ID
assert tm.producer_id_and_epoch.epoch == _PRODUCER_EPOCH

# Step 2: simulate the OUT_OF_ORDER_SEQUENCE_NUMBER recovery path.
tm.bump_producer_id_and_epoch()
assert tm.is_bumping_epoch()

broker.respond(InitProducerIdResponse, InitProducerIdResponse(
throttle_time_ms=0,
error_code=0,
producer_id=_PRODUCER_ID,
producer_epoch=_PRODUCER_EPOCH + 1,
))
_, future = _dispatch_next(client, tm)
_poll_for_future(client, future)

# Bump complete: same producer_id, epoch incremented, READY again.
assert tm._current_state == TransactionState.READY
assert tm.producer_id_and_epoch.producer_id == _PRODUCER_ID
assert tm.producer_id_and_epoch.epoch == _PRODUCER_EPOCH + 1


# ---------------------------------------------------------------------------
# AddPartitionsToTxnHandler
Expand Down
Loading