Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 101 additions & 91 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import selectors
import socket
import time
import uuid

from . import ConfigResourceType

Expand Down Expand Up @@ -398,38 +399,16 @@ async def _send_request_to_controller(self, request, get_errors_fn=lambda r: (),
.format(request, response))
return response

@staticmethod
def _convert_new_topic_request(new_topic):
"""
Build the tuple required by CreateTopicsRequest from a NewTopic object.

Arguments:
new_topic: A NewTopic instance containing name, partition count, replication factor,
replica assignments, and config entries.

Returns:
A tuple in the form:
(topic_name, num_partitions, replication_factor, [(partition_id, [replicas])...],
[(config_key, config_value)...])
"""
return (
new_topic.name,
new_topic.num_partitions,
new_topic.replication_factor,
[
(partition_id, replicas) for partition_id, replicas in new_topic.replica_assignments.items()
],
[
(config_key, config_value) for config_key, config_value in new_topic.topic_configs.items()
]
)

def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True,
wait_for_metadata=False):
"""Create new topics in the cluster.

Arguments:
new_topics: A list of NewTopic objects.
new_topics: A list of topic names,
or a dict of {topic_name: {num_partitions:, replication_factor:,
assignments: {partition: [broker_ids]},
configs: {key: value}}}
List of NewTopic objects is deprecated.

Keyword Arguments:
timeout_ms (numeric, optional): Milliseconds to wait for new topics to be created
Expand All @@ -450,30 +429,56 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_
"""
if validate_only and wait_for_metadata:
raise ValueError('validate_only and wait_for_metadata are mutually exclusive')
version = self._client.api_version(CreateTopicsRequest, max_version=3)
timeout_ms = self._validate_timeout(timeout_ms)
if version == 0:
if validate_only:
raise IncompatibleBrokerVersion(
"validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
.format(self._manager.broker_version))
request = CreateTopicsRequest[version](
topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout_ms=timeout_ms
)
elif version <= 3:
request = CreateTopicsRequest[version](
topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout_ms=timeout_ms,
validate_only=validate_only
)
if validate_only and self._manager.broker_version < (0, 10, 2):
raise IncompatibleBrokerVersion(
"validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
.format(self._manager.broker_version))

_Topic = CreateTopicsRequest.CreatableTopic
_Assignment = _Topic.CreatableReplicaAssignment
_Config = _Topic.CreatableTopicConfig
topics = []
if isinstance(new_topics, dict):
# {topic_name: {num_partitions:, replication_factor:, assignments: {partition: [broker_ids]}, configs: {key: value}}
for topic, data in new_topics.items():
configs = data.get('configs', {})
topics.append(_Topic(
name=topic,
num_partitions=data.get('num_partitions', -1),
replication_factor=data.get('replication_factor', -1),
assignments=[_Assignment(partition_index=partition_id, broker_ids=replicas)
for partition_id, replicas in data.get('assignments', {}).items()],
configs=[_Config(name=config_key, value=config_value)
for config_key, config_value in data.get('configs', {}).items()]
))
elif all(isinstance(v, str) for v in new_topics):
for new_topic in new_topics:
topics.append(_Topic(name=new_topic))
else:
raise RuntimeError('Version check error: %s' % version)
# TODO convert structs to a more pythonic interface
def get_response_errors(r):
from .new_topic import NewTopic
if all(isinstance(v, NewTopic) for v in new_topics):
for new_topic in new_topics:
topics.append(_Topic(
name=new_topic.name,
num_partitions=new_topic.num_partitions,
replication_factor=new_topic.replication_factor,
assignments=[_Assignment(partition_index=partition_id, broker_ids=replicas)
for partition_id, replicas in new_topic.replica_assignments.items()],
configs=[_Config(name=config_key, value=config_value)
for config_key, config_value in new_topic.topic_configs.items()]
))
if not topics:
raise ValueError(f"No valid topics found in new_topics: {new_topics}")

request = CreateTopicsRequest(
topics=topics,
timeout_ms=timeout_ms,
validate_only=validate_only)
def response_errors(r):
for topic in r.topics:
yield Errors.for_code(topic[1])
response = self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors)
yield Errors.for_code(topic.error_code)
response = self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors)
if wait_for_metadata: # implies not validate_only
self.wait_for_topics([new_topic.name for new_topic in new_topics])
return response
Expand Down Expand Up @@ -559,7 +564,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True):
"""Delete topics from the cluster.

Arguments:
topics ([str]): A list of topic name strings.
topics ([str]): A list of topic name strings or uuid.UUID ids.

Keyword Arguments:
timeout_ms (numeric, optional): Milliseconds to wait for topics to be deleted
Expand All @@ -569,13 +574,17 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True):
Returns:
Appropriate version of DeleteTopicsResponse class.
"""
version = self._client.api_version(DeleteTopicsRequest, max_version=3)
timeout_ms = self._validate_timeout(timeout_ms)
request = DeleteTopicsRequest[version](topic_names=topics, timeout_ms=timeout_ms)
def get_response_errors(r):
_Topic = DeleteTopicsRequest.DeleteTopicState
request = DeleteTopicsRequest(
topics=[_Topic(topic_id=topic) if isinstance(topic, uuid.UUID) else _Topic(name=topic)
for topic in topics],
timeout_ms=timeout_ms)

def response_errors(r):
for response in r.responses:
yield Errors.for_code(response[1])
return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors)
yield Errors.for_code(response.error_code)
return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors)

def _process_acl_operations(self, obj):
if obj.get('authorized_operations', None) is not None:
Expand Down Expand Up @@ -1067,30 +1076,14 @@ def alter_configs(self, config_resources):
# describe log dirs protocol not yet implemented
# Note: have to lookup the broker with the replica assignment and send the request to that broker

@staticmethod
def _convert_create_partitions_request(topic_name, new_partitions):
"""Convert a NewPartitions object into the tuple format for CreatePartitionsRequest.

Arguments:
topic_name: The name of the existing topic.
new_partitions: A NewPartitions instance with total_count and new_assignments.

Returns:
A tuple: (topic_name, (total_count, [list_of_assignments])).
"""
return (
topic_name,
(
new_partitions.total_count,
new_partitions.new_assignments
)
)

def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True):
"""Create additional partitions for an existing topic.

Arguments:
topic_partitions: A map of topic name strings to NewPartition objects.
topic_partitions: A dict of topic name strings to total partition count (int),
or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}}
if manual assignment is desired.
dict of {topic_name: NewPartition} is deprecated.

Keyword Arguments:
timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be
Expand All @@ -1102,17 +1095,37 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
Returns:
Appropriate version of CreatePartitionsResponse class.
"""
version = self._client.api_version(CreatePartitionsRequest, max_version=1)
timeout_ms = self._validate_timeout(timeout_ms)
request = CreatePartitionsRequest[version](
topics=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
_Topic = CreatePartitionsRequest.CreatePartitionsTopic
_Assignment = CreatePartitionsRequest.CreatePartitionsTopic.CreatePartitionsAssignment
topics = []
for topic, count in topic_partitions.items():
if isinstance(count, int):
topics.append(_Topic(name=topic, count=count))
elif isinstance(count, dict):
topics.append(
_Topic(
name=topic,
count=count['count'],
assignments=[_Assignment(broker_ids=broker_ids)
for broker_ids in count['assignments']]))

else:
topics.append(
_Topic(
name=topic,
count=count.total_count,
assignments=[_Assignment(broker_ids=broker_ids)
for broker_ids in count.new_assignments]))
request = CreatePartitionsRequest(
topics=topics,
timeout_ms=timeout_ms,
validate_only=validate_only
)
def get_response_errors(r):
validate_only=validate_only)

def response_errors(r):
for result in r.results:
yield Errors.for_code(result[1])
return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors)
yield Errors.for_code(result.error_code)
return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors)

def _get_leader_for_partitions(self, partitions, timeout_ms=None):
"""Finds ID of the leader node for every given topic partition.
Expand Down Expand Up @@ -1644,22 +1657,19 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_
Returns:
Appropriate version of ElectLeadersResponse class.
"""
version = self._client.api_version(ElectLeadersRequest, max_version=1)
timeout_ms = self._validate_timeout(timeout_ms)
request = ElectLeadersRequest[version](
request = ElectLeadersRequest(
election_type=ElectionType(election_type),
topic_partitions=self._get_topic_partitions(topic_partitions),
timeout_ms=timeout_ms,
)
# TODO convert structs to a more pythonic interface
def get_response_errors(r):
timeout_ms=timeout_ms)
def response_errors(r):
if r.API_VERSION >= 1:
yield Errors.for_code(r.error_code)
for result in r.replica_election_results:
for partition in result[1]:
yield Errors.for_code(partition[1])
for partition in result.partition_result:
yield Errors.for_code(partition.error_code)
ignore_errors = (Errors.ElectionNotNeededError,)
return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors, ignore_errors)
return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors)

def describe_log_dirs(self):
"""Send a DescribeLogDirsRequest request to a broker.
Expand Down
5 changes: 1 addition & 4 deletions kafka/cli/admin/topics/create.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from kafka.admin.new_topic import NewTopic


class CreateTopic:

@classmethod
Expand All @@ -13,4 +10,4 @@ def add_subparser(cls, subparsers):

@classmethod
def command(cls, client, args):
return client.create_topics([NewTopic(args.topic, args.num_partitions, args.replication_factor)])
return client.create_topics({args.topic: {'num_partitions': args.num_partitions, 'replication_factor': args.replication_factor}})
16 changes: 14 additions & 2 deletions kafka/cli/admin/topics/delete.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
import uuid


class DeleteTopic:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('delete', help='Delete Kafka Topic')
parser.add_argument('-t', '--topic', type=str, required=True)
parser.set_defaults(command=lambda cli, args: cli.delete_topics([args.topic]))
parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='topic name')
parser.add_argument('--id', type=str, action='append', dest='topic_ids', help='topic UUID')
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
if not args.topics and not args.topic_ids:
raise ValueError('At least one topic or topic_id is required!')
topic_ids = [uuid.UUID(topic_id) for topic_id in args.topic_ids]
topic_names = args.topics
return client.delete_topics(topic_names + topic_ids)
10 changes: 9 additions & 1 deletion kafka/protocol/admin/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@
class CreateTopicsRequest(ApiMessage): pass
class CreateTopicsResponse(ApiMessage): pass

class DeleteTopicsRequest(ApiMessage): pass
class DeleteTopicsRequest(ApiMessage):
def encode(self, version=None, header=False, framed=False):
# convert topics => topic_names for v0-v5
version = self.API_VERSION if version is None else version
if version is not None and version <= 5:
if self.topics and not self.topic_names: # pylint: disable=E0203
self.topic_names = [topic.name for topic in self.topics]
return super().encode(version=version, header=header, framed=framed)

class DeleteTopicsResponse(ApiMessage): pass

class CreatePartitionsRequest(ApiMessage): pass
Expand Down
30 changes: 25 additions & 5 deletions test/integration/test_admin_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
ACLFilter, ACLOperation, ACLPermissionType,
ResourcePattern, ResourceType, ACL,
ConfigResource, ConfigResourceType,
NewTopic,
NewPartitions, NewTopic,
)
from kafka.errors import (
BrokerResponseError, NoError, CoordinatorNotAvailableError,
Expand Down Expand Up @@ -388,12 +388,32 @@ def test_delete_records_with_errors(kafka_admin_client, topic, send_messages):
def test_create_delete_topics(kafka_admin_client):
topic_name = random_string(4)
response = kafka_admin_client.create_topics([NewTopic(topic_name, 1, 1)])
assert response.topics[0][0] == topic_name
assert response.topics[0][1] == 0 # NoError
assert response.topics[0].name == topic_name
assert response.topics[0].error_code == 0 # NoError

response = kafka_admin_client.delete_topics([topic_name])
assert response.responses[0][0] == topic_name
assert response.responses[0][1] == 0 # NoError
assert response.responses[0].name == topic_name
assert response.responses[0].error_code == 0 # NoError


@pytest.mark.skipif(env_kafka_version() < (1, 0), reason="CreatePartitions requires broker >=1.0")
def test_create_partitions(kafka_admin_client, topic):
# topic fixture creates with 4 partitions by default
topic_metadata = kafka_admin_client.describe_topics([topic])
assert len(topic_metadata) == 1
original_count = len(topic_metadata[0]['partitions'])
assert original_count == 4

# Increase to 6 partitions
new_total = 6
response = kafka_admin_client.create_partitions({topic: NewPartitions(new_total, [[0], [0]])})
for result in response.results:
assert result[0] == topic
assert result[1] == 0 # NoError

# Verify the new partition count
topic_metadata = kafka_admin_client.describe_topics([topic])
assert len(topic_metadata[0]['partitions']) == new_total


@pytest.mark.skipif(env_kafka_version() < (2, 2), reason="Leader Election requires broker >=2.2")
Expand Down
Loading