Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Co-Authored-By: (agent model name) <email>
- Do not add mutable runtime behavior globals or test-only singleton mutation APIs (`set*ForTests`, `reset*ForTests`, observer globals for core behavior).
- Prefer small consumer-owned service interfaces over broad deps bags or service locators.
- Do not leak third-party SDK types across chat subsystem boundaries when a small local interface will do; keep vendor SDKs inside infrastructure modules.
- Service modules must depend on small injected ports for Slack behavior, not import Slack infrastructure directly.
- `runtime/` orchestrates turns and turn-scoped formatting, `services/` do domain work (reply policy, delivery planning, channel intent, attachment validation), `state/` persists by concern, `ingress/` only normalizes/routes.
- **Feature-based colocation**: group files by domain feature, not by technical role. Within a module, create subdirectories for each feature domain (e.g., `tools/slack/`, `tools/web/`, `tools/sandbox/`, `tools/skill/`). Shared contracts and cross-cutting utilities live at the module root. Only extract to a shared location when 2+ features need the same code.
- Do not use barrel `index.ts` re-exports inside feature subdirectories — import directly from the source file. A module-root `index.ts` is acceptable as a composition root that wires features together.
Expand Down
12 changes: 12 additions & 0 deletions packages/junior/.dependency-cruiser.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ export default {
path: "^src/chat/runtime/",
},
},
{
name: "no-chat-services-to-slack",
comment:
"Service modules must depend on small injected ports, not Slack infrastructure.",
severity: "error",
from: {
path: "^src/chat/services/",
},
to: {
path: "^src/chat/slack/",
},
},
{
name: "no-chat-state-to-runtime",
comment: "State modules must not depend on runtime orchestration.",
Expand Down
5 changes: 2 additions & 3 deletions packages/junior/src/chat/app/services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ export function createJuniorRuntimeServices(
completeText: overrides.visionContext?.completeText ?? completeText,
listThreadReplies:
overrides.visionContext?.listThreadReplies ?? listThreadReplies,
downloadPrivateSlackFile:
overrides.visionContext?.downloadPrivateSlackFile ??
downloadPrivateSlackFile,
downloadFile:
overrides.visionContext?.downloadFile ?? downloadPrivateSlackFile,
});

