Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/code/src/main/db/migrations/meta/_journal.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@
"breakpoints": true
}
]
}
}
6 changes: 6 additions & 0 deletions apps/code/src/main/di/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import { FsService } from "../services/fs/service";
import { GitService } from "../services/git/service";
import { GitHubIntegrationService } from "../services/github-integration/service";
import { HandoffService } from "../services/handoff/service";
import { HomeService } from "../services/home/service";
import { InboxLinkService } from "../services/inbox-link/service";
import { LinearIntegrationService } from "../services/linear-integration/service";
import { LlmGatewayService } from "../services/llm-gateway/service";
Expand All @@ -69,6 +70,7 @@ import { UIService } from "../services/ui/service";
import { UpdatesService } from "../services/updates/service";
import { UsageMonitorService } from "../services/usage-monitor/service";
import { WatcherRegistryService } from "../services/watcher-registry/service";
import { WorkflowService } from "../services/workflow/service";
import { WorkspaceService } from "../services/workspace/service";
import { MAIN_TOKENS } from "./tokens";

Expand Down Expand Up @@ -153,6 +155,10 @@ container.bind(MAIN_TOKENS.TaskLinkService).to(TaskLinkService);
container.bind(MAIN_TOKENS.InboxLinkService).to(InboxLinkService);
container.bind(MAIN_TOKENS.NewTaskLinkService).to(NewTaskLinkService);
container.bind(MAIN_TOKENS.WatcherRegistryService).to(WatcherRegistryService);
// Home workflow config + snapshot are owned by PostHog now; these services are
// thin authenticated clients over the REST API (docs/workflow-architecture.md).
container.bind(MAIN_TOKENS.WorkflowService).to(WorkflowService);
container.bind(MAIN_TOKENS.HomeService).to(HomeService);
container.bind(MAIN_TOKENS.WorkspaceService).to(WorkspaceService);

container.bind(MAIN_TOKENS.SettingsStore).toConstantValue(settingsStore);
2 changes: 2 additions & 0 deletions apps/code/src/main/di/tokens.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,6 @@ export const MAIN_TOKENS = Object.freeze({
WorkspaceService: Symbol.for("Main.WorkspaceService"),
EnrichmentService: Symbol.for("Main.EnrichmentService"),
UsageMonitorService: Symbol.for("Main.UsageMonitorService"),
WorkflowService: Symbol.for("Main.WorkflowService"),
HomeService: Symbol.for("Main.HomeService"),
});
21 changes: 21 additions & 0 deletions apps/code/src/main/services/auth/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,27 @@ export class AuthService extends TypedEventEmitter<AuthServiceEvents> {

return response;
}

/**
* Authenticated fetch against a project-scoped PostHog endpoint
* (`/api/projects/:projectId/<path>`). Throws if no project is selected.
*/
async authenticatedProjectFetch(
path: string,
init: RequestInit = {},
): Promise<Response> {
const { projectId, cloudRegion } = this.getState();
if (projectId == null) {
throw new Error("No PostHog project selected");
}
const apiHost = getCloudUrlFromRegion(cloudRegion ?? "us");
return this.authenticatedFetch(
fetch,
`${apiHost}/api/projects/${projectId}/${path}`,
init,
);
}

