Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion kafka/protocol/api_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(cls, name, bases, attrs, **kw):


class ApiMessage(DataContainer, metaclass=ApiMessageData, init=False):
__slots__ = ('_header')
__slots__ = ('_header', '_min_version', '_max_version')

def __init_subclass__(cls, **kw):
super().__init_subclass__(**kw)
Expand All @@ -89,6 +89,8 @@ def __new__(cls, *args, **kwargs):

def __init__(self, *args, **kwargs):
self._header = None
self._min_version = kwargs.pop('min_version', None)
self._max_version = kwargs.pop('max_version', None)
super().__init__(*args, **kwargs)

@classproperty
Expand Down
41 changes: 25 additions & 16 deletions kafka/protocol/broker_version_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import namedtuple
import inspect
import logging
import functools

Expand Down Expand Up @@ -54,42 +55,50 @@ def _clean_broker_version(self, broker_version):
str(broker_version), str_version)
return broker_version

def api_version(self, operation, max_version=None):
def api_version(self, operation, min_version=0, max_version=float('inf')):
"""Find the latest version of the protocol operation supported by both
this library and the broker.

This resolves to the lesser of either the latest api version this
library supports, or the max version supported by the broker.
This resolves to the lesser of the latest api version this
library supports, the max version supported by the broker,
or optionally a max_version provided by the caller, or specified on
the request instance.

Arguments:
operation: A list of protocol operation versions from kafka.protocol.
operation: A protocol request class or instance from kafka.protocol.

Keyword Arguments:
max_version (int, optional): Provide an alternate maximum api version
min_version (int, optional): Provide an alternate minimum api version.
max_version (int, optional): Provide an alternate maximum api version.
to reflect limitations in user code.

Returns:
int: The highest api version number compatible between client and broker.

Raises: IncompatibleBrokerVersion if no matching version is found
"""
# Cap max_version at the largest available version in operation list
max_version = min(operation.max_version, max_version if max_version is not None else float('inf'))
assert min_version <= max_version
# if _max_version is a data descriptor, operation is a protocol class so no request min/max
if inspect.isdatadescriptor(operation._max_version):
request_max = float('inf')
request_min = 0
else:
request_max = operation._max_version if operation._max_version is not None else float('inf')
request_min = operation._min_version if operation._min_version is not None else 0
max_version = min(max_version, operation.max_version, request_max)
min_version = max(min_version, operation.min_version, request_min)
broker_api_versions = self.api_versions
api_key = operation.API_KEY
if broker_api_versions is None or api_key not in broker_api_versions:
raise Errors.IncompatibleBrokerVersion(
"Kafka broker does not support the '{}' Kafka protocol."
.format(operation.name))
f"Kafka broker does not support the '{operation.name}' Kafka protocol.")
broker_min_version, broker_max_version = broker_api_versions[api_key]
version = min(max_version, broker_max_version)
if version < broker_min_version:
# max library version is less than min broker version. Currently,
# no Kafka versions specify a min msg version. Maybe in the future?
if min_version > broker_max_version or max_version < broker_min_version:
raise Errors.IncompatibleBrokerVersion(
"No version of the '{}' Kafka protocol is supported by both the client and broker."
.format(operation.name))
return version
f"No version of the '{operation.name}' Kafka protocol is supported by both" \
f" the client [{min_version}-{max_version}]" \
f" and broker [{broker_min_version}-{broker_max_version}].")
return min(max_version, broker_max_version)

