diff --git a/test/admin/test_admin_cluster_features.py b/test/admin/test_admin_cluster_features.py index 4e184871c..ee378c893 100644 --- a/test/admin/test_admin_cluster_features.py +++ b/test/admin/test_admin_cluster_features.py @@ -1,6 +1,6 @@ import pytest -from kafka.admin import KafkaAdminClient, UpdateFeatureType +from kafka.admin import UpdateFeatureType from kafka.errors import ( ClusterAuthorizationFailedError, FeatureUpdateFailedError, @@ -9,32 +9,6 @@ from kafka.protocol.admin import UpdateFeaturesRequest, UpdateFeaturesResponse from kafka.protocol.metadata import ApiVersionsRequest, ApiVersionsResponse -from test.mock_broker import MockBroker - - -# --------------------------------------------------------------------------- -# fixtures -# --------------------------------------------------------------------------- - - -@pytest.fixture -def broker(request): - broker_version = getattr(request, 'param', (4, 2)) - return MockBroker(broker_version=broker_version) - - -@pytest.fixture -def admin(broker): - admin = KafkaAdminClient( - kafka_client=broker.client_factory(), - bootstrap_servers='%s:%d' % (broker.host, broker.port), - request_timeout_ms=5000, - ) - try: - yield admin - finally: - admin.close() - # --------------------------------------------------------------------------- # describe_features diff --git a/test/admin/test_admin_cluster_log_dirs.py b/test/admin/test_admin_cluster_log_dirs.py index 8c2c88f0c..9e628dc0f 100644 --- a/test/admin/test_admin_cluster_log_dirs.py +++ b/test/admin/test_admin_cluster_log_dirs.py @@ -1,44 +1,11 @@ import pytest import kafka.errors as Errors -from kafka.admin import KafkaAdminClient from kafka.protocol.admin import ( AlterReplicaLogDirsRequest, AlterReplicaLogDirsResponse, ) -from kafka.protocol.metadata import MetadataResponse from kafka.structs import TopicPartitionReplica -from test.mock_broker import MockBroker - - -@pytest.fixture -def broker(request): - broker_version = getattr(request, 'param', (4, 2)) - return MockBroker(broker_version=broker_version) - - -@pytest.fixture -def multi_broker(broker): - Broker = MetadataResponse.MetadataResponseBroker - broker.set_metadata(brokers=[ - Broker(node_id=0, host=broker.host, port=broker.port, rack=None), - Broker(node_id=1, host=broker.host, port=broker.port, rack=None), - ]) - return broker - - -@pytest.fixture -def admin(broker): - admin = KafkaAdminClient( - kafka_client=broker.client_factory(), - bootstrap_servers='%s:%d' % (broker.host, broker.port), - request_timeout_ms=5000, - ) - try: - yield admin - finally: - admin.close() - def _alter_response(results): """results: iterable of (topic, partition, error_code).""" diff --git a/test/admin/test_admin_cluster_quorum.py b/test/admin/test_admin_cluster_quorum.py index d9c113cde..01a34a2a1 100644 --- a/test/admin/test_admin_cluster_quorum.py +++ b/test/admin/test_admin_cluster_quorum.py @@ -1,30 +1,8 @@ import pytest import kafka.errors as Errors -from kafka.admin import KafkaAdminClient from kafka.protocol.admin import DescribeQuorumRequest, DescribeQuorumResponse -from test.mock_broker import MockBroker - - -@pytest.fixture -def broker(request): - broker_version = getattr(request, 'param', (4, 2)) - return MockBroker(broker_version=broker_version) - - -@pytest.fixture -def admin(broker): - admin = KafkaAdminClient( - kafka_client=broker.client_factory(), - bootstrap_servers='%s:%d' % (broker.host, broker.port), - request_timeout_ms=5000, - ) - try: - yield admin - finally: - admin.close() - def _quorum_response( *, diff --git a/test/admin/test_admin_concurrent.py b/test/admin/test_admin_concurrent.py index deba175a2..377a5b230 100644 --- a/test/admin/test_admin_concurrent.py +++ b/test/admin/test_admin_concurrent.py @@ -11,13 +11,6 @@ from kafka.admin import KafkaAdminClient -from test.mock_broker import MockBroker - - -@pytest.fixture -def broker(): - return MockBroker(broker_version=(4, 2)) - @pytest.fixture def admin(broker): diff --git a/test/admin/test_admin_configs.py b/test/admin/test_admin_configs.py index 39a88b58f..750548b7d 100644 --- a/test/admin/test_admin_configs.py +++ b/test/admin/test_admin_configs.py @@ -1,7 +1,7 @@ import pytest from kafka.admin import ( - AlterConfigOp, ConfigResource, ConfigResourceType, KafkaAdminClient) + AlterConfigOp, ConfigResource, ConfigResourceType) from kafka.errors import ClusterAuthorizationFailedError, InvalidConfigurationError from kafka.protocol.admin import ( AlterConfigsRequest, AlterConfigsResponse, @@ -10,9 +10,6 @@ ListConfigResourcesRequest, ListConfigResourcesResponse, ) -from test.mock_broker import MockBroker - - # ConfigResourceType values (wire) _TOPIC = ConfigResourceType.TOPIC.value @@ -39,26 +36,6 @@ def test_config_resource(): # --------------------------------------------------------------------------- -@pytest.fixture -def broker(request): - # parametrize tests with indirect=True - broker_version = getattr(request, 'param', (4, 2)) - return MockBroker(broker_version=broker_version) - - -@pytest.fixture -def admin(broker): - admin = KafkaAdminClient( - kafka_client=broker.client_factory(), - bootstrap_servers='%s:%d' % (broker.host, broker.port), - request_timeout_ms=5000, - ) - try: - yield admin - finally: - admin.close() - - def _describe_configs_response(resource_type, resource_name, configs): """configs: iterable of (name, value, source, read_only).""" Result = DescribeConfigsResponse.DescribeConfigsResult diff --git a/test/admin/test_admin_groups.py b/test/admin/test_admin_groups.py index 9857f82ba..1c3152bff 100644 --- a/test/admin/test_admin_groups.py +++ b/test/admin/test_admin_groups.py @@ -1,7 +1,6 @@ import pytest from kafka.admin import ( - KafkaAdminClient, GroupState, GroupType, MemberToRemove, OffsetTimestamp, ) @@ -25,28 +24,6 @@ from kafka.protocol.consumer.group import DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID from kafka.structs import OffsetAndMetadata, TopicPartition -from test.mock_broker import MockBroker - - -@pytest.fixture -def broker(request): - # parametrize tests with indirect=True - broker_version = getattr(request, 'param', (4, 2)) - return MockBroker(broker_version=broker_version) - - -@pytest.fixture -def admin(broker): - admin = KafkaAdminClient( - kafka_client=broker.client_factory(), - bootstrap_servers='%s:%d' % (broker.host, broker.port), - request_timeout_ms=5000, - ) - try: - yield admin - finally: - admin.close() - # --------------------------------------------------------------------------- # alter_group_offsets diff --git a/test/admin/test_admin_partitions.py b/test/admin/test_admin_partitions.py index 95e70df6a..b67d6aec1 100644 --- a/test/admin/test_admin_partitions.py +++ b/test/admin/test_admin_partitions.py @@ -2,7 +2,7 @@ import pytest -from kafka.admin import KafkaAdminClient, OffsetSpec +from kafka.admin import OffsetSpec from kafka.errors import ( UnknownTopicOrPartitionError, IncompatibleBrokerVersion, @@ -16,28 +16,6 @@ from kafka.protocol.metadata import MetadataResponse from kafka.structs import TopicPartition, OffsetAndTimestamp -from test.mock_broker import MockBroker - - -@pytest.fixture -def broker(request): - # parametrize tests with indirect=True - broker_version = getattr(request, 'param', (4, 2)) - return MockBroker(broker_version=broker_version) - - -@pytest.fixture -def admin(broker): - admin = KafkaAdminClient( - kafka_client=broker.client_factory(), - bootstrap_servers='%s:%d' % (broker.host, broker.port), - request_timeout_ms=5000, - ) - try: - yield admin - finally: - admin.close() - # --------------------------------------------------------------------------- # alter_partition_reassignments @@ -489,7 +467,7 @@ def handler(api_key, api_version, correlation_id, request_bytes): assert captured['request'].topics[0].partitions[0].timestamp == 1700000000 assert result[TopicPartition('topic-a', 0)].offset == 42 - def test_groups_partitions_by_leader(self, broker): + def test_groups_partitions_by_leader(self, broker, admin): # Two partitions, two different leaders => two requests. Broker = MetadataResponse.MetadataResponseBroker Topic = MetadataResponse.MetadataResponseTopic @@ -510,6 +488,7 @@ def test_groups_partitions_by_leader(self, broker): Broker(node_id=1, host=broker.host, port=broker.port, rack=None), ], ) + admin._manager.bootstrap() captured = [] @@ -523,13 +502,6 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(ListOffsetsRequest, handler) broker.respond_fn(ListOffsetsRequest, handler) - - # cant use fixture because it bootstraps eagerly - admin = KafkaAdminClient( - kafka_client=broker.client_factory(), - bootstrap_servers='%s:%d' % (broker.host, broker.port), - request_timeout_ms=5000, - ) try: admin.list_partition_offsets({ TopicPartition('topic-a', 0): OffsetSpec.LATEST, diff --git a/test/admin/test_admin_users.py b/test/admin/test_admin_users.py index f316f3c2b..f1715ab3c 100644 --- a/test/admin/test_admin_users.py +++ b/test/admin/test_admin_users.py @@ -3,8 +3,7 @@ import pytest from kafka.admin import ( - KafkaAdminClient, ScramMechanism, - UserScramCredentialDeletion, UserScramCredentialUpsertion, + ScramMechanism, UserScramCredentialDeletion, UserScramCredentialUpsertion, ) from kafka.errors import IllegalArgumentError from kafka.protocol.admin import ( @@ -12,8 +11,6 @@ DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse, ) -from test.mock_broker import MockBroker - class TestScramMechanism: @@ -87,26 +84,6 @@ def test_unknown_mechanism_rejected(self): # --------------------------------------------------------------------------- -@pytest.fixture -def broker(request): - # parametrize tests with indirect=True - broker_version = getattr(request, 'param', (4, 2)) - return MockBroker(broker_version=broker_version) - - -@pytest.fixture -def admin(broker): - admin = KafkaAdminClient( - kafka_client=broker.client_factory(), - bootstrap_servers='%s:%d' % (broker.host, broker.port), - request_timeout_ms=5000, - ) - try: - yield admin - finally: - admin.close() - - class TestAlterUserScramCredentialsMockBroker: def test_all_success_returns_none_values(self, broker, admin): Result = AlterUserScramCredentialsResponse.AlterUserScramCredentialsResult diff --git a/test/consumer/test_fetcher.py b/test/consumer/test_fetcher.py index 325cc235c..04b87521d 100644 --- a/test/consumer/test_fetcher.py +++ b/test/consumer/test_fetcher.py @@ -775,20 +775,7 @@ def test_seek_before_exception(client, mocker): class TestFetchOffsetsByTimes: @pytest.fixture - def net_client(self): - from kafka.net.compat import KafkaNetClient - cli = KafkaNetClient( - bootstrap_servers=['localhost:1'], - socket_connection_timeout_ms=1000, - reconnect_backoff_ms=10, - reconnect_backoff_max_ms=100, - ) - try: - yield cli - finally: - cli.close() - - def _make_fetcher(self, client): + def fetcher(self, client): subscription_state = SubscriptionState() subscription_state.subscribe(topics=['test']) tp = TopicPartition('test', 0) @@ -796,12 +783,10 @@ def _make_fetcher(self, client): subscription_state.seek(tp, 0) return Fetcher(client, subscription_state) - def test_empty_timestamps(self, net_client): - fetcher = self._make_fetcher(net_client) + def test_empty_timestamps(self, fetcher): assert fetcher.offsets_by_times({}) == {} - def test_success_no_retry(self, net_client, mocker): - fetcher = self._make_fetcher(net_client) + def test_success_no_retry(self, fetcher, mocker): tp = TopicPartition('test', 0) timestamps = {tp: 1000} expected_offset = OffsetAndTimestamp(10, 1000, -1) @@ -813,8 +798,7 @@ def test_success_no_retry(self, net_client, mocker): result = fetcher.offsets_by_times(timestamps, timeout_ms=10000) assert result == {tp: expected_offset} - def test_success_with_retry(self, net_client, mocker): - fetcher = self._make_fetcher(net_client) + def test_success_with_retry(self, fetcher, mocker): tp0 = TopicPartition('test', 0) tp1 = TopicPartition('test', 1) timestamps = {tp0: 1000, tp1: 2000} @@ -829,8 +813,7 @@ def test_success_with_retry(self, net_client, mocker): result = fetcher.offsets_by_times(timestamps, timeout_ms=10000) assert result == {tp0: offset0, tp1: offset1} - def test_timeout_raises(self, net_client, mocker): - fetcher = self._make_fetcher(net_client) + def test_timeout_raises(self, fetcher, mocker): tp = TopicPartition('test', 0) timestamps = {tp: 1000} @@ -841,8 +824,7 @@ def test_timeout_raises(self, net_client, mocker): with pytest.raises(Errors.KafkaTimeoutError): fetcher.offsets_by_times(timestamps, timeout_ms=50) - def test_non_retriable_error_raises(self, net_client, mocker): - fetcher = self._make_fetcher(net_client) + def test_non_retriable_error_raises(self, fetcher, mocker): tp = TopicPartition('test', 0) timestamps = {tp: 1000} @@ -854,8 +836,7 @@ def test_non_retriable_error_raises(self, net_client, mocker): with pytest.raises(Errors.TopicAuthorizationFailedError): fetcher.offsets_by_times(timestamps, timeout_ms=10000) - def test_retriable_invalid_metadata_triggers_refresh(self, net_client, mocker): - fetcher = self._make_fetcher(net_client) + def test_retriable_invalid_metadata_triggers_refresh(self, fetcher, mocker): tp = TopicPartition('test', 0) timestamps = {tp: 1000} expected_offset = OffsetAndTimestamp(10, 1000, -1) @@ -874,8 +855,7 @@ def test_retriable_invalid_metadata_triggers_refresh(self, net_client, mocker): assert result == {tp: expected_offset} update_metadata_mock.assert_called_once() - def test_retriable_non_metadata_error_sleeps(self, net_client, mocker): - fetcher = self._make_fetcher(net_client) + def test_retriable_non_metadata_error_sleeps(self, fetcher, mocker): tp = TopicPartition('test', 0) timestamps = {tp: 1000} expected_offset = OffsetAndTimestamp(10, 1000, -1) @@ -898,9 +878,8 @@ def test_retriable_non_metadata_error_sleeps(self, net_client, mocker): # At least one call_later was for the retry_backoff sleep assert call_later_spy.call_count >= 1 - def test_success_does_not_check_exception(self, net_client, mocker): + def test_success_does_not_check_exception(self, fetcher, mocker): """Regression: successful future should not fall through to check future.exception.""" - fetcher = self._make_fetcher(net_client) tp0 = TopicPartition('test', 0) tp1 = TopicPartition('test', 1) timestamps = {tp0: 1000, tp1: 2000} @@ -919,8 +898,7 @@ def test_success_does_not_check_exception(self, net_client, mocker): result = fetcher.offsets_by_times(timestamps, timeout_ms=10000) assert result == {tp0: offset0, tp1: offset1} - def test_no_timeout_passes_none(self, net_client, mocker): - fetcher = self._make_fetcher(net_client) + def test_no_timeout_passes_none(self, fetcher, mocker): tp = TopicPartition('test', 0) timestamps = {tp: 1000} expected_offset = OffsetAndTimestamp(10, 1000, -1)