Skip to content

PlateerLab/mantis

Repository files navigation

Mantis

Python License: MIT

Workflow execution engine.

Feed a workflow graph (nodes + edges), get logical execution. Replaces tangled, spaghetti executors with clean 4-phase pipeline: PARSE → PLAN → EXECUTE → FINALIZE.

from mantis import WorkflowRuntime

runtime = WorkflowRuntime(model=my_llm, tools=[calculate, get_weather])

# Execute a workflow graph
async for event in runtime.execute(workflow_data, input_data):
    print(event)

# Or collect the final result
result = await runtime.execute_collect(workflow_data, input_data)

Installation

pip install mantis              # core only (httpx)
pip install mantis[search]      # + graph-tool-call (>=0.15)
pip install mantis[sandbox]     # + Docker sandbox
pip install mantis[state]       # + PostgreSQL checkpointing
pip install mantis[all]         # everything

How It Works

 ┌─ Caller (xgen, FastAPI, script) ────────────────────────────────────┐
 │                                                                      │
 │  workflow_data ─── {nodes: [...], edges: [...]}                      │
 │  input_data ────── data for startnode                                │
 │  model ─────────── LLMProvider (for agent nodes)                    │
 │  tools ─────────── [@tool functions] (for agent nodes)              │
 │  hooks ─────────── [TraceHook, ApprovalHook, ...]                   │
 │  providers ─────── search / sandbox / workflows / state             │
 │                                                                      │
 └──────────────────────────┬───────────────────────────────────────────┘
                            │  runtime.execute(workflow_data, input_data)
                            ▼
╔═══════════════════════════════════════════════════════════════════════════╗
║                         WorkflowRuntime                                  ║
╠═══════════════════════════════════════════════════════════════════════════╣
║                                                                           ║
║   ┌─ PARSE ──────────────────────────────────────────────────────────┐  ║
║   │  JSON → NodeInfo + EdgeInfo                                       │  ║
║   │  disabled/disconnected removal · port dependency · runner mapping │  ║
║   └──────────────────────────┬───────────────────────────────────────┘  ║
║                              ▼                                           ║
║   ┌─ PLAN ───────────────────────────────────────────────────────────┐  ║
║   │  Kahn's algorithm topological sort · cycle detection              │  ║
║   └──────────────────────────┬───────────────────────────────────────┘  ║
║                              ▼                                           ║
║   ┌─ EXECUTE ────────────────────────────────────────────────────────┐  ║
║   │                                                                    │  ║
║   │  for node in execution_order:                                      │  ║
║   │      excluded? → skip                                              │  ║
║   │      PortStore.wire() → inputs                                     │  ║
║   │      Hook.before_node()                                            │  ║
║   │      NodeRunner.run() → result       ┌─────────────────────┐      │  ║
║   │      Hook.after_node()               │  NodeRunner 6 types │      │  ║
║   │      PortStore.store()               │  Default  (compute) │      │  ║
║   │      RouteResolver.evaluate()        │  Agent    (LLM)     │      │  ║
║   │                                      │  Router   (N-way)   │      │  ║
║   │      yield event                     │  Provider (config)  │      │  ║
║   │                                      │  IO       (in/out)  │      │  ║
║   │                                      │  Bypass   (pass)    │      │  ║
║   │                                      └─────────────────────┘      │  ║
║   └──────────────────────────┬───────────────────────────────────────┘  ║
║                              ▼                                           ║
║   ┌─ FINALIZE ───────────────────────────────────────────────────────┐  ║
║   │  collect endnode results · cleanup streams · Hook.on_complete()   │  ║
║   └──────────────────────────────────────────────────────────────────┘  ║
║                                                                           ║
║   ┌─ Internal Modules ───────────────────────────────────────────────┐  ║
║   │  PortStore ────── port-based data wiring between nodes            │  ║
║   │  StreamManager ── generator fan-out · BufferedFactory · replay    │  ║
║   │  RouteResolver ── router branching · subgraph DFS exclusion       │  ║
║   │  ToolRegistry ─── @tool · file · dir · runtime creation · bridge  │  ║
║   │  ToolGenerator ── LLM code gen → Docker verify → register         │  ║
║   │  ToolTester ──── schema → smoke → pytest 3-level verification     │  ║
║   │  WorkflowGenerator ── LLM → WorkflowDef auto-design              │  ║
║   └──────────────────────────────────────────────────────────────────┘  ║
║                                                                           ║
╠═══════════════════════════════════════════════════════════════════════════╣
║   HOOKS (applied per node)                                                ║
║   before_node → [execute] → after_node → on_complete                     ║
║   Approval · Trace · custom                                               ║
╠═══════════════════════════════════════════════════════════════════════════╣
║   ADAPTERS                                                                ║
║   event_to_sse · langchain_adapter · xgen_adapter · bridge                ║
╚═══════════════════════════════════════════════════════════════════════════╝
                            │
                            ▼
                    Event Stream / Result

