Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 100 additions & 38 deletions kafka/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


class Future:
__slots__ = ('is_done', 'value', 'exception', '_callbacks', '_errbacks', '_lock')
error_on_callbacks = False # and errbacks

def __init__(self):
Expand All @@ -17,10 +18,10 @@ def __init__(self):
self._lock = threading.Lock()

def succeeded(self):
return self.is_done and not bool(self.exception)
return self.is_done and self.exception is None

def failed(self):
return self.is_done and bool(self.exception)
return self.is_done and self.exception is not None

def retriable(self):
try:
Expand All @@ -29,47 +30,118 @@ def retriable(self):
return False

def success(self, value):
assert not self.is_done, 'Future is already complete'
with self._lock:
self.value = value
self.is_done = True
if self._callbacks:
self._call_backs('callback', self._callbacks, self.value)
# Hot path: called once per produced record via the sender thread's
# batch-completion callback chain. Kept tight: explicit acquire/
# release (cheaper than `with`), callbacks snapshot under the lock,
# dispatch outside the lock, inlined callback loop (avoids an extra
# Python frame per completion).
lock = self._lock
lock.acquire()
self.value = value
self.is_done = True
callbacks = self._callbacks
lock.release()
if callbacks:
error_on_callbacks = self.error_on_callbacks
for f in callbacks:
try:
f(value)
except Exception as e:
log.exception('Error processing callback')
if error_on_callbacks:
raise e
return self

def failure(self, e):
assert not self.is_done, 'Future is already complete'
exception = e if type(e) is not type else e()
assert isinstance(exception, BaseException), (
'future failed without an exception')
with self._lock:
self.exception = exception
self.is_done = True
self._call_backs('errback', self._errbacks, self.exception)
lock = self._lock
lock.acquire()
self.exception = exception
self.is_done = True
errbacks = self._errbacks
lock.release()
if errbacks:
error_on_callbacks = self.error_on_callbacks
for f in errbacks:
try:
f(exception)
except Exception as err:
log.exception('Error processing errback')
if error_on_callbacks:
raise err
return self

def add_callback(self, f, *args, **kwargs):
if args or kwargs:
f = functools.partial(f, *args, **kwargs)
with self._lock:
if not self.is_done:
self._callbacks.append(f)
elif self.succeeded():
self._lock.release()
self._call_backs('callback', [f], self.value)
self._lock.acquire()
lock = self._lock
lock.acquire()
if not self.is_done:
self._callbacks.append(f)
lock.release()
return self
lock.release()
if self.exception is None:
try:
f(self.value)
except Exception as e:
log.exception('Error processing callback')
if self.error_on_callbacks:
raise e
return self

def add_errback(self, f, *args, **kwargs):
if args or kwargs:
f = functools.partial(f, *args, **kwargs)
with self._lock:
if not self.is_done:
self._errbacks.append(f)
elif self.failed():
self._lock.release()
self._call_backs('errback', [f], self.exception)
self._lock.acquire()
lock = self._lock
lock.acquire()
if not self.is_done:
self._errbacks.append(f)
lock.release()
return self
lock.release()
if self.exception is not None:
try:
f(self.exception)
except Exception as e:
log.exception('Error processing errback')
if self.error_on_callbacks:
raise e
return self

def _add_cb_eb(self, cb, eb):
"""Register a (callback, errback) pair under a single lock acquire.

Fast path for call sites that always register both a plain callback
and errback with no ``*args``/``**kwargs``. Used on the producer hot
path (``FutureRecordMetadata`` -> ``FutureProduceResult``) to halve
the per-record lock-acquire count vs. calling ``add_callback()`` +
``add_errback()`` separately.
"""
lock = self._lock
lock.acquire()
if not self.is_done:
self._callbacks.append(cb)
self._errbacks.append(eb)
lock.release()
return self
lock.release()
if self.exception is None:
try:
cb(self.value)
except Exception as e:
log.exception('Error processing callback')
if self.error_on_callbacks:
raise e
else:
try:
eb(self.exception)
except Exception as e:
log.exception('Error processing errback')
if self.error_on_callbacks:
raise e
return self

def add_both(self, f, *args, **kwargs):
Expand All @@ -78,8 +150,7 @@ def add_both(self, f, *args, **kwargs):
return self

def chain(self, future):
self.add_callback(future.success)
self.add_errback(future.failure)
self._add_cb_eb(future.success, future.failure)
return self

def __await__(self):
Expand All @@ -88,12 +159,3 @@ def __await__(self):
if self.exception:
raise self.exception
return self.value

def _call_backs(self, back_type, backs, value):
for f in backs:
try:
f(value)
except Exception as e:
log.exception('Error processing %s', back_type)
if self.error_on_callbacks:
raise e
9 changes: 5 additions & 4 deletions kafka/producer/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@


class FutureProduceResult(Future):
__slots__ = ('topic_partition', '_latch')

def __init__(self, topic_partition):
super().__init__()
self.topic_partition = topic_partition
Expand All @@ -28,13 +30,13 @@ def wait(self, timeout=None):


class FutureRecordMetadata(Future):
__slots__ = ('_produce_future', 'args')
def __init__(self, produce_future, batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
super().__init__()
self._produce_future = produce_future
# packing args as a tuple is a minor speed optimization
self.args = (batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size)
produce_future.add_callback(self._produce_success)
produce_future.add_errback(self.failure)
produce_future._add_cb_eb(self._produce_success, self.failure)

def _produce_success(self, result):
offset, produce_timestamp_ms, record_exceptions_fn = result
Expand Down Expand Up @@ -72,8 +74,7 @@ def rebind(self, new_produce_future, new_batch_index):
self._produce_future = new_produce_future
_, timestamp_ms, checksum, sk, sv, sh = self.args
self.args = (new_batch_index, timestamp_ms, checksum, sk, sv, sh)
new_produce_future.add_callback(self._produce_success)
new_produce_future.add_errback(self.failure)
new_produce_future._add_cb_eb(self._produce_success, self.failure)
# Wake any thread blocked in get() so it re-waits on the new future.
# The old produce_future is never completed, so its stale callbacks
# (registered in __init__) will never fire.
Expand Down
Loading