diff --git a/packages/das/src/queue/constants.ts b/packages/das/src/queue/constants.ts index 04046ea..730f994 100644 --- a/packages/das/src/queue/constants.ts +++ b/packages/das/src/queue/constants.ts @@ -5,10 +5,16 @@ export const FETCH_JOBS = { PR_FILES: "fetch-pr-files", BACKFILL_REPO: "backfill-repo", ISSUE_CLOSURE: "fetch-issue-closure", + RECONCILE_REGISTRY: "reconcile-registry", } as const; export const DEFAULT_BACKFILL_DAYS = 40; +export const REGISTRY_RECONCILE_CRON = "0 */2 * * *"; + +export const MASTER_REPOSITORIES_URL = + "https://raw.githubusercontent.com/entrius/gittensor/test/gittensor/validator/weights/master_repositories.json"; + export function prFilesJobId( repoFullName: string, prNumber: number, diff --git a/packages/das/src/queue/fetch.processor.ts b/packages/das/src/queue/fetch.processor.ts index 08e1e6d..48c32c1 100644 --- a/packages/das/src/queue/fetch.processor.ts +++ b/packages/das/src/queue/fetch.processor.ts @@ -5,6 +5,7 @@ import { IsNull, Repository } from "typeorm"; import { Job, Queue } from "bullmq"; import { Issue, PullRequest } from "../entities"; import { GitHubFetcherService } from "../webhook/github-fetcher.service"; +import { RegistryReconcilerService } from "./registry-reconciler.service"; import { FETCH_QUEUE, FETCH_JOBS, @@ -57,6 +58,7 @@ export class FetchProcessor extends WorkerHost { private readonly issueRepo: Repository, @InjectQueue(FETCH_QUEUE) private readonly fetchQueue: Queue, + private readonly registryReconciler: RegistryReconcilerService, ) { super(); } @@ -82,6 +84,10 @@ export class FetchProcessor extends WorkerHost { await this.handleIssueClosure(repoFullName, issueNumber); break; } + case FETCH_JOBS.RECONCILE_REGISTRY: { + await this.registryReconciler.reconcile(); + break; + } default: this.logger.warn(`Unknown job name: ${job.name}`); } diff --git a/packages/das/src/queue/queue.module.ts b/packages/das/src/queue/queue.module.ts index 1fcd6d9..c0b91c9 100644 --- a/packages/das/src/queue/queue.module.ts +++ b/packages/das/src/queue/queue.module.ts @@ -13,6 +13,7 @@ import { } from "../entities"; import { GitHubFetcherService } from "../webhook/github-fetcher.service"; import { FetchProcessor } from "./fetch.processor"; +import { RegistryReconcilerService } from "./registry-reconciler.service"; import { FETCH_QUEUE } from "./constants"; @Module({ @@ -38,7 +39,7 @@ import { FETCH_QUEUE } from "./constants"; Review, ]), ], - providers: [FetchProcessor, GitHubFetcherService], + providers: [FetchProcessor, GitHubFetcherService, RegistryReconcilerService], exports: [BullModule], }) export class QueueModule {} diff --git a/packages/das/src/queue/registry-reconciler.service.ts b/packages/das/src/queue/registry-reconciler.service.ts new file mode 100644 index 0000000..e1c31d8 --- /dev/null +++ b/packages/das/src/queue/registry-reconciler.service.ts @@ -0,0 +1,82 @@ +import { Injectable, Logger, OnModuleInit } from "@nestjs/common"; +import { InjectQueue } from "@nestjs/bullmq"; +import { InjectRepository } from "@nestjs/typeorm"; +import { Queue } from "bullmq"; +import { In, Repository } from "typeorm"; +import { Repo } from "../entities"; +import { + FETCH_JOBS, + FETCH_QUEUE, + MASTER_REPOSITORIES_URL, + REGISTRY_RECONCILE_CRON, +} from "./constants"; + +@Injectable() +export class RegistryReconcilerService implements OnModuleInit { + private readonly logger = new Logger(RegistryReconcilerService.name); + + constructor( + @InjectQueue(FETCH_QUEUE) private readonly fetchQueue: Queue, + @InjectRepository(Repo) private readonly repoRepo: Repository, + ) {} + + async onModuleInit(): Promise { + // BullMQ deduplicates repeatable jobs by repeat key, so restarts + // don't accumulate duplicate schedules. + await this.fetchQueue.add( + FETCH_JOBS.RECONCILE_REGISTRY, + {}, + { + repeat: { pattern: REGISTRY_RECONCILE_CRON }, + removeOnComplete: true, + removeOnFail: 50, + }, + ); + } + + async reconcile(): Promise { + const canonical = await this.fetchCanonicalRepos(); + // Fail-safe: a network blip or non-200 must never mass-deregister. + if (!canonical) return; + if (canonical.size === 0) { + this.logger.warn("Canonical repo list is empty — refusing to deregister"); + return; + } + + const registered = await this.repoRepo.find({ + select: ["repoFullName"], + where: { registered: true }, + }); + const toDeregister = registered + .map((r) => r.repoFullName) + .filter((name) => !canonical.has(name.toLowerCase())); + + // installationId stays — delisting != uninstalling; re-list via admin/repos/register. + await this.repoRepo.update( + { repoFullName: In(toDeregister) }, + { registered: false }, + ); + + this.logger.log( + `Registry reconcile: deregistered ${toDeregister.length} repo(s)` + + (toDeregister.length ? `: ${toDeregister.join(", ")}` : ""), + ); + } + + private async fetchCanonicalRepos(): Promise | null> { + try { + const res = await fetch(MASTER_REPOSITORIES_URL); + if (!res.ok) { + this.logger.error( + `Master repos fetch returned ${res.status} ${res.statusText}`, + ); + return null; + } + const json = (await res.json()) as Record; + return new Set(Object.keys(json).map((k) => k.toLowerCase())); + } catch (err) { + this.logger.error(`Master repos fetch failed: ${String(err)}`); + return null; + } + } +}