Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 31 additions & 32 deletions kafka/protocol/broker_version_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

import kafka.errors as Errors
from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest
from kafka.protocol.consumer import OffsetFetchRequest, FetchRequest, ListOffsetsRequest
from kafka.protocol.consumer import OffsetFetchRequest, FetchRequest, ListOffsetsRequest, JoinGroupRequest
from kafka.protocol.metadata import FindCoordinatorRequest, MetadataRequest
from kafka.protocol.producer import ProduceRequest
from kafka.protocol.producer import ProduceRequest, AddPartitionsToTxnRequest

log = logging.getLogger('kafka.protocol')

Expand Down Expand Up @@ -107,40 +107,39 @@ def infer_broker_version_from_api_versions(api_versions):
test_cases = [
# format (<broker version>, <needed struct>)
# Make sure to update consumer_integration test check when adding newer versions.
# ((3, 9), FetchRequest[17]),
# ((3, 8), ProduceRequest[11]),
# ((3, 7), FetchRequest[16]),
# ((3, 6), AddPartitionsToTxnRequest[4]),
# ((3, 5), FetchRequest[15]),
# ((3, 4), StopReplicaRequest[3]), # broker-internal api...
# ((3, 3), DescribeAclsRequest[3]),
# ((3, 2), JoinGroupRequest[9]),
# ((3, 1), FetchRequest[13]),
# ((3, 0), ListOffsetsRequest[7]),
# ((2, 8), ProduceRequest[9]),
# ((2, 7), FetchRequest[12]),
# ((2, 6), ListGroupsRequest[4]),
# ((2, 5), JoinGroupRequest[7]),
((2, 6), DescribeClientQuotasRequest, 0),
((2, 5), DescribeAclsRequest, 2),
((2, 4), ProduceRequest, 8),
((2, 3), FetchRequest, 11),
((2, 2), ListOffsetsRequest, 5),
((2, 1), FetchRequest, 10),
((2, 0), FetchRequest, 8),
((1, 1), FetchRequest, 7),
((1, 0), MetadataRequest, 5),
((0, 11), MetadataRequest, 4),
((0, 10, 2), OffsetFetchRequest, 2),
((0, 10, 1), MetadataRequest, 2),
((4, 0), ListOffsetsRequest.API_KEY, 10),
((3, 9), FetchRequest.API_KEY, 17),
((3, 8), ProduceRequest.API_KEY, 11),
((3, 7), FetchRequest.API_KEY, 16),
((3, 6), AddPartitionsToTxnRequest.API_KEY, 4),
((3, 5), FetchRequest.API_KEY, 15),
((3, 4), 4, 7), # StopReplicaRequest[3]), # broker-internal api...
((3, 3), DescribeAclsRequest.API_KEY, 3),
((3, 2), JoinGroupRequest.API_KEY, 9),
((3, 1), FetchRequest.API_KEY, 13),
((3, 0), ListOffsetsRequest.API_KEY, 7),
((2, 8), ProduceRequest.API_KEY, 9),
((2, 7), FetchRequest.API_KEY, 12),
((2, 6), DescribeClientQuotasRequest.API_KEY, 0),
((2, 5), DescribeAclsRequest.API_KEY, 2),
((2, 4), ProduceRequest.API_KEY, 8),
((2, 3), FetchRequest.API_KEY, 11),
((2, 2), ListOffsetsRequest.API_KEY, 5),
((2, 1), FetchRequest.API_KEY, 10),
((2, 0), FetchRequest.API_KEY, 8),
((1, 1), FetchRequest.API_KEY, 7),
((1, 0), MetadataRequest.API_KEY, 5),
((0, 11), MetadataRequest.API_KEY, 4),
((0, 10, 2), OffsetFetchRequest.API_KEY, 2),
((0, 10, 1), MetadataRequest.API_KEY, 2),
]

# Get the best match of test cases
for broker_version, proto_struct, API_VERSION in sorted(test_cases, reverse=True):
if proto_struct.API_KEY not in api_versions:
for broker_version, api_key, version in sorted(test_cases, reverse=True):
if api_key not in api_versions:
continue
min_version, max_version = api_versions[proto_struct.API_KEY]
if min_version <= API_VERSION <= max_version:
min_version, max_version = api_versions[api_key]
if min_version <= version <= max_version:
return broker_version

# We know that ApiVersionsResponse is only supported in 0.10+
Expand Down
2 changes: 1 addition & 1 deletion test/integration/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
def test_kafka_version_infer(kafka_consumer_factory):
consumer = kafka_consumer_factory(api_version=None)
actual = BrokerVersionData(env_kafka_version())
expected = min((2, 6), actual.broker_version)
expected = min((4, 0), actual.broker_version)
assert consumer.config['api_version'] == expected, \
"Was expecting inferred broker version to be %s but was %s" % (expected, consumer.config['api_version'])

Expand Down
Loading