Skip to content

APIE-844 - Confluent CLI support for kstreams#3260

Open
tmalik (tmalikconfluent) wants to merge 13 commits intomainfrom
APIE-844
Open

APIE-844 - Confluent CLI support for kstreams#3260
tmalik (tmalikconfluent) wants to merge 13 commits intomainfrom
APIE-844

Conversation

@tmalikconfluent
Copy link
Copy Markdown
Contributor

@tmalikconfluent tmalik (tmalikconfluent) commented Feb 13, 2026

Release Notes

Breaking Changes

  • PLACEHOLDER

New Features

  • Added a Type column to confluent kafka consumer group [ list | describe ] command to display the consumer group protocol type (e.g., CLASSIC, CONSUMER).
  • Added the confluent kafka consumer stream-group command group with support for managing
    Kafka Streams groups (KIP-1071), including the following subcommands: member, member-assignment, member-target-assignment, member-task-partitions, member-target-assignment-task-partitions, and subtopology

Bug Fixes

  • PLACEHOLDER

Checklist

  • I have successfully built and used a custom CLI binary, without linter issues from this PR.
  • I have clearly specified in the What section below whether this PR applies to Confluent Cloud, Confluent Platform, or both.
  • I have verified this PR in Confluent Cloud pre-prod or production environment, if applicable.
  • I have verified this PR in Confluent Platform on-premises environment, if applicable.
  • I have attached manual CLI verification results or screenshots in the Test & Review section below.
  • I have added appropriate CLI integration or unit tests for any new or updated commands and functionality.
  • I confirm that this PR introduces no breaking changes or backward compatibility issues.
  • I have indicated the potential customer impact if something goes wrong in the Blast Radius section below.
  • I have put checkmarks below confirming that the feature associated with this PR is enabled in:
    • Confluent Cloud prod
    • Confluent Cloud stag
    • Confluent Platform
    • Check this box if the feature is enabled for certain organizations only

What

This PR adds Confluent CLI support for Kafka Streams groups (KIP-1071). It introduces
two changes:

  1. Consumer group protocol type display: The existing confluent kafka consumer group [ list | describe ] commands now include a Type field showing the group protocol type (CLASSIC, CONSUMER, etc.), bringing the CLI to parity with the Java Admin API and Apache Kafka command-line tools.

  2. New stream-group command tree: A new set of subcommands under confluent kafka consumer stream-group that allow users to inspect Kafka Streams groups, their members, task assignments (active/standby/warmup), subtopologies, and task partition details. These commands use the internal Kafka REST v3 streams-groups API endpoints.

Blast Radius

Low to none blast radius:

  • Confluent Cloud customers using confluent kafka consumer group [ list | describe ] command will see an additional Type column in output. This is additive and non-breaking; the column is omitted when the API does not return a type value.
  • The new confluent kafka consumer stream-group commands is purely additive and should not break any existing workflows.

References

JIRA APIE-851 CLI implementation for share groups

Test & Review

APIE-851 kstreams for AK 4.2 - CLI Testing

@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copilot AI review requested due to automatic review settings March 11, 2026 18:41
@tmalikconfluent tmalik (tmalikconfluent) review requested due to automatic review settings March 11, 2026 18:41
Copilot AI review requested due to automatic review settings March 11, 2026 20:53
@tmalikconfluent tmalik (tmalikconfluent) review requested due to automatic review settings March 11, 2026 20:53
Copilot AI review requested due to automatic review settings March 13, 2026 19:22
@tmalikconfluent tmalik (tmalikconfluent) review requested due to automatic review settings March 13, 2026 19:22
Copilot AI review requested due to automatic review settings March 13, 2026 20:42
@tmalikconfluent tmalik (tmalikconfluent) review requested due to automatic review settings March 13, 2026 20:42
Copilot AI review requested due to automatic review settings March 13, 2026 21:43
@tmalikconfluent tmalik (tmalikconfluent) review requested due to automatic review settings March 13, 2026 21:43
Copilot AI review requested due to automatic review settings March 13, 2026 22:45
@tmalikconfluent tmalik (tmalikconfluent) review requested due to automatic review settings March 13, 2026 22:45
Copilot AI review requested due to automatic review settings March 26, 2026 21:25
@tmalikconfluent tmalik (tmalikconfluent) review requested due to automatic review settings March 26, 2026 21:25
Copilot AI review requested due to automatic review settings April 1, 2026 17:37
@tmalikconfluent tmalik (tmalikconfluent) review requested due to automatic review settings April 1, 2026 17:37
Copilot AI review requested due to automatic review settings April 1, 2026 17:44
@tmalikconfluent tmalik (tmalikconfluent) review requested due to automatic review settings April 1, 2026 17:44
Copilot AI review requested due to automatic review settings April 1, 2026 18:07
@tmalikconfluent tmalik (tmalikconfluent) review requested due to automatic review settings April 1, 2026 18:07
Copilot AI review requested due to automatic review settings April 1, 2026 21:17
@tmalikconfluent tmalik (tmalikconfluent) review requested due to automatic review settings April 1, 2026 21:17
@cqin-confluent Cynthia Qin (cqin-confluent) marked this pull request as ready for review April 8, 2026 16:50
@cqin-confluent Cynthia Qin (cqin-confluent) requested a review from a team as a code owner April 8, 2026 16:50
Copilot AI review requested due to automatic review settings April 8, 2026 16:50
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds Confluent CLI support for Kafka Streams groups (KIP-1071) by introducing a new kafka streams-group command tree backed by internal Kafka REST v3 endpoints, and by augmenting consumer group output with the protocol type.

Changes:

  • Add a new confluent kafka streams-group command group (list/describe, members, assignments, task partitions, and subtopologies) using internal Kafka REST v3 streams-groups endpoints.
  • Add a Type column/field to confluent kafka consumer group [list|describe] output.
  • Extend the test Kafka REST router and integration fixtures to cover the new endpoints/commands.

Reviewed changes

Copilot reviewed 70 out of 71 changed files in this pull request and generated 23 comments.

