Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 1 addition & 27 deletions test/admin/test_admin_cluster_features.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from kafka.admin import KafkaAdminClient, UpdateFeatureType
from kafka.admin import UpdateFeatureType
from kafka.errors import (
ClusterAuthorizationFailedError,
FeatureUpdateFailedError,
Expand All @@ -9,32 +9,6 @@
from kafka.protocol.admin import UpdateFeaturesRequest, UpdateFeaturesResponse
from kafka.protocol.metadata import ApiVersionsRequest, ApiVersionsResponse

from test.mock_broker import MockBroker


# ---------------------------------------------------------------------------
# fixtures
# ---------------------------------------------------------------------------


@pytest.fixture
def broker(request):
broker_version = getattr(request, 'param', (4, 2))
return MockBroker(broker_version=broker_version)


@pytest.fixture
def admin(broker):
admin = KafkaAdminClient(
kafka_client=broker.client_factory(),
bootstrap_servers='%s:%d' % (broker.host, broker.port),
request_timeout_ms=5000,
)
try:
yield admin
finally:
admin.close()


# ---------------------------------------------------------------------------
# describe_features
Expand Down
33 changes: 0 additions & 33 deletions test/admin/test_admin_cluster_log_dirs.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,11 @@
import pytest

import kafka.errors as Errors
from kafka.admin import KafkaAdminClient
from kafka.protocol.admin import (
AlterReplicaLogDirsRequest, AlterReplicaLogDirsResponse,
)
from kafka.protocol.metadata import MetadataResponse
from kafka.structs import TopicPartitionReplica

from test.mock_broker import MockBroker


@pytest.fixture
def broker(request):
broker_version = getattr(request, 'param', (4, 2))
return MockBroker(broker_version=broker_version)


@pytest.fixture
def multi_broker(broker):
Broker = MetadataResponse.MetadataResponseBroker
broker.set_metadata(brokers=[
Broker(node_id=0, host=broker.host, port=broker.port, rack=None),
Broker(node_id=1, host=broker.host, port=broker.port, rack=None),
])
return broker


@pytest.fixture
def admin(broker):
admin = KafkaAdminClient(
kafka_client=broker.client_factory(),
bootstrap_servers='%s:%d' % (broker.host, broker.port),
request_timeout_ms=5000,
)
try:
yield admin
finally:
admin.close()


def _alter_response(results):
"""results: iterable of (topic, partition, error_code)."""
Expand Down
22 changes: 0 additions & 22 deletions test/admin/test_admin_cluster_quorum.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,8 @@
import pytest

import kafka.errors as Errors
from kafka.admin import KafkaAdminClient
from kafka.protocol.admin import DescribeQuorumRequest, DescribeQuorumResponse

from test.mock_broker import MockBroker


@pytest.fixture
def broker(request):
broker_version = getattr(request, 'param', (4, 2))
return MockBroker(broker_version=broker_version)


@pytest.fixture
def admin(broker):
admin = KafkaAdminClient(
kafka_client=broker.client_factory(),
bootstrap_servers='%s:%d' % (broker.host, broker.port),
request_timeout_ms=5000,
)
try:
yield admin
finally:
admin.close()


def _quorum_response(
*,
Expand Down
7 changes: 0 additions & 7 deletions test/admin/test_admin_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@

from kafka.admin import KafkaAdminClient

from test.mock_broker import MockBroker


@pytest.fixture
def broker():
return MockBroker(broker_version=(4, 2))


@pytest.fixture
def admin(broker):
Expand Down
25 changes: 1 addition & 24 deletions test/admin/test_admin_configs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest

from kafka.admin import (
AlterConfigOp, ConfigResource, ConfigResourceType, KafkaAdminClient)
AlterConfigOp, ConfigResource, ConfigResourceType)
from kafka.errors import ClusterAuthorizationFailedError, InvalidConfigurationError
from kafka.protocol.admin import (
AlterConfigsRequest, AlterConfigsResponse,
Expand All @@ -10,9 +10,6 @@
ListConfigResourcesRequest, ListConfigResourcesResponse,
)

from test.mock_broker import MockBroker


# ConfigResourceType values (wire)
_TOPIC = ConfigResourceType.TOPIC.value

Expand All @@ -39,26 +36,6 @@ def test_config_resource():
# ---------------------------------------------------------------------------


@pytest.fixture
def broker(request):
# parametrize tests with indirect=True
broker_version = getattr(request, 'param', (4, 2))
return MockBroker(broker_version=broker_version)


@pytest.fixture
def admin(broker):
admin = KafkaAdminClient(
kafka_client=broker.client_factory(),
bootstrap_servers='%s:%d' % (broker.host, broker.port),
request_timeout_ms=5000,
)
try:
yield admin
finally:
admin.close()


def _describe_configs_response(resource_type, resource_name, configs):
"""configs: iterable of (name, value, source, read_only)."""
Result = DescribeConfigsResponse.DescribeConfigsResult
Expand Down
23 changes: 0 additions & 23 deletions test/admin/test_admin_groups.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pytest

from kafka.admin import (
KafkaAdminClient,
GroupState, GroupType, MemberToRemove,
OffsetTimestamp,
)
Expand All @@ -25,28 +24,6 @@
from kafka.protocol.consumer.group import DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID
from kafka.structs import OffsetAndMetadata, TopicPartition

from test.mock_broker import MockBroker


@pytest.fixture
def broker(request):
# parametrize tests with indirect=True
broker_version = getattr(request, 'param', (4, 2))
return MockBroker(broker_version=broker_version)


@pytest.fixture
def admin(broker):
admin = KafkaAdminClient(
kafka_client=broker.client_factory(),
bootstrap_servers='%s:%d' % (broker.host, broker.port),
request_timeout_ms=5000,
)
try:
yield admin
finally:
admin.close()


# ---------------------------------------------------------------------------
# alter_group_offsets
Expand Down
34 changes: 3 additions & 31 deletions test/admin/test_admin_partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest

from kafka.admin import KafkaAdminClient, OffsetSpec
from kafka.admin import OffsetSpec
from kafka.errors import (
UnknownTopicOrPartitionError,
IncompatibleBrokerVersion,
Expand All @@ -16,28 +16,6 @@
from kafka.protocol.metadata import MetadataResponse
from kafka.structs import TopicPartition, OffsetAndTimestamp

from test.mock_broker import MockBroker


@pytest.fixture
def broker(request):
# parametrize tests with indirect=True
broker_version = getattr(request, 'param', (4, 2))
return MockBroker(broker_version=broker_version)


@pytest.fixture
def admin(broker):
admin = KafkaAdminClient(
kafka_client=broker.client_factory(),
bootstrap_servers='%s:%d' % (broker.host, broker.port),
request_timeout_ms=5000,
)
try:
yield admin
finally:
admin.close()


# ---------------------------------------------------------------------------
# alter_partition_reassignments
Expand Down Expand Up @@ -489,7 +467,7 @@ def handler(api_key, api_version, correlation_id, request_bytes):
assert captured['request'].topics[0].partitions[0].timestamp == 1700000000
assert result[TopicPartition('topic-a', 0)].offset == 42

def test_groups_partitions_by_leader(self, broker):
def test_groups_partitions_by_leader(self, broker, admin):
# Two partitions, two different leaders => two requests.
Broker = MetadataResponse.MetadataResponseBroker
Topic = MetadataResponse.MetadataResponseTopic
Expand All @@ -510,6 +488,7 @@ def test_groups_partitions_by_leader(self, broker):
Broker(node_id=1, host=broker.host, port=broker.port, rack=None),
],
)
admin._manager.bootstrap()

