diff --git a/kafka/protocol/api_message.py b/kafka/protocol/api_message.py index bea8be812..f0dbcb132 100644 --- a/kafka/protocol/api_message.py +++ b/kafka/protocol/api_message.py @@ -63,7 +63,7 @@ def __init__(cls, name, bases, attrs, **kw): class ApiMessage(DataContainer, metaclass=ApiMessageData, init=False): - __slots__ = ('_header') + __slots__ = ('_header', '_min_version', '_max_version') def __init_subclass__(cls, **kw): super().__init_subclass__(**kw) @@ -89,6 +89,8 @@ def __new__(cls, *args, **kwargs): def __init__(self, *args, **kwargs): self._header = None + self._min_version = kwargs.pop('min_version', None) + self._max_version = kwargs.pop('max_version', None) super().__init__(*args, **kwargs) @classproperty diff --git a/kafka/protocol/broker_version_data.py b/kafka/protocol/broker_version_data.py index f32cf888a..b1b863acd 100644 --- a/kafka/protocol/broker_version_data.py +++ b/kafka/protocol/broker_version_data.py @@ -1,4 +1,5 @@ from collections import namedtuple +import inspect import logging import functools @@ -54,18 +55,21 @@ def _clean_broker_version(self, broker_version): str(broker_version), str_version) return broker_version - def api_version(self, operation, max_version=None): + def api_version(self, operation, min_version=0, max_version=float('inf')): """Find the latest version of the protocol operation supported by both this library and the broker. - This resolves to the lesser of either the latest api version this - library supports, or the max version supported by the broker. + This resolves to the lesser of the latest api version this + library supports, the max version supported by the broker, + or optionally a max_version provided by the caller, or specified on + the request instance. Arguments: - operation: A list of protocol operation versions from kafka.protocol. + operation: A protocol request class or instance from kafka.protocol. Keyword Arguments: - max_version (int, optional): Provide an alternate maximum api version + min_version (int, optional): Provide an alternate minimum api version. + max_version (int, optional): Provide an alternate maximum api version. to reflect limitations in user code. Returns: @@ -73,23 +77,28 @@ def api_version(self, operation, max_version=None): Raises: IncompatibleBrokerVersion if no matching version is found """ - # Cap max_version at the largest available version in operation list - max_version = min(operation.max_version, max_version if max_version is not None else float('inf')) + assert min_version <= max_version + # if _max_version is a data descriptor, operation is a protocol class so no request min/max + if inspect.isdatadescriptor(operation._max_version): + request_max = float('inf') + request_min = 0 + else: + request_max = operation._max_version if operation._max_version is not None else float('inf') + request_min = operation._min_version if operation._min_version is not None else 0 + max_version = min(max_version, operation.max_version, request_max) + min_version = max(min_version, operation.min_version, request_min) broker_api_versions = self.api_versions api_key = operation.API_KEY if broker_api_versions is None or api_key not in broker_api_versions: raise Errors.IncompatibleBrokerVersion( - "Kafka broker does not support the '{}' Kafka protocol." - .format(operation.name)) + f"Kafka broker does not support the '{operation.name}' Kafka protocol.") broker_min_version, broker_max_version = broker_api_versions[api_key] - version = min(max_version, broker_max_version) - if version < broker_min_version: - # max library version is less than min broker version. Currently, - # no Kafka versions specify a min msg version. Maybe in the future? + if min_version > broker_max_version or max_version < broker_min_version: raise Errors.IncompatibleBrokerVersion( - "No version of the '{}' Kafka protocol is supported by both the client and broker." - .format(operation.name)) - return version + f"No version of the '{operation.name}' Kafka protocol is supported by both" \ + f" the client [{min_version}-{max_version}]" \ + f" and broker [{broker_min_version}-{broker_max_version}].") + return min(max_version, broker_max_version) def __str__(self): return '' % '.'.join(map(str, self.broker_version)) diff --git a/test/protocol/test_broker_version_data.py b/test/protocol/test_broker_version_data.py new file mode 100644 index 000000000..afcb49601 --- /dev/null +++ b/test/protocol/test_broker_version_data.py @@ -0,0 +1,189 @@ +"""Tests for BrokerVersionData — version resolution, construction, and edge cases.""" + +import pytest + +from kafka.errors import IncompatibleBrokerVersion, KafkaConfigurationError, UnrecognizedBrokerVersion +from kafka.protocol.broker_version_data import BrokerVersionData, BROKER_API_VERSIONS +from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.admin import CreateTopicsRequest, CreatePartitionsRequest +from kafka.protocol.consumer import FetchRequest +from kafka.protocol.producer import ProduceRequest + + +# --------------------------------------------------------------------------- +# Construction +# --------------------------------------------------------------------------- + + +class TestBrokerVersionDataConstruction: + def test_from_broker_version_tuple(self): + bvd = BrokerVersionData((2, 8)) + assert bvd.broker_version == (2, 8) + assert bvd.api_versions is not None + assert isinstance(bvd.api_versions, dict) + + def test_from_api_versions_dict(self): + api_versions = {0: (0, 9), 1: (0, 11), 3: (0, 9)} + bvd = BrokerVersionData(api_versions=api_versions) + assert bvd.api_versions == api_versions + assert bvd.broker_version is not None # inferred + + def test_no_args_raises(self): + with pytest.raises(ValueError): + BrokerVersionData() + + def test_invalid_string_raises(self): + with pytest.raises((ValueError, KafkaConfigurationError)): + BrokerVersionData(broker_version="not-a-tuple") + + def test_invalid_type_raises(self): + with pytest.raises(KafkaConfigurationError): + BrokerVersionData(broker_version=[2, 8]) + + def test_unknown_version_falls_back(self): + # A version that doesn't exist exactly should fall back to the + # nearest known version that is <=. + bvd = BrokerVersionData((4, 99)) + assert bvd.broker_version <= (4, 99) + assert bvd.api_versions is not None + + def test_unrecognized_ancient_version_raises(self): + with pytest.raises(UnrecognizedBrokerVersion): + BrokerVersionData((0, 1)) + + def test_string_version_accepted_with_warning(self): + # Legacy string format still works + bvd = BrokerVersionData('2.8.2') + assert bvd.broker_version == (2, 8) + + def test_patched_version_normalized(self): + # (0, 10) is ambiguous; should normalize to (0, 10, 0) + bvd = BrokerVersionData((0, 10)) + assert bvd.broker_version == (0, 10, 0) + + def test_all_known_versions_construct(self): + for version in BROKER_API_VERSIONS: + bvd = BrokerVersionData(version) + assert bvd.broker_version == version + + +# --------------------------------------------------------------------------- +# api_version resolution +# --------------------------------------------------------------------------- + + +class TestApiVersionResolution: + @pytest.fixture + def bvd(self): + return BrokerVersionData((4, 2)) + + def test_basic_resolution(self, bvd): + version = bvd.api_version(MetadataRequest) + assert isinstance(version, int) + assert MetadataRequest.min_version <= version <= MetadataRequest.max_version + + def test_max_version_cap(self, bvd): + # Caller-provided cap + version = bvd.api_version(MetadataRequest, max_version=3) + assert version <= 3 + + def test_min_version_floor(self, bvd): + version = bvd.api_version(MetadataRequest, min_version=5) + assert version >= 5 + + def test_min_exceeds_broker_max_raises(self): + # Use an old broker that only supports low versions + bvd = BrokerVersionData((0, 9, 0, 1)) + # FetchRequest on 0.9 supports some versions, but min_version=999 can't be satisfied + with pytest.raises(IncompatibleBrokerVersion): + bvd.api_version(FetchRequest, min_version=999) + + def test_max_below_broker_min_raises(self, bvd): + # ProduceRequest on 4.2 has some min version; cap below it + broker_min = bvd.api_versions[ProduceRequest.API_KEY][0] + if broker_min > 0: + with pytest.raises(IncompatibleBrokerVersion): + bvd.api_version(ProduceRequest, max_version=broker_min - 1) + + def test_unsupported_api_raises(self): + # Construct a BrokerVersionData with a minimal api_versions dict + # that doesn't include CreateTopicsRequest + bvd = BrokerVersionData(api_versions={0: (0, 9)}) # only ProduceRequest + with pytest.raises(IncompatibleBrokerVersion, match="does not support"): + bvd.api_version(CreateTopicsRequest) + + def test_min_max_inversion_asserts(self, bvd): + with pytest.raises(AssertionError): + bvd.api_version(MetadataRequest, min_version=10, max_version=3) + + def test_request_instance_max_version(self, bvd): + """Per-instance _max_version on a request caps the resolved version.""" + # MetadataRequest on (4,2) broker supports v0-v13; cap to v3 + request = MetadataRequest(topics=None, max_version=3) + version = bvd.api_version(request) + assert version <= 3 + + def test_request_instance_min_version(self, bvd): + """Per-instance _min_version on a request sets a floor.""" + request = MetadataRequest(topics=None, min_version=5) + version = bvd.api_version(request) + assert version >= 5 + + def test_request_instance_min_max_version(self, bvd): + """Per-instance min and max narrow the range.""" + request = MetadataRequest(topics=None, min_version=3, max_version=5) + version = bvd.api_version(request) + assert 3 <= version <= 5 + + def test_request_instance_version_no_override(self, bvd): + """Request without instance min/max uses full range.""" + request = MetadataRequest(topics=None) + version = bvd.api_version(request) + assert version == min(MetadataRequest.max_version, + bvd.api_versions[MetadataRequest.API_KEY][1]) + + def test_caller_max_and_instance_max_both_applied(self, bvd): + """The effective max is min(caller_max, instance_max, class_max, broker_max).""" + # MetadataRequest on (4,2) supports v0-v13 + request = MetadataRequest(topics=None, max_version=8) + # Caller provides a tighter cap than the instance + version = bvd.api_version(request, max_version=5) + assert version <= 5 + + # Instance provides a tighter cap than the caller + request2 = MetadataRequest(topics=None, max_version=3) + version2 = bvd.api_version(request2, max_version=8) + assert version2 <= 3 + + def test_class_vs_instance(self, bvd): + """Passing the class (not an instance) should work without instance min/max.""" + version_from_class = bvd.api_version(MetadataRequest) + version_from_instance = bvd.api_version(MetadataRequest(topics=None)) + assert version_from_class == version_from_instance + + +# --------------------------------------------------------------------------- +# Comparison and string representation +# --------------------------------------------------------------------------- + + +class TestBrokerVersionDataComparison: + def test_equality(self): + a = BrokerVersionData((2, 8)) + b = BrokerVersionData((2, 8)) + assert a == b + + def test_inequality(self): + a = BrokerVersionData((2, 8)) + b = BrokerVersionData((3, 0)) + assert a != b + + def test_ordering(self): + a = BrokerVersionData((2, 8)) + b = BrokerVersionData((3, 0)) + assert a < b + assert b > a + + def test_str(self): + bvd = BrokerVersionData((2, 8)) + assert '2.8' in str(bvd)