Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions internal/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,18 @@ import (
"time"

"github.com/alecthomas/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/block/cachew/client"
"github.com/block/cachew/internal/cache"
)

//nolint:gochecknoglobals // OTel tracer instances are package-scoped by convention
var tracer = otel.Tracer("github.com/block/cachew/internal/snapshot")

// Create archives a directory using tar with zstd compression, then uploads to the cache.
//
// The archive preserves all file permissions, ownership, and symlinks.
Expand All @@ -34,7 +41,24 @@ func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory st
// Each entry in includePaths is archived relative to baseDir and must exist.
// Exclude patterns use tar's --exclude syntax.
// threads controls zstd parallelism; 0 uses all available CPU cores.
func CreatePaths(ctx context.Context, remote cache.Cache, key cache.Key, baseDir, archiveName string, includePaths []string, ttl time.Duration, excludePatterns []string, threads int, extraHeaders ...http.Header) error {
func CreatePaths(ctx context.Context, remote cache.Cache, key cache.Key, baseDir, archiveName string, includePaths []string, ttl time.Duration, excludePatterns []string, threads int, extraHeaders ...http.Header) (returnErr error) {
ctx, span := tracer.Start(ctx, "cachew.snapshot.create",
trace.WithAttributes(
attribute.String("cache.key", key.String()),
attribute.String("snapshot.archive_name", archiveName),
attribute.Int("snapshot.include_paths", len(includePaths)),
attribute.Int("snapshot.exclude_patterns", len(excludePatterns)),
attribute.Int("snapshot.threads", threads),
),
)
defer func() {
if returnErr != nil {
span.RecordError(returnErr)
span.SetStatus(codes.Error, returnErr.Error())
}
span.End()
}()

headers := make(http.Header)
headers.Set("Content-Type", "application/zstd")
headers.Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", archiveName+".tar.zst"))
Expand Down Expand Up @@ -71,7 +95,21 @@ func StreamTo(ctx context.Context, w io.Writer, directory string, excludePattern
// all file permissions, ownership, and symlinks.
// The operation is fully streaming - no temporary files are created.
// threads controls zstd parallelism; 0 uses all available CPU cores.
func Restore(ctx context.Context, remote cache.Cache, key cache.Key, directory string, threads int) error {
func Restore(ctx context.Context, remote cache.Cache, key cache.Key, directory string, threads int) (returnErr error) {
ctx, span := tracer.Start(ctx, "cachew.snapshot.restore",
trace.WithAttributes(
attribute.String("cache.key", key.String()),
attribute.Int("snapshot.threads", threads),
),
)
defer func() {
if returnErr != nil {
span.RecordError(returnErr)
span.SetStatus(codes.Error, returnErr.Error())
}
span.End()
}()

rc, _, err := remote.Open(ctx, key)
if err != nil {
return errors.Wrap(err, "failed to open object")
Expand Down
9 changes: 9 additions & 0 deletions internal/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/alecthomas/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/trace"
)

Expand Down Expand Up @@ -41,6 +42,14 @@ func New(ctx context.Context, cfg Config) (stop func(), err error) {
provider := trace.NewTracerProvider(trace.WithBatcher(exporter))
otel.SetTracerProvider(provider)

// Register a W3C trace context + baggage propagator so trace IDs
// flow across HTTP boundaries (otelhttp/otelconnect use the global
// propagator). Without this, every service starts a new trace.
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))

return func() {
_ = provider.Shutdown(context.Background()) //nolint:errcheck // shutdown errors are not actionable
}, nil
Expand Down