return {
Expand Down
4 changes: 2 additions & 2 deletions packages/junior/src/chat/runtime/reply-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import {
import { completeAuthPauseTurn } from "@/chat/runtime/auth-pause-state";
import type { PreparedTurnState } from "@/chat/runtime/turn-preparation";
import {
generateThreadTitle,
type ConversationMemoryService,
markConversationMessage,
normalizeConversationText,
upsertConversationMessage,
Expand Down Expand Up @@ -68,7 +68,7 @@ import {

export interface ReplyExecutorServices {
generateAssistantReply: typeof generateAssistantReplyImpl;
generateThreadTitle: typeof generateThreadTitle;
generateThreadTitle: ConversationMemoryService["generateThreadTitle"];
lookupSlackUser: typeof lookupSlackUser;
scheduleTurnTimeoutResume: (
request: TurnTimeoutResumeRequest,
Expand Down
138 changes: 133 additions & 5 deletions packages/junior/src/chat/runtime/turn-preparation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ import {
type ThreadArtifactsState,
} from "@/chat/state/artifacts";
import {
compactConversationIfNeeded,
buildConversationContext,
isHumanConversationMessage,
normalizeConversationText,
seedConversationBackfill,
updateConversationStats,
upsertConversationMessage,
} from "@/chat/services/conversation-memory";
import {
countPotentialImageAttachments,
hasPotentialImageAttachment,
hydrateConversationVisionContext,
isVisionEnabled,
} from "@/chat/services/vision-context";
import { getChannelConfigurationService } from "@/chat/runtime/thread-state";
import type { ChannelConfigurationService } from "@/chat/configuration/types";

const BACKFILL_MESSAGE_LIMIT = 80;

export interface PreparedTurnState {
artifacts: ThreadArtifactsState;
configuration?: Record<string, unknown>;
Expand All @@ -42,8 +42,25 @@ export interface PreparedTurnState {
}

export interface PrepareTurnStateDeps {
compactConversationIfNeeded: typeof compactConversationIfNeeded;
hydrateConversationVisionContext: typeof hydrateConversationVisionContext;
compactConversationIfNeeded: (
conversation: ThreadConversationState,
context: {
threadId?: string;
channelId?: string;
requesterId?: string;
runId?: string;
},
) => Promise<void>;
hydrateConversationVisionContext: (
conversation: ThreadConversationState,
context: {
threadId?: string;
channelId?: string;
requesterId?: string;
runId?: string;
threadTs?: string;
},
) => Promise<void>;
}

function hasPendingImageHydration(
Expand All @@ -55,6 +72,117 @@ function hasPendingImageHydration(
);
}

function createConversationMessageFromSdkMessage(
entry: Message,
): ConversationMessage | null {
const rawText = normalizeConversationText(entry.text);
if (!rawText) {
return null;
}

return {
id: entry.id,
role: entry.author.isMe ? "assistant" : "user",
text: rawText,
createdAtMs: entry.metadata.dateSent.getTime(),
author: {
userId: entry.author.userId,
userName: entry.author.userName,
fullName: entry.author.fullName,
isBot:
typeof entry.author.isBot === "boolean"
? entry.author.isBot
: undefined,
},
meta: {
slackTs: getSlackMessageTs(entry),
},
};
}

async function seedConversationBackfill(
thread: Thread,
conversation: ThreadConversationState,
currentTurn: {
messageId: string;
messageCreatedAtMs: number;
},
): Promise<void> {
if (conversation.backfill.completedAtMs) {
return;
}
if (conversation.messages.length > 0 || conversation.compactions.length > 0) {
conversation.backfill = {
completedAtMs: Date.now(),
source: "recent_messages",
};
updateConversationStats(conversation);
return;
}

const seeded: ConversationMessage[] = [];
let source: "recent_messages" | "thread_fetch" = "recent_messages";

try {
const fetchedNewestFirst: Message[] = [];
for await (const entry of thread.messages) {
fetchedNewestFirst.push(entry);
if (fetchedNewestFirst.length >= BACKFILL_MESSAGE_LIMIT) {
break;
}
}
fetchedNewestFirst.reverse();
for (const entry of fetchedNewestFirst) {
const message = createConversationMessageFromSdkMessage(entry);
if (message) {
seeded.push(message);
}
}
if (seeded.length > 0) {
source = "thread_fetch";
}
} catch {}

if (seeded.length === 0) {
try {
await thread.refresh();
} catch {}

const fromRecent = thread.recentMessages.slice(-BACKFILL_MESSAGE_LIMIT);
for (const entry of fromRecent) {
const message = createConversationMessageFromSdkMessage(entry);
if (message) {
seeded.push(message);
}
}
source = "recent_messages";
}

for (const message of seeded) {
if (
message.id !== currentTurn.messageId &&
message.createdAtMs > currentTurn.messageCreatedAtMs
) {
continue;
}
if (
message.id !== currentTurn.messageId &&
message.createdAtMs === currentTurn.messageCreatedAtMs &&
message.id > currentTurn.messageId
) {
continue;
}
upsertConversationMessage(conversation, message);
}

conversation.backfill = {
completedAtMs: Date.now(),
source,
};
updateConversationStats(conversation);
}

/** Build the turn-state preparer from injected conversation services. */
export function createPrepareTurnState(deps: PrepareTurnStateDeps) {
return async function prepareTurnState(args: {
explicitMention: boolean;
Expand Down
125 changes: 2 additions & 123 deletions packages/junior/src/chat/services/conversation-memory.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import type { Message, Thread } from "chat";
import { botConfig } from "@/chat/config";
import { completeText } from "@/chat/pi/client";
import { getSlackMessageTs } from "@/chat/slack/message";
import type { completeText } from "@/chat/pi/client";
import type {
ConversationCompaction,
ConversationMessage,
Expand All @@ -17,7 +15,6 @@ const CONTEXT_MIN_LIVE_MESSAGES = 12;
const CONTEXT_COMPACTION_BATCH_SIZE = 24;
const CONTEXT_MAX_COMPACTIONS = 16;
const CONTEXT_MAX_MESSAGE_CHARS = 3200;
const BACKFILL_MESSAGE_LIMIT = 80;

export interface ConversationMemoryDeps {
completeText: typeof completeText;
Expand Down Expand Up @@ -417,6 +414,7 @@ async function compactConversationIfNeededWithDeps(
}
}

/** Build the service that owns durable conversation memory compaction and titles. */
export function createConversationMemoryService(
deps: ConversationMemoryDeps,
): ConversationMemoryService {
Expand All @@ -428,125 +426,6 @@ export function createConversationMemoryService(
};
}

const defaultConversationMemoryService = createConversationMemoryService({
completeText,
});

export const compactConversationIfNeeded =
defaultConversationMemoryService.compactConversationIfNeeded;
export const generateThreadTitle =
defaultConversationMemoryService.generateThreadTitle;

function createConversationMessageFromSdkMessage(
entry: Message,
): ConversationMessage | null {
const rawText = normalizeConversationText(entry.text);
if (!rawText) {
return null;
}

return {
id: entry.id,
role: entry.author.isMe ? "assistant" : "user",
text: rawText,
createdAtMs: entry.metadata.dateSent.getTime(),
author: {
userId: entry.author.userId,
userName: entry.author.userName,
fullName: entry.author.fullName,
isBot:
typeof entry.author.isBot === "boolean"
? entry.author.isBot
: undefined,
},
meta: {
slackTs: getSlackMessageTs(entry),
},
};
}

export async function seedConversationBackfill(
thread: Thread,
conversation: ThreadConversationState,
currentTurn: {
messageId: string;
messageCreatedAtMs: number;
},
): Promise<void> {
if (conversation.backfill.completedAtMs) {
return;
}
if (conversation.messages.length > 0 || conversation.compactions.length > 0) {
conversation.backfill = {
completedAtMs: Date.now(),
source: "recent_messages",
};
updateConversationStats(conversation);
return;
}

const seeded: ConversationMessage[] = [];
let source: "recent_messages" | "thread_fetch" = "recent_messages";

try {
const fetchedNewestFirst: Message[] = [];
for await (const entry of thread.messages) {
fetchedNewestFirst.push(entry);
if (fetchedNewestFirst.length >= BACKFILL_MESSAGE_LIMIT) {
break;
}
}
fetchedNewestFirst.reverse();
for (const entry of fetchedNewestFirst) {
const message = createConversationMessageFromSdkMessage(entry);
if (message) {
seeded.push(message);
}
}
if (seeded.length > 0) {
source = "thread_fetch";
}
} catch {}

if (seeded.length === 0) {
try {
await thread.refresh();
} catch {}

const fromRecent = thread.recentMessages.slice(-BACKFILL_MESSAGE_LIMIT);
for (const entry of fromRecent) {
const message = createConversationMessageFromSdkMessage(entry);
if (message) {
seeded.push(message);
}
}
source = "recent_messages";
}

for (const message of seeded) {
if (
message.id !== currentTurn.messageId &&
message.createdAtMs > currentTurn.messageCreatedAtMs
) {
continue;
}
if (
message.id !== currentTurn.messageId &&
message.createdAtMs === currentTurn.messageCreatedAtMs &&
message.id > currentTurn.messageId
) {
continue;
}
upsertConversationMessage(conversation, message);
}

conversation.backfill = {
completedAtMs: Date.now(),
source,
};
updateConversationStats(conversation);
}

export function isHumanConversationMessage(
message: ConversationMessage,
): boolean {
Expand Down
Loading
Loading