def __str__(self):
return '<BrokerVersionData %s>' % '.'.join(map(str, self.broker_version))
Expand Down
189 changes: 189 additions & 0 deletions test/protocol/test_broker_version_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""Tests for BrokerVersionData — version resolution, construction, and edge cases."""

import pytest

from kafka.errors import IncompatibleBrokerVersion, KafkaConfigurationError, UnrecognizedBrokerVersion
from kafka.protocol.broker_version_data import BrokerVersionData, BROKER_API_VERSIONS
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.admin import CreateTopicsRequest, CreatePartitionsRequest
from kafka.protocol.consumer import FetchRequest
from kafka.protocol.producer import ProduceRequest


# ---------------------------------------------------------------------------
# Construction
# ---------------------------------------------------------------------------


class TestBrokerVersionDataConstruction:
def test_from_broker_version_tuple(self):
bvd = BrokerVersionData((2, 8))
assert bvd.broker_version == (2, 8)
assert bvd.api_versions is not None
assert isinstance(bvd.api_versions, dict)

def test_from_api_versions_dict(self):
api_versions = {0: (0, 9), 1: (0, 11), 3: (0, 9)}
bvd = BrokerVersionData(api_versions=api_versions)
assert bvd.api_versions == api_versions
assert bvd.broker_version is not None # inferred

def test_no_args_raises(self):
with pytest.raises(ValueError):
BrokerVersionData()

def test_invalid_string_raises(self):
with pytest.raises((ValueError, KafkaConfigurationError)):
BrokerVersionData(broker_version="not-a-tuple")

def test_invalid_type_raises(self):
with pytest.raises(KafkaConfigurationError):
BrokerVersionData(broker_version=[2, 8])

def test_unknown_version_falls_back(self):
# A version that doesn't exist exactly should fall back to the
# nearest known version that is <=.
bvd = BrokerVersionData((4, 99))
assert bvd.broker_version <= (4, 99)
assert bvd.api_versions is not None

def test_unrecognized_ancient_version_raises(self):
with pytest.raises(UnrecognizedBrokerVersion):
BrokerVersionData((0, 1))

def test_string_version_accepted_with_warning(self):
# Legacy string format still works
bvd = BrokerVersionData('2.8.2')
assert bvd.broker_version == (2, 8)

def test_patched_version_normalized(self):
# (0, 10) is ambiguous; should normalize to (0, 10, 0)
bvd = BrokerVersionData((0, 10))
assert bvd.broker_version == (0, 10, 0)

def test_all_known_versions_construct(self):
for version in BROKER_API_VERSIONS:
bvd = BrokerVersionData(version)
assert bvd.broker_version == version


# ---------------------------------------------------------------------------
# api_version resolution
# ---------------------------------------------------------------------------


class TestApiVersionResolution:
@pytest.fixture
def bvd(self):
return BrokerVersionData((4, 2))

def test_basic_resolution(self, bvd):
version = bvd.api_version(MetadataRequest)
assert isinstance(version, int)
assert MetadataRequest.min_version <= version <= MetadataRequest.max_version

def test_max_version_cap(self, bvd):
# Caller-provided cap
version = bvd.api_version(MetadataRequest, max_version=3)
assert version <= 3

def test_min_version_floor(self, bvd):
version = bvd.api_version(MetadataRequest, min_version=5)
assert version >= 5

def test_min_exceeds_broker_max_raises(self):
# Use an old broker that only supports low versions
bvd = BrokerVersionData((0, 9, 0, 1))
# FetchRequest on 0.9 supports some versions, but min_version=999 can't be satisfied
with pytest.raises(IncompatibleBrokerVersion):
bvd.api_version(FetchRequest, min_version=999)

def test_max_below_broker_min_raises(self, bvd):
# ProduceRequest on 4.2 has some min version; cap below it
broker_min = bvd.api_versions[ProduceRequest.API_KEY][0]
if broker_min > 0:
with pytest.raises(IncompatibleBrokerVersion):
bvd.api_version(ProduceRequest, max_version=broker_min - 1)

def test_unsupported_api_raises(self):
# Construct a BrokerVersionData with a minimal api_versions dict
# that doesn't include CreateTopicsRequest
bvd = BrokerVersionData(api_versions={0: (0, 9)}) # only ProduceRequest
with pytest.raises(IncompatibleBrokerVersion, match="does not support"):
bvd.api_version(CreateTopicsRequest)

def test_min_max_inversion_asserts(self, bvd):
with pytest.raises(AssertionError):
bvd.api_version(MetadataRequest, min_version=10, max_version=3)

def test_request_instance_max_version(self, bvd):
"""Per-instance _max_version on a request caps the resolved version."""
# MetadataRequest on (4,2) broker supports v0-v13; cap to v3
request = MetadataRequest(topics=None, max_version=3)
version = bvd.api_version(request)
assert version <= 3

def test_request_instance_min_version(self, bvd):
"""Per-instance _min_version on a request sets a floor."""
request = MetadataRequest(topics=None, min_version=5)
version = bvd.api_version(request)
assert version >= 5

def test_request_instance_min_max_version(self, bvd):
"""Per-instance min and max narrow the range."""
request = MetadataRequest(topics=None, min_version=3, max_version=5)
version = bvd.api_version(request)
assert 3 <= version <= 5

def test_request_instance_version_no_override(self, bvd):
"""Request without instance min/max uses full range."""
request = MetadataRequest(topics=None)
version = bvd.api_version(request)
assert version == min(MetadataRequest.max_version,
bvd.api_versions[MetadataRequest.API_KEY][1])

def test_caller_max_and_instance_max_both_applied(self, bvd):
"""The effective max is min(caller_max, instance_max, class_max, broker_max)."""
# MetadataRequest on (4,2) supports v0-v13
request = MetadataRequest(topics=None, max_version=8)
# Caller provides a tighter cap than the instance
version = bvd.api_version(request, max_version=5)
assert version <= 5

# Instance provides a tighter cap than the caller
request2 = MetadataRequest(topics=None, max_version=3)
version2 = bvd.api_version(request2, max_version=8)
assert version2 <= 3

def test_class_vs_instance(self, bvd):
"""Passing the class (not an instance) should work without instance min/max."""
version_from_class = bvd.api_version(MetadataRequest)
version_from_instance = bvd.api_version(MetadataRequest(topics=None))
assert version_from_class == version_from_instance


# ---------------------------------------------------------------------------
# Comparison and string representation
# ---------------------------------------------------------------------------


class TestBrokerVersionDataComparison:
def test_equality(self):
a = BrokerVersionData((2, 8))
b = BrokerVersionData((2, 8))
assert a == b

def test_inequality(self):
a = BrokerVersionData((2, 8))
b = BrokerVersionData((3, 0))
assert a != b

def test_ordering(self):
a = BrokerVersionData((2, 8))
b = BrokerVersionData((3, 0))
assert a < b
assert b > a

def test_str(self):
bvd = BrokerVersionData((2, 8))
assert '2.8' in str(bvd)
Loading