-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgraph_engine.py
More file actions
120 lines (94 loc) · 4.24 KB
/
graph_engine.py
File metadata and controls
120 lines (94 loc) · 4.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Kyber Command - LangGraph orchestration engine with MemorySaver."""
import asyncio
from pathlib import Path
import yaml
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph
from langgraph.prebuilt import ToolNode
from agents.coder import CODER_TOOLS, create_coder_llm, create_coder_node
from agents.researcher import create_researcher_llm, researcher_node
from agents.state import KyberState, ROUTER_CODER, ROUTER_RESEARCHER
from agents.supervisor import create_supervisor_llm, supervisor_node
def load_config(config_path: str = "config.yaml") -> dict:
"""Load configuration from YAML."""
path = Path(config_path)
if not path.exists():
raise FileNotFoundError(f"Config not found: {config_path}")
with open(path, encoding="utf-8") as f:
return yaml.safe_load(f)
def _route_after_supervisor(state: KyberState) -> str:
"""Route to researcher or coder based on supervisor's decision."""
next_agent = state.get("next_agent") or ROUTER_RESEARCHER
return next_agent
def _should_continue_coder(state: KyberState) -> str:
"""If last AIMessage has tool_calls, go to coder_tools; else END."""
messages = state.get("messages", [])
last = messages[-1] if messages else None
if isinstance(last, AIMessage) and last.tool_calls:
return "coder_tools"
return END
def build_graph(config: dict | None = None):
"""Build and compile the Kyber Command LangGraph."""
config = config or load_config()
ollama = config.get("ollama", {})
base_url = ollama.get("base_url", "http://localhost:11434")
temperature = ollama.get("temperature", 0.7)
default_model = ollama.get("default_model", "llama3.1:8b")
agents_config = config.get("agents", {})
persistence = config.get("persistence", {})
# Create LLMs
sup_cfg = agents_config.get("supervisor", {})
supervisor_llm = create_supervisor_llm(
model=sup_cfg.get("model", default_model),
base_url=base_url,
temperature=temperature,
)
supervisor_sys = sup_cfg.get("system_prompt", "Route to researcher or coder.")
res_cfg = agents_config.get("researcher", {})
researcher_llm = create_researcher_llm(
model=res_cfg.get("model", default_model),
base_url=base_url,
temperature=temperature,
)
researcher_sys = res_cfg.get("system_prompt", "Answer general questions.")
cod_cfg = agents_config.get("coder", {})
coder_llm = create_coder_llm(
model=cod_cfg.get("model", default_model),
base_url=base_url,
temperature=temperature,
)
coder_sys = cod_cfg.get("system_prompt", "Handle code and files with care.")
# Supervisor node
def _supervisor(s: KyberState) -> dict:
return supervisor_node(s, supervisor_llm, supervisor_sys)
# Researcher node
def _researcher(s: KyberState) -> dict:
return researcher_node(s, researcher_llm, researcher_sys)
# Coder: agent + tools (for ReAct loop)
coder_agent = create_coder_node(coder_llm, coder_sys)
coder_tools = ToolNode(CODER_TOOLS)
# Coder subgraph: agent <-> tools
coder_builder = StateGraph(KyberState)
coder_builder.add_node("coder_agent", coder_agent)
coder_builder.add_node("coder_tools", coder_tools)
coder_builder.add_edge(START, "coder_agent")
coder_builder.add_conditional_edges("coder_agent", _should_continue_coder)
coder_builder.add_edge("coder_tools", "coder_agent")
coder_subgraph = coder_builder.compile()
# Main graph: supervisor -> researcher | coder
builder = StateGraph(KyberState)
builder.add_node("supervisor", _supervisor)
builder.add_node("researcher", _researcher)
builder.add_node("coder", coder_subgraph)
builder.add_edge(START, "supervisor")
builder.add_conditional_edges("supervisor", _route_after_supervisor)
builder.add_edge("researcher", END)
builder.add_edge("coder", END)
# Persistence: MemorySaver (in-memory for now, can switch to AsyncSqliteSaver later)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
return graph
def get_graph():
"""Get the compiled graph (singleton-style for app)."""
return build_graph()