diff --git a/kafka/future.py b/kafka/future.py index 9f3a0d43a..9524ee5df 100644 --- a/kafka/future.py +++ b/kafka/future.py @@ -6,6 +6,7 @@ class Future: + __slots__ = ('is_done', 'value', 'exception', '_callbacks', '_errbacks', '_lock') error_on_callbacks = False # and errbacks def __init__(self): @@ -17,10 +18,10 @@ def __init__(self): self._lock = threading.Lock() def succeeded(self): - return self.is_done and not bool(self.exception) + return self.is_done and self.exception is None def failed(self): - return self.is_done and bool(self.exception) + return self.is_done and self.exception is not None def retriable(self): try: @@ -29,47 +30,118 @@ def retriable(self): return False def success(self, value): - assert not self.is_done, 'Future is already complete' - with self._lock: - self.value = value - self.is_done = True - if self._callbacks: - self._call_backs('callback', self._callbacks, self.value) + # Hot path: called once per produced record via the sender thread's + # batch-completion callback chain. Kept tight: explicit acquire/ + # release (cheaper than `with`), callbacks snapshot under the lock, + # dispatch outside the lock, inlined callback loop (avoids an extra + # Python frame per completion). + lock = self._lock + lock.acquire() + self.value = value + self.is_done = True + callbacks = self._callbacks + lock.release() + if callbacks: + error_on_callbacks = self.error_on_callbacks + for f in callbacks: + try: + f(value) + except Exception as e: + log.exception('Error processing callback') + if error_on_callbacks: + raise e return self def failure(self, e): - assert not self.is_done, 'Future is already complete' exception = e if type(e) is not type else e() assert isinstance(exception, BaseException), ( 'future failed without an exception') - with self._lock: - self.exception = exception - self.is_done = True - self._call_backs('errback', self._errbacks, self.exception) + lock = self._lock + lock.acquire() + self.exception = exception + self.is_done = True + errbacks = self._errbacks + lock.release() + if errbacks: + error_on_callbacks = self.error_on_callbacks + for f in errbacks: + try: + f(exception) + except Exception as err: + log.exception('Error processing errback') + if error_on_callbacks: + raise err return self def add_callback(self, f, *args, **kwargs): if args or kwargs: f = functools.partial(f, *args, **kwargs) - with self._lock: - if not self.is_done: - self._callbacks.append(f) - elif self.succeeded(): - self._lock.release() - self._call_backs('callback', [f], self.value) - self._lock.acquire() + lock = self._lock + lock.acquire() + if not self.is_done: + self._callbacks.append(f) + lock.release() + return self + lock.release() + if self.exception is None: + try: + f(self.value) + except Exception as e: + log.exception('Error processing callback') + if self.error_on_callbacks: + raise e return self def add_errback(self, f, *args, **kwargs): if args or kwargs: f = functools.partial(f, *args, **kwargs) - with self._lock: - if not self.is_done: - self._errbacks.append(f) - elif self.failed(): - self._lock.release() - self._call_backs('errback', [f], self.exception) - self._lock.acquire() + lock = self._lock + lock.acquire() + if not self.is_done: + self._errbacks.append(f) + lock.release() + return self + lock.release() + if self.exception is not None: + try: + f(self.exception) + except Exception as e: + log.exception('Error processing errback') + if self.error_on_callbacks: + raise e + return self + + def _add_cb_eb(self, cb, eb): + """Register a (callback, errback) pair under a single lock acquire. + + Fast path for call sites that always register both a plain callback + and errback with no ``*args``/``**kwargs``. Used on the producer hot + path (``FutureRecordMetadata`` -> ``FutureProduceResult``) to halve + the per-record lock-acquire count vs. calling ``add_callback()`` + + ``add_errback()`` separately. + """ + lock = self._lock + lock.acquire() + if not self.is_done: + self._callbacks.append(cb) + self._errbacks.append(eb) + lock.release() + return self + lock.release() + if self.exception is None: + try: + cb(self.value) + except Exception as e: + log.exception('Error processing callback') + if self.error_on_callbacks: + raise e + else: + try: + eb(self.exception) + except Exception as e: + log.exception('Error processing errback') + if self.error_on_callbacks: + raise e return self def add_both(self, f, *args, **kwargs): @@ -78,8 +150,7 @@ def add_both(self, f, *args, **kwargs): return self def chain(self, future): - self.add_callback(future.success) - self.add_errback(future.failure) + self._add_cb_eb(future.success, future.failure) return self def __await__(self): @@ -88,12 +159,3 @@ def __await__(self): if self.exception: raise self.exception return self.value - - def _call_backs(self, back_type, backs, value): - for f in backs: - try: - f(value) - except Exception as e: - log.exception('Error processing %s', back_type) - if self.error_on_callbacks: - raise e diff --git a/kafka/producer/future.py b/kafka/producer/future.py index a97acbad5..dfc6b0c33 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -7,6 +7,8 @@ class FutureProduceResult(Future): + __slots__ = ('topic_partition', '_latch') + def __init__(self, topic_partition): super().__init__() self.topic_partition = topic_partition @@ -28,13 +30,13 @@ def wait(self, timeout=None): class FutureRecordMetadata(Future): + __slots__ = ('_produce_future', 'args') def __init__(self, produce_future, batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size): super().__init__() self._produce_future = produce_future # packing args as a tuple is a minor speed optimization self.args = (batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size) - produce_future.add_callback(self._produce_success) - produce_future.add_errback(self.failure) + produce_future._add_cb_eb(self._produce_success, self.failure) def _produce_success(self, result): offset, produce_timestamp_ms, record_exceptions_fn = result @@ -72,8 +74,7 @@ def rebind(self, new_produce_future, new_batch_index): self._produce_future = new_produce_future _, timestamp_ms, checksum, sk, sv, sh = self.args self.args = (new_batch_index, timestamp_ms, checksum, sk, sv, sh) - new_produce_future.add_callback(self._produce_success) - new_produce_future.add_errback(self.failure) + new_produce_future._add_cb_eb(self._produce_success, self.failure) # Wake any thread blocked in get() so it re-waits on the new future. # The old produce_future is never completed, so its stale callbacks # (registered in __init__) will never fire.