diff --git a/kafka/cluster.py b/kafka/cluster.py index 3f5e4ccf2..97eb5f237 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -522,8 +522,11 @@ def add_listener(self, listener): self._listeners.add(listener) def remove_listener(self, listener): - """Remove a previously added listener callback""" - self._listeners.remove(listener) + """Remove a previously added listener callback.""" + try: + self._listeners.remove(listener) + except KeyError: + pass def add_coordinator(self, response, key_type, key): """Update with metadata for a group or txn coordinator diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 1b5321c97..2b35b240f 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -907,17 +907,11 @@ def _close_heartbeat_thread(self, timeout_ms=None): pass self._heartbeat_thread = None - def __del__(self): - try: - self._close_heartbeat_thread() - except (TypeError, AttributeError): - pass - def close(self, timeout_ms=None): """Close the coordinator, leave the current group, and reset local generation / member_id""" - self._close_heartbeat_thread(timeout_ms=timeout_ms) if self.config['api_version'] >= (0, 9): + self._close_heartbeat_thread(timeout_ms=timeout_ms) self.maybe_leave_group(timeout_ms=timeout_ms) def is_dynamic_member(self): diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 6592988c9..a7a1a6fbb 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -143,14 +143,6 @@ def __init__(self, client, subscription, **configs): self._cluster.request_update() self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) - def __del__(self): - if hasattr(self, '_cluster') and self._cluster: - try: - self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) - except TypeError: - pass - super().__del__() - def protocol_type(self): return ConsumerProtocolType @@ -485,6 +477,7 @@ def close(self, autocommit=True, timeout_ms=None): try: if autocommit: self._maybe_auto_commit_offsets_sync(timeout_ms=timeout_ms) + self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) finally: super().close(timeout_ms=timeout_ms)