The main loop has zero if/elif for node types. All node-specific logic dispatches to NodeRunner handlers.

Node Runners

Runner Handles What it does
DefaultRunner compute, transform node.execute(**inputs) → store result
AgentRunner LLM agents PipelineExecutor loop with ToolRegistry, search, sandbox. Streams tokens + tool calls.
RouterRunner conditional branch Extract routing key → RouteResolver excludes unselected subgraphs via DFS
ProviderRunner model, MCP, config Create config object → downstream nodes receive via port wiring
IORunner start, end Start: inject input_data. End: collect results, stream generators.
BypassRunner bypass=True Pass through input without execution

Agent Nodes

When an agent node has no node_class, AgentRunner creates a PipelineExecutor with all connected providers:

AgentRunner._run_mantis_agent()
  → PipelineExecutor(model, ToolRegistry, search, sandbox, workflows, state)
    → RESOLVE: LLM call (with tool search filtering)
    → ACT: tool execution (with create_tool + Docker verify if sandbox)
    → OBSERVE: checkpoint
    → loop until done
  → intermediate events (tool_call, tool_result) propagate to workflow stream

Tool Creation & Verification

When sandbox is provided, agent nodes can create tools at runtime:

create_tool → LLM generates @tool code
  → Docker sandbox: syntax check
    → ToolTester: smoke test + pytest
      → pass → ToolRegistry registration → available next iteration

3-level verification:

Level Method Sandbox
1 validate_schema() No
2 smoke_test() Optional
3 run_pytest() Required

Tool Search (graph-tool-call)

from mantis.search import GraphToolManager, GraphToolConfig

manager = GraphToolManager(GraphToolConfig(search_mode="enhanced"))

tools = await manager.aretrieve("cancel order", top_k=5)
plan = manager.plan_workflow("Process refund for order #123")
compressed = manager.compress_result(huge_response, max_chars=4000)

PipelineExecutor (Agent Mode)

For standalone agent execution without a workflow graph:

from mantis import PipelineExecutor, tool
from mantis.providers import ModelClient

@tool(name="calc", description="Calculate", parameters={"expr": {"type": "string"}})
async def calc(expr: str) -> dict:
    return {"result": eval(expr)}

executor = PipelineExecutor(
    model=ModelClient(model="gpt-4o-mini", api_key="sk-..."),
    tools=[calc],
)
result = await executor.run("What is 42 * 17?")

Events

async for event in runtime.execute(workflow_data, input_data):
    match event["type"]:
        case "workflow_start":   ...  # node count, edge count
        case "node_start":       ...  # node_id, function_id, name
        case "node_complete":    ...  # node_id, result_keys
        case "node_skip":        ...  # node_id, reason (excluded/bypass)
        case "route_decision":   ...  # selected_port
        case "stream_chunk":     ...  # node_id, chunk
        case "agent_tool_call":  ...  # node_id, tool_name, args
        case "agent_tool_result":...  # node_id, tool_name, result
        case "workflow_complete": ...  # final results
        case "workflow_error":   ...  # error detail

Package Structure

mantis/
├── runtime/                    # WorkflowRuntime (core engine)
│   ├── runtime.py              # entry point: PARSE → PLAN → EXECUTE → FINALIZE
│   ├── parse.py                # workflow JSON → NodeInfo/EdgeInfo
│   ├── plan.py                 # topological sort (Kahn's algorithm)
│   ├── execute.py              # main loop + event emission
│   ├── port_store.py           # port-based data wiring
│   ├── stream_manager.py       # generator fan-out / BufferedFactory
│   ├── route_resolver.py       # router branching + DFS exclusion
│   ├── runners/                # NodeRunner handlers (6 types)
│   └── hooks/                  # ExecutionHook (trace, approval)
│
├── executor/                   # PipelineExecutor (agent mode)
│   ├── pipeline.py             # PREPARE → RESOLVE → ACT → OBSERVE loop
│   ├── flow.py                 # FlowState for deterministic flows
│   └── phases/                 # pluggable phase implementations
│
├── tools/                      # @tool decorator, ToolRegistry, bridge
├── search/                     # GraphToolManager (graph-tool-call)
├── sandbox/                    # DockerSandbox
├── generate/                   # ToolGenerator (LLM → code → verify → register)
├── testing/                    # ToolTester (schema → smoke → pytest)
├── workflow/                   # WorkflowDef, Store, Generator
├── adapters/                   # SSE, xgen, LangChain adapters
├── models/                     # NodeInfo, PortInfo, EdgeInfo
└── exceptions.py               # MantisError hierarchy

License

MIT

About

AI agent execution engine — plug in a model, get a working agent.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages