diff --git a/_docs/lfs-demos.md b/_docs/lfs-demos.md index 7ed2d4d..8ee84df 100644 --- a/_docs/lfs-demos.md +++ b/_docs/lfs-demos.md @@ -214,7 +214,7 @@ make e72-browser-demo # local with port-forward make e72-browser-demo-k8s # in-cluster deployment ``` -Features drag-and-drop upload, real-time progress, SHA-256 verification, presigned URL download, and an inline video player for MP4 content. +Features drag-and-drop upload, real-time progress, SHA-256 verification, proxy-verified download, and an inline video player for MP4 content. --- diff --git a/_docs/lfs-helm.md b/_docs/lfs-helm.md index 150bc50..b9d91f6 100644 --- a/_docs/lfs-helm.md +++ b/_docs/lfs-helm.md @@ -63,9 +63,12 @@ lfsProxy: repository: ghcr.io/kafscale/kafscale-lfs-proxy tag: latest - # S3 backend + # S3 backend. + # bucket and region are REQUIRED; the proxy fails to start if either is empty. + # The bucket name `kafscale-lfs` is permanently blocklisted at startup + # (security fix / PR #139). Use your own name. s3: - bucket: kafscale + bucket: my-bucket region: us-east-1 endpoint: "" # Custom endpoint for MinIO pathStyle: false # Set true for MinIO diff --git a/_docs/lfs-proxy.md b/_docs/lfs-proxy.md index 500366d..421aeb5 100644 --- a/_docs/lfs-proxy.md +++ b/_docs/lfs-proxy.md @@ -40,20 +40,21 @@ Producer ──▶ LFS Proxy ──▶ S3 (blob) Kafka (pointer envelope) │ ▼ -Consumer ◀── LFS SDK ──▶ S3 (fetch blob) +Consumer ◀── LFS SDK ──▶ S3 (direct fetch + local checksum) +Consumer ◀── HTTP client ──▶ LFS Proxy ──▶ S3 (verified stream) ``` 1. **Write path (Kafka protocol)**: The proxy intercepts Produce requests. Records tagged with an `LFS_BLOB` header are rewritten: the payload is uploaded to S3 and the Kafka record is replaced with a JSON envelope containing the S3 key, checksum, and content type. -2. **Write path (HTTP API)**: Clients can also upload files via the REST API (`POST /v1/topics/{topic}/records`). The proxy uploads the file to S3 and publishes the envelope to Kafka in one operation. +2. **Write path (HTTP API)**: Clients can also upload files via the REST API (`POST /lfs/produce` or the multipart upload session endpoints under `/lfs/uploads/...`). The proxy uploads the file to S3 and publishes the envelope to Kafka in one operation. See the OpenAPI spec at `cmd/proxy/openapi.yaml` for full schema. -3. **Read path**: Consumer SDKs (Go, Java, Python, JS) detect LFS envelopes and transparently fetch the original object from S3. +3. **Read path**: Consumer SDKs (Go, Java, Python, JS) detect LFS envelopes and can fetch the object directly from S3 while validating the envelope checksum locally. Clients that want the proxy to enforce the trust boundary call `POST /lfs/download`; the proxy verifies the envelope-recorded SHA-256 against the bytes returned from S3 **before** delivering them to the client (see [Trust model and integrity verification](#trust-model-and-integrity-verification) below). ## Key features - **Transparent Kafka proxy** — existing producers work without code changes by adding an `LFS_BLOB` header - **HTTP upload API** — RESTful endpoint for browser and SDK uploads with OpenAPI spec -- **Checksum verification** — SHA-256, CRC-32, or MD5 integrity checks on upload and download +- **Checksum verification** — upload checksum support plus server-side SHA-256 verification for proxy-streamed downloads - **TLS and SASL** — full TLS support for HTTP endpoints and SASL/SCRAM for Kafka backend - **Prometheus metrics** — upload/download counters, latencies, S3 operation histograms - **CORS support** — configurable cross-origin headers for browser-based uploads @@ -66,39 +67,92 @@ Consumer ◀── LFS SDK ──▶ S3 (fetch blob) S3 objects are stored under a deterministic key: ``` -lfs/{namespace}/{topic}/{partition}/{offset}-{uuid}.bin +{namespace}/{topic}/lfs/{yyyy}/{mm}/{dd}/obj-{uuid} ``` ### Envelope format ```json { - "lfs_version": 1, - "s3_bucket": "kafscale", - "s3_key": "lfs/default/demo-topic/0/42-abc123.bin", + "kfs_lfs": 1, + "bucket": "my-bucket", + "key": "default/demo-topic/lfs/2026/02/05/obj-abc123", + "size": 10485760, + "sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", "content_type": "application/octet-stream", - "content_length": 10485760, - "checksum_algo": "sha256", - "checksum": "e3b0c44298fc1c14..." + "created_at": "2026-02-05T10:30:00Z", + "proxy_id": "lfs-proxy-0" } ``` +## Trust model and integrity verification + +> **Kafka is the authority. S3 is untrusted storage.** + +The envelope lives in Kafka and carries the authoritative SHA-256 checksum recorded at upload time. The proxy treats the S3 object as untrusted on the download path: it reads the bytes into temporary storage, verifies their SHA-256 against the envelope-supplied checksum, and **only then** streams the verified bytes to the client (`200 OK` with `Content-Length` set to the verified size). On mismatch — or if S3 returns more bytes than the envelope declares — the proxy returns `502` with `code: integrity_failure` and **no payload bytes ever reach the client**. + +This design holds across HTTP/1.1, HTTP/2, every HTTP client library (Go, Java, Python `requests`, JavaScript `fetch`, `curl --output`), and every HTTP intermediary (nginx-ingress, ALB, CDN). No framing tricks, no trailers, no connection-abort signalling. + +### Stream-mode download request + +```bash +curl -X POST http://localhost:8080/lfs/download \ + -H "X-API-Key: $KAFSCALE_LFS_PROXY_HTTP_API_KEY" \ + -H "Content-Type: application/json" \ + -d '{ + "bucket": "my-bucket", + "key": "default/demo-topic/lfs/2026/02/05/obj-abc123", + "mode": "stream", + "integrity": { + "sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + "checksum_alg": "sha256", + "size": 10485760 + } + }' \ + -o downloaded-blob.bin +``` + +Both `integrity.sha256` AND `integrity.size` are **required** on stream-mode requests — the size enables a hard cap on the S3 read so a compromised bucket cannot exhaust proxy temporary storage. Clients should copy both values from the Kafka envelope. + +### Presign-mode download (off by default) + +`mode: presign` returns a time-limited URL the client uses to fetch the object directly from S3. The proxy does **no** integrity verification on this path — the client is responsible for hashing the downloaded bytes against the `integrity` block echoed in the response. Disabled by default; enable per-deployment by setting `KAFSCALE_LFS_PROXY_PRESIGN_ENABLED=true`. + +### Error codes on `/lfs/download` + +| HTTP | `code` | Meaning | +|---|---|---| +| 400 | `missing_integrity` | `integrity.sha256` was not supplied | +| 400 | `invalid_integrity` | `integrity.sha256` is not a 64-character hex digest, or `integrity.size` is negative | +| 400 | `missing_integrity_size` | stream mode requires `integrity.size` | +| 400 | `payload_too_large` | `integrity.size` exceeds `KAFSCALE_LFS_PROXY_MAX_BLOB_SIZE` or cannot be verified safely | +| 400 | `presign_disabled` | presign mode requested but operator did not opt in | +| 400 | `unsupported_checksum_alg` | only `sha256` is accepted | +| 500 | `temp_storage_failed` | temporary verification storage is unavailable or full | +| 502 | `integrity_failure` | SHA-256 mismatch or S3 returned more bytes than declared | +| 502 | `s3_get_failed` | S3 read failed | + ## Configuration -The LFS proxy is configured via environment variables or CLI flags: +The LFS proxy is configured via environment variables. All variables are prefixed `KAFSCALE_LFS_PROXY_`. | Variable | Default | Description | |---|---|---| -| `LFS_S3_BUCKET` | `kafscale` | S3 bucket for blob storage | -| `LFS_S3_REGION` | `us-east-1` | S3 region | -| `LFS_S3_ENDPOINT` | — | Custom S3 endpoint (for MinIO) | -| `LFS_S3_PATH_STYLE` | `false` | Use path-style S3 addressing | -| `LFS_KAFKA_BROKERS` | `localhost:9092` | Kafka bootstrap servers | -| `LFS_HTTP_ADDR` | `:8080` | HTTP API listen address | -| `LFS_METRICS_ADDR` | `:9095` | Prometheus metrics listen address | -| `LFS_CHECKSUM_ALGO` | `sha256` | Checksum algorithm (`sha256`, `crc32`, `md5`) | -| `LFS_MAX_UPLOAD_SIZE` | `0` (unlimited) | Maximum upload size in bytes | -| `LFS_CORS_ORIGINS` | `*` | Allowed CORS origins | +| `KAFSCALE_LFS_PROXY_S3_BUCKET` | **required** | S3 bucket for blob storage. The bucket name `kafscale-lfs` is permanently blocklisted at startup (security fix / PR #139). Use your own name. | +| `KAFSCALE_LFS_PROXY_S3_REGION` | **required** | S3 region | +| `KAFSCALE_LFS_PROXY_S3_ENDPOINT` | — | Custom S3 endpoint (for MinIO or non-AWS S3) | +| `KAFSCALE_LFS_PROXY_S3_FORCE_PATH_STYLE` | auto | Use path-style S3 addressing (defaults true when endpoint is set) | +| `KAFSCALE_LFS_PROXY_S3_ACCESS_KEY` | — | S3 access key (or use IAM role / instance profile) | +| `KAFSCALE_LFS_PROXY_S3_SECRET_KEY` | — | S3 secret key | +| `KAFSCALE_LFS_PROXY_S3_SESSION_TOKEN` | — | S3 session token (for STS) | +| `KAFSCALE_LFS_PROXY_S3_PUBLIC_ENDPOINT` | — | Endpoint advertised in presigned URLs (for split-network deployments) | +| `KAFSCALE_LFS_PROXY_S3_ENSURE_BUCKET` | `false` | Create the bucket on startup if it doesn't exist | +| `KAFSCALE_LFS_PROXY_MAX_BLOB_SIZE` | `5368709120` (5 GiB) | Upper bound on per-object size for both uploads and downloads. Download requests with `integrity.size` larger than this are rejected with `payload_too_large`. | +| `KAFSCALE_LFS_PROXY_CHUNK_SIZE` | `5242880` (5 MiB) | Multipart upload chunk size | +| `KAFSCALE_LFS_PROXY_CHECKSUM_ALGO` | `sha256` | Checksum algorithm (only `sha256` is currently honored by the integrity-verification download path) | +| `KAFSCALE_LFS_PROXY_HTTP_API_KEY` | — | If set, required as `X-API-Key:` or `Authorization: Bearer ...` on HTTP requests | +| `KAFSCALE_LFS_PROXY_PRESIGN_ENABLED` | `false` | Opt-in to presigned-URL download mode | +| `KAFSCALE_LFS_PROXY_ID` | hostname | Proxy instance identifier (in ops-tracker events) | ## Quick start @@ -106,8 +160,10 @@ The LFS proxy is configured via environment variables or CLI flags: # Start MinIO + broker + LFS proxy locally make lfs-demo -# Upload a file via HTTP -curl -X POST http://localhost:8080/v1/topics/demo-topic/records \ +# Upload a file via the HTTP API +curl -X POST http://localhost:8080/lfs/produce \ + -H "X-API-Key: $KAFSCALE_LFS_PROXY_HTTP_API_KEY" \ + -F "topic=demo-topic" \ -F "file=@large-file.bin" # Consume the envelope diff --git a/_docs/lfs-sdks.md b/_docs/lfs-sdks.md index d7f64d3..6cf12fc 100644 --- a/_docs/lfs-sdks.md +++ b/_docs/lfs-sdks.md @@ -27,7 +27,7 @@ limitations under the License. # LFS Client SDKs -KafScale provides LFS client SDKs in four languages. Each SDK handles envelope encoding/decoding, HTTP upload to the LFS proxy, and transparent S3 object resolution. +KafScale provides LFS client SDKs in four languages. Each SDK handles envelope encoding/decoding, HTTP upload to the LFS proxy, and S3 object resolution with checksum validation. ## Go (built-in) @@ -36,21 +36,24 @@ The Go SDK lives in `pkg/lfs/` and is used internally by the LFS proxy, console, ```go import "github.com/KafScale/platform/pkg/lfs" -// Produce a large file -producer := lfs.NewProducer(lfs.ProducerConfig{ - ProxyAddr: "http://localhost:8080", - Topic: "demo-topic", -}) -err := producer.Upload(ctx, "large-file.bin", fileReader) +// Produce a large file through the LFS proxy HTTP API. +producer := lfs.NewProducer("http://localhost:8080") +result, err := producer.Produce(ctx, "demo-topic", "large-file.bin", fileReader) -// Consume and resolve -consumer := lfs.NewConsumer(lfs.ConsumerConfig{ - S3Bucket: "kafscale", - S3Endpoint: "http://localhost:9000", +// Resolve a consumed envelope directly from S3 with local checksum validation. +s3Client, err := lfs.NewS3Client(ctx, lfs.S3Config{ + Bucket: "my-bucket", + Region: "us-east-1", + Endpoint: "http://localhost:9000", // optional, for MinIO + ForcePathStyle: true, // optional, for MinIO }) -reader, err := consumer.Resolve(ctx, envelope) +consumer := lfs.NewConsumer(s3Client) +envelopeBytes, _ := lfs.EncodeEnvelope(result.Envelope) +_, payload, err := consumer.Unwrap(ctx, envelopeBytes) ``` +> **Trust model:** Kafka is authoritative; S3 is untrusted storage. Current SDK resolvers fetch directly from S3 and validate the envelope checksum locally. HTTP clients that use `POST /lfs/download` must pass `integrity.sha256` AND `integrity.size` from the envelope so the proxy can verify before streaming. See the [LFS Proxy doc](/lfs-proxy/#trust-model-and-integrity-verification) for the full design. + ## Java Maven-based SDK with retry/backoff and configurable HTTP timeouts.