Workflow execution engine.
Feed a workflow graph (nodes + edges), get logical execution. Replaces tangled, spaghetti executors with clean 4-phase pipeline: PARSE → PLAN → EXECUTE → FINALIZE.
from mantis import WorkflowRuntime
runtime = WorkflowRuntime(model=my_llm, tools=[calculate, get_weather])
# Execute a workflow graph
async for event in runtime.execute(workflow_data, input_data):
print(event)
# Or collect the final result
result = await runtime.execute_collect(workflow_data, input_data)pip install mantis # core only (httpx)
pip install mantis[search] # + graph-tool-call (>=0.15)
pip install mantis[sandbox] # + Docker sandbox
pip install mantis[state] # + PostgreSQL checkpointing
pip install mantis[all] # everything ┌─ Caller (xgen, FastAPI, script) ────────────────────────────────────┐
│ │
│ workflow_data ─── {nodes: [...], edges: [...]} │
│ input_data ────── data for startnode │
│ model ─────────── LLMProvider (for agent nodes) │
│ tools ─────────── [@tool functions] (for agent nodes) │
│ hooks ─────────── [TraceHook, ApprovalHook, ...] │
│ providers ─────── search / sandbox / workflows / state │
│ │
└──────────────────────────┬───────────────────────────────────────────┘
│ runtime.execute(workflow_data, input_data)
▼
╔═══════════════════════════════════════════════════════════════════════════╗
║ WorkflowRuntime ║
╠═══════════════════════════════════════════════════════════════════════════╣
║ ║
║ ┌─ PARSE ──────────────────────────────────────────────────────────┐ ║
║ │ JSON → NodeInfo + EdgeInfo │ ║
║ │ disabled/disconnected removal · port dependency · runner mapping │ ║
║ └──────────────────────────┬───────────────────────────────────────┘ ║
║ ▼ ║
║ ┌─ PLAN ───────────────────────────────────────────────────────────┐ ║
║ │ Kahn's algorithm topological sort · cycle detection │ ║
║ └──────────────────────────┬───────────────────────────────────────┘ ║
║ ▼ ║
║ ┌─ EXECUTE ────────────────────────────────────────────────────────┐ ║
║ │ │ ║
║ │ for node in execution_order: │ ║
║ │ excluded? → skip │ ║
║ │ PortStore.wire() → inputs │ ║
║ │ Hook.before_node() │ ║
║ │ NodeRunner.run() → result ┌─────────────────────┐ │ ║
║ │ Hook.after_node() │ NodeRunner 6 types │ │ ║
║ │ PortStore.store() │ Default (compute) │ │ ║
║ │ RouteResolver.evaluate() │ Agent (LLM) │ │ ║
║ │ │ Router (N-way) │ │ ║
║ │ yield event │ Provider (config) │ │ ║
║ │ │ IO (in/out) │ │ ║
║ │ │ Bypass (pass) │ │ ║
║ │ └─────────────────────┘ │ ║
║ └──────────────────────────┬───────────────────────────────────────┘ ║
║ ▼ ║
║ ┌─ FINALIZE ───────────────────────────────────────────────────────┐ ║
║ │ collect endnode results · cleanup streams · Hook.on_complete() │ ║
║ └──────────────────────────────────────────────────────────────────┘ ║
║ ║
║ ┌─ Internal Modules ───────────────────────────────────────────────┐ ║
║ │ PortStore ────── port-based data wiring between nodes │ ║
║ │ StreamManager ── generator fan-out · BufferedFactory · replay │ ║
║ │ RouteResolver ── router branching · subgraph DFS exclusion │ ║
║ │ ToolRegistry ─── @tool · file · dir · runtime creation · bridge │ ║
║ │ ToolGenerator ── LLM code gen → Docker verify → register │ ║
║ │ ToolTester ──── schema → smoke → pytest 3-level verification │ ║
║ │ WorkflowGenerator ── LLM → WorkflowDef auto-design │ ║
║ └──────────────────────────────────────────────────────────────────┘ ║
║ ║
╠═══════════════════════════════════════════════════════════════════════════╣
║ HOOKS (applied per node) ║
║ before_node → [execute] → after_node → on_complete ║
║ Approval · Trace · custom ║
╠═══════════════════════════════════════════════════════════════════════════╣
║ ADAPTERS ║
║ event_to_sse · langchain_adapter · xgen_adapter · bridge ║
╚═══════════════════════════════════════════════════════════════════════════╝
│
▼
Event Stream / Result
The main loop has zero if/elif for node types. All node-specific logic dispatches to NodeRunner handlers.
| Runner | Handles | What it does |
|---|---|---|
| DefaultRunner | compute, transform | node.execute(**inputs) → store result |
| AgentRunner | LLM agents | PipelineExecutor loop with ToolRegistry, search, sandbox. Streams tokens + tool calls. |
| RouterRunner | conditional branch | Extract routing key → RouteResolver excludes unselected subgraphs via DFS |
| ProviderRunner | model, MCP, config | Create config object → downstream nodes receive via port wiring |
| IORunner | start, end | Start: inject input_data. End: collect results, stream generators. |
| BypassRunner | bypass=True | Pass through input without execution |
When an agent node has no node_class, AgentRunner creates a PipelineExecutor with all connected providers:
AgentRunner._run_mantis_agent()
→ PipelineExecutor(model, ToolRegistry, search, sandbox, workflows, state)
→ RESOLVE: LLM call (with tool search filtering)
→ ACT: tool execution (with create_tool + Docker verify if sandbox)
→ OBSERVE: checkpoint
→ loop until done
→ intermediate events (tool_call, tool_result) propagate to workflow stream
When sandbox is provided, agent nodes can create tools at runtime:
create_tool → LLM generates @tool code
→ Docker sandbox: syntax check
→ ToolTester: smoke test + pytest
→ pass → ToolRegistry registration → available next iteration
3-level verification:
| Level | Method | Sandbox |
|---|---|---|
| 1 | validate_schema() |
No |
| 2 | smoke_test() |
Optional |
| 3 | run_pytest() |
Required |
from mantis.search import GraphToolManager, GraphToolConfig
manager = GraphToolManager(GraphToolConfig(search_mode="enhanced"))
tools = await manager.aretrieve("cancel order", top_k=5)
plan = manager.plan_workflow("Process refund for order #123")
compressed = manager.compress_result(huge_response, max_chars=4000)For standalone agent execution without a workflow graph:
from mantis import PipelineExecutor, tool
from mantis.providers import ModelClient
@tool(name="calc", description="Calculate", parameters={"expr": {"type": "string"}})
async def calc(expr: str) -> dict:
return {"result": eval(expr)}
executor = PipelineExecutor(
model=ModelClient(model="gpt-4o-mini", api_key="sk-..."),
tools=[calc],
)
result = await executor.run("What is 42 * 17?")async for event in runtime.execute(workflow_data, input_data):
match event["type"]:
case "workflow_start": ... # node count, edge count
case "node_start": ... # node_id, function_id, name
case "node_complete": ... # node_id, result_keys
case "node_skip": ... # node_id, reason (excluded/bypass)
case "route_decision": ... # selected_port
case "stream_chunk": ... # node_id, chunk
case "agent_tool_call": ... # node_id, tool_name, args
case "agent_tool_result":... # node_id, tool_name, result
case "workflow_complete": ... # final results
case "workflow_error": ... # error detailmantis/
├── runtime/ # WorkflowRuntime (core engine)
│ ├── runtime.py # entry point: PARSE → PLAN → EXECUTE → FINALIZE
│ ├── parse.py # workflow JSON → NodeInfo/EdgeInfo
│ ├── plan.py # topological sort (Kahn's algorithm)
│ ├── execute.py # main loop + event emission
│ ├── port_store.py # port-based data wiring
│ ├── stream_manager.py # generator fan-out / BufferedFactory
│ ├── route_resolver.py # router branching + DFS exclusion
│ ├── runners/ # NodeRunner handlers (6 types)
│ └── hooks/ # ExecutionHook (trace, approval)
│
├── executor/ # PipelineExecutor (agent mode)
│ ├── pipeline.py # PREPARE → RESOLVE → ACT → OBSERVE loop
│ ├── flow.py # FlowState for deterministic flows
│ └── phases/ # pluggable phase implementations
│
├── tools/ # @tool decorator, ToolRegistry, bridge
├── search/ # GraphToolManager (graph-tool-call)
├── sandbox/ # DockerSandbox
├── generate/ # ToolGenerator (LLM → code → verify → register)
├── testing/ # ToolTester (schema → smoke → pytest)
├── workflow/ # WorkflowDef, Store, Generator
├── adapters/ # SSE, xgen, LangChain adapters
├── models/ # NodeInfo, PortInfo, EdgeInfo
└── exceptions.py # MantisError hierarchy
MIT