Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
ResourceType, ACLPermissionType, ACLResourcePatternType)
from kafka.admin._configs import (
ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType)
from kafka.admin._topics import NewTopic, NewPartitions
from kafka.admin._topics import NewTopic
from kafka.admin._partitions import NewPartitions
from kafka.admin._users import (
ScramMechanism, UserScramCredentialDeletion, UserScramCredentialUpsertion)

Expand Down
439 changes: 439 additions & 0 deletions kafka/admin/_partitions.py

Large diffs are not rendered by default.

165 changes: 0 additions & 165 deletions kafka/admin/_records.py

This file was deleted.

75 changes: 1 addition & 74 deletions kafka/admin/_topics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Topic management mixin for KafkaAdminClient.

Also defines NewTopic and NewPartitions data classes.
Also defines NewTopic data class.
"""

from __future__ import annotations
Expand Down Expand Up @@ -244,60 +244,6 @@ def response_errors(r):
result['topics'] = result.pop('responses')
return result

@staticmethod
def _process_create_partitions_input(topic_partitions):
_Topic = CreatePartitionsRequest.CreatePartitionsTopic
_Assignment = CreatePartitionsRequest.CreatePartitionsTopic.CreatePartitionsAssignment
topics = []
for topic, count in topic_partitions.items():
if isinstance(count, int):
topics.append(_Topic(name=topic, count=count))
elif isinstance(count, dict):
topics.append(
_Topic(
name=topic,
count=count['count'],
assignments=[_Assignment(broker_ids=broker_ids)
for broker_ids in count['assignments']]))
else:
topics.append(
_Topic(
name=topic,
count=count.total_count,
assignments=[_Assignment(broker_ids=broker_ids)
for broker_ids in count.new_assignments]))
return topics

def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True):
"""Create additional partitions for an existing topic.

Arguments:
topic_partitions: A dict of topic name strings to total partition count (int),
or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}}
if manual assignment is desired.
dict of {topic_name: NewPartition} is deprecated.

Keyword Arguments:
timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be
created before the broker returns.
validate_only (bool, optional): If True, don't actually create new partitions.
Default: False
raise_errors (bool, optional): Whether to raise errors as exceptions. Default True.

Returns:
Appropriate version of CreatePartitionsResponse class.
"""
timeout_ms = self._validate_timeout(timeout_ms)
request = CreatePartitionsRequest(
topics=self._process_create_partitions_input(topic_partitions),
timeout_ms=timeout_ms,
validate_only=validate_only)

def response_errors(r):
for result in r.results:
yield Errors.for_code(result.error_code)
return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors)


class NewTopic:
"""DEPRECATED: A class for new topic creation.
Expand All @@ -320,22 +266,3 @@ def __init__(self, name, num_partitions=-1, replication_factor=-1,
self.replication_factor = replication_factor
self.replica_assignments = replica_assignments or {}
self.topic_configs = topic_configs or {}


class NewPartitions:
"""DEPRECATED: A class for new partition creation on existing topics.

Note that the length of new_assignments, if specified, must be the
difference between the new total number of partitions and the existing
number of partitions.

Arguments:
total_count (int): the total number of partitions that should exist
on the topic
new_assignments ([[int]]): an array of arrays of replica assignments
for new partitions. If not set, broker assigns replicas per an
internal algorithm.
"""
def __init__(self, total_count, new_assignments=None):
self.total_count = total_count
self.new_assignments = new_assignments
4 changes: 2 additions & 2 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from kafka.admin._cluster import ClusterAdminMixin
from kafka.admin._configs import ConfigAdminMixin
from kafka.admin._groups import GroupAdminMixin
from kafka.admin._records import RecordAdminMixin
from kafka.admin._partitions import PartitionAdminMixin
from kafka.admin._topics import TopicAdminMixin
from kafka.admin._users import UserAdminMixin

Expand All @@ -29,7 +29,7 @@ class KafkaAdminClient(
ClusterAdminMixin,
ConfigAdminMixin,
GroupAdminMixin,
RecordAdminMixin,
PartitionAdminMixin,
TopicAdminMixin,
UserAdminMixin,
):
Expand Down
10 changes: 3 additions & 7 deletions kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .cluster import ClusterSubCommand
from .configs import ConfigsSubCommand
from .consumer_groups import ConsumerGroupsSubCommand
from .partitions import PartitionsSubCommand
from .topics import TopicsSubCommand
from .users import UsersSubCommand
from ..common import add_common_cli_args
Expand Down Expand Up @@ -49,7 +50,8 @@ def run_cli(args=None):
parser = main_parser()
subparsers = parser.add_subparsers(help='subcommands')
for cmd in [ACLsSubCommand, ClusterSubCommand, ConfigsSubCommand,
TopicsSubCommand, ConsumerGroupsSubCommand, UsersSubCommand]:
TopicsSubCommand, PartitionsSubCommand,
ConsumerGroupsSubCommand, UsersSubCommand]:
cmd.add_subparser(subparsers)
config = parser.parse_args(args)

