diff --git a/kafka/protocol/broker_version_data.py b/kafka/protocol/broker_version_data.py index ab715953d..b3ca7b186 100644 --- a/kafka/protocol/broker_version_data.py +++ b/kafka/protocol/broker_version_data.py @@ -4,9 +4,9 @@ import kafka.errors as Errors from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest -from kafka.protocol.consumer import OffsetFetchRequest, FetchRequest, ListOffsetsRequest +from kafka.protocol.consumer import OffsetFetchRequest, FetchRequest, ListOffsetsRequest, JoinGroupRequest from kafka.protocol.metadata import FindCoordinatorRequest, MetadataRequest -from kafka.protocol.producer import ProduceRequest +from kafka.protocol.producer import ProduceRequest, AddPartitionsToTxnRequest log = logging.getLogger('kafka.protocol') @@ -107,40 +107,39 @@ def infer_broker_version_from_api_versions(api_versions): test_cases = [ # format (, ) # Make sure to update consumer_integration test check when adding newer versions. - # ((3, 9), FetchRequest[17]), - # ((3, 8), ProduceRequest[11]), - # ((3, 7), FetchRequest[16]), - # ((3, 6), AddPartitionsToTxnRequest[4]), - # ((3, 5), FetchRequest[15]), - # ((3, 4), StopReplicaRequest[3]), # broker-internal api... - # ((3, 3), DescribeAclsRequest[3]), - # ((3, 2), JoinGroupRequest[9]), - # ((3, 1), FetchRequest[13]), - # ((3, 0), ListOffsetsRequest[7]), - # ((2, 8), ProduceRequest[9]), - # ((2, 7), FetchRequest[12]), - # ((2, 6), ListGroupsRequest[4]), - # ((2, 5), JoinGroupRequest[7]), - ((2, 6), DescribeClientQuotasRequest, 0), - ((2, 5), DescribeAclsRequest, 2), - ((2, 4), ProduceRequest, 8), - ((2, 3), FetchRequest, 11), - ((2, 2), ListOffsetsRequest, 5), - ((2, 1), FetchRequest, 10), - ((2, 0), FetchRequest, 8), - ((1, 1), FetchRequest, 7), - ((1, 0), MetadataRequest, 5), - ((0, 11), MetadataRequest, 4), - ((0, 10, 2), OffsetFetchRequest, 2), - ((0, 10, 1), MetadataRequest, 2), + ((4, 0), ListOffsetsRequest.API_KEY, 10), + ((3, 9), FetchRequest.API_KEY, 17), + ((3, 8), ProduceRequest.API_KEY, 11), + ((3, 7), FetchRequest.API_KEY, 16), + ((3, 6), AddPartitionsToTxnRequest.API_KEY, 4), + ((3, 5), FetchRequest.API_KEY, 15), + ((3, 4), 4, 7), # StopReplicaRequest[3]), # broker-internal api... + ((3, 3), DescribeAclsRequest.API_KEY, 3), + ((3, 2), JoinGroupRequest.API_KEY, 9), + ((3, 1), FetchRequest.API_KEY, 13), + ((3, 0), ListOffsetsRequest.API_KEY, 7), + ((2, 8), ProduceRequest.API_KEY, 9), + ((2, 7), FetchRequest.API_KEY, 12), + ((2, 6), DescribeClientQuotasRequest.API_KEY, 0), + ((2, 5), DescribeAclsRequest.API_KEY, 2), + ((2, 4), ProduceRequest.API_KEY, 8), + ((2, 3), FetchRequest.API_KEY, 11), + ((2, 2), ListOffsetsRequest.API_KEY, 5), + ((2, 1), FetchRequest.API_KEY, 10), + ((2, 0), FetchRequest.API_KEY, 8), + ((1, 1), FetchRequest.API_KEY, 7), + ((1, 0), MetadataRequest.API_KEY, 5), + ((0, 11), MetadataRequest.API_KEY, 4), + ((0, 10, 2), OffsetFetchRequest.API_KEY, 2), + ((0, 10, 1), MetadataRequest.API_KEY, 2), ] # Get the best match of test cases - for broker_version, proto_struct, API_VERSION in sorted(test_cases, reverse=True): - if proto_struct.API_KEY not in api_versions: + for broker_version, api_key, version in sorted(test_cases, reverse=True): + if api_key not in api_versions: continue - min_version, max_version = api_versions[proto_struct.API_KEY] - if min_version <= API_VERSION <= max_version: + min_version, max_version = api_versions[api_key] + if min_version <= version <= max_version: return broker_version # We know that ApiVersionsResponse is only supported in 0.10+ diff --git a/test/integration/test_consumer_integration.py b/test/integration/test_consumer_integration.py index c3f358a30..9d50f7d4e 100644 --- a/test/integration/test_consumer_integration.py +++ b/test/integration/test_consumer_integration.py @@ -16,7 +16,7 @@ def test_kafka_version_infer(kafka_consumer_factory): consumer = kafka_consumer_factory(api_version=None) actual = BrokerVersionData(env_kafka_version()) - expected = min((2, 6), actual.broker_version) + expected = min((4, 0), actual.broker_version) assert consumer.config['api_version'] == expected, \ "Was expecting inferred broker version to be %s but was %s" % (expected, consumer.config['api_version'])