From eb33e41f7c5f8e7b8e406f9c2c47cb8808acf55a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 29 Apr 2026 12:27:25 -0700 Subject: [PATCH] consumer: guard manager.run calls with client._lock --- kafka/consumer/fetcher.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index d4551a57d..c8e44372a 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -219,7 +219,8 @@ def offsets_by_times(self, timestamps, timeout_ms=None): Raises: KafkaTimeoutError if timeout_ms provided """ - offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) + with self._client._lock: + offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) for tp in timestamps: if tp not in offsets: offsets[tp] = None @@ -344,7 +345,8 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms=None): KafkaTimeoutError if timeout_ms provided. """ timestamps = dict([(tp, timestamp) for tp in partitions]) - offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) + with self._client._lock: + offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) for tp in timestamps: offsets[tp] = offsets[tp].offset return offsets