Expand Down Expand Up @@ -100,7 +102,6 @@ def run_cli(args=None):

# Commands TODO:
# [configs]
# alter
# IncrementalAlterConfigs (not supported yet)

# [consumer-groups]
Expand All @@ -123,13 +124,8 @@ def run_cli(args=None):
# abort (not supported yet)

# [topics]
# describe-partitions (DescribeTopicPartitions - not supported yet)
# list-offsets (not supported yet)
# delete-offsets (OffsetDelete - not supported yet)
# alter-reassignments (AlterPartitionReassignments - not supported yet)
# list-reassignments (ListPartitionReassignments - not supported yet)
# create-partitions
# elect-leaders

# [cluster]
# describe-features (DescribeFeatures - not supported yet)
Expand Down
26 changes: 26 additions & 0 deletions kafka/cli/admin/partitions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import sys

from .alter_reassignments import AlterPartitionReassignments
from .create import CreatePartitions
from .delete_records import DeleteRecords
from .describe import DescribeTopicPartitions
from .elect_leaders import ElectLeaders
from .list_reassignments import ListPartitionReassignments


class PartitionsSubCommand:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('partitions', help='Manage Kafka Partitions')
commands = parser.add_subparsers()
for cmd in [
CreatePartitions,
DeleteRecords,
ElectLeaders,
AlterPartitionReassignments,
ListPartitionReassignments,
DescribeTopicPartitions,
]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
39 changes: 39 additions & 0 deletions kafka/cli/admin/partitions/alter_reassignments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from kafka.structs import TopicPartition


class AlterPartitionReassignments:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser(
'alter-reassignments',
help='Alter replica assignments for partitions')
parser.add_argument(
'-r', '--reassign', type=str, action='append',
dest='reassignments', default=[], required=True,
help='TOPIC:PARTITION=BROKER_ID[,BROKER_ID...] to set a new '
'replica set, or TOPIC:PARTITION=cancel to cancel an '
'in-progress reassignment for that partition. Repeatable.')
parser.add_argument(
'--timeout-ms', type=int, default=None,
help='Request timeout in milliseconds')
parser.add_argument(
'--no-raise-errors', dest='raise_errors', action='store_false',
help='Do not raise on partition-level errors; return the response instead')
parser.set_defaults(command=cls.command, raise_errors=True)

@classmethod
def command(cls, client, args):
reassignments = {}
for spec in args.reassignments:
tp_str, replicas_str = spec.rsplit('=', 1)
topic, partition = tp_str.rsplit(':', 1)
tp = TopicPartition(topic, int(partition))
if replicas_str.lower() == 'cancel':
reassignments[tp] = None
else:
reassignments[tp] = [int(b) for b in replicas_str.split(',') if b]
return client.alter_partition_reassignments(
reassignments,
timeout_ms=args.timeout_ms,
raise_errors=args.raise_errors)
Loading
Loading