Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
ResourceType, ACLPermissionType, ACLResourcePatternType)
from kafka.admin._configs import (
ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType)
from kafka.admin._topics import NewTopic
from kafka.admin._groups import MemberToRemove
from kafka.admin._partitions import NewPartitions
from kafka.admin._topics import NewTopic
from kafka.admin._users import (
ScramMechanism, UserScramCredentialDeletion, UserScramCredentialUpsertion)

Expand All @@ -13,5 +14,6 @@
'ACL', 'ACLFilter', 'ACLOperation', 'ACLPermissionType', 'ACLResourcePatternType',
'ResourceType', 'ResourcePattern', 'ResourcePatternFilter',
'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType',
'MemberToRemove', # NewTopic + NewPartitions are deprecated and not included in __all__
'ScramMechanism', 'UserScramCredentialDeletion', 'UserScramCredentialUpsertion',
]
368 changes: 329 additions & 39 deletions kafka/admin/_groups.py

Large diffs are not rendered by default.

22 changes: 12 additions & 10 deletions kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .acls import ACLsSubCommand
from .cluster import ClusterSubCommand
from .configs import ConfigsSubCommand
from .consumer_groups import ConsumerGroupsSubCommand
from .groups import GroupsSubCommand
from .partitions import PartitionsSubCommand
from .topics import TopicsSubCommand
from .users import UsersSubCommand
Expand Down Expand Up @@ -51,7 +51,7 @@ def run_cli(args=None):
subparsers = parser.add_subparsers(help='subcommands')
for cmd in [ACLsSubCommand, ClusterSubCommand, ConfigsSubCommand,
TopicsSubCommand, PartitionsSubCommand,
ConsumerGroupsSubCommand, UsersSubCommand]:
GroupsSubCommand, UsersSubCommand]:
cmd.add_subparser(subparsers)
config = parser.parse_args(args)

Expand Down Expand Up @@ -104,17 +104,10 @@ def run_cli(args=None):
# [configs]
# IncrementalAlterConfigs (not supported yet)

# [consumer-groups]
# remove-members (not supported yet)
# delete-offsets (not supported yet)
# alter-offsets (not supported yet)

# [client-quotas]
# describe (DescribeClientQuotas - not supported yet)
# alter (AlterClientQuotas - not supported yet)

# DescribeQuorum (not supported yet)

# [producers]
# describe (DescribeProducers - not supported yet)

Expand All @@ -125,11 +118,20 @@ def run_cli(args=None):

# [topics]
# list-offsets (not supported yet)
# delete-offsets (OffsetDelete - not supported yet)

# [cluster]
# describe-features (DescribeFeatures - not supported yet)
# update-features (UpdateFeatures - not supported yet)
# version
# api-versions
# alter-log-dirs (AlterReplicaLogDirs - not supported yet)
# DescribeQuorum (not supported yet)
# UnregisterBroker
# AddRaftVoter
# RemoveRaftVoter

# [tokens] *DelegationTokenRequest
# create
# describe
# renew
# expire
17 changes: 0 additions & 17 deletions kafka/cli/admin/consumer_groups/__init__.py

This file was deleted.

22 changes: 22 additions & 0 deletions kafka/cli/admin/groups/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import sys

from .alter_offsets import AlterGroupOffsets
from .delete import DeleteGroups
from .delete_offsets import DeleteGroupOffsets
from .describe import DescribeGroups
from .list import ListGroups
from .list_offsets import ListGroupOffsets
from .remove_members import RemoveGroupMembers


class GroupsSubCommand:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('groups', help='Manage Kafka Groups')
commands = parser.add_subparsers()
for cmd in [ListGroups, DescribeGroups, DeleteGroups,
ListGroupOffsets, AlterGroupOffsets, DeleteGroupOffsets,
RemoveGroupMembers]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
32 changes: 32 additions & 0 deletions kafka/cli/admin/groups/alter_offsets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from kafka.structs import OffsetAndMetadata, TopicPartition


class AlterGroupOffsets:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser(
'alter-offsets',
help='Alter committed offsets for a consumer group')
parser.add_argument('-g', '--group-id', type=str, required=True)
parser.add_argument(
'-o', '--offset', type=str, action='append',
dest='offsets', default=[], required=True,
help='TOPIC:PARTITION:OFFSET triple (repeatable).')
parser.add_argument(
'--group-coordinator-id', type=int, default=None,
help='Send directly to this broker id, skipping coordinator lookup')
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
offsets = {}
for spec in args.offsets:
topic, partition, offset = spec.rsplit(':', 2)
offsets[TopicPartition(topic, int(partition))] = \
OffsetAndMetadata(int(offset), '', None)
result = client.alter_group_offsets(
args.group_id, offsets,
group_coordinator_id=args.group_coordinator_id)
return {'%s:%d' % (tp.topic, tp.partition): err.__name__
for tp, err in result.items()}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
class DeleteConsumerGroups:
class DeleteGroups:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('delete', help='Delete Consumer Groups')
parser = subparsers.add_parser('delete', help='Delete Groups')
parser.add_argument('-g', '--group-id', type=str, action='append', dest='groups', required=True)
parser.set_defaults(command=lambda cli, args: cli.delete_consumer_groups(args.groups))
parser.set_defaults(command=lambda cli, args: cli.delete_groups(args.groups))
31 changes: 31 additions & 0 deletions kafka/cli/admin/groups/delete_offsets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from kafka.structs import TopicPartition


