From f95425dab65ad4ba31c387635492b92a492fe69d Mon Sep 17 00:00:00 2001 From: Radu Mihai Gheorghe Date: Wed, 18 Mar 2026 13:56:57 +0200 Subject: [PATCH 1/4] feat: add a2a tool --- pyproject.toml | 2 +- src/uipath_langchain/agent/tools/__init__.py | 2 + .../agent/tools/a2a/__init__.py | 7 + .../agent/tools/a2a/a2a_tool.py | 252 ++++++++++++++++++ uv.lock | 2 +- 5 files changed, 263 insertions(+), 2 deletions(-) create mode 100644 src/uipath_langchain/agent/tools/a2a/__init__.py create mode 100644 src/uipath_langchain/agent/tools/a2a/a2a_tool.py diff --git a/pyproject.toml b/pyproject.toml index fc5ca1893..16f5ae19a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-langchain" -version = "0.8.26" +version = "0.8.27" description = "Python SDK that enables developers to build and deploy LangGraph agents to the UiPath Cloud Platform" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/src/uipath_langchain/agent/tools/__init__.py b/src/uipath_langchain/agent/tools/__init__.py index d06e191b2..e42030329 100644 --- a/src/uipath_langchain/agent/tools/__init__.py +++ b/src/uipath_langchain/agent/tools/__init__.py @@ -1,5 +1,6 @@ """Tool creation and management for LowCode agents.""" +from .a2a import create_a2a_agent_tools from .context_tool import create_context_tool from .escalation_tool import create_escalation_tool from .extraction_tool import create_ixp_extraction_tool @@ -13,6 +14,7 @@ from .tool_node import ToolWrapperMixin, UiPathToolNode, create_tool_node __all__ = [ + "create_a2a_agent_tools", "create_tools_from_resources", "create_tool_node", "create_context_tool", diff --git a/src/uipath_langchain/agent/tools/a2a/__init__.py b/src/uipath_langchain/agent/tools/a2a/__init__.py new file mode 100644 index 000000000..b94e123fc --- /dev/null +++ b/src/uipath_langchain/agent/tools/a2a/__init__.py @@ -0,0 +1,7 @@ +"""A2A (Agent-to-Agent) tools.""" + +from .a2a_tool import create_a2a_agent_tools + +__all__ = [ + "create_a2a_agent_tools", +] diff --git a/src/uipath_langchain/agent/tools/a2a/a2a_tool.py b/src/uipath_langchain/agent/tools/a2a/a2a_tool.py new file mode 100644 index 000000000..e0c8f50c9 --- /dev/null +++ b/src/uipath_langchain/agent/tools/a2a/a2a_tool.py @@ -0,0 +1,252 @@ +"""A2A singleton tool — one tool per remote agent with closure-managed context. + +Each tool maintains its own task_id/context_id in a closure for conversation +continuity. Authentication uses the UiPath SDK Bearer token, resolved lazily +on first call. +""" + +from __future__ import annotations + +import asyncio +import json +from logging import getLogger +from uuid import uuid4 + +import httpx +from a2a.client import Client +from a2a.types import ( + AgentCard, + Message, + Part, + Role, + Task, + TaskArtifactUpdateEvent, + TaskState, + TextPart, +) +from langchain_core.tools import BaseTool +from uipath.agent.models.agent import AgentA2aResourceConfig + +from uipath_langchain.agent.tools.base_uipath_structured_tool import ( + BaseUiPathStructuredTool, +) +from uipath_langchain.agent.tools.utils import sanitize_tool_name + +logger = getLogger(__name__) + + +def _extract_text(obj: Task | Message) -> str: + """Extract text content from a Task or Message response.""" + parts: list[Part] = [] + + if isinstance(obj, Message): + parts = obj.parts or [] + elif isinstance(obj, Task): + if obj.status and obj.status.state == TaskState.input_required: + if obj.status.message: + parts = obj.status.message.parts or [] + else: + if obj.artifacts: + for artifact in obj.artifacts: + parts.extend(artifact.parts or []) + if not parts and obj.status and obj.status.message: + parts = obj.status.message.parts or [] + if not parts and obj.history: + for msg in reversed(obj.history): + if msg.role == Role.agent: + parts = msg.parts or [] + break + + texts = [] + for part in parts: + if isinstance(part.root, TextPart): + texts.append(part.root.text) + return "\n".join(texts) if texts else "" + + +def _format_response(text: str, state: str) -> str: + """Build a structured tool response the LLM can act on.""" + return json.dumps({"agent_response": text, "task_state": state}) + + +def _build_description(card: AgentCard) -> str: + """Build a tool description from an agent card.""" + parts = [] + if card.description: + parts.append(card.description) + if card.skills: + for skill in card.skills: + skill_desc = skill.name or "" + if skill.description: + skill_desc += f": {skill.description}" + if skill_desc: + parts.append(f"Skill: {skill_desc}") + return " | ".join(parts) if parts else f"Remote A2A agent at {card.url}" + + +def fetch_agent_card( + agent_card_url: str, headers: dict[str, str] | None = None +) -> AgentCard: + """Fetch the agent card from a remote A2A endpoint (synchronous).""" + response = httpx.get(agent_card_url, headers=headers or {}, timeout=30) + response.raise_for_status() + return AgentCard(**response.json()) + + +def _resolve_a2a_url(config: AgentA2aResourceConfig) -> str: + """Resolve the A2A endpoint URL from config. + + Uses a2aUrl if available, otherwise derives from agentCardUrl + by stripping the .well-known path. + """ + a2a_url = getattr(config, "a2a_url", None) + if a2a_url: + return a2a_url + return config.agent_card_url.replace("/.well-known/agent-card.json", "") + + +async def create_a2a_agent_tools( + resources: list[AgentA2aResourceConfig], +) -> list[BaseTool]: + """Create A2A tools from a list of A2A resource configurations. + + Each enabled A2A resource becomes a single tool representing the remote agent. + + Args: + resources: List of A2A resource configurations from agent.json. + + Returns: + List of BaseTool instances, one per enabled A2A resource. + """ + tools: list[BaseTool] = [] + + for resource in resources: + if resource.is_enabled is False: + logger.info("Skipping disabled A2A resource '%s'", resource.name) + continue + if resource.is_active is False: + logger.info("Skipping inactive A2A resource '%s'", resource.name) + continue + + logger.info("Creating A2A tool for resource '%s'", resource.name) + tool = _create_a2a_singleton_tool(resource) + tools.append(tool) + + return tools + + +def _create_a2a_singleton_tool(config: AgentA2aResourceConfig) -> BaseTool: + """Create a single LangChain tool for A2A communication. + + All connection and conversation state is managed in a closure. + The httpx client and A2A protocol client are created lazily on first call. + + Args: + config: A2A resource configuration from agent.json. + + Returns: + A BaseTool that sends messages to the remote A2A agent. + """ + if config.cached_agent_card: + agent_card = AgentCard(**config.cached_agent_card) + else: + agent_card = fetch_agent_card(config.agent_card_url) + + raw_name = agent_card.name or config.name + tool_name = sanitize_tool_name(raw_name) + tool_description = _build_description(agent_card) + a2a_url = _resolve_a2a_url(config) + + _lock = asyncio.Lock() + _client: Client | None = None + _http_client: httpx.AsyncClient | None = None + _task_id: str | None = None + _context_id: str | None = None + + async def _ensure_client() -> Client: + nonlocal _client, _http_client + if _client is None: + async with _lock: + if _client is None: + from a2a.client import ClientConfig, ClientFactory + from uipath.platform import UiPath + + sdk = UiPath() + _http_client = httpx.AsyncClient( + timeout=120, + headers={ + "Authorization": f"Bearer {sdk._config.secret}" + }, + ) + _client = await ClientFactory.connect( + a2a_url, + client_config=ClientConfig( + httpx_client=_http_client, + streaming=False, + ), + ) + return _client # type: ignore[return-value] + + async def _send(*, message: str) -> str: + nonlocal _task_id, _context_id + client = await _ensure_client() + + if _task_id or _context_id: + logger.info( + "A2A continue task=%s context=%s to %s", + _task_id, + _context_id, + a2a_url, + ) + else: + logger.info("A2A new message to %s", a2a_url) + + a2a_message = Message( + role=Role.user, + parts=[Part(root=TextPart(text=message))], + message_id=str(uuid4()), + task_id=_task_id, + context_id=_context_id, + ) + + try: + text = "" + state = "unknown" + + async for event in client.send_message(a2a_message): + if isinstance(event, Message): + text = _extract_text(event) + _context_id = event.context_id + state = "completed" + break + else: + task, update = event + _task_id = task.id + _context_id = task.context_id + state = ( + task.status.state.value if task.status else "unknown" + ) + if update is None: + text = _extract_text(task) + break + elif isinstance(update, TaskArtifactUpdateEvent): + for part in update.artifact.parts or []: + if isinstance(part.root, TextPart): + text += part.root.text + + return _format_response(text or "No response received.", state) + + except Exception as e: + logger.exception("A2A request to %s failed", a2a_url) + return _format_response(f"Error: {e}", "error") + + return BaseUiPathStructuredTool( + name=tool_name, + description=tool_description, + coroutine=_send, + metadata={ + "tool_type": "a2a", + "display_name": raw_name, + "slug": config.slug, + }, + ) diff --git a/uv.lock b/uv.lock index 0e6d82c19..d4be2e6a6 100644 --- a/uv.lock +++ b/uv.lock @@ -3333,7 +3333,7 @@ wheels = [ [[package]] name = "uipath-langchain" -version = "0.8.26" +version = "0.8.27" source = { editable = "." } dependencies = [ { name = "httpx" }, From 77258f59cac5235d6c7e5525e9572da2d09d100f Mon Sep 17 00:00:00 2001 From: Radu Mihai Gheorghe Date: Wed, 18 Mar 2026 18:02:02 +0200 Subject: [PATCH 2/4] fix: add a2a sdk --- .vscode/settings.json | 4 ---- pyproject.toml | 1 + .../agent/tools/a2a/a2a_tool.py | 8 ++------ uv.lock | 19 +++++++++++++++++++ 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 24a9e3428..6f30b8006 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -16,10 +16,6 @@ "source.organizeImports": "explicit" } }, - "workbench.colorCustomizations": { - "titleBar.activeBackground": "#0099cc", - "titleBar.inactiveBackground": "#0099cc" - }, "python.testing.pytestArgs": [ "tests" ], diff --git a/pyproject.toml b/pyproject.toml index 16f5ae19a..f5c8b5da5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ dependencies = [ "jsonpath-ng>=1.7.0", "mcp==1.26.0", "langchain-mcp-adapters==0.2.1", + "a2a-sdk>=0.3.0", ] classifiers = [ diff --git a/src/uipath_langchain/agent/tools/a2a/a2a_tool.py b/src/uipath_langchain/agent/tools/a2a/a2a_tool.py index e0c8f50c9..bcf27b49a 100644 --- a/src/uipath_langchain/agent/tools/a2a/a2a_tool.py +++ b/src/uipath_langchain/agent/tools/a2a/a2a_tool.py @@ -174,9 +174,7 @@ async def _ensure_client() -> Client: sdk = UiPath() _http_client = httpx.AsyncClient( timeout=120, - headers={ - "Authorization": f"Bearer {sdk._config.secret}" - }, + headers={"Authorization": f"Bearer {sdk._config.secret}"}, ) _client = await ClientFactory.connect( a2a_url, @@ -223,9 +221,7 @@ async def _send(*, message: str) -> str: task, update = event _task_id = task.id _context_id = task.context_id - state = ( - task.status.state.value if task.status else "unknown" - ) + state = task.status.state.value if task.status else "unknown" if update is None: text = _extract_text(task) break diff --git a/uv.lock b/uv.lock index d4be2e6a6..5b265d74d 100644 --- a/uv.lock +++ b/uv.lock @@ -7,6 +7,23 @@ resolution-markers = [ "python_full_version < '3.13'", ] +[[package]] +name = "a2a-sdk" +version = "0.3.25" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "google-api-core", version = "2.25.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.14'" }, + { name = "google-api-core", version = "2.29.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.14'" }, + { name = "httpx" }, + { name = "httpx-sse" }, + { name = "protobuf" }, + { name = "pydantic" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/55/83/3c99b276d09656cce039464509f05bf385e5600d6dc046a131bbcf686930/a2a_sdk-0.3.25.tar.gz", hash = "sha256:afda85bab8d6af0c5d15e82f326c94190f6be8a901ce562d045a338b7127242f", size = 270638, upload-time = "2026-03-10T13:08:46.417Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bd/f9/6a62520b7ecb945188a6e1192275f4732ff9341cd4629bc975a6c146aeab/a2a_sdk-0.3.25-py3-none-any.whl", hash = "sha256:2fce38faea82eb0b6f9f9c2bcf761b0d78612c80ef0e599b50d566db1b2654b5", size = 149609, upload-time = "2026-03-10T13:08:44.7Z" }, +] + [[package]] name = "aiohappyeyeballs" version = "2.6.1" @@ -3336,6 +3353,7 @@ name = "uipath-langchain" version = "0.8.27" source = { editable = "." } dependencies = [ + { name = "a2a-sdk" }, { name = "httpx" }, { name = "jsonpath-ng" }, { name = "jsonschema-pydantic-converter" }, @@ -3383,6 +3401,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "a2a-sdk", specifier = ">=0.3.0" }, { name = "boto3-stubs", marker = "extra == 'bedrock'", specifier = ">=1.41.4" }, { name = "google-generativeai", marker = "extra == 'vertex'", specifier = ">=0.8.0" }, { name = "httpx", specifier = ">=0.27.0" }, From 705cd73d1a8821486a0ae9f928a6996d4ca6385b Mon Sep 17 00:00:00 2001 From: Radu Mihai Gheorghe Date: Wed, 18 Mar 2026 18:13:32 +0200 Subject: [PATCH 3/4] fix: add a2a input model --- src/uipath_langchain/agent/tools/a2a/a2a_tool.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/uipath_langchain/agent/tools/a2a/a2a_tool.py b/src/uipath_langchain/agent/tools/a2a/a2a_tool.py index bcf27b49a..7ee6258b7 100644 --- a/src/uipath_langchain/agent/tools/a2a/a2a_tool.py +++ b/src/uipath_langchain/agent/tools/a2a/a2a_tool.py @@ -25,6 +25,7 @@ TextPart, ) from langchain_core.tools import BaseTool +from pydantic import BaseModel, Field from uipath.agent.models.agent import AgentA2aResourceConfig from uipath_langchain.agent.tools.base_uipath_structured_tool import ( @@ -35,6 +36,12 @@ logger = getLogger(__name__) +class A2aToolInput(BaseModel): + """Input schema for A2A agent tool.""" + + message: str = Field(description="The message to send to the remote agent.") + + def _extract_text(obj: Task | Message) -> str: """Extract text content from a Task or Message response.""" parts: list[Part] = [] @@ -240,6 +247,7 @@ async def _send(*, message: str) -> str: name=tool_name, description=tool_description, coroutine=_send, + args_schema=A2aToolInput, metadata={ "tool_type": "a2a", "display_name": raw_name, From 7ea2c910bda78e8de7742ecc6cf8f3c4bbd71284 Mon Sep 17 00:00:00 2001 From: Radu Mihai Gheorghe Date: Fri, 20 Mar 2026 14:50:33 +0200 Subject: [PATCH 4/4] feat: add llm passthrough option for a2a tool --- .../agent/tools/a2a/a2a_tool.py | 315 ++++++++++++++---- 1 file changed, 253 insertions(+), 62 deletions(-) diff --git a/src/uipath_langchain/agent/tools/a2a/a2a_tool.py b/src/uipath_langchain/agent/tools/a2a/a2a_tool.py index 7ee6258b7..1a33193c6 100644 --- a/src/uipath_langchain/agent/tools/a2a/a2a_tool.py +++ b/src/uipath_langchain/agent/tools/a2a/a2a_tool.py @@ -1,8 +1,11 @@ -"""A2A singleton tool — one tool per remote agent with closure-managed context. +"""A2A singleton tool — one tool per remote agent. -Each tool maintains its own task_id/context_id in a closure for conversation -continuity. Authentication uses the UiPath SDK Bearer token, resolved lazily -on first call. +Each tool maintains conversation context (task_id/context_id) across calls. +The persistence strategy is controlled by the advancedPersistance flag: +- True: deterministic persistence via graph state (tools_storage) +- False/absent: LLM passthrough (IDs in tool schema and response) + +Authentication uses the UiPath SDK Bearer token, resolved lazily on first call. """ from __future__ import annotations @@ -10,6 +13,7 @@ import asyncio import json from logging import getLogger +from typing import Any from uuid import uuid4 import httpx @@ -24,24 +28,53 @@ TaskState, TextPart, ) +from langchain_core.messages import ToolCall, ToolMessage from langchain_core.tools import BaseTool +from langgraph.types import Command from pydantic import BaseModel, Field from uipath.agent.models.agent import AgentA2aResourceConfig +from uipath_langchain.agent.react.types import AgentGraphState from uipath_langchain.agent.tools.base_uipath_structured_tool import ( BaseUiPathStructuredTool, ) +from uipath_langchain.agent.tools.tool_node import ( + ToolWrapperMixin, + ToolWrapperReturnType, +) from uipath_langchain.agent.tools.utils import sanitize_tool_name logger = getLogger(__name__) class A2aToolInput(BaseModel): - """Input schema for A2A agent tool.""" + """Input schema for A2A agent tool with graph state persistence.""" message: str = Field(description="The message to send to the remote agent.") +class A2aToolPassthroughInput(BaseModel): + """Input schema for A2A agent tool with LLM passthrough.""" + + message: str = Field(description="The message to send to the remote agent.") + task_id: str | None = Field( + default=None, + description="Task ID from a previous call to this tool. " + "Pass it to continue an existing conversation.", + ) + context_id: str | None = Field( + default=None, + description="Context ID from a previous call to this tool. " + "Pass it to continue an existing conversation.", + ) + + + +class A2aStructuredToolWithWrapper(BaseUiPathStructuredTool, ToolWrapperMixin): + pass + + + def _extract_text(obj: Task | Message) -> str: """Extract text content from a Task or Message response.""" parts: list[Part] = [] @@ -71,9 +104,20 @@ def _extract_text(obj: Task | Message) -> str: return "\n".join(texts) if texts else "" -def _format_response(text: str, state: str) -> str: +def _format_response( + text: str, + state: str, + *, + task_id: str | None = None, + context_id: str | None = None, + include_ids: bool = False, +) -> str: """Build a structured tool response the LLM can act on.""" - return json.dumps({"agent_response": text, "task_state": state}) + result: dict[str, Any] = {"agent_response": text, "task_state": state} + if include_ids: + result["task_id"] = task_id + result["context_id"] = context_id + return json.dumps(result) def _build_description(card: AgentCard) -> str: @@ -118,6 +162,7 @@ async def create_a2a_agent_tools( """Create A2A tools from a list of A2A resource configurations. Each enabled A2A resource becomes a single tool representing the remote agent. + The persistence strategy is determined by the advancedPersistance flag on each resource. Args: resources: List of A2A resource configurations from agent.json. @@ -135,18 +180,93 @@ async def create_a2a_agent_tools( logger.info("Skipping inactive A2A resource '%s'", resource.name) continue - logger.info("Creating A2A tool for resource '%s'", resource.name) + logger.info( + "Creating A2A tool for resource '%s' (advancedPersistance=%s)", + resource.name, + resource.advanced_persistance, + ) tool = _create_a2a_singleton_tool(resource) tools.append(tool) return tools + +async def _send_a2a_message( + client: Client, + a2a_url: str, + *, + message: str, + task_id: str | None, + context_id: str | None, +) -> tuple[str, str, str | None, str | None]: + """Send a message to a remote A2A agent and return the response. + + Args: + client: The A2A protocol client. + a2a_url: The remote agent URL (for logging). + message: The user message text. + task_id: Prior task ID for conversation continuity. + context_id: Prior context ID for conversation continuity. + + Returns: + Tuple of (response_text, task_state, new_task_id, new_context_id). + """ + if task_id or context_id: + logger.info( + "A2A continue task=%s context=%s to %s", task_id, context_id, a2a_url + ) + else: + logger.info("A2A new message to %s", a2a_url) + + a2a_message = Message( + role=Role.user, + parts=[Part(root=TextPart(text=message))], + message_id=str(uuid4()), + task_id=task_id, + context_id=context_id, + ) + + try: + text = "" + state = "unknown" + new_task_id = task_id + new_context_id = context_id + + async for event in client.send_message(a2a_message): + if isinstance(event, Message): + text = _extract_text(event) + new_context_id = event.context_id + state = "completed" + break + else: + task, update = event + new_task_id = task.id + new_context_id = task.context_id + state = task.status.state.value if task.status else "unknown" + if update is None: + text = _extract_text(task) + break + elif isinstance(update, TaskArtifactUpdateEvent): + for part in update.artifact.parts or []: + if isinstance(part.root, TextPart): + text += part.root.text + + return (text or "No response received.", state, new_task_id, new_context_id) + + except Exception as e: + logger.exception("A2A request to %s failed", a2a_url) + return (f"Error: {e}", "error", task_id, context_id) + + + + def _create_a2a_singleton_tool(config: AgentA2aResourceConfig) -> BaseTool: """Create a single LangChain tool for A2A communication. - All connection and conversation state is managed in a closure. - The httpx client and A2A protocol client are created lazily on first call. + Branches on config.advanced_persistance: + - True: graph state via tools_storage + - False: LLM passthrough with task_id/context_id in schema Args: config: A2A resource configuration from agent.json. @@ -167,8 +287,6 @@ def _create_a2a_singleton_tool(config: AgentA2aResourceConfig) -> BaseTool: _lock = asyncio.Lock() _client: Client | None = None _http_client: httpx.AsyncClient | None = None - _task_id: str | None = None - _context_id: str | None = None async def _ensure_client() -> Client: nonlocal _client, _http_client @@ -192,65 +310,138 @@ async def _ensure_client() -> Client: ) return _client # type: ignore[return-value] + metadata = { + "tool_type": "a2a", + "display_name": raw_name, + "slug": config.slug, + } + + if config.advanced_persistance: + return _create_graph_state_tool( + tool_name, tool_description, a2a_url, _ensure_client, metadata + ) + else: + return _create_passthrough_tool( + tool_name, tool_description, a2a_url, _ensure_client, metadata + ) + + + + +def _create_graph_state_tool( + tool_name: str, + tool_description: str, + a2a_url: str, + ensure_client: Any, + metadata: dict[str, Any], +) -> BaseTool: + """Create an A2A tool that persists task_id/context_id in graph state.""" + async def _send(*, message: str) -> str: - nonlocal _task_id, _context_id - client = await _ensure_client() - - if _task_id or _context_id: - logger.info( - "A2A continue task=%s context=%s to %s", - _task_id, - _context_id, - a2a_url, - ) - else: - logger.info("A2A new message to %s", a2a_url) - - a2a_message = Message( - role=Role.user, - parts=[Part(root=TextPart(text=message))], - message_id=str(uuid4()), - task_id=_task_id, - context_id=_context_id, + client = await ensure_client() + text, state, _, _ = await _send_a2a_message( + client, a2a_url, message=message, task_id=None, context_id=None + ) + return _format_response(text, state) + + async def _a2a_wrapper( + tool: BaseTool, + call: ToolCall, + state: AgentGraphState, + ) -> ToolWrapperReturnType: + prior = state.inner_state.tools_storage.get(tool.name) or {} + task_id = prior.get("task_id") + context_id = prior.get("context_id") + + logger.info( + "A2A wrapper read from tools_storage: task_id=%s context_id=%s", + task_id, + context_id, ) - try: - text = "" - state = "unknown" + client = await ensure_client() + text, task_state, new_task_id, new_context_id = await _send_a2a_message( + client, + a2a_url, + message=call["args"]["message"], + task_id=task_id, + context_id=context_id, + ) - async for event in client.send_message(a2a_message): - if isinstance(event, Message): - text = _extract_text(event) - _context_id = event.context_id - state = "completed" - break - else: - task, update = event - _task_id = task.id - _context_id = task.context_id - state = task.status.state.value if task.status else "unknown" - if update is None: - text = _extract_text(task) - break - elif isinstance(update, TaskArtifactUpdateEvent): - for part in update.artifact.parts or []: - if isinstance(part.root, TextPart): - text += part.root.text + logger.info( + "A2A wrapper writing to tools_storage: task_id=%s context_id=%s", + new_task_id, + new_context_id, + ) - return _format_response(text or "No response received.", state) + return Command( + update={ + "messages": [ + ToolMessage( + content=_format_response(text, task_state), + name=call["name"], + tool_call_id=call["id"], + ) + ], + "inner_state": { + "tools_storage": { + tool.name: { + "task_id": new_task_id, + "context_id": new_context_id, + } + } + }, + } + ) - except Exception as e: - logger.exception("A2A request to %s failed", a2a_url) - return _format_response(f"Error: {e}", "error") + tool = A2aStructuredToolWithWrapper( + name=tool_name, + description=tool_description, + coroutine=_send, + args_schema=A2aToolInput, + metadata=metadata, + ) + tool.set_tool_wrappers(awrapper=_a2a_wrapper) + return tool + + + + +def _create_passthrough_tool( + tool_name: str, + tool_description: str, + a2a_url: str, + ensure_client: Any, + metadata: dict[str, Any], +) -> BaseTool: + """Create an A2A tool where the LLM manages task_id/context_id passthrough.""" + + async def _send( + *, + message: str, + task_id: str | None = None, + context_id: str | None = None, + ) -> str: + client = await ensure_client() + text, state, new_task_id, new_context_id = await _send_a2a_message( + client, + a2a_url, + message=message, + task_id=task_id, + context_id=context_id, + ) + return _format_response( + text, + state, + task_id=new_task_id, + context_id=new_context_id, + include_ids=True, + ) return BaseUiPathStructuredTool( name=tool_name, description=tool_description, coroutine=_send, - args_schema=A2aToolInput, - metadata={ - "tool_type": "a2a", - "display_name": raw_name, - "slug": config.slug, - }, + args_schema=A2aToolPassthroughInput, + metadata=metadata, )