captured = []

Expand All @@ -523,13 +502,6 @@ def handler(api_key, api_version, correlation_id, request_bytes):

broker.respond_fn(ListOffsetsRequest, handler)
broker.respond_fn(ListOffsetsRequest, handler)

# cant use fixture because it bootstraps eagerly
admin = KafkaAdminClient(
kafka_client=broker.client_factory(),
bootstrap_servers='%s:%d' % (broker.host, broker.port),
request_timeout_ms=5000,
)
try:
admin.list_partition_offsets({
TopicPartition('topic-a', 0): OffsetSpec.LATEST,
Expand Down
25 changes: 1 addition & 24 deletions test/admin/test_admin_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@
import pytest

from kafka.admin import (
KafkaAdminClient, ScramMechanism,
UserScramCredentialDeletion, UserScramCredentialUpsertion,
ScramMechanism, UserScramCredentialDeletion, UserScramCredentialUpsertion,
)
from kafka.errors import IllegalArgumentError
from kafka.protocol.admin import (
AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse,
DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse,
)

from test.mock_broker import MockBroker


class TestScramMechanism:

Expand Down Expand Up @@ -87,26 +84,6 @@ def test_unknown_mechanism_rejected(self):
# ---------------------------------------------------------------------------


@pytest.fixture
def broker(request):
# parametrize tests with indirect=True
broker_version = getattr(request, 'param', (4, 2))
return MockBroker(broker_version=broker_version)


@pytest.fixture
def admin(broker):
admin = KafkaAdminClient(
kafka_client=broker.client_factory(),
bootstrap_servers='%s:%d' % (broker.host, broker.port),
request_timeout_ms=5000,
)
try:
yield admin
finally:
admin.close()


class TestAlterUserScramCredentialsMockBroker:
def test_all_success_returns_none_values(self, broker, admin):
Result = AlterUserScramCredentialsResponse.AlterUserScramCredentialsResult
Expand Down
Loading
Loading