Show a summary per file
File Description
test/test-server/kafka_rest_router.go Adds mock Kafka REST v3 routes/handlers for streams-groups endpoints used by integration tests.
test/kafka_test.go Adds integration test coverage for the new kafka streams-group command tree.
test/fixtures/output/kafka/streams-group/task-partitions-describe.golden Golden output for member task partition “describe”.
test/fixtures/output/kafka/streams-group/target-task-partitions-describe.golden Golden output for member target-assignment task partition “describe”.
test/fixtures/output/kafka/streams-group/target-assignment-task-list.golden Golden output for target assignment task “list”.
test/fixtures/output/kafka/streams-group/target-assignment-describe.golden Golden output for target assignment “describe”.
test/fixtures/output/kafka/streams-group/subtopology/list-help.golden Help golden for streams-group subtopology list.
test/fixtures/output/kafka/streams-group/subtopology/help.golden Help golden for streams-group subtopology.
test/fixtures/output/kafka/streams-group/subtopology/describe-help.golden Help golden for streams-group subtopology describe.
test/fixtures/output/kafka/streams-group/subtopology-list.golden Golden output for subtopology “list” (human).
test/fixtures/output/kafka/streams-group/subtopology-list-json.golden Golden output for subtopology “list” (json).
test/fixtures/output/kafka/streams-group/subtopology-describe.golden Golden output for subtopology “describe”.
test/fixtures/output/kafka/streams-group/member/list-help.golden Help golden for streams-group member list.
test/fixtures/output/kafka/streams-group/member/help.golden Help golden for streams-group member.
test/fixtures/output/kafka/streams-group/member/describe-help.golden Help golden for streams-group member describe.
test/fixtures/output/kafka/streams-group/member-task-partitions/help.golden Help golden for streams-group member-task-partitions.
test/fixtures/output/kafka/streams-group/member-task-partitions/describe-help.golden Help golden for streams-group member-task-partitions describe.
test/fixtures/output/kafka/streams-group/member-target-assignment/list-help.golden Help golden for streams-group member-target-assignment list.
test/fixtures/output/kafka/streams-group/member-target-assignment/help.golden Help golden for streams-group member-target-assignment.
test/fixtures/output/kafka/streams-group/member-target-assignment/describe-help.golden Help golden for streams-group member-target-assignment describe.
test/fixtures/output/kafka/streams-group/member-target-assignment-task-partitions/help.golden Help golden for streams-group member-target-assignment-task-partitions.
test/fixtures/output/kafka/streams-group/member-target-assignment-task-partitions/describe-help.golden Help golden for streams-group member-target-assignment-task-partitions describe.
test/fixtures/output/kafka/streams-group/member-list.golden Golden output for member “list” (human).
test/fixtures/output/kafka/streams-group/member-list-json.golden Golden output for member “list” (json).
test/fixtures/output/kafka/streams-group/member-describe.golden Golden output for member “describe”.
test/fixtures/output/kafka/streams-group/member-assignment/list-help.golden Help golden for streams-group member-assignment list.
test/fixtures/output/kafka/streams-group/member-assignment/help.golden Help golden for streams-group member-assignment.
test/fixtures/output/kafka/streams-group/member-assignment/describe-help.golden Help golden for streams-group member-assignment describe.
test/fixtures/output/kafka/streams-group/list.golden Golden output for streams-group “list” (human).
test/fixtures/output/kafka/streams-group/list-json.golden Golden output for streams-group “list” (json).
test/fixtures/output/kafka/streams-group/list-help.golden Help golden for streams-group “list”.
test/fixtures/output/kafka/streams-group/help.golden Help golden for streams-group root.
test/fixtures/output/kafka/streams-group/describe.golden Golden output for streams-group “describe” (human).
test/fixtures/output/kafka/streams-group/describe-json.golden Golden output for streams-group “describe” (json).
test/fixtures/output/kafka/streams-group/describe-help.golden Help golden for streams-group “describe”.
test/fixtures/output/kafka/streams-group/describe-dne.golden Golden output for “describe” on a missing streams group.
test/fixtures/output/kafka/streams-group/assignment-task-list.golden Golden output for member assignment task “list”.
test/fixtures/output/kafka/streams-group/assignment-describe.golden Golden output for member assignment “describe”.
test/fixtures/output/kafka/help.golden Adds streams-group to top-level kafka help output.
test/fixtures/output/kafka/consumer/group/list.golden Updates consumer group list output to include Type column.
test/fixtures/output/kafka/consumer/group/list-onprem.golden Updates on-prem consumer group list output to include Type column.
test/fixtures/output/iam/certificate-authority/use.golden Adds/updates a golden output fixture (unrelated to streams-group feature).
pkg/cmd/prerunner.go Extends KafkaREST initialization to include an internal Kafka REST client.
pkg/cmd/kafka_rest.go Adds CloudClientInternal to KafkaREST wrapper struct.
pkg/cmd/flags.go Adds autocompletion for streams-group IDs via internal Kafka REST client.
pkg/ccloudv2/kafkarest.go Introduces KafkaRestClientInternal wrapper and streams-groups API methods + envelope unwrapping helper.
internal/kafka/command.go Registers the new streams-group command under kafka.
internal/kafka/command_kafka_stream_group.go Adds the streams-group root command and common arg completion hook.
internal/kafka/command_kafka_stream_group_subtopology.go Adds streams-group subtopology command group.
internal/kafka/command_kafka_stream_group_subtopology_list.go Implements streams-group subtopology listing.
internal/kafka/command_kafka_stream_group_subtopology_describe.go Implements streams-group subtopology describe.
internal/kafka/command_kafka_stream_group_member.go Adds streams-group member command group and output struct.
internal/kafka/command_kafka_stream_group_member_task_partitions.go Adds streams-group member-task-partitions command group.
internal/kafka/command_kafka_stream_group_member_task_partitions_describe.go Implements member task partitions describe.
internal/kafka/command_kafka_stream_group_member_target_task_partitions.go Adds streams-group member-target-assignment-task-partitions command group.
internal/kafka/command_kafka_stream_group_member_target_task_partitions_describe.go Implements member target-assignment task partitions describe.
internal/kafka/command_kafka_stream_group_member_target_assignment.go Adds streams-group member-target-assignment command group.
internal/kafka/command_kafka_stream_group_member_target_assignment_list.go Implements member target-assignment task listing.
internal/kafka/command_kafka_stream_group_member_target_assignment_describe.go Implements member target-assignment describe.
internal/kafka/command_kafka_stream_group_member_list.go Implements streams-group member list.
internal/kafka/command_kafka_stream_group_member_describe.go Implements streams-group member describe.
internal/kafka/command_kafka_stream_group_member_assignment.go Adds streams-group member-assignment command group + output struct.
internal/kafka/command_kafka_stream_group_member_assignment_task_list.go Implements member assignment task listing.
internal/kafka/command_kafka_stream_group_member_assignment_describe.go Implements member assignment describe.
internal/kafka/command_kafka_stream_group_list.go Implements streams-group list.
internal/kafka/command_kafka_stream_group_describe.go Implements streams-group describe.
internal/kafka/command_consumer_group.go Adds protocol type field to consumer group output schema.
internal/kafka/command_consumer_group_list.go Populates protocol type in consumer group list.
internal/kafka/command_consumer_group_describe.go Populates protocol type in consumer group describe.
go.mod Adds internal ccloud SDK dependency for streams-groups endpoints.
go.sum Adds checksums for the new internal dependency.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +72 to +76
func unwrapDataEnvelope(httpResp *http.Response, target interface{}) error {
if httpResp == nil || httpResp.Body == nil {
return nil
}
body, err := io.ReadAll(httpResp.Body)
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unwrapDataEnvelope reads httpResp.Body but never closes it. If the underlying SDK doesn’t fully drain/close the body, this can leak connections; it also consumes the body so subsequent error-parsing/logging can’t re-read it. Consider defer httpResp.Body.Close() (or closing in the caller) and, if the body must remain readable, re-wrapping it with a new io.NopCloser(bytes.NewReader(body)) after ReadAll.

Copilot uses AI. Check for mistakes.
return res, kafkarest.NewError(c.GetUrlInternal(), err, httpResp)
}

