Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions internal/gitclone/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -85,6 +86,19 @@ type Repository struct {
refCheckValid bool
fetchSem chan struct{}
credentialProvider CredentialProvider
lastAccessed atomic.Int64
}

func (r *Repository) TouchAccessed() {
r.lastAccessed.Store(time.Now().UnixNano())
}

func (r *Repository) LastAccessed() time.Time {
ns := r.lastAccessed.Load()
if ns == 0 {
return time.Time{}
}
return time.Unix(0, ns)
}

type Manager struct {
Expand Down Expand Up @@ -199,6 +213,7 @@ func (m *Manager) GetOrCreate(_ context.Context, upstreamURL string) (*Repositor
fetchSem: make(chan struct{}, 1),
credentialProvider: m.credentialProvider,
}
repo.lastAccessed.Store(time.Now().UnixNano())

headFile := filepath.Join(clonePath, "HEAD")
if _, err := os.Stat(headFile); err == nil {
Expand Down
19 changes: 17 additions & 2 deletions internal/jobscheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ type Config struct {
SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"`
}

// ErrJobIdle is a sentinel error that a periodic job can return to signal that
// it should not be re-armed. The scheduler treats this as a graceful stop, not
// as a job failure.
var ErrJobIdle = errors.New("job idle")

type queueJob struct {
id string
queue string
Expand Down Expand Up @@ -182,6 +187,12 @@ func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Durati
submit := func() {
q.Submit(queue, id, func(ctx context.Context) error {
err := run(ctx)
// If the job signals idle, stop re-arming so the periodic chain
// ends. The caller is responsible for re-scheduling when activity
// resumes.
if errors.Is(err, ErrJobIdle) {
return err
}
if q.store != nil {
if storeErr := q.store.SetLastRun(key, time.Now()); storeErr != nil {
logging.FromContext(ctx).WarnContext(ctx, "Failed to record job last run", "key", key, "error", storeErr)
Expand Down Expand Up @@ -291,10 +302,14 @@ func (q *RootScheduler) runJob(ctx context.Context, logger *slog.Logger, job que

elapsed := time.Since(start)
status := "success"
if err != nil {
switch {
case errors.Is(err, ErrJobIdle):
status = "idle"
logger.InfoContext(ctx, "Periodic job idled out", "job", job, "elapsed", elapsed)
case err != nil:
status = "error"
logger.ErrorContext(ctx, "Job failed", "job", job, "error", err, "elapsed", elapsed)
} else {
default:
logger.InfoContext(ctx, "Job completed", "job", job, "elapsed", elapsed)
}
jobAttrs = append(jobAttrs, attribute.String("status", status))
Expand Down
28 changes: 28 additions & 0 deletions internal/jobscheduler/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,34 @@ func TestJobSchedulerPeriodicJob(t *testing.T) {
}, "periodic job should execute multiple times")
}

func TestJobSchedulerPeriodicJobStopsOnIdle(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
defer cancel()

scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2})

var executions atomic.Int32

scheduler.SubmitPeriodicJob("queue1", "idle-job", 50*time.Millisecond, func(_ context.Context) error {
n := executions.Add(1)
if n >= 2 {
return jobscheduler.ErrJobIdle
}
return nil
})

// Wait for the idle return to take effect.
eventually(t, time.Second, func() bool { return executions.Load() >= 2 },
"periodic job should fire at least twice before idling out")

after := executions.Load()
// Wait several intervals; no further executions should occur.
time.Sleep(300 * time.Millisecond)
assert.Equal(t, after, executions.Load(),
"periodic job should not re-arm after returning ErrJobIdle")
}

func TestJobSchedulerPeriodicJobWithError(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
Expand Down
35 changes: 35 additions & 0 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Config struct {
RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run full repack. 0 disables." default:"0"`
ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression. 0 = all CPU cores; useful for short-lived CLI invocations but risky on a long-running server where multiple snapshot/restore operations can run concurrently." default:"4"`
BundleCacheTTL time.Duration `hcl:"bundle-cache-ttl,optional" help:"TTL of cached server-side git bundles." default:"2h"`
IdleTimeout time.Duration `hcl:"idle-timeout,optional" help:"Stop periodic jobs for repos with no client requests for this duration. 0 disables." default:"72h"`
}

type Strategy struct {
Expand All @@ -66,6 +67,7 @@ type Strategy struct {
snapshotSpools sync.Map // keyed by upstream URL, values are *snapshotSpoolEntry
coldSnapshotMu sync.Map // keyed by upstream URL, values are *coldSnapshotEntry
deferredRestoreOnce sync.Map // keyed by upstream URL, ensures at most one deferred restore per repo
idledRepos sync.Map // keyed by upstream URL, set when periodic jobs idle out
metrics *gitMetrics
repoCounts *RepoCounts
ready atomic.Bool
Expand Down Expand Up @@ -197,6 +199,38 @@ func (s *Strategy) Ready() bool {
return s.ready.Load()
}

func (s *Strategy) isRepoIdle(repo *gitclone.Repository) bool {
if s.config.IdleTimeout <= 0 {
return false
}
return time.Since(repo.LastAccessed()) > s.config.IdleTimeout
}

func (s *Strategy) withIdleGuard(repo *gitclone.Repository, fn func(ctx context.Context) error) func(ctx context.Context) error {
return func(ctx context.Context) error {
if s.isRepoIdle(repo) {
s.idledRepos.Store(repo.UpstreamURL(), true)
logging.FromContext(ctx).InfoContext(ctx, "Stopping periodic job for idle repo", "upstream", repo.UpstreamURL())
return jobscheduler.ErrJobIdle
}
return fn(ctx)
}
}

func (s *Strategy) touchAndReschedule(repo *gitclone.Repository) {
repo.TouchAccessed()
if _, wasIdle := s.idledRepos.LoadAndDelete(repo.UpstreamURL()); wasIdle {
logging.FromContext(s.ctx).InfoContext(s.ctx, "Re-scheduling periodic jobs for previously idle repo",
"upstream", repo.UpstreamURL())
if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
}
if s.config.RepackInterval > 0 {
s.scheduleRepackJobs(repo)
}
}
}

// SetMetadataStore enables the per-repo clone histogram and schedules its
// daily reaper. Called by config.Load after the metadata backend is built.
func (s *Strategy) SetMetadataStore(store *metadatadb.Store) {
Expand Down Expand Up @@ -332,6 +366,7 @@ func (s *Strategy) handleGitRequest(w http.ResponseWriter, r *http.Request, host
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
s.touchAndReschedule(repo)

// Increment after GetOrCreate so unvalidated URLs can't bloat the keyspace.
if isClone, cerr := RequestIsClone(pathValue, r); cerr != nil {
Expand Down
1 change: 1 addition & 0 deletions internal/strategy/git/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (s *Strategy) handleEnsureRefs(w http.ResponseWriter, r *http.Request, host
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
s.touchAndReschedule(repo)

if repo.State() != gitclone.StateReady {
if err := s.ensureCloneReady(ctx, repo); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/strategy/git/repack.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func (s *Strategy) scheduleRepackJobs(repo *gitclone.Repository) {
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "repack-periodic", s.config.RepackInterval, func(ctx context.Context) error {
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "repack-periodic", s.config.RepackInterval, s.withIdleGuard(repo, func(ctx context.Context) error {
start := time.Now()
err := repo.Repack(ctx)
status := "success"
Expand All @@ -19,5 +19,5 @@ func (s *Strategy) scheduleRepackJobs(repo *gitclone.Repository) {
}
s.metrics.recordOperation(ctx, "repack", status, time.Since(start))
return errors.Wrap(err, "repack")
})
}))
}
31 changes: 18 additions & 13 deletions internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,19 @@ func (s *Strategy) generateAndUploadMirrorSnapshot(ctx context.Context, repo *gi
}

func (s *Strategy) scheduleSnapshotJobs(repo *gitclone.Repository) {
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "snapshot-periodic", s.config.SnapshotInterval, func(ctx context.Context) error {
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "snapshot-periodic", s.config.SnapshotInterval, s.withIdleGuard(repo, func(ctx context.Context) error {
return s.generateAndUploadSnapshot(ctx, repo)
})
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "lfs-snapshot-periodic", s.config.SnapshotInterval, func(ctx context.Context) error {
}))
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "lfs-snapshot-periodic", s.config.SnapshotInterval, s.withIdleGuard(repo, func(ctx context.Context) error {
return s.generateAndUploadLFSSnapshot(ctx, repo)
})
}))
mirrorInterval := s.config.MirrorSnapshotInterval
if mirrorInterval == 0 {
mirrorInterval = s.config.SnapshotInterval
}
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "mirror-snapshot-periodic", mirrorInterval, func(ctx context.Context) error {
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "mirror-snapshot-periodic", mirrorInterval, s.withIdleGuard(repo, func(ctx context.Context) error {
return s.generateAndUploadMirrorSnapshot(ctx, repo)
})
}))
}

func (s *Strategy) snapshotMutexFor(upstreamURL string) *sync.Mutex {
Expand All @@ -202,6 +202,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
s.touchAndReschedule(repo)

cacheKey := snapshotCacheKey(upstreamURL)

Expand Down Expand Up @@ -346,6 +347,7 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
s.touchAndReschedule(repo)
if cloneErr := s.ensureCloneReady(ctx, repo); cloneErr != nil {
logger.ErrorContext(ctx, "Clone unavailable for bundle", "upstream", upstreamURL, "error", cloneErr)
http.Error(w, "Repository unavailable", http.StatusServiceUnavailable)
Expand Down Expand Up @@ -832,13 +834,16 @@ func (s *Strategy) handleLFSSnapshotRequest(w http.ResponseWriter, r *http.Reque
// restore + on-demand generation. Kick off a background mirror warm so
// the periodic LFS snapshot job can fire once the mirror is ready.
logger.InfoContext(ctx, "LFS snapshot cache miss, triggering background warm", "upstream", upstreamURL)
if repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL); repoErr == nil && repo.State() != gitclone.StateReady {
s.scheduler.Submit(upstreamURL, "lfs-mirror-warm", func(ctx context.Context) error {
if err := s.startClone(ctx, repo); err != nil {
logger.WarnContext(ctx, "Background mirror warm for LFS failed", "upstream", upstreamURL, "error", err)
}
return nil
})
if repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL); repoErr == nil {
s.touchAndReschedule(repo)
if repo.State() != gitclone.StateReady {
s.scheduler.Submit(upstreamURL, "lfs-mirror-warm", func(ctx context.Context) error {
if err := s.startClone(ctx, repo); err != nil {
logger.WarnContext(ctx, "Background mirror warm for LFS failed", "upstream", upstreamURL, "error", err)
}
return nil
})
}
}
http.Error(w, "LFS snapshot not found", http.StatusNotFound)
}
Loading