diff --git a/databricks-builder-app/server/services/databricks_tools.py b/databricks-builder-app/server/services/databricks_tools.py index 11a83e4d..ef7ad83a 100644 --- a/databricks-builder-app/server/services/databricks_tools.py +++ b/databricks-builder-app/server/services/databricks_tools.py @@ -9,6 +9,7 @@ """ import asyncio +import inspect import json import logging import threading @@ -289,15 +290,30 @@ async def wrapper(args: dict[str, Any]) -> dict[str, Any]: else: parsed_args[key] = value - # FastMCP tools are sync - run in thread pool with heartbeat + # FastMCP tools may be sync OR async — the mcp-server patches @mcp.tool + # to convert sync functions to async (via _wrap_sync_in_thread). If we + # just call ctx.run(fn, **args) on an async function we get back a + # coroutine object instead of the result. Detect and dispatch. print(f'[MCP TOOL] Running {name} in thread pool with heartbeat...', file=sys.stderr, flush=True) # Copy context to propagate Databricks auth contextvars to the thread ctx = copy_context() - def run_in_context(): - """Run the tool function within the copied context.""" - return ctx.run(fn, **parsed_args) + if inspect.iscoroutinefunction(fn): + def run_in_context(): + """Run the async tool function in a fresh event loop inside the thread.""" + def runner(): + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + try: + return new_loop.run_until_complete(fn(**parsed_args)) + finally: + new_loop.close() + return ctx.run(runner) + else: + def run_in_context(): + """Run the sync tool function within the copied context.""" + return ctx.run(fn, **parsed_args) # Run tool in executor so we can poll for completion with heartbeat # Use executor.submit() to get a concurrent.futures.Future (thread-safe)