async redeemInviteCode(code: string): Promise<AuthState> {
const { apiHost } = await this.getValidAccessToken();
const response = await this.authenticatedFetch(
Expand Down
113 changes: 113 additions & 0 deletions apps/code/src/main/services/home/service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import {
EMPTY_HOME_SNAPSHOT,
HomeEvent,
type HomeEvents,
type HomeSnapshot,
homeSnapshot,
} from "@shared/types/home-snapshot";
import { inject, injectable, postConstruct } from "inversify";
import { MAIN_TOKENS } from "../../di/tokens";
import { logger } from "../../utils/logger";
import { TypedEventEmitter } from "../../utils/typed-event-emitter";
import type { AuthService } from "../auth/service";

const log = logger.scope("home");

// Client poll cadence. The server worker keeps data fresh independently; this
// just pulls the latest snapshot so an open Home view stays current without a
// realtime channel (docs/home-tab.md §10 — REST + client poll for v1).
const POLL_INTERVAL_MS = 120_000;

/**
* Reads the per-user Home snapshot from PostHog. All grouping, PR polling, and
* classification happen server-side in the `evaluate-code-workstreams` Temporal
* worker; this service is a thin authenticated client + poll loop that emits
* {@link HomeEvent.SnapshotUpdated} when the snapshot changes.
*/
@injectable()
export class HomeService extends TypedEventEmitter<HomeEvents> {
private timer: ReturnType<typeof setInterval> | null = null;
private lastSerialized: string | null = null;

constructor(
@inject(MAIN_TOKENS.AuthService)
private readonly authService: AuthService,
) {
super();
}

@postConstruct()
init(): void {
this.timer = setInterval(() => {
void this.poll();
}, POLL_INTERVAL_MS);
}

dispose(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
}

async getSnapshot(): Promise<HomeSnapshot> {
return (await this.fetchSnapshot()) ?? EMPTY_HOME_SNAPSHOT;
}

async refresh(): Promise<HomeSnapshot> {
// Fire-and-forget: the server kicks off an async worker eval. The fresh
// result lands via the next poll → onSnapshotUpdated; we just return the
// current snapshot so the mutation has a value.
await this.requestServerRefresh();
return this.getSnapshot();
}

private async poll(): Promise<void> {
const snapshot = await this.fetchSnapshot();
if (!snapshot) return;
const serialized = JSON.stringify(snapshot);
if (serialized === this.lastSerialized) return;
this.lastSerialized = serialized;
this.emit(HomeEvent.SnapshotUpdated, snapshot);
}

private async fetchSnapshot(): Promise<HomeSnapshot | null> {
if (this.authService.getState().projectId == null) return null;
try {
const res = await this.authService.authenticatedProjectFetch(
"code_home/",
{ method: "GET" },
);
if (!res.ok) {
log.warn("Failed to fetch home snapshot", { status: res.status });
return null;
}
const parsed = homeSnapshot.safeParse(await res.json());
if (!parsed.success) {
log.warn("Home snapshot failed schema validation", {
error: parsed.error.message,
});
return null;
}
return parsed.data;
} catch (err) {
log.warn("Error fetching home snapshot", {
error: err instanceof Error ? err.message : String(err),
});
return null;
}
}

private async requestServerRefresh(): Promise<void> {
if (this.authService.getState().projectId == null) return;
try {
await this.authService.authenticatedProjectFetch("code_home/refresh/", {
method: "POST",
});
} catch (err) {
log.warn("Error requesting home refresh", {
error: err instanceof Error ? err.message : String(err),
});
}
}
}
59 changes: 59 additions & 0 deletions apps/code/src/main/services/workflow/default-workflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import type { WorkflowConfig } from "@shared/types/workflow";

// Seed config: applied on first run and on "Reset to default".
export function buildDefaultWorkflow(): WorkflowConfig {
return {
id: "default",
version: 1,
updatedAt: new Date(0).toISOString(),
bindings: {
working: [
{
id: "create_pr",
label: "Create PR",
skillId: "create-pr",
prompt:
"Open a PR for the current branch. Use the task history to write a concise description.",
},
],
in_review: [],
ci_failing: [
{
id: "fix_ci",
label: "Fix CI",
skillId: "fix-ci",
prompt:
"CI is failing on this PR. Investigate the failing checks and push a fix.",
},
],
changes_requested: [
{
id: "address_comments",
label: "Address review",
skillId: "address-comments",
prompt:
"Address the change requests on this PR — read the latest review and respond with code.",
},
],
comments_waiting: [
{
id: "address_threads",
label: "Address comments",
skillId: "address-comments",
prompt: "Address the unresolved review comments on this PR.",
},
],
ready_to_merge: [
{
id: "final_check",
label: "Final check",
skillId: "code-review",
prompt:
"Do a last-pass review of this PR. Call out anything risky before I merge.",
},
],
stale: [],
done: [],
},
};
}
84 changes: 84 additions & 0 deletions apps/code/src/main/services/workflow/service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import {
type SaveInput,
type SaveResult,
saveResult,
type WorkflowConfig,
WorkflowEvent,
type WorkflowEvents,
workflowConfig,
} from "@shared/types/workflow";
import { inject, injectable } from "inversify";
import { MAIN_TOKENS } from "../../di/tokens";
import { logger } from "../../utils/logger";
import { TypedEventEmitter } from "../../utils/typed-event-emitter";
import type { AuthService } from "../auth/service";
import { buildDefaultWorkflow } from "./default-workflow";

const log = logger.scope("workflow");

/**
* Reads and writes the user's Home workflow config from PostHog
* (`/api/projects/:id/code_workflow/`). The server owns persistence, the
* monotonic `version`, optimistic concurrency, validation, and the default
* seed; this service is a thin authenticated client that emits
* {@link WorkflowEvent.Changed} on save/reset. Offline, `get()` falls back to
* the built-in default so the editor still renders.
*/
@injectable()
export class WorkflowService extends TypedEventEmitter<WorkflowEvents> {
constructor(
@inject(MAIN_TOKENS.AuthService)
private readonly authService: AuthService,
) {
super();
}

async get(): Promise<WorkflowConfig> {
const json = await this.request("GET", "code_workflow/");
const parsed = workflowConfig.safeParse(json);
if (parsed.success) return parsed.data;
// Unexpected response shape — render the default rather than breaking the
// Home config surface. (Network/auth failures throw and surface as the
// query's error state.)
return buildDefaultWorkflow();
}

async save(input: SaveInput): Promise<SaveResult> {
const json = await this.request("POST", "code_workflow/save/", {
config: input.config,
expectedVersion: input.expectedVersion,
});
const parsed = saveResult.parse(json);
if (parsed.status === "saved") {
this.emit(WorkflowEvent.Changed, parsed.config);
log.info("Workflow saved", { version: parsed.config.version });
}
return parsed;
}

async resetToDefault(): Promise<WorkflowConfig> {
const json = await this.request("POST", "code_workflow/reset/");
const config = workflowConfig.parse(json);
this.emit(WorkflowEvent.Changed, config);
log.info("Workflow reset to default", { version: config.version });
return config;
}

private async request(
method: "GET" | "POST",
path: string,
body?: unknown,
): Promise<unknown> {
const init: RequestInit = { method };
if (body !== undefined) {
init.headers = { "Content-Type": "application/json" };
init.body = JSON.stringify(body);
}
const res = await this.authService.authenticatedProjectFetch(path, init);
// 409/422 carry a structured SaveResult body the caller validates.
if (!res.ok && res.status !== 409 && res.status !== 422) {
throw new Error(`Workflow request failed: ${res.status}`);
}
return res.json();
}
}
4 changes: 4 additions & 0 deletions apps/code/src/main/trpc/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { fsRouter } from "./routers/fs";
import { gitRouter } from "./routers/git";
import { githubIntegrationRouter } from "./routers/github-integration";
import { handoffRouter } from "./routers/handoff";
import { homeRouter } from "./routers/home";
import { linearIntegrationRouter } from "./routers/linear-integration.js";
import { llmGatewayRouter } from "./routers/llm-gateway";
import { logsRouter } from "./routers/logs";
Expand All @@ -37,6 +38,7 @@ import { suspensionRouter } from "./routers/suspension.js";
import { uiRouter } from "./routers/ui";
import { updatesRouter } from "./routers/updates";
import { usageMonitorRouter } from "./routers/usage-monitor";
import { workflowRouter } from "./routers/workflow";
import { workspaceRouter } from "./routers/workspace";
import { router } from "./trpc";

Expand All @@ -61,6 +63,7 @@ export const trpcRouter = router({
git: gitRouter,
githubIntegration: githubIntegrationRouter,
handoff: handoffRouter,
home: homeRouter,
linearIntegration: linearIntegrationRouter,
llmGateway: llmGatewayRouter,
mcpApps: mcpAppsRouter,
Expand All @@ -81,6 +84,7 @@ export const trpcRouter = router({
updates: updatesRouter,
usageMonitor: usageMonitorRouter,
deepLink: deepLinkRouter,
workflow: workflowRouter,
workspace: workspaceRouter,
});

Expand Down
31 changes: 31 additions & 0 deletions apps/code/src/main/trpc/routers/home.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import {
HomeEvent,
type HomeEvents,
homeSnapshot,
} from "@shared/types/home-snapshot";
import { container } from "../../di/container";
import { MAIN_TOKENS } from "../../di/tokens";
import type { HomeService } from "../../services/home/service";
import { publicProcedure, router } from "../trpc";

const getService = () => container.get<HomeService>(MAIN_TOKENS.HomeService);

function subscribe<K extends keyof HomeEvents>(event: K) {
return publicProcedure.subscription(async function* (opts) {
const service = getService();
const iterable = service.toIterable(event, { signal: opts.signal });
for await (const data of iterable) {
yield data;
}
});
}

export const homeRouter = router({
getSnapshot: publicProcedure
.output(homeSnapshot)
.query(() => getService().getSnapshot()),
refresh: publicProcedure
.output(homeSnapshot)
.mutation(() => getService().refresh()),
onSnapshotUpdated: subscribe(HomeEvent.SnapshotUpdated),
});
Loading
Loading