diff --git a/cmd/broker/main.go b/cmd/broker/main.go index c1ba108..a58aaf5 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -28,6 +28,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -60,6 +61,11 @@ const ( type handler struct { apiVersions []kmsg.ApiVersionsResponseApiKey + // nextProducerID is a monotonically-increasing allocator for + // InitProducerID responses (OPS-005 #1 stub). Accessed via sync/atomic. + // Replace with a proper persistent allocator when full idempotent-producer + // dedup lands. + nextProducerID int64 store metadata.Store s3 storage.S3Client cache *cache.SegmentCache @@ -206,6 +212,20 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re return protocol.EncodeResponse(header.CorrelationID, header.APIVersion, resp), nil case *kmsg.ProduceRequest: return h.handleProduce(ctx, header, req.(*kmsg.ProduceRequest)) + case *kmsg.InitProducerIDRequest: + // OPS-005 #1: minimum-viable INIT_PRODUCER_ID stub. Allocates a + // monotonically-increasing producer ID with epoch 0. Does NOT track + // sequence numbers or deduplicate (that work is in scope for a later + // broker-engineering sprint). This is sufficient to unblock idempotent + // producers for development + bp-002 BDR convergence. For production + // correctness, full dedup-on-replay is still required. + pid := atomic.AddInt64(&h.nextProducerID, 1) + resp := kmsg.NewPtrInitProducerIDResponse() + resp.ErrorCode = protocol.NONE + resp.ProducerID = pid + resp.ProducerEpoch = 0 + resp.ThrottleMillis = 0 + return protocol.EncodeResponse(header.CorrelationID, header.APIVersion, resp), nil case *kmsg.FetchRequest: return h.handleFetch(ctx, header, req.(*kmsg.FetchRequest)) case *kmsg.FindCoordinatorRequest: @@ -2711,7 +2731,15 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey { {key: protocol.APIKeyOffsetCommit, minVersion: 3, maxVersion: 3}, {key: protocol.APIKeyOffsetFetch, minVersion: 5, maxVersion: 5}, {key: protocol.APIKeyDescribeGroups, minVersion: 5, maxVersion: 5}, - {key: protocol.APIKeyListGroups, minVersion: 5, maxVersion: 5}, + // OPS-005: Java admin-client negotiates LIST_GROUPS in range [0,4]. + // Narrow 5-5 range breaks `kafka-consumer-groups --list`. Widen to 0-5 + // — the underlying coordinator handler is version-agnostic. + {key: protocol.APIKeyListGroups, minVersion: 0, maxVersion: 5}, + // OPS-005 #1: idempotent-producer init. Handler is a stub that + // allocates a monotonically-increasing producer ID; sequence-number + // dedup is NOT yet implemented. Sufficient to unblock franz-go and + // Java default producers; production correctness gap tracked. + {key: protocol.APIKeyInitProducerID, minVersion: 0, maxVersion: 4}, {key: protocol.APIKeyOffsetForLeaderEpoch, minVersion: 3, maxVersion: 3}, {key: protocol.APIKeyDescribeConfigs, minVersion: 4, maxVersion: 4}, {key: protocol.APIKeyAlterConfigs, minVersion: 1, maxVersion: 1}, @@ -2722,7 +2750,7 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey { } unsupported := []int16{ 4, 5, 6, 7, - 21, 22, + 21, // 22 (InitProducerID) moved to supported — OPS-005 #1 stub handler 24, 25, 26, } diff --git a/deploy/helm/kafscale-broker-standalone/Chart.yaml b/deploy/helm/kafscale-broker-standalone/Chart.yaml new file mode 100644 index 0000000..b5dc1b5 --- /dev/null +++ b/deploy/helm/kafscale-broker-standalone/Chart.yaml @@ -0,0 +1,6 @@ +apiVersion: v2 +name: kafscale-broker-standalone +description: Minimal standalone KafScale broker (etcd + MinIO external). Smoke chart only. +type: application +version: 0.1.0 +appVersion: "v1.5.0" diff --git a/deploy/helm/kafscale-broker-standalone/templates/broker.yaml b/deploy/helm/kafscale-broker-standalone/templates/broker.yaml new file mode 100644 index 0000000..9aa0f5e --- /dev/null +++ b/deploy/helm/kafscale-broker-standalone/templates/broker.yaml @@ -0,0 +1,52 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kafscale-broker + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: kafscale-broker + app.kubernetes.io/part-of: ops-foundation +spec: + replicas: 1 + selector: { matchLabels: { app.kubernetes.io/name: kafscale-broker } } + template: + metadata: + labels: + app.kubernetes.io/name: kafscale-broker + app.kubernetes.io/part-of: ops-foundation + spec: + enableServiceLinks: false + containers: + - name: broker + image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + env: + - { name: KAFSCALE_BROKER_ID, value: "0" } + - { name: KAFSCALE_BROKER_ADDR, value: ":9092" } + - { name: KAFSCALE_BROKER_HOST, value: "kafscale-broker" } + - { name: KAFSCALE_BROKER_PORT, value: "9092" } + - { name: KAFSCALE_BROKER_ETCD_ENDPOINTS, value: "{{ .Values.etcdEndpoints }}" } + - { name: KAFSCALE_BROKER_DATA_DIR, value: "/tmp/data" } + - { name: KAFSCALE_BROKER_LOG_LEVEL, value: "info" } + - { name: KAFSCALE_S3_BUCKET, value: "{{ .Values.s3Bucket }}" } + - { name: KAFSCALE_S3_REGION, value: "us-east-1" } + - { name: KAFSCALE_S3_ENDPOINT, value: "{{ .Values.s3Endpoint }}" } + - { name: KAFSCALE_S3_ACCESS_KEY, value: "{{ .Values.s3AccessKey }}" } + - { name: KAFSCALE_S3_SECRET_KEY, value: "{{ .Values.s3SecretKey }}" } + - { name: KAFSCALE_S3_PATH_STYLE, value: "true" } + ports: + - { name: kafka, containerPort: 9092 } +--- +apiVersion: v1 +kind: Service +metadata: + name: kafscale-broker + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: kafscale-broker +spec: + type: {{ .Values.service.type }} + ports: + - { name: kafka, port: {{ .Values.service.port }}, targetPort: 9092 } + selector: + app.kubernetes.io/name: kafscale-broker diff --git a/deploy/helm/kafscale-broker-standalone/values.yaml b/deploy/helm/kafscale-broker-standalone/values.yaml new file mode 100644 index 0000000..635f164 --- /dev/null +++ b/deploy/helm/kafscale-broker-standalone/values.yaml @@ -0,0 +1,17 @@ +image: + repository: ghcr.io/kafscale/kafscale-broker + tag: dev + pullPolicy: IfNotPresent + +etcdEndpoints: "http://etcd:2379" +s3Endpoint: "http://minio:9000" +s3Bucket: "kafscale" +s3AccessKey: "scalytics" +s3SecretKey: "scalytics" + +service: + type: ClusterIP + port: 9092 + +labels: + app.kubernetes.io/part-of: ops-foundation diff --git a/pkg/protocol/api.go b/pkg/protocol/api.go index b237578..f4d81af 100644 --- a/pkg/protocol/api.go +++ b/pkg/protocol/api.go @@ -33,6 +33,7 @@ const ( APIKeyApiVersion int16 = 18 APIKeyCreateTopics int16 = 19 APIKeyDeleteTopics int16 = 20 + APIKeyInitProducerID int16 = 22 // OPS-005: idempotent-producer init (stub handler) APIKeyOffsetForLeaderEpoch int16 = 23 APIKeyDescribeConfigs int16 = 32 APIKeyAlterConfigs int16 = 33