Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions cmd/broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -60,6 +61,11 @@ const (

type handler struct {
apiVersions []kmsg.ApiVersionsResponseApiKey
// nextProducerID is a monotonically-increasing allocator for
// InitProducerID responses (OPS-005 #1 stub). Accessed via sync/atomic.
// Replace with a proper persistent allocator when full idempotent-producer
// dedup lands.
nextProducerID int64
store metadata.Store
s3 storage.S3Client
cache *cache.SegmentCache
Expand Down Expand Up @@ -206,6 +212,20 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
return protocol.EncodeResponse(header.CorrelationID, header.APIVersion, resp), nil
case *kmsg.ProduceRequest:
return h.handleProduce(ctx, header, req.(*kmsg.ProduceRequest))
case *kmsg.InitProducerIDRequest:
// OPS-005 #1: minimum-viable INIT_PRODUCER_ID stub. Allocates a
// monotonically-increasing producer ID with epoch 0. Does NOT track
// sequence numbers or deduplicate (that work is in scope for a later
// broker-engineering sprint). This is sufficient to unblock idempotent
// producers for development + bp-002 BDR convergence. For production
// correctness, full dedup-on-replay is still required.
pid := atomic.AddInt64(&h.nextProducerID, 1)
resp := kmsg.NewPtrInitProducerIDResponse()
resp.ErrorCode = protocol.NONE
resp.ProducerID = pid
resp.ProducerEpoch = 0
resp.ThrottleMillis = 0
return protocol.EncodeResponse(header.CorrelationID, header.APIVersion, resp), nil
case *kmsg.FetchRequest:
return h.handleFetch(ctx, header, req.(*kmsg.FetchRequest))
case *kmsg.FindCoordinatorRequest:
Expand Down Expand Up @@ -2711,7 +2731,15 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey {
{key: protocol.APIKeyOffsetCommit, minVersion: 3, maxVersion: 3},
{key: protocol.APIKeyOffsetFetch, minVersion: 5, maxVersion: 5},
{key: protocol.APIKeyDescribeGroups, minVersion: 5, maxVersion: 5},
{key: protocol.APIKeyListGroups, minVersion: 5, maxVersion: 5},
// OPS-005: Java admin-client negotiates LIST_GROUPS in range [0,4].
// Narrow 5-5 range breaks `kafka-consumer-groups --list`. Widen to 0-5
// — the underlying coordinator handler is version-agnostic.
{key: protocol.APIKeyListGroups, minVersion: 0, maxVersion: 5},
// OPS-005 #1: idempotent-producer init. Handler is a stub that
// allocates a monotonically-increasing producer ID; sequence-number
// dedup is NOT yet implemented. Sufficient to unblock franz-go and
// Java default producers; production correctness gap tracked.
{key: protocol.APIKeyInitProducerID, minVersion: 0, maxVersion: 4},
{key: protocol.APIKeyOffsetForLeaderEpoch, minVersion: 3, maxVersion: 3},
{key: protocol.APIKeyDescribeConfigs, minVersion: 4, maxVersion: 4},
{key: protocol.APIKeyAlterConfigs, minVersion: 1, maxVersion: 1},
Expand All @@ -2722,7 +2750,7 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey {
}
unsupported := []int16{
4, 5, 6, 7,
21, 22,
21, // 22 (InitProducerID) moved to supported — OPS-005 #1 stub handler
24, 25, 26,
}

Expand Down
6 changes: 6 additions & 0 deletions deploy/helm/kafscale-broker-standalone/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
apiVersion: v2
name: kafscale-broker-standalone
description: Minimal standalone KafScale broker (etcd + MinIO external). Smoke chart only.
type: application
version: 0.1.0
appVersion: "v1.5.0"
52 changes: 52 additions & 0 deletions deploy/helm/kafscale-broker-standalone/templates/broker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafscale-broker
namespace: {{ .Release.Namespace }}
labels:
app.kubernetes.io/name: kafscale-broker
app.kubernetes.io/part-of: ops-foundation
spec:
replicas: 1
selector: { matchLabels: { app.kubernetes.io/name: kafscale-broker } }
template:
metadata:
labels:
app.kubernetes.io/name: kafscale-broker
app.kubernetes.io/part-of: ops-foundation
spec:
enableServiceLinks: false
containers:
- name: broker
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
env:
- { name: KAFSCALE_BROKER_ID, value: "0" }
- { name: KAFSCALE_BROKER_ADDR, value: ":9092" }
- { name: KAFSCALE_BROKER_HOST, value: "kafscale-broker" }
- { name: KAFSCALE_BROKER_PORT, value: "9092" }
- { name: KAFSCALE_BROKER_ETCD_ENDPOINTS, value: "{{ .Values.etcdEndpoints }}" }
- { name: KAFSCALE_BROKER_DATA_DIR, value: "/tmp/data" }
- { name: KAFSCALE_BROKER_LOG_LEVEL, value: "info" }
- { name: KAFSCALE_S3_BUCKET, value: "{{ .Values.s3Bucket }}" }
- { name: KAFSCALE_S3_REGION, value: "us-east-1" }
- { name: KAFSCALE_S3_ENDPOINT, value: "{{ .Values.s3Endpoint }}" }
- { name: KAFSCALE_S3_ACCESS_KEY, value: "{{ .Values.s3AccessKey }}" }
- { name: KAFSCALE_S3_SECRET_KEY, value: "{{ .Values.s3SecretKey }}" }
- { name: KAFSCALE_S3_PATH_STYLE, value: "true" }
ports:
- { name: kafka, containerPort: 9092 }
---
apiVersion: v1
kind: Service
metadata:
name: kafscale-broker
namespace: {{ .Release.Namespace }}
labels:
app.kubernetes.io/name: kafscale-broker
spec:
type: {{ .Values.service.type }}
ports:
- { name: kafka, port: {{ .Values.service.port }}, targetPort: 9092 }
selector:
app.kubernetes.io/name: kafscale-broker
17 changes: 17 additions & 0 deletions deploy/helm/kafscale-broker-standalone/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
image:
repository: ghcr.io/kafscale/kafscale-broker
tag: dev
pullPolicy: IfNotPresent

etcdEndpoints: "http://etcd:2379"
s3Endpoint: "http://minio:9000"
s3Bucket: "kafscale"
s3AccessKey: "scalytics"
s3SecretKey: "scalytics"

service:
type: ClusterIP
port: 9092

labels:
app.kubernetes.io/part-of: ops-foundation
1 change: 1 addition & 0 deletions pkg/protocol/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
APIKeyApiVersion int16 = 18
APIKeyCreateTopics int16 = 19
APIKeyDeleteTopics int16 = 20
APIKeyInitProducerID int16 = 22 // OPS-005: idempotent-producer init (stub handler)
APIKeyOffsetForLeaderEpoch int16 = 23
APIKeyDescribeConfigs int16 = 32
APIKeyAlterConfigs int16 = 33
Expand Down
Loading