diff --git a/internal/gitclone/manager.go b/internal/gitclone/manager.go index 90a2bb3..c01de69 100644 --- a/internal/gitclone/manager.go +++ b/internal/gitclone/manager.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -85,6 +86,19 @@ type Repository struct { refCheckValid bool fetchSem chan struct{} credentialProvider CredentialProvider + lastAccessed atomic.Int64 +} + +func (r *Repository) TouchAccessed() { + r.lastAccessed.Store(time.Now().UnixNano()) +} + +func (r *Repository) LastAccessed() time.Time { + ns := r.lastAccessed.Load() + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) } type Manager struct { @@ -199,6 +213,7 @@ func (m *Manager) GetOrCreate(_ context.Context, upstreamURL string) (*Repositor fetchSem: make(chan struct{}, 1), credentialProvider: m.credentialProvider, } + repo.lastAccessed.Store(time.Now().UnixNano()) headFile := filepath.Join(clonePath, "HEAD") if _, err := os.Stat(headFile); err == nil { diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index 123b08c..7de09b4 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -22,6 +22,11 @@ type Config struct { SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"` } +// ErrJobIdle is a sentinel error that a periodic job can return to signal that +// it should not be re-armed. The scheduler treats this as a graceful stop, not +// as a job failure. +var ErrJobIdle = errors.New("job idle") + type queueJob struct { id string queue string @@ -182,6 +187,12 @@ func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Durati submit := func() { q.Submit(queue, id, func(ctx context.Context) error { err := run(ctx) + // If the job signals idle, stop re-arming so the periodic chain + // ends. The caller is responsible for re-scheduling when activity + // resumes. + if errors.Is(err, ErrJobIdle) { + return err + } if q.store != nil { if storeErr := q.store.SetLastRun(key, time.Now()); storeErr != nil { logging.FromContext(ctx).WarnContext(ctx, "Failed to record job last run", "key", key, "error", storeErr) @@ -291,10 +302,14 @@ func (q *RootScheduler) runJob(ctx context.Context, logger *slog.Logger, job que elapsed := time.Since(start) status := "success" - if err != nil { + switch { + case errors.Is(err, ErrJobIdle): + status = "idle" + logger.InfoContext(ctx, "Periodic job idled out", "job", job, "elapsed", elapsed) + case err != nil: status = "error" logger.ErrorContext(ctx, "Job failed", "job", job, "error", err, "elapsed", elapsed) - } else { + default: logger.InfoContext(ctx, "Job completed", "job", job, "elapsed", elapsed) } jobAttrs = append(jobAttrs, attribute.String("status", status)) diff --git a/internal/jobscheduler/jobs_test.go b/internal/jobscheduler/jobs_test.go index 75516b4..19ae5d7 100644 --- a/internal/jobscheduler/jobs_test.go +++ b/internal/jobscheduler/jobs_test.go @@ -263,6 +263,34 @@ func TestJobSchedulerPeriodicJob(t *testing.T) { }, "periodic job should execute multiple times") } +func TestJobSchedulerPeriodicJobStopsOnIdle(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2}) + + var executions atomic.Int32 + + scheduler.SubmitPeriodicJob("queue1", "idle-job", 50*time.Millisecond, func(_ context.Context) error { + n := executions.Add(1) + if n >= 2 { + return jobscheduler.ErrJobIdle + } + return nil + }) + + // Wait for the idle return to take effect. + eventually(t, time.Second, func() bool { return executions.Load() >= 2 }, + "periodic job should fire at least twice before idling out") + + after := executions.Load() + // Wait several intervals; no further executions should occur. + time.Sleep(300 * time.Millisecond) + assert.Equal(t, after, executions.Load(), + "periodic job should not re-arm after returning ErrJobIdle") +} + func TestJobSchedulerPeriodicJobWithError(t *testing.T) { _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) ctx, cancel := context.WithCancel(ctx) diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index 261b3d3..b2a9d50 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -49,6 +49,7 @@ type Config struct { RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run full repack. 0 disables." default:"0"` ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression. 0 = all CPU cores; useful for short-lived CLI invocations but risky on a long-running server where multiple snapshot/restore operations can run concurrently." default:"4"` BundleCacheTTL time.Duration `hcl:"bundle-cache-ttl,optional" help:"TTL of cached server-side git bundles." default:"2h"` + IdleTimeout time.Duration `hcl:"idle-timeout,optional" help:"Stop periodic jobs for repos with no client requests for this duration. 0 disables." default:"72h"` } type Strategy struct { @@ -66,6 +67,7 @@ type Strategy struct { snapshotSpools sync.Map // keyed by upstream URL, values are *snapshotSpoolEntry coldSnapshotMu sync.Map // keyed by upstream URL, values are *coldSnapshotEntry deferredRestoreOnce sync.Map // keyed by upstream URL, ensures at most one deferred restore per repo + idledRepos sync.Map // keyed by upstream URL, set when periodic jobs idle out metrics *gitMetrics repoCounts *RepoCounts ready atomic.Bool @@ -197,6 +199,38 @@ func (s *Strategy) Ready() bool { return s.ready.Load() } +func (s *Strategy) isRepoIdle(repo *gitclone.Repository) bool { + if s.config.IdleTimeout <= 0 { + return false + } + return time.Since(repo.LastAccessed()) > s.config.IdleTimeout +} + +func (s *Strategy) withIdleGuard(repo *gitclone.Repository, fn func(ctx context.Context) error) func(ctx context.Context) error { + return func(ctx context.Context) error { + if s.isRepoIdle(repo) { + s.idledRepos.Store(repo.UpstreamURL(), true) + logging.FromContext(ctx).InfoContext(ctx, "Stopping periodic job for idle repo", "upstream", repo.UpstreamURL()) + return jobscheduler.ErrJobIdle + } + return fn(ctx) + } +} + +func (s *Strategy) touchAndReschedule(repo *gitclone.Repository) { + repo.TouchAccessed() + if _, wasIdle := s.idledRepos.LoadAndDelete(repo.UpstreamURL()); wasIdle { + logging.FromContext(s.ctx).InfoContext(s.ctx, "Re-scheduling periodic jobs for previously idle repo", + "upstream", repo.UpstreamURL()) + if s.config.SnapshotInterval > 0 { + s.scheduleSnapshotJobs(repo) + } + if s.config.RepackInterval > 0 { + s.scheduleRepackJobs(repo) + } + } +} + // SetMetadataStore enables the per-repo clone histogram and schedules its // daily reaper. Called by config.Load after the metadata backend is built. func (s *Strategy) SetMetadataStore(store *metadatadb.Store) { @@ -332,6 +366,7 @@ func (s *Strategy) handleGitRequest(w http.ResponseWriter, r *http.Request, host http.Error(w, "Internal server error", http.StatusInternalServerError) return } + s.touchAndReschedule(repo) // Increment after GetOrCreate so unvalidated URLs can't bloat the keyspace. if isClone, cerr := RequestIsClone(pathValue, r); cerr != nil { diff --git a/internal/strategy/git/refs.go b/internal/strategy/git/refs.go index e8335a7..a13778e 100644 --- a/internal/strategy/git/refs.go +++ b/internal/strategy/git/refs.go @@ -72,6 +72,7 @@ func (s *Strategy) handleEnsureRefs(w http.ResponseWriter, r *http.Request, host http.Error(w, "internal server error", http.StatusInternalServerError) return } + s.touchAndReschedule(repo) if repo.State() != gitclone.StateReady { if err := s.ensureCloneReady(ctx, repo); err != nil { diff --git a/internal/strategy/git/repack.go b/internal/strategy/git/repack.go index 51b8efc..3a6ab29 100644 --- a/internal/strategy/git/repack.go +++ b/internal/strategy/git/repack.go @@ -10,7 +10,7 @@ import ( ) func (s *Strategy) scheduleRepackJobs(repo *gitclone.Repository) { - s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "repack-periodic", s.config.RepackInterval, func(ctx context.Context) error { + s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "repack-periodic", s.config.RepackInterval, s.withIdleGuard(repo, func(ctx context.Context) error { start := time.Now() err := repo.Repack(ctx) status := "success" @@ -19,5 +19,5 @@ func (s *Strategy) scheduleRepackJobs(repo *gitclone.Repository) { } s.metrics.recordOperation(ctx, "repack", status, time.Since(start)) return errors.Wrap(err, "repack") - }) + })) } diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 926436c..e7ab5d8 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -168,19 +168,19 @@ func (s *Strategy) generateAndUploadMirrorSnapshot(ctx context.Context, repo *gi } func (s *Strategy) scheduleSnapshotJobs(repo *gitclone.Repository) { - s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "snapshot-periodic", s.config.SnapshotInterval, func(ctx context.Context) error { + s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "snapshot-periodic", s.config.SnapshotInterval, s.withIdleGuard(repo, func(ctx context.Context) error { return s.generateAndUploadSnapshot(ctx, repo) - }) - s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "lfs-snapshot-periodic", s.config.SnapshotInterval, func(ctx context.Context) error { + })) + s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "lfs-snapshot-periodic", s.config.SnapshotInterval, s.withIdleGuard(repo, func(ctx context.Context) error { return s.generateAndUploadLFSSnapshot(ctx, repo) - }) + })) mirrorInterval := s.config.MirrorSnapshotInterval if mirrorInterval == 0 { mirrorInterval = s.config.SnapshotInterval } - s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "mirror-snapshot-periodic", mirrorInterval, func(ctx context.Context) error { + s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "mirror-snapshot-periodic", mirrorInterval, s.withIdleGuard(repo, func(ctx context.Context) error { return s.generateAndUploadMirrorSnapshot(ctx, repo) - }) + })) } func (s *Strategy) snapshotMutexFor(upstreamURL string) *sync.Mutex { @@ -202,6 +202,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, http.Error(w, "Internal server error", http.StatusInternalServerError) return } + s.touchAndReschedule(repo) cacheKey := snapshotCacheKey(upstreamURL) @@ -346,6 +347,7 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h http.Error(w, "Internal server error", http.StatusInternalServerError) return } + s.touchAndReschedule(repo) if cloneErr := s.ensureCloneReady(ctx, repo); cloneErr != nil { logger.ErrorContext(ctx, "Clone unavailable for bundle", "upstream", upstreamURL, "error", cloneErr) http.Error(w, "Repository unavailable", http.StatusServiceUnavailable) @@ -832,13 +834,16 @@ func (s *Strategy) handleLFSSnapshotRequest(w http.ResponseWriter, r *http.Reque // restore + on-demand generation. Kick off a background mirror warm so // the periodic LFS snapshot job can fire once the mirror is ready. logger.InfoContext(ctx, "LFS snapshot cache miss, triggering background warm", "upstream", upstreamURL) - if repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL); repoErr == nil && repo.State() != gitclone.StateReady { - s.scheduler.Submit(upstreamURL, "lfs-mirror-warm", func(ctx context.Context) error { - if err := s.startClone(ctx, repo); err != nil { - logger.WarnContext(ctx, "Background mirror warm for LFS failed", "upstream", upstreamURL, "error", err) - } - return nil - }) + if repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL); repoErr == nil { + s.touchAndReschedule(repo) + if repo.State() != gitclone.StateReady { + s.scheduler.Submit(upstreamURL, "lfs-mirror-warm", func(ctx context.Context) error { + if err := s.startClone(ctx, repo); err != nil { + logger.WarnContext(ctx, "Background mirror warm for LFS failed", "upstream", upstreamURL, "error", err) + } + return nil + }) + } } http.Error(w, "LFS snapshot not found", http.StatusNotFound) }