Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 17 additions & 18 deletions packages/plugins/mcp/src/sdk/binding-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { makeInMemoryScopedKv, scopeKv, type Kv, type ToolId, type ScopedKv } fr
import { McpToolBinding } from "./types";
import type { McpStoredSourceData } from "./types";
import { McpOAuthSession } from "./oauth";
import { McpStoredSourceSchema } from "./stored-source";

// ---------------------------------------------------------------------------
// OAuth session TTL — pending sessions are cleaned up after this many ms
Expand All @@ -31,20 +32,22 @@ const decodeOAuthSession = Schema.decodeUnknownSync(Schema.parseJson(StoredOAuth
// Stored source — combines meta + config into one entry
// ---------------------------------------------------------------------------

export interface McpStoredSource {
readonly namespace: string;
readonly name: string;
readonly config: McpStoredSourceData;
}
export type McpStoredSource = McpStoredSourceSchema;

const encodeSource = Schema.encodeSync(Schema.parseJson(McpStoredSourceSchema));
const decodeSource = Schema.decodeUnknownSync(Schema.parseJson(McpStoredSourceSchema));

// ---------------------------------------------------------------------------
// Stored binding schema
//
// Note: old rows also carried a `sourceData` field inlining the full source
// config on every binding. Effect Schema strips unknown fields on decode, so
// those legacy rows still load; the next write drops the extra field.
// ---------------------------------------------------------------------------

const StoredBindingEntry = Schema.Struct({
namespace: Schema.String,
binding: McpToolBinding,
sourceData: Schema.Unknown,
});

const encodeBindingEntry = Schema.encodeSync(Schema.parseJson(StoredBindingEntry));
Expand All @@ -57,14 +60,13 @@ const decodeBindingEntry = Schema.decodeUnknownSync(Schema.parseJson(StoredBindi
export interface McpBindingStore {
readonly get: (toolId: ToolId) => Effect.Effect<{
binding: McpToolBinding;
sourceData: McpStoredSourceData;
namespace: string;
} | null>;

readonly put: (
toolId: ToolId,
namespace: string,
binding: McpToolBinding,
sourceData: McpStoredSourceData,
) => Effect.Effect<void>;

readonly remove: (toolId: ToolId) => Effect.Effect<void>;
Expand Down Expand Up @@ -105,12 +107,12 @@ const makeStore = (
const entry = decodeBindingEntry(raw);
return {
binding: entry.binding as McpToolBinding,
sourceData: entry.sourceData as McpStoredSourceData,
namespace: entry.namespace,
};
}),

put: (toolId, namespace, binding, sourceData) =>
bindings.set([{ key: toolId, value: encodeBindingEntry({ namespace, binding, sourceData }) }]),
put: (toolId, namespace, binding) =>
bindings.set([{ key: toolId, value: encodeBindingEntry({ namespace, binding }) }]),

remove: (toolId) => bindings.delete([toolId]).pipe(Effect.asVoid),

Expand Down Expand Up @@ -139,31 +141,28 @@ const makeStore = (

// ---- Sources (meta + config combined) ----

putSource: (source) => sources.set([{ key: source.namespace, value: JSON.stringify(source) }]),
putSource: (source) => sources.set([{ key: source.namespace, value: encodeSource(source) }]),

removeSource: (namespace) => sources.delete([namespace]).pipe(Effect.asVoid),

listSources: () =>
Effect.gen(function* () {
const entries = yield* sources.list();
return entries.map((e) => JSON.parse(e.value) as McpStoredSource);
return entries.map((e) => decodeSource(e.value));
}),

getSource: (namespace) =>
Effect.gen(function* () {
const raw = yield* sources.get(namespace);
if (!raw) return null;
// @effect-diagnostics-next-line preferSchemaOverJson:off
return JSON.parse(raw) as McpStoredSource;
return decodeSource(raw);
}),

getSourceConfig: (namespace) =>
Effect.gen(function* () {
const raw = yield* sources.get(namespace);
if (!raw) return null;
// @effect-diagnostics-next-line preferSchemaOverJson:off
const source = JSON.parse(raw) as McpStoredSource;
return source.config;
return decodeSource(raw).config;
}),

// ---- Pending OAuth sessions (short-lived, between startOAuth and completeOAuth) ----
Expand Down
11 changes: 10 additions & 1 deletion packages/plugins/mcp/src/sdk/invoke.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,16 @@ export const makeMcpInvoker = (opts: {
});
}

const { binding, sourceData } = entry;
const sourceData = yield* opts.bindingStore.getSourceConfig(entry.namespace);
if (!sourceData) {
return yield* new ToolInvocationError({
toolId,
message: `No MCP source config found for namespace "${entry.namespace}"`,
cause: undefined,
});
}

const { binding } = entry;
const cacheKey = connectionCacheKey(sourceData);

// Build the connector and register it for the cache lookup
Expand Down
25 changes: 7 additions & 18 deletions packages/plugins/mcp/src/sdk/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,6 @@ export const mcpPlugin = (options?: {
ToolId.make(joinToolPath(sourceId, e.toolId)),
sourceId,
toBinding(e),
sd,
),
{ discard: true },
);
Expand Down Expand Up @@ -540,7 +539,6 @@ export const mcpPlugin = (options?: {
ToolId.make(joinToolPath(namespace, e.toolId)),
namespace,
toBinding(e),
sd,
),
{ discard: true },
);
Expand Down Expand Up @@ -597,7 +595,6 @@ export const mcpPlugin = (options?: {
ToolId.make(joinToolPath(namespace, e.toolId)),
namespace,
toBinding(e),
sd,
),
{ discard: true },
);
Expand Down Expand Up @@ -703,10 +700,13 @@ export const mcpPlugin = (options?: {

const updateSource = (namespace: string, input: McpUpdateSourceInput) =>
Effect.gen(function* () {
const existingConfig = yield* bindingStore.getSourceConfig(namespace);
if (!existingConfig || existingConfig.transport !== "remote") return;
const existing = yield* bindingStore.getSource(namespace);
if (!existing || existing.config.transport !== "remote") return;

const remote = existingConfig as Extract<McpStoredSourceData, { transport: "remote" }>;
const remote = existing.config as Extract<
McpStoredSourceData,
{ transport: "remote" }
>;
const updatedConfig: McpStoredSourceData = {
...remote,
...(input.endpoint !== undefined ? { endpoint: input.endpoint } : {}),
Expand All @@ -715,22 +715,11 @@ export const mcpPlugin = (options?: {
...(input.queryParams !== undefined ? { queryParams: input.queryParams } : {}),
};

const sources = yield* bindingStore.listSources();
const existingMeta = sources.find((s) => s.namespace === namespace);

yield* bindingStore.putSource({
namespace,
name: existingMeta?.name ?? namespace,
name: existing.name,
config: updatedConfig,
});

const toolIds = yield* bindingStore.listByNamespace(namespace);
for (const toolId of toolIds) {
const entry = yield* bindingStore.get(toolId);
if (entry) {
yield* bindingStore.put(toolId, namespace, entry.binding, updatedConfig);
}
}
});

const getSource = (namespace: string) => bindingStore.getSource(namespace);
Expand Down
Loading