class DeleteGroupOffsets:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser(
'delete-offsets',
help='Delete committed offsets for a consumer group')
parser.add_argument('-g', '--group-id', type=str, required=True)
parser.add_argument(
'-p', '--partition', type=str, action='append',
dest='partitions', default=[], required=True,
help='TOPIC:PARTITION pair (repeatable).')
parser.add_argument(
'--group-coordinator-id', type=int, default=None,
help='Send directly to this broker id, skipping coordinator lookup')
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
partitions = []
for spec in args.partitions:
topic, partition = spec.rsplit(':', 1)
partitions.append(TopicPartition(topic, int(partition)))
result = client.delete_group_offsets(
args.group_id, partitions,
group_coordinator_id=args.group_coordinator_id)
return {'%s:%d' % (tp.topic, tp.partition): err.__name__
for tp, err in result.items()}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
class DescribeConsumerGroups:
class DescribeGroups:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('describe', help='Describe Consumer Groups')
parser = subparsers.add_parser('describe', help='Describe Groups')
parser.add_argument('-g', '--group-id', type=str, action='append', dest='groups', required=True)
parser.set_defaults(command=lambda cli, args: cli.describe_consumer_groups(args.groups))
parser.set_defaults(command=lambda cli, args: cli.describe_groups(args.groups))
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
class ListConsumerGroups:
class ListGroups:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('list', help='List Consumer Groups')
parser.set_defaults(command=lambda cli, _args: cli.list_consumer_groups())
parser = subparsers.add_parser('list', help='List Groups')
parser.set_defaults(command=lambda cli, _args: cli.list_groups())
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
class ListConsumerGroupOffsets:
class ListGroupOffsets:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('list-offsets', help='List Offsets for Consumer Group')
parser = subparsers.add_parser('list-offsets', help='List Offsets for Group')
parser.add_argument('-g', '--group-id', type=str, required=True)
parser.set_defaults(command=lambda cli, args: cli.list_consumer_group_offsets(args.group_id))
parser.set_defaults(command=lambda cli, args: cli.list_group_offsets(args.group_id))
42 changes: 42 additions & 0 deletions kafka/cli/admin/groups/remove_members.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from kafka.admin import MemberToRemove


class RemoveGroupMembers:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser(
'remove-members',
help='Remove members from a consumer group')
parser.add_argument('-g', '--group-id', type=str, required=True)
parser.add_argument(
'-m', '--member-id', type=str, action='append',
dest='member_ids', default=[],
help='Dynamic member id to remove (repeatable).')
parser.add_argument(
'-i', '--group-instance-id', type=str, action='append',
dest='group_instance_ids', default=[],
help='Static group.instance.id to remove (repeatable; '
'requires broker LeaveGroup v3+).')
parser.add_argument(
'--reason', type=str, default=None,
help='Optional reason; sent to broker on LeaveGroup v5+ '
'(KIP-800). Applied to all members.')
parser.add_argument(
'--group-coordinator-id', type=int, default=None,
help='Send directly to this broker id, skipping coordinator lookup')
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
members = [MemberToRemove(member_id=mid, reason=args.reason)
for mid in args.member_ids]
members.extend(MemberToRemove(group_instance_id=gid, reason=args.reason)
for gid in args.group_instance_ids)
if not members:
raise ValueError(
'At least one --member-id or --group-instance-id is required.')
result = client.remove_group_members(
args.group_id, members,
group_coordinator_id=args.group_coordinator_id)
return {(m.member_id or m.group_instance_id): err.__name__ for m, err in result.items()}
6 changes: 3 additions & 3 deletions kafka/cli/admin/partitions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ def add_subparser(cls, subparsers):
commands = parser.add_subparsers()
for cmd in [
CreatePartitions,
DescribeTopicPartitions,
ListPartitionReassignments,
AlterPartitionReassignments,
DeleteRecords,
ElectLeaders,
AlterPartitionReassignments,
ListPartitionReassignments,
DescribeTopicPartitions,
]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
9 changes: 8 additions & 1 deletion kafka/cli/consumer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ def main_parser():
parser.add_argument(
'-g', '--group', type=str, required=True,
help='consumer group')
parser.add_argument(
'-i', '--group-instance-id', type=str,
help='static group membership identifier')
parser.add_argument(
'-f', '--format', type=str, default='str',
help='output format: str|raw|full')
Expand Down Expand Up @@ -71,7 +74,11 @@ def run_cli(args=None):
logger = logging.getLogger(__name__)

kwargs = build_kwargs(config.extra_config)
consumer = KafkaConsumer(bootstrap_servers=config.bootstrap_servers, group_id=config.group, **kwargs)
consumer = KafkaConsumer(
bootstrap_servers=config.bootstrap_servers,
group_id=config.group,
group_instance_id=config.group_instance_id,
**kwargs)
consumer.subscribe(config.topics)
try:
for m in consumer:
Expand Down
4 changes: 4 additions & 0 deletions kafka/protocol/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class OffsetFetchResponse(ApiMessage): pass
class OffsetCommitRequest(ApiMessage): pass
class OffsetCommitResponse(ApiMessage): pass

class OffsetDeleteRequest(ApiMessage): pass
class OffsetDeleteResponse(ApiMessage): pass


__all__ = [
'DEFAULT_GENERATION_ID', 'UNKNOWN_MEMBER_ID',
Expand All @@ -31,4 +34,5 @@ class OffsetCommitResponse(ApiMessage): pass
'HeartbeatRequest', 'HeartbeatResponse',
'OffsetFetchRequest', 'OffsetFetchResponse',
'OffsetCommitRequest', 'OffsetCommitResponse',
'OffsetDeleteRequest', 'OffsetDeleteResponse',
]
Loading
Loading