diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index d4551a57d..c8e44372a 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -219,7 +219,8 @@ def offsets_by_times(self, timestamps, timeout_ms=None): Raises: KafkaTimeoutError if timeout_ms provided """ - offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) + with self._client._lock: + offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) for tp in timestamps: if tp not in offsets: offsets[tp] = None @@ -344,7 +345,8 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms=None): KafkaTimeoutError if timeout_ms provided. """ timestamps = dict([(tp, timestamp) for tp in partitions]) - offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) + with self._client._lock: + offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) for tp in timestamps: offsets[tp] = offsets[tp].offset return offsets