func (c *KafkaRestClientInternal) ListKafkaStreamsGroupMemberSubtopologies(groupId string) (kafkarestv3Internal.StreamsGroupSubtopologyDataList, error) {
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name ListKafkaStreamsGroupMemberSubtopologies is misleading: it lists group subtopologies (it calls ListKafkaStreamsGroupSubtopologies) and doesn’t take a member ID. Renaming it to ListKafkaStreamsGroupSubtopologies (and updating call sites) would make the API clearer.

Suggested change
func (c *KafkaRestClientInternal) ListKafkaStreamsGroupMemberSubtopologies(groupId string) (kafkarestv3Internal.StreamsGroupSubtopologyDataList, error) {
func (c *KafkaRestClientInternal) ListKafkaStreamsGroupSubtopologies(groupId string) (kafkarestv3Internal.StreamsGroupSubtopologyDataList, error) {

Copilot uses AI. Check for mistakes.
Comment on lines +478 to +479
func (c *KafkaRestClientInternal) GetKafkaStreamGroupSubtopology(groupId, topology string) (kafkarestv3Internal.StreamsGroupSubtopologyData, error) {
res, httpResp, err := c.StreamsGroupV3Api.GetKafkaStreamsGroupSubtopology(c.kafkaRestApiContextInternal(), c.ClusterId, groupId, topology).Execute()
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameter name topology is ambiguous here; this method fetches a specific subtopology. Renaming the parameter to subtopologyId (or similar) would improve readability and reduce confusion at call sites.

Suggested change
func (c *KafkaRestClientInternal) GetKafkaStreamGroupSubtopology(groupId, topology string) (kafkarestv3Internal.StreamsGroupSubtopologyData, error) {
res, httpResp, err := c.StreamsGroupV3Api.GetKafkaStreamsGroupSubtopology(c.kafkaRestApiContextInternal(), c.ClusterId, groupId, topology).Execute()
func (c *KafkaRestClientInternal) GetKafkaStreamGroupSubtopology(groupId, subtopologyId string) (kafkarestv3Internal.StreamsGroupSubtopologyData, error) {
res, httpResp, err := c.StreamsGroupV3Api.GetKafkaStreamsGroupSubtopology(c.kafkaRestApiContextInternal(), c.ClusterId, groupId, subtopologyId).Execute()

Copilot uses AI. Check for mistakes.
Comment on lines +11 to +15
cmd := &cobra.Command{
Use: "describe <group>",
Short: "Describe stream group",
Args: cobra.ExactArgs(1),
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs),
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This command hits internal Kafka REST endpoints via CloudClientInternal, but unlike the corresponding list commands it has no RequireNonAPIKeyCloudLogin run requirement. If API-key login isn’t supported for these endpoints, this should add the same run requirement to prevent confusing runtime failures.

Copilot uses AI. Check for mistakes.
Comment on lines +11 to +16
cmd := &cobra.Command{
Use: "describe <member>",
Short: "Describe stream group member",
Args: cobra.ExactArgs(1),
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs),
RunE: c.streamGroupMemberDescribe,
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ValidArgsFunction here uses validStreamGroupArgs, which autocompletes stream group IDs. For describe <member>, shell completion should suggest member IDs (likely based on --group). Consider adding a dedicated valid-args function that reads the --group flag and calls ListKafkaStreamsGroupMembers to populate member suggestions.

Copilot uses AI. Check for mistakes.
func (c *streamGroupCommand) newStreamGroupListCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List kafka stream groups.",
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Short help text uses lowercase “kafka” here, but other commands use “Kafka”. Consider changing to “List Kafka stream groups.” (and updating the golden help fixtures) for consistency.

Suggested change
Short: "List kafka stream groups.",
Short: "List Kafka stream groups.",

Copilot uses AI. Check for mistakes.
Comment on lines +59 to +64
topics, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroup()
if err != nil {
return nil, err
}

return topics.Data, nil
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable name topics is misleading here since this function is listing streams groups, not topics. Renaming it to something like groupsResp/streamsGroups would improve readability.

Suggested change
topics, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroup()
if err != nil {
return nil, err
}
return topics.Data, nil
groupsResp, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroup()
if err != nil {
return nil, err
}
return groupsResp.Data, nil

Copilot uses AI. Check for mistakes.
func (c *streamGroupCommand) newStreamGroupMemberCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "member",
Short: "Manage Kafka stream groups members.",
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Help text is grammatically incorrect: “Manage Kafka stream groups members.” should be “Manage Kafka stream group members.” (or similar). Fixing this string will also require updating the related golden help fixture.

Suggested change
Short: "Manage Kafka stream groups members.",
Short: "Manage Kafka stream group members.",

Copilot uses AI. Check for mistakes.
Comment on lines 27 to 29
cmd.AddCommand(newShareGroupCommand(prerunner))
cmd.AddCommand(newStreamGroupCommand(prerunner))
cmd.AddCommand(newTopicCommand(cfg, prerunner))
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description/release notes mention the new commands under confluent kafka consumer stream-group, but the implementation adds streams-group directly under confluent kafka (sibling of consumer, topic, etc.). Please align the docs/release notes (or the command placement) so users can find the feature.

Copilot uses AI. Check for mistakes.
Comment on lines 14 to 18
IsSimple bool `human:"Simple" serialized:"is_simple"`
PartitionAssignor string `human:"Partition Assignor" serialized:"partition_assignor"`
State string `human:"State" serialized:"state"`
ProtocolType string `human:"Type,omitempty" serialized:"type,omitempty"`
}
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omitempty in the human tag is only honored for single-object tables, not list headers/columns (see pkg/output/table.go list header generation). This means the “Type” column will always appear in consumer group list even when empty (as reflected in the updated golden output). If the intent is to omit the column when the API doesn’t return types, consider conditionally filtering the list columns (e.g., list.Filter(...)) when all ProtocolType values are empty.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants