diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 5cf5e34b9..d4551a57d 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -593,11 +593,7 @@ async def _send_list_offsets_request(self, node_id, timestamps_and_epochs): ) log.debug("Sending ListOffsetRequest %s to broker %s", request, node_id) - try: - response = await self._manager.send(request, node_id=node_id) - except Errors.IncompatibleBrokerVersion as exc: - # TODO: push this down to connection or bvd - raise Errors.UnsupportedVersionError(exc.args[0]) from None + response = await self._manager.send(request, node_id=node_id) return self._handle_list_offsets_response(response) def _handle_list_offsets_response(self, response): diff --git a/kafka/errors.py b/kafka/errors.py index 992b0d21d..5dd8ec9f1 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -37,10 +37,6 @@ class IllegalStateError(KafkaError): pass -class IncompatibleBrokerVersion(KafkaError): - pass - - class KafkaConfigurationError(KafkaError): pass @@ -404,6 +400,10 @@ class UnsupportedVersionError(BrokerResponseError): description = 'The version of API is not supported.' +class IncompatibleBrokerVersion(UnsupportedVersionError): + """Synthetic error raised by client""" + + class TopicAlreadyExistsError(BrokerResponseError): errno = 36 message = 'TOPIC_ALREADY_EXISTS'