APIE-844 - Confluent CLI support for kstreams#3260
APIE-844 - Confluent CLI support for kstreams#3260tmalik (tmalikconfluent) wants to merge 13 commits intomainfrom
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
This PR adds Confluent CLI support for Kafka Streams groups (KIP-1071) by introducing a new kafka streams-group command tree backed by internal Kafka REST v3 endpoints, and by augmenting consumer group output with the protocol type.
Changes:
- Add a new
confluent kafka streams-groupcommand group (list/describe, members, assignments, task partitions, and subtopologies) using internal Kafka REST v3 streams-groups endpoints. - Add a
Typecolumn/field toconfluent kafka consumer group [list|describe]output. - Extend the test Kafka REST router and integration fixtures to cover the new endpoints/commands.
Reviewed changes
Copilot reviewed 70 out of 71 changed files in this pull request and generated 23 comments.
Show a summary per file
| File | Description |
|---|---|
| test/test-server/kafka_rest_router.go | Adds mock Kafka REST v3 routes/handlers for streams-groups endpoints used by integration tests. |
| test/kafka_test.go | Adds integration test coverage for the new kafka streams-group command tree. |
| test/fixtures/output/kafka/streams-group/task-partitions-describe.golden | Golden output for member task partition “describe”. |
| test/fixtures/output/kafka/streams-group/target-task-partitions-describe.golden | Golden output for member target-assignment task partition “describe”. |
| test/fixtures/output/kafka/streams-group/target-assignment-task-list.golden | Golden output for target assignment task “list”. |
| test/fixtures/output/kafka/streams-group/target-assignment-describe.golden | Golden output for target assignment “describe”. |
| test/fixtures/output/kafka/streams-group/subtopology/list-help.golden | Help golden for streams-group subtopology list. |
| test/fixtures/output/kafka/streams-group/subtopology/help.golden | Help golden for streams-group subtopology. |
| test/fixtures/output/kafka/streams-group/subtopology/describe-help.golden | Help golden for streams-group subtopology describe. |
| test/fixtures/output/kafka/streams-group/subtopology-list.golden | Golden output for subtopology “list” (human). |
| test/fixtures/output/kafka/streams-group/subtopology-list-json.golden | Golden output for subtopology “list” (json). |
| test/fixtures/output/kafka/streams-group/subtopology-describe.golden | Golden output for subtopology “describe”. |
| test/fixtures/output/kafka/streams-group/member/list-help.golden | Help golden for streams-group member list. |
| test/fixtures/output/kafka/streams-group/member/help.golden | Help golden for streams-group member. |
| test/fixtures/output/kafka/streams-group/member/describe-help.golden | Help golden for streams-group member describe. |
| test/fixtures/output/kafka/streams-group/member-task-partitions/help.golden | Help golden for streams-group member-task-partitions. |
| test/fixtures/output/kafka/streams-group/member-task-partitions/describe-help.golden | Help golden for streams-group member-task-partitions describe. |
| test/fixtures/output/kafka/streams-group/member-target-assignment/list-help.golden | Help golden for streams-group member-target-assignment list. |
| test/fixtures/output/kafka/streams-group/member-target-assignment/help.golden | Help golden for streams-group member-target-assignment. |
| test/fixtures/output/kafka/streams-group/member-target-assignment/describe-help.golden | Help golden for streams-group member-target-assignment describe. |
| test/fixtures/output/kafka/streams-group/member-target-assignment-task-partitions/help.golden | Help golden for streams-group member-target-assignment-task-partitions. |
| test/fixtures/output/kafka/streams-group/member-target-assignment-task-partitions/describe-help.golden | Help golden for streams-group member-target-assignment-task-partitions describe. |
| test/fixtures/output/kafka/streams-group/member-list.golden | Golden output for member “list” (human). |
| test/fixtures/output/kafka/streams-group/member-list-json.golden | Golden output for member “list” (json). |
| test/fixtures/output/kafka/streams-group/member-describe.golden | Golden output for member “describe”. |
| test/fixtures/output/kafka/streams-group/member-assignment/list-help.golden | Help golden for streams-group member-assignment list. |
| test/fixtures/output/kafka/streams-group/member-assignment/help.golden | Help golden for streams-group member-assignment. |
| test/fixtures/output/kafka/streams-group/member-assignment/describe-help.golden | Help golden for streams-group member-assignment describe. |
| test/fixtures/output/kafka/streams-group/list.golden | Golden output for streams-group “list” (human). |
| test/fixtures/output/kafka/streams-group/list-json.golden | Golden output for streams-group “list” (json). |
| test/fixtures/output/kafka/streams-group/list-help.golden | Help golden for streams-group “list”. |
| test/fixtures/output/kafka/streams-group/help.golden | Help golden for streams-group root. |
| test/fixtures/output/kafka/streams-group/describe.golden | Golden output for streams-group “describe” (human). |
| test/fixtures/output/kafka/streams-group/describe-json.golden | Golden output for streams-group “describe” (json). |
| test/fixtures/output/kafka/streams-group/describe-help.golden | Help golden for streams-group “describe”. |
| test/fixtures/output/kafka/streams-group/describe-dne.golden | Golden output for “describe” on a missing streams group. |
| test/fixtures/output/kafka/streams-group/assignment-task-list.golden | Golden output for member assignment task “list”. |
| test/fixtures/output/kafka/streams-group/assignment-describe.golden | Golden output for member assignment “describe”. |
| test/fixtures/output/kafka/help.golden | Adds streams-group to top-level kafka help output. |
| test/fixtures/output/kafka/consumer/group/list.golden | Updates consumer group list output to include Type column. |
| test/fixtures/output/kafka/consumer/group/list-onprem.golden | Updates on-prem consumer group list output to include Type column. |
| test/fixtures/output/iam/certificate-authority/use.golden | Adds/updates a golden output fixture (unrelated to streams-group feature). |
| pkg/cmd/prerunner.go | Extends KafkaREST initialization to include an internal Kafka REST client. |
| pkg/cmd/kafka_rest.go | Adds CloudClientInternal to KafkaREST wrapper struct. |
| pkg/cmd/flags.go | Adds autocompletion for streams-group IDs via internal Kafka REST client. |
| pkg/ccloudv2/kafkarest.go | Introduces KafkaRestClientInternal wrapper and streams-groups API methods + envelope unwrapping helper. |
| internal/kafka/command.go | Registers the new streams-group command under kafka. |
| internal/kafka/command_kafka_stream_group.go | Adds the streams-group root command and common arg completion hook. |
| internal/kafka/command_kafka_stream_group_subtopology.go | Adds streams-group subtopology command group. |
| internal/kafka/command_kafka_stream_group_subtopology_list.go | Implements streams-group subtopology listing. |
| internal/kafka/command_kafka_stream_group_subtopology_describe.go | Implements streams-group subtopology describe. |
| internal/kafka/command_kafka_stream_group_member.go | Adds streams-group member command group and output struct. |
| internal/kafka/command_kafka_stream_group_member_task_partitions.go | Adds streams-group member-task-partitions command group. |
| internal/kafka/command_kafka_stream_group_member_task_partitions_describe.go | Implements member task partitions describe. |
| internal/kafka/command_kafka_stream_group_member_target_task_partitions.go | Adds streams-group member-target-assignment-task-partitions command group. |
| internal/kafka/command_kafka_stream_group_member_target_task_partitions_describe.go | Implements member target-assignment task partitions describe. |
| internal/kafka/command_kafka_stream_group_member_target_assignment.go | Adds streams-group member-target-assignment command group. |
| internal/kafka/command_kafka_stream_group_member_target_assignment_list.go | Implements member target-assignment task listing. |
| internal/kafka/command_kafka_stream_group_member_target_assignment_describe.go | Implements member target-assignment describe. |
| internal/kafka/command_kafka_stream_group_member_list.go | Implements streams-group member list. |
| internal/kafka/command_kafka_stream_group_member_describe.go | Implements streams-group member describe. |
| internal/kafka/command_kafka_stream_group_member_assignment.go | Adds streams-group member-assignment command group + output struct. |
| internal/kafka/command_kafka_stream_group_member_assignment_task_list.go | Implements member assignment task listing. |
| internal/kafka/command_kafka_stream_group_member_assignment_describe.go | Implements member assignment describe. |
| internal/kafka/command_kafka_stream_group_list.go | Implements streams-group list. |
| internal/kafka/command_kafka_stream_group_describe.go | Implements streams-group describe. |
| internal/kafka/command_consumer_group.go | Adds protocol type field to consumer group output schema. |
| internal/kafka/command_consumer_group_list.go | Populates protocol type in consumer group list. |
| internal/kafka/command_consumer_group_describe.go | Populates protocol type in consumer group describe. |
| go.mod | Adds internal ccloud SDK dependency for streams-groups endpoints. |
| go.sum | Adds checksums for the new internal dependency. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func unwrapDataEnvelope(httpResp *http.Response, target interface{}) error { | ||
| if httpResp == nil || httpResp.Body == nil { | ||
| return nil | ||
| } | ||
| body, err := io.ReadAll(httpResp.Body) |
There was a problem hiding this comment.
unwrapDataEnvelope reads httpResp.Body but never closes it. If the underlying SDK doesn’t fully drain/close the body, this can leak connections; it also consumes the body so subsequent error-parsing/logging can’t re-read it. Consider defer httpResp.Body.Close() (or closing in the caller) and, if the body must remain readable, re-wrapping it with a new io.NopCloser(bytes.NewReader(body)) after ReadAll.
| return res, kafkarest.NewError(c.GetUrlInternal(), err, httpResp) | ||
| } | ||
|
|
||
| func (c *KafkaRestClientInternal) ListKafkaStreamsGroupMemberSubtopologies(groupId string) (kafkarestv3Internal.StreamsGroupSubtopologyDataList, error) { |
There was a problem hiding this comment.
The method name ListKafkaStreamsGroupMemberSubtopologies is misleading: it lists group subtopologies (it calls ListKafkaStreamsGroupSubtopologies) and doesn’t take a member ID. Renaming it to ListKafkaStreamsGroupSubtopologies (and updating call sites) would make the API clearer.
| func (c *KafkaRestClientInternal) ListKafkaStreamsGroupMemberSubtopologies(groupId string) (kafkarestv3Internal.StreamsGroupSubtopologyDataList, error) { | |
| func (c *KafkaRestClientInternal) ListKafkaStreamsGroupSubtopologies(groupId string) (kafkarestv3Internal.StreamsGroupSubtopologyDataList, error) { |
| func (c *KafkaRestClientInternal) GetKafkaStreamGroupSubtopology(groupId, topology string) (kafkarestv3Internal.StreamsGroupSubtopologyData, error) { | ||
| res, httpResp, err := c.StreamsGroupV3Api.GetKafkaStreamsGroupSubtopology(c.kafkaRestApiContextInternal(), c.ClusterId, groupId, topology).Execute() |
There was a problem hiding this comment.
Parameter name topology is ambiguous here; this method fetches a specific subtopology. Renaming the parameter to subtopologyId (or similar) would improve readability and reduce confusion at call sites.
| func (c *KafkaRestClientInternal) GetKafkaStreamGroupSubtopology(groupId, topology string) (kafkarestv3Internal.StreamsGroupSubtopologyData, error) { | |
| res, httpResp, err := c.StreamsGroupV3Api.GetKafkaStreamsGroupSubtopology(c.kafkaRestApiContextInternal(), c.ClusterId, groupId, topology).Execute() | |
| func (c *KafkaRestClientInternal) GetKafkaStreamGroupSubtopology(groupId, subtopologyId string) (kafkarestv3Internal.StreamsGroupSubtopologyData, error) { | |
| res, httpResp, err := c.StreamsGroupV3Api.GetKafkaStreamsGroupSubtopology(c.kafkaRestApiContextInternal(), c.ClusterId, groupId, subtopologyId).Execute() |
| cmd := &cobra.Command{ | ||
| Use: "describe <group>", | ||
| Short: "Describe stream group", | ||
| Args: cobra.ExactArgs(1), | ||
| ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs), |
There was a problem hiding this comment.
This command hits internal Kafka REST endpoints via CloudClientInternal, but unlike the corresponding list commands it has no RequireNonAPIKeyCloudLogin run requirement. If API-key login isn’t supported for these endpoints, this should add the same run requirement to prevent confusing runtime failures.
| cmd := &cobra.Command{ | ||
| Use: "describe <member>", | ||
| Short: "Describe stream group member", | ||
| Args: cobra.ExactArgs(1), | ||
| ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs), | ||
| RunE: c.streamGroupMemberDescribe, |
There was a problem hiding this comment.
ValidArgsFunction here uses validStreamGroupArgs, which autocompletes stream group IDs. For describe <member>, shell completion should suggest member IDs (likely based on --group). Consider adding a dedicated valid-args function that reads the --group flag and calls ListKafkaStreamsGroupMembers to populate member suggestions.
| func (c *streamGroupCommand) newStreamGroupListCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "list", | ||
| Short: "List kafka stream groups.", |
There was a problem hiding this comment.
The Short help text uses lowercase “kafka” here, but other commands use “Kafka”. Consider changing to “List Kafka stream groups.” (and updating the golden help fixtures) for consistency.
| Short: "List kafka stream groups.", | |
| Short: "List Kafka stream groups.", |
| topics, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroup() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return topics.Data, nil |
There was a problem hiding this comment.
Variable name topics is misleading here since this function is listing streams groups, not topics. Renaming it to something like groupsResp/streamsGroups would improve readability.
| topics, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroup() | |
| if err != nil { | |
| return nil, err | |
| } | |
| return topics.Data, nil | |
| groupsResp, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroup() | |
| if err != nil { | |
| return nil, err | |
| } | |
| return groupsResp.Data, nil |
| func (c *streamGroupCommand) newStreamGroupMemberCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "member", | ||
| Short: "Manage Kafka stream groups members.", |
There was a problem hiding this comment.
Help text is grammatically incorrect: “Manage Kafka stream groups members.” should be “Manage Kafka stream group members.” (or similar). Fixing this string will also require updating the related golden help fixture.
| Short: "Manage Kafka stream groups members.", | |
| Short: "Manage Kafka stream group members.", |
| cmd.AddCommand(newShareGroupCommand(prerunner)) | ||
| cmd.AddCommand(newStreamGroupCommand(prerunner)) | ||
| cmd.AddCommand(newTopicCommand(cfg, prerunner)) |
There was a problem hiding this comment.
PR description/release notes mention the new commands under confluent kafka consumer stream-group, but the implementation adds streams-group directly under confluent kafka (sibling of consumer, topic, etc.). Please align the docs/release notes (or the command placement) so users can find the feature.
| IsSimple bool `human:"Simple" serialized:"is_simple"` | ||
| PartitionAssignor string `human:"Partition Assignor" serialized:"partition_assignor"` | ||
| State string `human:"State" serialized:"state"` | ||
| ProtocolType string `human:"Type,omitempty" serialized:"type,omitempty"` | ||
| } |
There was a problem hiding this comment.
omitempty in the human tag is only honored for single-object tables, not list headers/columns (see pkg/output/table.go list header generation). This means the “Type” column will always appear in consumer group list even when empty (as reflected in the updated golden output). If the intent is to omit the column when the API doesn’t return types, consider conditionally filtering the list columns (e.g., list.Filter(...)) when all ProtocolType values are empty.
Release Notes
Breaking Changes
New Features
Typecolumn toconfluent kafka consumer group [ list | describe ]command to display the consumer group protocol type (e.g., CLASSIC, CONSUMER).confluent kafka consumer stream-groupcommand group with support for managingKafka Streams groups (KIP-1071), including the following subcommands:
member,member-assignment,member-target-assignment,member-task-partitions,member-target-assignment-task-partitions, andsubtopologyBug Fixes
Checklist
Whatsection below whether this PR applies to Confluent Cloud, Confluent Platform, or both.Test & Reviewsection below.Blast Radiussection below.What
This PR adds Confluent CLI support for Kafka Streams groups (KIP-1071). It introduces
two changes:
Consumer group protocol type display: The existing
confluent kafka consumer group [ list | describe ]commands now include a Type field showing the group protocol type (CLASSIC, CONSUMER, etc.), bringing the CLI to parity with the Java Admin API and Apache Kafka command-line tools.New stream-group command tree: A new set of subcommands under
confluent kafka consumer stream-groupthat allow users to inspect Kafka Streams groups, their members, task assignments (active/standby/warmup), subtopologies, and task partition details. These commands use the internal Kafka REST v3 streams-groups API endpoints.Blast Radius
Low to none blast radius:
confluent kafka consumer group [ list | describe ]command will see an additional Type column in output. This is additive and non-breaking; the column is omitted when the API does not return a type value.References
JIRA APIE-851 CLI implementation for share groups
Test & Review
APIE-851 kstreams for AK 4.2 - CLI Testing