diff --git a/.flocks/plugins/agents/host-forensics-fast/agent.yaml b/.flocks/plugins/agents/host-forensics-fast/agent.yaml new file mode 100644 index 0000000..787820b --- /dev/null +++ b/.flocks/plugins/agents/host-forensics-fast/agent.yaml @@ -0,0 +1,46 @@ +name: host-forensics-fast +description: >- + Fast Linux host compromise triage subagent for first-pass investigation. Use when the user + wants a quick, concise, and accurate host security check, rapid cryptomining triage, or an + initial compromise assessment before deeper forensics. Runs a lightweight triage script, makes + a fast verdict, and only performs a few targeted follow-up commands for high-signal findings. +description_cn: >- + Linux 主机快速排查子 Agent:用于首轮研判、快速安全检查、挖矿快速排查、或在深度取证前先做简洁准确的主机状态判断。 + 默认执行轻量 triage 脚本,尽快给出结论;仅在发现高置信可疑项时补充少量定点命令,不进入重型深度取证流程。 +mode: subagent +delegatable: true +hidden: false +tags: + - security + - host-forensics + - fast-triage +color: "#F39C12" + +temperature: 0.2 + +tools: + - tool_search + - ssh_run_script + - ssh_host_cmd + - threatbook_mcp_ip_query + - threatbook_mcp_domain_query + - threatbook_mcp_hash_query + - virustotal_ip_query + - virustotal_domain_query + - virustotal_file_query + - bash + - read + +prompt_metadata: + category: security + cost: low + triggers: + - domain: security + trigger: "Linux host quick triage, first-pass compromise assessment, rapid mining check, host quick check" + use_when: + - User asks for a quick Linux host compromise check + - User wants first-pass triage before deep forensics + - User asks for concise and fast host anomaly investigation + avoid_when: + - Full-scope forensic collection is explicitly required + - The user already asked for deep evidence preservation or exhaustive investigation diff --git a/.flocks/plugins/agents/host-forensics-fast/prompt.md b/.flocks/plugins/agents/host-forensics-fast/prompt.md new file mode 100644 index 0000000..fddd8f0 --- /dev/null +++ b/.flocks/plugins/agents/host-forensics-fast/prompt.md @@ -0,0 +1,146 @@ +# Host Forensics Fast Agent + +> **⚠️ 执行约束(必读)** +> 本 agent 必须由主 agent(Rex)**直接执行**,全程使用 `ssh_run_script` / `ssh_host_cmd` / 威胁情报工具完成步骤。 +> **严禁** 将本任务通过 `delegate_task` 委派给任何 subagent。 +> 本版本目标是 **简洁、快速、准确**:默认只做首轮排查,不进入 `deep_scan.sh` 这类重型流程。 + +## 目标 + +- 在最短路径内判断主机是否 **明显异常** +- 优先发现 **挖矿 / 后门 / 持久化 / 异常登录 / 临时目录落地** +- 输出 **可执行的快速结论**,而不是堆积冗长证据 + +## 工具说明 + +- `ssh_run_script`:一次 SSH 执行轻量批量采集脚本 +- `ssh_host_cmd`:仅对高置信可疑项补充 1-3 条定点命令 +- `threatbook_mcp_*` / `virustotal_*`:只查询高信号 IoC,不做大批量查询 + +## 脚本文件 + +| 脚本 | 路径 | 用途 | +|------|------|------| +| triage_fast.sh | `.flocks/plugins/agents/host-forensics-fast/scripts/triage_fast.sh` | 轻量快速排查,通常 10-20 秒完成 | + +--- + +## 调查流程 + +### Step 0:运行 triage_fast.sh + +``` +ssh_run_script(host=<目标IP>, script_path=".flocks/plugins/agents/host-forensics-fast/scripts/triage_fast.sh") +``` + +如果用户已经提供了同等信息的主机输出,可直接跳到 Step 1 分析。 + +--- + +### Step 1:快速研判(默认在 1 轮内完成) + +优先检查以下 8 个维度: + +1. **已知矿工/高 CPU 进程**:`KNOWN_MINER_PROCESSES`、`CPU_TOP_PROCESSES` +2. **异常外联**:`NETWORK_ESTABLISHED`、`SUSPICIOUS_NETWORK_TO_KNOWN_PORTS` +3. **临时目录可执行落地**:`TMP_EXECUTABLES` +4. **持久化痕迹**:`CRON_JOBS`、`SYSTEMD_RUNNING_SERVICES` +5. **认证与登录异常**:`RECENT_AUTH_EVENTS` +6. **SSH 密钥异常**:`SSH_AUTHORIZED_KEYS_ROOT` +7. **运行时隐藏/注入迹象**:`OPEN_FILES_DELETED`、`LD_SO_PRELOAD` +8. **近期可疑落地文件**:`RECENTLY_MODIFIED_FILES` + +**直接判为高可疑的快速信号:** +- `KNOWN_MINER_PROCESSES` 非空 +- `SUSPICIOUS_NETWORK_TO_KNOWN_PORTS` 非空 +- `TMP_EXECUTABLES` 非空 +- `LD_SO_PRELOAD` 非空 +- `OPEN_FILES_DELETED` 非空 + +**判定原则:** +- 无明显异常:输出 `CLEAN` +- 有单点异常但证据不足:输出 `SUSPICIOUS` +- 有多项高置信指标互相印证:输出 `COMPROMISED` + +--- + +### Step 2:仅做少量定点补充(必要时) + +只有在 Step 1 发现高置信可疑项时,才允许继续;并且总共只补充 **最多 3 组** 定点命令。 + +**对可疑进程:** +```bash +ls -la /proc//exe +cat /proc//cmdline | tr '\0' ' ' +ss -tunap | grep +``` + +**对可疑文件:** +```bash +sha256sum +ls -la +``` + +**对可疑计划任务或服务:** +```bash +systemctl status --no-pager +cat +``` + +如果补充命令已经足够支撑结论,立即停止,不再扩展取证面。 + +--- + +### Step 3:高信号 IoC 才查询情报 + +按需查询,不批量滥查: + +- 外部 IP:`threatbook_mcp_ip_query`,必要时补 `virustotal_ip_query` +- 域名:`threatbook_mcp_domain_query`,必要时补 `virustotal_domain_query` +- 可疑样本哈希:`threatbook_mcp_hash_query`,必要时补 `virustotal_file_query` + +--- + +## 输出要求 + +报告必须简短,优先回答: + +1. 这台主机 **现在是否明显可疑** +2. **最关键的 1-3 个证据** 是什么 +3. 需要用户 **下一步做什么** + +使用以下格式: + +```markdown +## Host Quick Assessment + +**Target**: [主机 IP/hostname] +**Verdict**: CLEAN / SUSPICIOUS / COMPROMISED +**Confidence**: HIGH / MEDIUM / LOW + +### Summary +[用 2-3 句话直接说明结论] + +### Key Evidence +- [证据 1] +- [证据 2] +- [证据 3] + +### IoCs +- IPs: [列表] +- Domains: [列表] +- File Hashes: [列表] +- Paths: [列表] + +### Next Actions +1. [立即建议] +2. [后续建议] +``` + +## 约束 + +- **只读**:不修改目标主机 +- **不安装工具**:不在目标主机安装任何软件 +- **不打扰业务**:避免耗时长、扫描面大的命令 +- **先结论后证据**:输出以快速决策为导向 +- **证据不足时不要夸大**:无法证明入侵时,如实给出 `SUSPICIOUS` diff --git a/.flocks/plugins/agents/host-forensics-fast/scripts/triage_fast.sh b/.flocks/plugins/agents/host-forensics-fast/scripts/triage_fast.sh new file mode 100644 index 0000000..de83b66 --- /dev/null +++ b/.flocks/plugins/agents/host-forensics-fast/scripts/triage_fast.sh @@ -0,0 +1,68 @@ +#!/usr/bin/env bash +# Host compromise fast triage script +# ----------------------------------------------------------------------- +# Lightweight, read-only first-pass collection for quick host assessment. +# Focuses on the highest-signal indicators so the caller can decide fast +# whether to stop, escalate, or continue with deeper investigation. +# ----------------------------------------------------------------------- + +LANG=C +export LANG + +_s() { printf '\n### %s ###\n' "$1"; } + +_s "TRIAGE_FAST_START" +date -u +hostname +uname -a +uptime + +_s "CPU_TOP_PROCESSES" +ps aux --sort=-%cpu 2>/dev/null | head -15 || ps aux 2>/dev/null | head -15 + +_s "KNOWN_MINER_PROCESSES" +ps aux 2>/dev/null | grep -iE 'xmrig|minerd|cpuminer|cgminer|bfgminer|ethminer|nbminer|phoenixminer|t-rex|gminer|kinsing' | grep -v grep + +_s "NETWORK_ESTABLISHED" +ss -tunap 2>/dev/null | grep -v "127\.0\.0\.1\|::1" | grep ESTAB | head -25 || \ + netstat -tunap 2>/dev/null | grep -v "127\.0\.0\.1\|::1" | grep ESTABLISHED | head -25 + +_s "SUSPICIOUS_NETWORK_TO_KNOWN_PORTS" +ss -tunap 2>/dev/null | grep -E ':3333|:4444|:5555|:14444|:45700|:8899|:9999' | grep ESTAB | head -15 + +_s "LISTENING_PORTS" +ss -tlnup 2>/dev/null | head -20 || netstat -tlnup 2>/dev/null | head -20 + +_s "TMP_EXECUTABLES" +find /tmp /dev/shm /var/tmp -type f -executable 2>/dev/null | head -20 + +_s "CRON_JOBS" +crontab -l 2>/dev/null +echo '---' +cat /etc/crontab 2>/dev/null +echo '---' +cat /etc/cron.d/* 2>/dev/null | head -40 + +_s "SYSTEMD_RUNNING_SERVICES" +systemctl list-units --type=service --state=running --no-pager 2>/dev/null | head -25 + +_s "SSH_AUTHORIZED_KEYS_ROOT" +cat /root/.ssh/authorized_keys 2>/dev/null + +_s "LD_SO_PRELOAD" +cat /etc/ld.so.preload 2>/dev/null + +_s "OPEN_FILES_DELETED" +lsof 2>/dev/null | grep '(deleted)' | head -15 + +_s "RECENT_AUTH_EVENTS" +grep -E 'Failed password|Accepted password|Accepted publickey|Invalid user|ROOT' \ + /var/log/auth.log 2>/dev/null | tail -50 || \ +grep -E 'Failed password|Accepted password|Accepted publickey|Invalid user|ROOT' \ + /var/log/secure 2>/dev/null | tail -50 + +_s "RECENTLY_MODIFIED_FILES" +find /root /home /tmp /var/tmp /dev/shm /etc /usr/local /opt -maxdepth 3 -type f -mtime -3 2>/dev/null | head -40 + +_s "TRIAGE_FAST_COMPLETE" +date -u diff --git a/.flocks/plugins/workflows/loop_host_forensics_fast/meta.json b/.flocks/plugins/workflows/loop_host_forensics_fast/meta.json new file mode 100644 index 0000000..0a779ae --- /dev/null +++ b/.flocks/plugins/workflows/loop_host_forensics_fast/meta.json @@ -0,0 +1,10 @@ +{ + "name": "loop_host_forensics_fast", + "description": "从文件或 inputs 读取主机列表,循环调用 host-forensics-fast 子 Agent;每台主机结果立即落盘为独立文件,末步仅生成轻量索引与清单,避免全量 summary 超时", + "category": "default", + "status": "active", + "createdBy": null, + "createdAt": 1775787114059, + "updatedAt": 1775817769342, + "id": "loop_host_forensics_fast" +} \ No newline at end of file diff --git a/.flocks/plugins/workflows/loop_host_forensics_fast/workflow.json b/.flocks/plugins/workflows/loop_host_forensics_fast/workflow.json new file mode 100644 index 0000000..4feb09d --- /dev/null +++ b/.flocks/plugins/workflows/loop_host_forensics_fast/workflow.json @@ -0,0 +1,82 @@ +{ + "id": "loop_host_forensics_fast", + "name": "loop_host_forensics_fast", + "description": "从文件或 inputs 读取主机列表;可选 ssh_user。每台巡检结束立即写入 host_triage/ 下独立 md;循环态仅保留包含 success/verdict/per_host_md 的轻量结果索引。每台巡检前先做 SSH 预检,超时仅重试一次;末步同时生成 JSON/CSV 索引,避免最后节点 summary 因全量结果过大而超时。", + "start": "init_hosts", + "nodes": [ + { + "id": "init_hosts", + "type": "python", + "name": "初始化主机列表与日志", + "description": "解析 hosts、ssh_user;创建 output_dir 与 host_triage/;写总日志头并初始化轻量结果索引。", + "code": "import datetime\nimport os\nfrom pathlib import Path\nfrom flocks.workspace.manager import WorkspaceManager\n\nhosts = inputs.get(\"hosts\")\nif hosts is None:\n hosts = []\nif isinstance(hosts, str):\n hosts = [hosts.strip()] if str(hosts).strip() else []\nelif not isinstance(hosts, list):\n hosts = []\n\nhosts_file = str(inputs.get(\"hosts_file\") or \"\").strip()\nif hosts_file:\n p = Path(hosts_file).expanduser()\n if p.is_file():\n for line in p.read_text(encoding=\"utf-8\").splitlines():\n line = line.strip()\n if not line or line.startswith(\"#\"):\n continue\n hosts.append(line)\n\nseen = set()\nuniq = []\nfor h in hosts:\n hs = str(h).strip()\n if not hs or hs in seen:\n continue\n seen.add(hs)\n uniq.append(hs)\nhosts = uniq\n\nsu = inputs.get(\"ssh_user\")\nif isinstance(su, str):\n su = su.strip()\nelse:\n su = \"\"\n\nws = WorkspaceManager.get_instance()\noday = datetime.date.today().isoformat()\noutput_dir = str(ws.get_workspace_dir() / \"outputs\" / oday)\nos.makedirs(output_dir, exist_ok=True)\nper_host_dir = os.path.join(output_dir, \"host_triage\")\nbatch_report_path = os.path.join(output_dir, \"batch_host_triage_log.md\")\n\noutputs[\"hosts\"] = hosts\noutputs[\"ssh_user\"] = su\noutputs[\"host_idx\"] = 0\noutputs[\"triage_results\"] = []\noutputs[\"output_dir\"] = output_dir\noutputs[\"per_host_dir\"] = per_host_dir\noutputs[\"batch_report_path\"] = batch_report_path\noutputs[\"should_continue\"] = \"continue\" if hosts else \"exit\"\n\nif hosts:\n os.makedirs(per_host_dir, exist_ok=True)\n header = (\n \"# 批量主机快速巡检日志\\n\\n\"\n \"- 开始日期: \" + oday + \"\\n\"\n \"- 主机数量: \" + str(len(hosts)) + \"\\n\"\n \"- 逐台报告目录: `host_triage/`(每台一个独立 `.md`)\\n\"\n \"- 工作流状态: 仅保留逐台结果的轻量索引,完整正文只写入单机报告文件\\n\"\n )\n if su:\n header += \"- SSH 用户: `\" + su + \"`(`ssh_run_script` 的 host 使用 `user@<目标>`)\\n\"\n else:\n header += \"- SSH 用户: (未指定 `ssh_user`,仅下列主机标识、Agent/工具默认账户,一般为 root)\\n\"\n header += \"\\n---\\n\\n\"\n with open(batch_report_path, \"w\", encoding=\"utf-8\") as f:\n f.write(header)" + }, + { + "id": "loop_check", + "type": "loop", + "name": "循环判断", + "select_key": "should_continue", + "description": "should_continue 为 continue 则巡检下一台,为 exit 则结束循环" + }, + { + "id": "inspect_host", + "type": "python", + "name": "单台快速巡检", + "description": "每台主机先做 SSH 预检;预检通过后再调 task(host-forensics-fast)。超时仅重试一次;循环态保留文件路径、执行状态、verdict 与失败分类。", + "code": "import os\nimport re\nimport time\n\nhosts = inputs.get(\"hosts\", [])\nidx = int(inputs.get(\"host_idx\", 0))\nhost = hosts[idx] if 0 <= idx < len(hosts) else \"\"\nhost = str(host).strip()\n\nsu = inputs.get(\"ssh_user\")\nif isinstance(su, str):\n su = su.strip()\nelse:\n su = \"\"\n\nconnect_host = host\nconnect_user = \"\"\nif \"@\" in host:\n user_part, host_part = host.split(\"@\", 1)\n connect_user = str(user_part).strip()\n connect_host = str(host_part).strip() or host\n ssh_target = host\n user_hint = (\n \"主机列表项已含 `user@host` 形式。SSH 工具调用必须使用:host=`\"\n + connect_host\n + \"`,username=`\"\n + connect_user\n + \"`。\"\n )\nelif su:\n connect_user = su\n connect_host = host\n ssh_target = su + \"@\" + host\n user_hint = (\n \"工作流已指定 `ssh_user`=`\"\n + su\n + \"`。SSH 工具调用必须使用:host=`\"\n + connect_host\n + \"`,username=`\"\n + connect_user\n + \"`。\"\n )\nelse:\n connect_host = host\n ssh_target = host\n user_hint = (\n \"未指定 `ssh_user`。SSH 工具调用请使用 host=`\"\n + connect_host\n + \"`,username 留空使用默认账户(一般为 root)。\"\n )\n\n\ndef _extract_verdict(markdown_text):\n if not markdown_text:\n return \"UNKNOWN\"\n match = re.search(\n r\"(?im)^\\s*\\*{0,2}Verdict\\*{0,2}\\s*:\\s*\"\n r\"(CLEAN|SUSPICIOUS|COMPROMISED|UNKNOWN)\\b\",\n markdown_text,\n )\n if not match:\n return \"UNKNOWN\"\n return str(match.group(1)).upper()\n\n\ndef _is_timeout_error(message):\n text = str(message or \"\").lower()\n return (\n \"timed out\" in text\n or \"timeout\" in text\n or \"超时\" in text\n or \"节点执行超时\" in text\n )\n\n\ndef _classify_error(message):\n text = str(message or \"\")\n lower = text.lower()\n if not text.strip():\n return \"unknown\"\n if \"permission denied\" in lower or \"auth failed\" in lower or \"authentication failed\" in lower:\n return \"auth_failed\"\n if \"host key verification failed\" in lower or \"host key\" in lower:\n return \"host_key_verification_failed\"\n if \"connection refused\" in lower:\n return \"connection_refused\"\n if \"no route to host\" in lower:\n return \"no_route_to_host\"\n if \"network is unreachable\" in lower:\n return \"network_unreachable\"\n if \"name or service not known\" in lower or \"could not resolve\" in lower or \"nodename nor servname provided\" in lower:\n return \"dns_resolution_failed\"\n if \"connection reset\" in lower:\n return \"connection_reset\"\n if \"broken pipe\" in lower or \"connection lost\" in lower or \"disconnect\" in lower:\n return \"connection_lost\"\n if \"kex\" in lower or \"key exchange\" in lower or \"protocol error\" in lower:\n return \"ssh_handshake_failed\"\n if _is_timeout_error(text):\n if \"connect\" in lower or \"ssh connection failed\" in lower:\n return \"connect_timeout\"\n return \"execution_timeout\"\n if \"ssh connection failed\" in lower:\n return \"ssh_connection_failed\"\n return \"unknown\"\n\n\nidx1 = idx + 1\nper_host_dir = str(inputs.get(\"per_host_dir\") or \"\").strip()\nif not per_host_dir:\n od = str(inputs.get(\"output_dir\") or \"\").strip()\n if od:\n per_host_dir = os.path.join(od, \"host_triage\")\n else:\n per_host_dir = os.path.join(os.path.dirname(inputs.get(\"batch_report_path\") or \".\") or \".\", \"host_triage\")\nos.makedirs(per_host_dir, exist_ok=True)\n\n\ndef _slug(s):\n t = re.sub(r\"[^0-9A-Za-z._@-]+\", \"_\", str(s)).strip(\"_\")\n t = t.replace(\"@\", \"_at_\")\n return (t[:56] if t else \"host\")\n\n\nnh = len(hosts)\nbase = \"{:04d}_{}\".format(idx1, _slug(ssh_target))\nper_host_md = os.path.join(per_host_dir, base + \".md\")\n\npreflight_timeout_s = 20\npreflight_res = tool.run_safe(\n \"ssh_host_cmd\",\n host=connect_host,\n username=(connect_user or None),\n command=\"echo FLOCKS_SSH_OK\",\n timeout=preflight_timeout_s,\n)\npreflight_output = preflight_res.get(\"output\") or preflight_res.get(\"text\") or \"\"\npreflight_error = preflight_res.get(\"error\") or \"\"\npreflight_ok = bool(preflight_res.get(\"success\")) and \"FLOCKS_SSH_OK\" in str(preflight_output)\n\ntext = \"\"\nok = False\nerr = \"\"\nverdict = \"UNKNOWN\"\nfailure_category = \"\"\nattempts = 0\n\nif preflight_ok:\n desc = \"Fast triage item \" + str(idx1)\n prompt = (\n \"你是 host-forensics-fast 工作模式:对下列目标执行 Linux 主机快速安全巡检(首轮研判)。\\n\"\n \"请使用 ssh_run_script,script_path 为 `.flocks/plugins/agents/host-forensics-fast/scripts/triage_fast.sh`。\\n\"\n + user_hint\n + \"\\n\\n本次 SSH 工具参数必须使用:\\n\"\n + \"- host: \"\n + connect_host\n + \"\\n\"\n + (\"- username: \" + connect_user + \"\\n\" if connect_user else \"- username: (留空,使用默认账户)\\n\")\n + \"\\n请输出简洁 Markdown:结论、可疑项、风险判断、后续建议。\"\n )\n while attempts < 2:\n attempts += 1\n res = tool.run_safe(\n \"task\",\n description=desc + \" attempt \" + str(attempts),\n prompt=prompt,\n subagent_type=\"host-forensics-fast\",\n run_in_background=False,\n )\n text = res.get(\"text\") or \"\"\n ok = bool(res.get(\"success\"))\n err = res.get(\"error\") or \"\"\n if ok:\n verdict = _extract_verdict(text)\n break\n if not _is_timeout_error(err) or attempts >= 2:\n failure_category = _classify_error(err)\n break\n time.sleep(3)\n if not ok and not failure_category:\n failure_category = _classify_error(err)\nelse:\n err = preflight_error or \"SSH preflight failed\"\n failure_category = _classify_error(err)\n\nlines = [\n \"# 单主机快速巡检结果\",\n \"\",\n \"- 批次内序号: {} / {}\".format(idx1, nh),\n \"- 列表项 host: `{}`\".format(host),\n \"- ssh_target: `{}`\".format(ssh_target),\n \"- ssh_host: `{}`\".format(connect_host),\n \"- ssh_user: `{}`\".format(connect_user or \"(default)\"),\n \"- success: {}\".format(ok),\n \"- verdict: {}\".format(verdict),\n \"- failure_category: {}\".format(failure_category or \"\"),\n \"- inspect_attempts: {}\".format(attempts),\n \"\",\n]\nif not preflight_ok:\n lines.extend(\n [\n \"## SSH 预检失败\",\n \"\",\n \"- 分类: `{}`\".format(failure_category),\n \"- 错误:\",\n \"\",\n \"```\",\n str(err),\n \"```\",\n \"\",\n ]\n )\nelif err:\n lines.extend([\"## 错误\", \"\", \"```\", str(err), \"```\", \"\"])\nelse:\n lines.extend([\"## 子 Agent 输出\", \"\", (text if text else \"_(无正文)_\"), \"\"])\nwith open(per_host_md, \"w\", encoding=\"utf-8\") as f:\n f.write(\"\\n\".join(lines))\n\ntr = inputs.get(\"triage_results\", [])\ntr = list(tr) if isinstance(tr, list) else []\ntr.append(\n {\n \"host\": host,\n \"ssh_user\": connect_user or su,\n \"ssh_target\": ssh_target,\n \"ssh_host\": connect_host,\n \"success\": ok,\n \"verdict\": verdict,\n \"failure_category\": failure_category,\n \"inspect_attempts\": attempts,\n \"error\": err,\n \"per_host_md\": per_host_md,\n }\n)\noutputs[\"triage_results\"] = tr\n\nbatch_report_path = inputs.get(\"batch_report_path\", \"\")\nsection = (\n \"\\n## [{}/{}] `{}`\\n\\n\".format(idx1, nh, ssh_target)\n + \"- 单独报告: `{}`\\n\".format(per_host_md)\n + \"- 执行结果: {}\\n\".format(\"成功\" if ok else \"失败\")\n + \"- 判定结果: `{}`\\n\".format(verdict)\n + \"- 失败分类: `{}`\\n\".format(failure_category or \"\")\n + \"- 尝试次数: {}\\n\\n---\\n\".format(attempts)\n)\nif batch_report_path:\n with open(batch_report_path, \"a\", encoding=\"utf-8\") as f:\n f.write(section)\n\noutputs[\"last_host\"] = host\noutputs[\"last_ssh_target\"] = ssh_target\noutputs[\"last_success\"] = ok\noutputs[\"last_verdict\"] = verdict\noutputs[\"last_failure_category\"] = failure_category\noutputs[\"last_per_host_md\"] = per_host_md" + }, + { + "id": "advance_index", + "type": "python", + "name": "下一台主机", + "description": "host_idx 自增并更新 should_continue", + "code": "hosts = inputs.get(\"hosts\", [])\nidx = int(inputs.get(\"host_idx\", 0)) + 1\noutputs[\"host_idx\"] = idx\noutputs[\"should_continue\"] = \"continue\" if idx < len(hosts) else \"exit\"\n" + }, + { + "id": "finalize_summary", + "type": "python", + "name": "生成索引与清单", + "description": "无 LLM:写 batch_host_triage_index.md、manifest.json、轻量 results.json、results.csv,并汇总 success/verdict 与失败分类。", + "code": "import csv\nimport datetime\nimport json\nimport os\nfrom flocks.workspace.manager import WorkspaceManager\n\nresults = inputs.get(\"triage_results\") or []\nif not isinstance(results, list):\n results = []\n\nws = WorkspaceManager.get_instance()\noday = datetime.date.today().isoformat()\noutput_dir = str(ws.get_workspace_dir() / \"outputs\" / oday)\nos.makedirs(output_dir, exist_ok=True)\nindex_path = os.path.join(output_dir, \"batch_host_triage_index.md\")\nmanifest_path = os.path.join(output_dir, \"batch_host_triage_manifest.json\")\nresults_json_path = os.path.join(output_dir, \"batch_host_triage_results.json\")\nresults_csv_path = os.path.join(output_dir, \"batch_host_triage_results.csv\")\nbatch_report_path = inputs.get(\"batch_report_path\", \"\")\nper_host_dir = str(inputs.get(\"per_host_dir\") or \"\").strip() or os.path.join(output_dir, \"host_triage\")\nhosts = inputs.get(\"hosts\", [])\nif not isinstance(hosts, list):\n hosts = []\n\nsu = inputs.get(\"ssh_user\")\nif isinstance(su, str):\n su = su.strip()\nelse:\n su = \"\"\n\nverdict_order = [\"CLEAN\", \"SUSPICIOUS\", \"COMPROMISED\", \"UNKNOWN\"]\nverdict_counts = {name: 0 for name in verdict_order}\nfailure_counts = {}\nfor r in results:\n if not isinstance(r, dict):\n continue\n verdict = str(r.get(\"verdict\") or \"UNKNOWN\").upper()\n if verdict not in verdict_counts:\n verdict = \"UNKNOWN\"\n verdict_counts[verdict] += 1\n failure_category = str(r.get(\"failure_category\") or \"\").strip()\n if failure_category:\n failure_counts[failure_category] = failure_counts.get(failure_category, 0) + 1\n\nn_ok = sum(1 for r in results if isinstance(r, dict) and r.get(\"success\"))\nn_total = len(results)\n\nlines = [\n \"# 批量主机快速巡检 — 索引\",\n \"\",\n \"每台主机的完整输出已写在 `host_triage/` 下独立 `.md` 文件中;本文件只保留目录信息与轻量状态(**无 LLM 调用**,避免大批量巡检末步超时)。\",\n \"\",\n \"- 日期: `{}`\".format(oday),\n \"- 逐台目录: `{}`\".format(per_host_dir),\n \"- 合计: {} 台\".format(n_total),\n \"- 执行成功: {} 台\".format(n_ok),\n \"- 执行失败: {} 台\".format(n_total - n_ok),\n \"- CSV 快速视图: `{}`\".format(results_csv_path),\n \"- CLEAN: {} 台\".format(verdict_counts[\"CLEAN\"]),\n \"- SUSPICIOUS: {} 台\".format(verdict_counts[\"SUSPICIOUS\"]),\n \"- COMPROMISED: {} 台\".format(verdict_counts[\"COMPROMISED\"]),\n \"- UNKNOWN: {} 台\".format(verdict_counts[\"UNKNOWN\"]),\n \"\",\n]\nif failure_counts:\n lines.extend([\"## 失败分类统计\", \"\"])\n for name in sorted(failure_counts):\n lines.append(\"- {}: {} 台\".format(name, failure_counts[name]))\n lines.append(\"\")\nlines.extend([\"## 逐台文件与状态\", \"\"])\nif not results:\n lines.append(\"_无可巡检记录(未执行子 Agent 或主机列表为空)。_\")\nelse:\n for i, r in enumerate(results, 1):\n if not isinstance(r, dict):\n continue\n p = r.get(\"per_host_md\") or \"\"\n lab = r.get(\"ssh_target\") or r.get(\"host\", \"\")\n ok = r.get(\"success\", False)\n verdict = str(r.get(\"verdict\") or \"UNKNOWN\").upper()\n if verdict not in verdict_counts:\n verdict = \"UNKNOWN\"\n failure_category = str(r.get(\"failure_category\") or \"\").strip()\n attempts = int(r.get(\"inspect_attempts\") or 0)\n bn = os.path.basename(p) if p else \"(未生成文件)\"\n exec_status = \"成功\" if ok else \"失败\"\n line = \"{}. `{}` — 执行: {} — 判定: `{}` — 尝试: {} — 文件: `{}`\".format(\n i,\n lab,\n exec_status,\n verdict,\n attempts,\n bn,\n )\n if failure_category:\n line += \" — 失败分类: `{}`\".format(failure_category)\n lines.append(line)\n if p:\n lines.append(\" - 路径: `{}`\".format(p))\n\nindex_body = \"\\n\".join(lines)\nwith open(index_path, \"w\", encoding=\"utf-8\") as f:\n f.write(index_body)\n\nitems = []\nfor r in results:\n if not isinstance(r, dict):\n continue\n verdict = str(r.get(\"verdict\") or \"UNKNOWN\").upper()\n if verdict not in verdict_counts:\n verdict = \"UNKNOWN\"\n items.append(\n {\n \"host\": r.get(\"host\"),\n \"ssh_user\": r.get(\"ssh_user\"),\n \"ssh_target\": r.get(\"ssh_target\"),\n \"ssh_host\": r.get(\"ssh_host\"),\n \"success\": r.get(\"success\"),\n \"verdict\": verdict,\n \"failure_category\": r.get(\"failure_category\"),\n \"inspect_attempts\": r.get(\"inspect_attempts\"),\n \"error\": r.get(\"error\"),\n \"per_host_md\": r.get(\"per_host_md\"),\n }\n )\nmanifest = {\n \"date\": oday,\n \"output_dir\": output_dir,\n \"per_host_dir\": per_host_dir,\n \"hosts\": hosts,\n \"ssh_user\": su,\n \"batch_report_path\": batch_report_path,\n \"index_path\": index_path,\n \"results_csv_path\": results_csv_path,\n \"items\": items,\n}\nwith open(manifest_path, \"w\", encoding=\"utf-8\") as f:\n json.dump(manifest, f, ensure_ascii=False, indent=2)\n\nbundle = {\n \"date\": oday,\n \"hosts\": hosts,\n \"ssh_user\": su,\n \"output_dir\": output_dir,\n \"per_host_dir\": per_host_dir,\n \"batch_report_path\": batch_report_path,\n \"index_path\": index_path,\n \"manifest_path\": manifest_path,\n \"results_csv_path\": results_csv_path,\n \"triage_results\": items,\n}\nwith open(results_json_path, \"w\", encoding=\"utf-8\") as f:\n json.dump(bundle, f, ensure_ascii=False, indent=2)\n\nwith open(results_csv_path, \"w\", encoding=\"utf-8\", newline=\"\") as f:\n writer = csv.DictWriter(\n f,\n fieldnames=[\n \"host\",\n \"ssh_user\",\n \"ssh_target\",\n \"ssh_host\",\n \"success\",\n \"verdict\",\n \"failure_category\",\n \"inspect_attempts\",\n \"error\",\n \"per_host_md\",\n ],\n )\n writer.writeheader()\n for item in items:\n writer.writerow(item)\n\nsummary = (\n \"批量巡检完成:共 {} 台,执行成功 {} 台,执行失败 {} 台;\"\n \"判定结果为 CLEAN {} 台、SUSPICIOUS {} 台、COMPROMISED {} 台、UNKNOWN {} 台。\"\n \"逐台完整结果已写入 `{}`,索引见 `{}`,快速筛选 CSV 见 `{}`。\"\n).format(\n n_total,\n n_ok,\n n_total - n_ok,\n verdict_counts[\"CLEAN\"],\n verdict_counts[\"SUSPICIOUS\"],\n verdict_counts[\"COMPROMISED\"],\n verdict_counts[\"UNKNOWN\"],\n per_host_dir,\n index_path,\n results_csv_path,\n)\noutputs[\"executive_summary\"] = summary\noutputs[\"index_path\"] = index_path\noutputs[\"manifest_path\"] = manifest_path\noutputs[\"results_json_path\"] = results_json_path\noutputs[\"results_csv_path\"] = results_csv_path\noutputs[\"batch_report_path\"] = batch_report_path\noutputs[\"per_host_dir\"] = per_host_dir" + } + ], + "edges": [ + { + "from": "init_hosts", + "to": "loop_check", + "order": 0 + }, + { + "from": "loop_check", + "to": "inspect_host", + "label": "continue", + "order": 0 + }, + { + "from": "loop_check", + "to": "finalize_summary", + "label": "exit", + "order": 1 + }, + { + "from": "inspect_host", + "to": "advance_index", + "order": 0 + }, + { + "from": "advance_index", + "to": "loop_check", + "order": 0 + } + ], + "metadata": { + "node_timeout_s": 900, + "sampleInputs": { + "hosts": [ + "", + "" + ], + "ssh_user": "" + } + } +} diff --git a/.flocks/plugins/workflows/loop_host_forensics_fast/workflow.md b/.flocks/plugins/workflows/loop_host_forensics_fast/workflow.md new file mode 100644 index 0000000..e124bec --- /dev/null +++ b/.flocks/plugins/workflows/loop_host_forensics_fast/workflow.md @@ -0,0 +1,131 @@ +# 批量主机快速巡检(循环子 Agent) + +## 业务场景 + +对多台 Linux 主机依次执行快速安全巡检(首轮研判):每台主机由子 Agent `host-forensics-fast` 完成轻量 triage 与结论输出。适用于批量资产排查、挖矿/异常快速过筛等场景。 + +本工作流的关键目标是避免“循环结束后最后一个节点拿到全量巡检正文再做 summary”导致超时,因此策略改为: + +- 每巡检完一台主机,立即把完整结果写入 `host_triage/` 下的独立 Markdown 文件。 +- 工作流循环态只保留轻量索引信息,不在 `triage_results` 中累计完整正文。 +- 轻量索引额外沉淀 `verdict`,便于后续只读索引文件就做全局概况,或直接筛出异常主机。 +- 每台巡检前先做一次 SSH 预检;若预检失败,直接记录失败分类,不再进入重型巡检。 +- 对单台巡检仅在“超时”场景自动重试 1 次。 +- 末步不再做 LLM 汇总,只生成索引文件、manifest、轻量结果 JSON 和 CSV,并返回一段很短的执行摘要。 + +## 输入参数(工作流 `inputs`) + +| 字段 | 类型 | 说明 | +|------|------|------| +| `hosts_file` | 可选 string | 主机列表文件绝对路径(或 `~` 开头)。每行一个 SSH 目标(主机名或可解析地址),空行与 `#` 开头行忽略。 | +| `hosts` | 可选 list[str] | 直接内嵌的主机列表;可与文件合并(先读文件再并入列表,去重保序)。 | +| `hosts` 为 string | 少见 | 单主机字符串时视为单元素列表。 | +| `ssh_user` | 可选 string | 指定 SSH 登录用户。若提供且列表项不含 `user@host` 形式,则工作流会在内部拆分成 `host=<主机>`、`username=` 调用 SSH 工具。若不提供,则使用列表中的主机标识本身(由 Agent/工具默认账户连接,一般为 root)。 | + +若两者都未提供有效主机,工作流跳过巡检循环,仅输出无主机的轻量结果。 + +若 `hosts` 中某项已含 `@`(如 `user@target-host`),则不再与 `ssh_user` 拼接,该项按原样作为 SSH 目标。 + +## 流程步骤 + +### 1. 初始化(`init_hosts`) + +- 工具/模型:Python(`WorkspaceManager` + 文件系统) +- 输入:`hosts_file`、`hosts`、可选 `ssh_user` +- 处理逻辑: + - 解析主机列表并去重保序。 + - 规范化 `ssh_user`。 + - 创建输出目录与 `host_triage/` 目录。 + - 初始化 `batch_host_triage_log.md` 文件头。 + - 初始化循环状态:`host_idx=0`、`triage_results=[]`、`should_continue`。 +- 输出:`hosts`、`ssh_user`、`host_idx`、`triage_results`、`output_dir`、`per_host_dir`、`batch_report_path`、`should_continue` + +说明:这里的 `triage_results` 从现在开始只保存轻量索引,不保存每台机器的完整正文;逐台结果至少包含 `success`、`verdict`、`failure_category`、`per_host_md`。 + +### 2. 循环判断(`loop_check`) + +- 工具/模型:`loop` 节点,`select_key` 为 `should_continue` +- 决策分支: + - `continue`:进入单台巡检 + - `exit`:进入末步索引生成 + +### 3. 单台巡检(`inspect_host`) + +- 工具/模型:Python + `ssh_host_cmd` 预检 + `task`(`subagent_type=host-forensics-fast`) +- 输入:`hosts`、`host_idx`、`ssh_user`、`per_host_dir`、`batch_report_path`、`triage_results` +- 处理逻辑: + - 取当前 `hosts[host_idx]`,计算 `ssh_target`,并归一化出 `ssh_host` / `ssh_user`。 + - 先用 `ssh_host_cmd("echo FLOCKS_SSH_OK")` 做轻量 SSH 预检。 + - 若预检失败:按错误文本归类(如 `auth_failed`、`connect_timeout`、`connection_refused` 等),直接写入索引与单机报告。 + - 若预检通过:构造 prompt,明确要求子 Agent 调用 SSH 工具时分别传 `host` 和 `username`。 + - 调用 `tool.run_safe('task', ...)` 执行巡检;若仅因超时失败,则自动重试 1 次。 + - 将本轮完整输出立即写入 `host_triage/NNNN_slug.md`。 + - 从子 Agent 输出中提取 `Verdict`,未识别时回退为 `UNKNOWN`。 + - 向 `triage_results` 仅追加轻量字段:`{host, ssh_user, ssh_target, ssh_host, success, verdict, failure_category, inspect_attempts, error, per_host_md}`。 + - 向 `batch_host_triage_log.md` 追加一段索引信息,不再追加完整正文。 +- 输出:`triage_results`、`last_host`、`last_ssh_target`、`last_success`、`last_per_host_md` + +### 4. 前进下标(`advance_index`) + +- 工具/模型:Python +- 处理逻辑:`host_idx += 1`;若仍小于 `len(hosts)` 则 `should_continue='continue'`,否则为 `exit` +- 输出:`host_idx`、`should_continue` + +### 5. 生成索引与清单(`finalize_summary`) + +- 工具/模型:Python +- 输入:`triage_results`、`hosts`、`ssh_user`、`per_host_dir`、`batch_report_path` +- 处理逻辑: + - 不调用 LLM。 + - 基于轻量索引生成 `batch_host_triage_index.md`,其中包含执行状态、`verdict` 分布和失败分类统计。 + - 生成 `batch_host_triage_manifest.json`。 + - 生成 `batch_host_triage_results.json`,其中 `triage_results` 仅包含轻量字段,不包含每台完整正文。 + - 生成 `batch_host_triage_results.csv`,用于快速筛选与表格查看。 + - 输出一段短 `executive_summary`,说明总数、成功失败数、`verdict` 分布和结果文件位置。 +- 输出:`executive_summary`、`index_path`、`manifest_path`、`results_json_path`、`results_csv_path`、`batch_report_path`、`per_host_dir` + +## 文件输出约定 + +- 目录:`~/.flocks/workspace/outputs/<执行当日 YYYY-MM-DD>/` +- 逐台完整结果:`host_triage/<序号>_<主机>.md` +- 循环日志:`batch_host_triage_log.md` +- 末步索引:`batch_host_triage_index.md` +- 末步清单:`batch_host_triage_manifest.json` +- 末步轻量结果:`batch_host_triage_results.json` +- 末步快速视图:`batch_host_triage_results.csv` + +## 设计说明 + +- 大字段正文只写磁盘,不在循环状态里累积。 +- 末步索引、`results.json` 和 `results.csv` 会同步保留每台主机的 `success`、`verdict`、失败分类与 `per_host_md`,便于后续快速概览和异常筛选。 +- 本工作流建议运行时使用 `node_timeout_s=900`;工作流元数据已内置该默认值。 +- 最后一个节点只处理轻量索引,因此主机数很多时也不容易超时。 +- 若需要查看某台机器的完整分析,直接打开对应的 `host_triage/*.md` 即可。 + +## 样例 inputs + +指定 SSH 用户: + +```json +{ + "hosts": ["", ""], + "ssh_user": "" +} +``` + +不指定用户: + +```json +{ + "hosts": [""] +} +``` + +从文件合并列表: + +```json +{ + "hosts_file": "/path/to/hosts.txt", + "hosts": ["", ""] +} +``` diff --git a/flocks/agent/agents/rex/prompt_builder.py b/flocks/agent/agents/rex/prompt_builder.py index 6210f58..81fbe27 100644 --- a/flocks/agent/agents/rex/prompt_builder.py +++ b/flocks/agent/agents/rex/prompt_builder.py @@ -649,6 +649,10 @@ def _build_security_priority_section(available_agents: List["AvailableAgent"]) - "intent": "网络流量日志 / NDR 告警分析", "signals": '"流量日志", "NDR", "告警分析", "网络攻击", "攻击是否成功", "network traffic", "alert analysis"', }, + "host-forensics-fast": { + "intent": "Linux 主机快速排查 / 首轮研判", + "signals": '"快速排查", "首轮排查", "快速研判", "快速看一下主机", "先看主机是否异常", "host triage", "quick triage"', + }, "host-forensics": { "intent": "Linux 主机入侵检测 / 取证", "signals": '"主机入侵", "挖矿", "后门", "webshell", "主机异常", "主机安全检查", "host compromise", "forensics"', diff --git a/flocks/server/routes/session.py b/flocks/server/routes/session.py index 15f52e4..825e3de 100644 --- a/flocks/server/routes/session.py +++ b/flocks/server/routes/session.py @@ -135,6 +135,12 @@ def _session_to_response(session: SessionModel) -> SessionResponse: ) +def _is_hidden_from_session_manager(session: SessionModel) -> bool: + """Return whether a session should be excluded from manager listings.""" + metadata = session.metadata if isinstance(session.metadata, dict) else {} + return bool(metadata.get("hideFromSessionManager")) + + # ============================================================================= # Session CRUD Routes # ============================================================================= @@ -191,6 +197,8 @@ async def list_sessions( term = search.lower() if search else None for session in all_sessions: + if _is_hidden_from_session_manager(session): + continue if directory is not None and session.directory != directory: continue if roots and session.parent_id: diff --git a/flocks/server/routes/workflow.py b/flocks/server/routes/workflow.py index 6920982..d368bc5 100644 --- a/flocks/server/routes/workflow.py +++ b/flocks/server/routes/workflow.py @@ -8,6 +8,7 @@ import asyncio import hashlib import json +import os import shutil import threading import time @@ -33,6 +34,8 @@ stop_workflow_service, ) from flocks.session.recorder import Recorder +from flocks.session.message import Message, MessageRole +from flocks.session.session import Session from flocks.workflow.workflow_lint import lint_workflow from flocks.workflow.compiler import compile_workflow from flocks.workflow.fs_store import ( @@ -42,9 +45,11 @@ workflow_scan_dirs as _all_scan_dirs, ) from flocks.workflow.io import load_workflow, dump_workflow +from flocks.workflow.tools import get_tool_registry from flocks.config.config import Config from flocks.storage.storage import Storage from flocks.server.routes.event import publish_event +from flocks.tool import ToolContext from flocks.utils.log import Log @@ -114,6 +119,9 @@ class WorkflowRunRequest(BaseModel): inputs: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Input parameters") timeout_s: Optional[float] = Field(None, alias="timeoutS", description="Timeout in seconds") trace: bool = Field(False, description="Enable tracing") + session_id: Optional[str] = Field(None, alias="sessionId", description="Optional parent session ID") + message_id: Optional[str] = Field(None, alias="messageId", description="Optional parent message ID") + agent: Optional[str] = Field(None, description="Optional agent name for tool context") class WorkflowExecutionResponse(BaseModel): @@ -186,6 +194,87 @@ def _read_workflow_from_fs(workflow_id: str) -> Optional[Dict[str, Any]]: return shared_read_workflow_from_fs(workflow_id) +async def _build_workflow_tool_context( + *, + workflow_id: str, + action_name: str, + session_id: Optional[str] = None, + message_id: Optional[str] = None, + agent: Optional[str] = None, +) -> ToolContext: + """Build a real ToolContext for workflow execution. + + Prefer the caller-provided session/message. When absent, create a temporary + parent session and synthetic user message so workflow-internal tools such as + `task` can resolve a valid parent session. + """ + effective_session_id = str(session_id or "").strip() + effective_message_id = str(message_id or "").strip() + effective_agent = str(agent or "").strip() + + workspace_dir = os.getcwd() + project_id = "default" + try: + from flocks.project.instance import Instance + + workspace_dir = str(getattr(Instance, "directory", None) or workspace_dir) + project = getattr(Instance, "project", None) + if project is not None and getattr(project, "id", None): + project_id = str(project.id) + except Exception: + workspace_dir = str(_find_workspace_root()) + + parent_session = None + if effective_session_id: + parent_session = await Session.get_by_id(effective_session_id) + if not parent_session: + raise HTTPException(status_code=400, detail=f"Parent session not found: {effective_session_id}") + workspace_dir = str(getattr(parent_session, "directory", None) or workspace_dir) + if getattr(parent_session, "project_id", None): + project_id = str(parent_session.project_id) + if not effective_agent: + effective_agent = str(getattr(parent_session, "agent", None) or "rex") + else: + parent_session = await Session.create( + project_id=project_id, + directory=workspace_dir, + title=f"Workflow {action_name}: {workflow_id}", + agent=effective_agent or "rex", + category="task", + metadata={ + "workflowTempParent": True, + "hideFromSessionManager": True, + "workflowId": workflow_id, + "workflowAction": action_name, + }, + ) + effective_session_id = parent_session.id + workspace_dir = str(getattr(parent_session, "directory", None) or workspace_dir) + if not effective_agent: + effective_agent = str(getattr(parent_session, "agent", None) or "rex") + + if not effective_message_id: + message = await Message.create( + session_id=effective_session_id, + role=MessageRole.USER, + content=f"[Workflow {action_name}] {workflow_id}", + agent=effective_agent or "rex", + synthetic=True, + ) + effective_message_id = message.id + + return ToolContext( + session_id=effective_session_id, + message_id=effective_message_id, + agent=effective_agent or "rex", + event_publish_callback=publish_event, + extra={ + "workspace_dir": workspace_dir, + "main_session_key": effective_session_id, + }, + ) + + def _write_workflow_to_fs( workflow_id: str, workflow_json: Dict[str, Any], @@ -367,6 +456,7 @@ async def _run_workflow_execution_task( req: WorkflowRunRequest, exec_id: str, cancel_event: threading.Event, + tool_context: Optional[ToolContext] = None, ) -> None: """Execute a workflow in the background and keep the execution record updated.""" exec_key = _workflow_execution_key(exec_id) @@ -398,6 +488,7 @@ def _on_step_complete(step_result) -> None: trace=req.trace, on_step_complete=_on_step_complete, cancel=cancel_event.is_set, + tool_context=tool_context, ) duration = time.time() - start_time @@ -716,6 +807,13 @@ async def run_workflow_endpoint(workflow_id: str, req: WorkflowRunRequest): raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") workflow_json = data["workflowJson"] + tool_context = await _build_workflow_tool_context( + workflow_id=workflow_id, + action_name="run", + session_id=req.session_id, + message_id=req.message_id, + agent=req.agent, + ) # Create execution record exec_id = str(uuid.uuid4()) @@ -741,6 +839,7 @@ async def run_workflow_endpoint(workflow_id: str, req: WorkflowRunRequest): req=req, exec_id=exec_id, cancel_event=cancel_event, + tool_context=tool_context, ) ) _active_workflow_executions[exec_id] = ActiveWorkflowExecution( @@ -1405,6 +1504,9 @@ class RunNodeRequest(BaseModel): node_id: str = Field(..., description="Node ID to execute") inputs: Dict[str, Any] = Field(default_factory=dict, description="Input data for the node") + session_id: Optional[str] = Field(None, alias="sessionId", description="Optional parent session ID") + message_id: Optional[str] = Field(None, alias="messageId", description="Optional parent message ID") + agent: Optional[str] = Field(None, description="Optional agent name for tool context") class RunNodeResponse(BaseModel): @@ -1434,6 +1536,13 @@ async def run_single_node(workflow_id: str, req: RunNodeRequest): raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") workflow_json = data["workflowJson"] + tool_context = await _build_workflow_tool_context( + workflow_id=workflow_id, + action_name=f"run-node:{req.node_id}", + session_id=req.session_id, + message_id=req.message_id, + agent=req.agent, + ) try: from flocks.workflow.models import Workflow as WfModel @@ -1441,7 +1550,10 @@ async def run_single_node(workflow_id: str, req: RunNodeRequest): from flocks.workflow.repl_runtime import PythonExecRuntime wf = WfModel.from_dict(workflow_json) - engine = WorkflowEngine(wf, runtime=PythonExecRuntime()) + engine = WorkflowEngine( + wf, + runtime=PythonExecRuntime(tool_registry=get_tool_registry(tool_context=tool_context)), + ) step_result = await asyncio.to_thread(engine.run_node, req.node_id, req.inputs) diff --git a/flocks/workflow/engine.py b/flocks/workflow/engine.py index 109e018..ec57d0e 100644 --- a/flocks/workflow/engine.py +++ b/flocks/workflow/engine.py @@ -108,7 +108,7 @@ class WorkflowEngine: trace: bool = False mutate_workflow: bool = False workflow_path: Optional[str] = None - node_timeout_s: Optional[float] = 120.0 + node_timeout_s: Optional[float] = 300.0 _depth: int = 0 max_parallel_workers: int = 4 workflow_loader: Optional[Callable[[str], "Workflow"]] = field(default=None, repr=False) diff --git a/flocks/workflow/llm.py b/flocks/workflow/llm.py index dce541b..a420a2a 100644 --- a/flocks/workflow/llm.py +++ b/flocks/workflow/llm.py @@ -1,6 +1,7 @@ import asyncio from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass +import time from typing import Any, Dict, Optional from flocks.config.config import Config @@ -266,6 +267,9 @@ def ask( *, model: Optional[str] = None, provider_id: Optional[str] = None, + timeout_s: Optional[float] = None, + max_retries: int = 0, + retry_delay_s: float = 1.0, ) -> str: if model is not None or provider_id is not None: return LLMClient( @@ -273,11 +277,19 @@ def ask( base_url=self.base_url, model=model if model is not None else self.model, provider_id=provider_id if provider_id is not None else self.provider_id, - ).ask(prompt, temperature=temperature) + ).ask( + prompt, + temperature=temperature, + timeout_s=timeout_s, + max_retries=max_retries, + retry_delay_s=retry_delay_s, + ) targets = self._build_candidate_targets() preflight_errors: list[tuple[_ResolvedTarget, str]] = [] runtime_errors: list[tuple[_ResolvedTarget, Exception]] = [] + retry_count = max(0, int(max_retries)) + delay_s = max(0.0, float(retry_delay_s)) for target in targets: validation_error = self._validate_target(target) @@ -288,21 +300,36 @@ def ask( provider = self._prepare_provider(target.provider_id) async def _call(): - return await provider.chat( + coro = provider.chat( model_id=target.model_id, messages=[ChatMessage(role="user", content=prompt + "请使用中文输出。")], temperature=temperature, ) - - try: - response = _run_coro_sync(_call()) - except Exception as exc: - runtime_errors.append((target, exc)) - continue - - self.provider_id = target.provider_id - self.model = target.model_id - return str(getattr(response, "content", "") or "") + if timeout_s is not None and float(timeout_s) > 0: + return await asyncio.wait_for(coro, timeout=float(timeout_s)) + return await coro + + last_exc: Optional[Exception] = None + for attempt in range(retry_count + 1): + try: + response = _run_coro_sync(_call()) + self.provider_id = target.provider_id + self.model = target.model_id + return str(getattr(response, "content", "") or "") + except asyncio.TimeoutError as exc: + total_attempts = retry_count + 1 + last_exc = TimeoutError( + f"LLM call timed out after {timeout_s}s " + f"(attempt {attempt + 1}/{total_attempts})" + ) + except Exception as exc: + last_exc = exc + + if attempt < retry_count and delay_s > 0: + time.sleep(delay_s) + + if last_exc is not None: + runtime_errors.append((target, last_exc)) self._raise_resolution_error( targets=targets, @@ -336,10 +363,16 @@ def ask( *, model: Optional[str] = None, provider_id: Optional[str] = None, + timeout_s: Optional[float] = None, + max_retries: int = 0, + retry_delay_s: float = 1.0, ) -> str: return get_llm_client(model=model, provider_id=provider_id).ask( prompt, temperature=temperature, + timeout_s=timeout_s, + max_retries=max_retries, + retry_delay_s=retry_delay_s, ) diff --git a/flocks/workflow/repl_runtime.py b/flocks/workflow/repl_runtime.py index 4bc7a9f..eb98bf9 100644 --- a/flocks/workflow/repl_runtime.py +++ b/flocks/workflow/repl_runtime.py @@ -380,13 +380,25 @@ def run_safe(self, name, **kwargs): return _rpc_call({{"kind": "tool_safe", "name": name, "kwargs": kwargs}}) class _LLMProxy: - def ask(self, prompt, temperature=0.2, model=None, provider_id=None): + def ask( + self, + prompt, + temperature=0.2, + model=None, + provider_id=None, + timeout_s=None, + max_retries=0, + retry_delay_s=1.0, + ): return _rpc_call({{ "kind": "llm", "prompt": prompt, "temperature": temperature, "model": model, "provider_id": provider_id, + "timeout_s": timeout_s, + "max_retries": max_retries, + "retry_delay_s": retry_delay_s, }}) def get_path(path, data=None): @@ -509,15 +521,35 @@ def _handle_rpc_request(self, *, msg: Dict[str, Any], token: str) -> Dict[str, A temperature = 0.2 model = rpc.get("model") provider_id = rpc.get("provider_id") + timeout_raw = rpc.get("timeout_s") + max_retries_raw = rpc.get("max_retries", 0) + retry_delay_raw = rpc.get("retry_delay_s", 1.0) if model is not None and not isinstance(model, str): raise RuntimeError("LLM model must be a string when provided") if provider_id is not None and not isinstance(provider_id, str): raise RuntimeError("LLM provider_id must be a string when provided") + timeout_s = None + if timeout_raw is not None: + try: + timeout_s = float(timeout_raw) + except Exception as exc: + raise RuntimeError("LLM timeout_s must be a number when provided") from exc + try: + max_retries = int(max_retries_raw) + except Exception as exc: + raise RuntimeError("LLM max_retries must be an integer when provided") from exc + try: + retry_delay_s = float(retry_delay_raw) + except Exception as exc: + raise RuntimeError("LLM retry_delay_s must be a number when provided") from exc output = get_lazy_llm().ask( prompt, temperature=temperature, model=model, provider_id=provider_id, + timeout_s=timeout_s, + max_retries=max_retries, + retry_delay_s=retry_delay_s, ) return {"type": "rpc_result", "token": token, "id": req_id, "ok": True, "output": output} diff --git a/flocks/workflow/runner.py b/flocks/workflow/runner.py index 3740880..0770af3 100644 --- a/flocks/workflow/runner.py +++ b/flocks/workflow/runner.py @@ -82,6 +82,43 @@ def _resolve_workflow_runtime_preference(tool_context: Optional[Any]) -> Literal return "host" # type: ignore[return-value] +def _resolve_workflow_node_timeout( + requested_timeout_s: Optional[float], + workflow_metadata: Optional[Dict[str, Any]], +) -> Optional[float]: + """Resolve effective per-node timeout with workflow metadata fallback.""" + if requested_timeout_s is None: + return None + if requested_timeout_s not in (300, 300.0): + return requested_timeout_s + if not isinstance(workflow_metadata, dict): + return requested_timeout_s + + candidates = [ + workflow_metadata.get("node_timeout_s"), + workflow_metadata.get("nodeTimeoutS"), + ] + runtime_defaults = workflow_metadata.get("runtime_defaults") + if isinstance(runtime_defaults, dict): + candidates.extend( + [ + runtime_defaults.get("node_timeout_s"), + runtime_defaults.get("nodeTimeoutS"), + ] + ) + + for value in candidates: + if value is None: + continue + try: + resolved = float(value) + except (TypeError, ValueError): + continue + if resolved > 0: + return resolved + return requested_timeout_s + + def _resolve_sandbox_payload_from_config(tool_context: Optional[Any]) -> Optional[Dict[str, Any]]: """Resolve sandbox payload directly from config for workflow sandbox default.""" config_data = _load_config_data() @@ -205,7 +242,7 @@ def run_workflow( workflow: WorkflowSource, inputs: Optional[Dict[str, Any]] = None, timeout_s: Optional[float] = None, - node_timeout_s: Optional[float] = 120.0, + node_timeout_s: Optional[float] = 300.0, trace: bool = False, use_llm: Optional[bool] = None, tool_registry: Optional[Any] = None, @@ -295,6 +332,11 @@ def run_workflow( except Exception: pass + effective_node_timeout_s = _resolve_workflow_node_timeout( + node_timeout_s, + wf.metadata, + ) + reqs = requirements_from_workflow_metadata(wf.metadata) if ensure_requirements: _logger.info("检查依赖包...") @@ -330,14 +372,20 @@ def run_workflow( (requirements_installer or RequirementsInstaller(installer="auto")).ensure_installed(reqs) rt = PythonExecRuntime(tool_registry=registry) - _logger.info(f"创建执行引擎 (use_llm={effective_use_llm}, trace={trace}, node_timeout={node_timeout_s}s, parallel_workers={max_parallel_workers})") + _logger.info( + "创建执行引擎 (use_llm=%s, trace=%s, node_timeout=%ss, parallel_workers=%s)", + effective_use_llm, + trace, + effective_node_timeout_s, + max_parallel_workers, + ) engine = WorkflowEngine( wf, runtime=rt, use_llm=bool(effective_use_llm), trace=trace, workflow_path=workflow_path_for_engine, - node_timeout_s=node_timeout_s, + node_timeout_s=effective_node_timeout_s, max_parallel_workers=max_parallel_workers, ) diff --git a/tests/agent/test_agent.py b/tests/agent/test_agent.py index d8b2205..8d64742 100644 --- a/tests/agent/test_agent.py +++ b/tests/agent/test_agent.py @@ -18,7 +18,7 @@ BUILTIN_AGENTS = [ "rex", "hephaestus", "plan", "explore", "oracle", "librarian", "metis", "momus", "multimodal-looker", - "self-enhance", "rex-junior", "host-forensics", + "self-enhance", "rex-junior", "host-forensics", "host-forensics-fast", ] @@ -102,7 +102,7 @@ async def test_self_enhance_agent(self): @pytest.mark.asyncio async def test_security_agents(self): - for name in ["host-forensics"]: + for name in ["host-forensics", "host-forensics-fast"]: agent = await Agent.get(name) assert agent is not None assert agent.mode == "subagent" diff --git a/tests/agent/test_agent_factory.py b/tests/agent/test_agent_factory.py index 29c231c..8518db3 100644 --- a/tests/agent/test_agent_factory.py +++ b/tests/agent/test_agent_factory.py @@ -437,12 +437,12 @@ async def test_static_prompt_agents(self): async def test_plugin_agent_has_prompt_if_installed(self): """Plugin agents installed in ~/.flocks/plugins/agents/ should load their prompt.""" from flocks.agent.registry import Agent - # host-forensics is a plugin agent (native=False); skip if not installed locally - agent = await Agent.get("host-forensics") - if agent is None: - pytest.skip("host-forensics plugin not installed in this environment") - assert agent.prompt is not None, "host-forensics should have a prompt.md" - assert len(agent.prompt) > 20 + for name in ["host-forensics", "host-forensics-fast"]: + agent = await Agent.get(name) + if agent is None: + pytest.skip(f"{name} plugin not installed in this environment") + assert agent.prompt is not None, f"{name} should have a prompt.md" + assert len(agent.prompt) > 20 @pytest.mark.asyncio async def test_plan_agent_has_no_prompt(self): diff --git a/tests/integration/test_ssh_run_script.py b/tests/integration/test_ssh_run_script.py index f76569c..fa2e899 100644 --- a/tests/integration/test_ssh_run_script.py +++ b/tests/integration/test_ssh_run_script.py @@ -216,6 +216,23 @@ def test_deep_scan_script_passes(self): violations = _scan_script_safety(content) assert violations == [], "deep_scan.sh has safety violations:\n" + "\n".join(violations) + def test_fast_triage_script_passes(self): + """The bundled triage_fast.sh must pass safety checks.""" + from pathlib import Path + fast_triage_path = ( + Path(__file__).parents[2] + / ".flocks" + / "plugins" + / "agents" + / "host-forensics-fast" + / "scripts" + / "triage_fast.sh" + ) + if fast_triage_path.exists(): + content = fast_triage_path.read_text() + violations = _scan_script_safety(content) + assert violations == [], "triage_fast.sh has safety violations:\n" + "\n".join(violations) + # --------------------------------------------------------------------------- # _extract_sections diff --git a/tests/workflow/test_loop_host_forensics_fast_workflow.py b/tests/workflow/test_loop_host_forensics_fast_workflow.py new file mode 100644 index 0000000..678d06b --- /dev/null +++ b/tests/workflow/test_loop_host_forensics_fast_workflow.py @@ -0,0 +1,248 @@ +import csv +import json +from pathlib import Path + +from flocks.workspace.manager import WorkspaceManager + + +WORKFLOW_PATH = ( + Path(__file__).resolve().parents[2] + / ".flocks" + / "plugins" + / "workflows" + / "loop_host_forensics_fast" + / "workflow.json" +) + + +def _load_workflow() -> dict: + return json.loads(WORKFLOW_PATH.read_text(encoding="utf-8")) + + +def test_inspect_host_extracts_verdict_into_lightweight_result(tmp_path: Path) -> None: + workflow = _load_workflow() + inspect_host = next(node for node in workflow["nodes"] if node["id"] == "inspect_host") + + per_host_dir = tmp_path / "host_triage" + batch_report_path = tmp_path / "batch_host_triage_log.md" + + class DummyTool: + def run_safe(self, *args, **kwargs) -> dict: + if args == ("ssh_host_cmd",): + assert kwargs["host"] == "10.0.0.8" + assert kwargs["username"] == "root" + return {"success": True, "output": "FLOCKS_SSH_OK\n"} + assert args == ("task",) + assert kwargs["subagent_type"] == "host-forensics-fast" + assert "- host: 10.0.0.8" in kwargs["prompt"] + assert "- username: root" in kwargs["prompt"] + return { + "success": True, + "text": ( + "## Host Quick Assessment\n\n" + "**Target**: 10.0.0.8\n" + "**Verdict**: SUSPICIOUS\n" + "**Confidence**: HIGH\n\n" + "### Summary\n存在异常外联,需要继续排查。\n" + ), + } + + env = { + "inputs": { + "hosts": ["10.0.0.8"], + "host_idx": 0, + "ssh_user": "root", + "per_host_dir": str(per_host_dir), + "batch_report_path": str(batch_report_path), + "triage_results": [], + }, + "outputs": {}, + "tool": DummyTool(), + } + + exec(inspect_host["code"], env, env) + + result = env["outputs"]["triage_results"][0] + assert result["success"] is True + assert result["verdict"] == "SUSPICIOUS" + assert result["failure_category"] == "" + assert result["inspect_attempts"] == 1 + assert result["per_host_md"].endswith(".md") + assert Path(result["per_host_md"]).exists() + assert env["outputs"]["last_verdict"] == "SUSPICIOUS" + + report_text = Path(result["per_host_md"]).read_text(encoding="utf-8") + assert "- verdict: SUSPICIOUS" in report_text + + +def test_inspect_host_records_preflight_failure_category(tmp_path: Path) -> None: + workflow = _load_workflow() + inspect_host = next(node for node in workflow["nodes"] if node["id"] == "inspect_host") + + class DummyTool: + def run_safe(self, *args, **kwargs) -> dict: + assert args == ("ssh_host_cmd",) + return { + "success": False, + "error": "SSH connection failed: Permission denied (publickey,password)", + } + + env = { + "inputs": { + "hosts": ["10.0.0.9"], + "host_idx": 0, + "ssh_user": "root", + "per_host_dir": str(tmp_path / "host_triage"), + "batch_report_path": str(tmp_path / "batch_host_triage_log.md"), + "triage_results": [], + }, + "outputs": {}, + "tool": DummyTool(), + } + + exec(inspect_host["code"], env, env) + + result = env["outputs"]["triage_results"][0] + assert result["success"] is False + assert result["verdict"] == "UNKNOWN" + assert result["failure_category"] == "auth_failed" + assert result["inspect_attempts"] == 0 + + report_text = Path(result["per_host_md"]).read_text(encoding="utf-8") + assert "## SSH 预检失败" in report_text + assert "auth_failed" in report_text + + +def test_inspect_host_retries_once_on_timeout(tmp_path: Path) -> None: + workflow = _load_workflow() + inspect_host = next(node for node in workflow["nodes"] if node["id"] == "inspect_host") + + class DummyTool: + def __init__(self) -> None: + self.task_calls = 0 + + def run_safe(self, *args, **kwargs) -> dict: + if args == ("ssh_host_cmd",): + return {"success": True, "output": "FLOCKS_SSH_OK\n"} + assert args == ("task",) + self.task_calls += 1 + if self.task_calls == 1: + return { + "success": False, + "error": "节点执行超时 (300s)", + } + return { + "success": True, + "text": ( + "## Host Quick Assessment\n\n" + "**Target**: 10.0.0.10\n" + "**Verdict**: CLEAN\n" + "**Confidence**: MEDIUM\n\n" + "### Summary\n当前未见明显异常。\n" + ), + } + + tool = DummyTool() + env = { + "inputs": { + "hosts": ["10.0.0.10"], + "host_idx": 0, + "ssh_user": "root", + "per_host_dir": str(tmp_path / "host_triage"), + "batch_report_path": str(tmp_path / "batch_host_triage_log.md"), + "triage_results": [], + }, + "outputs": {}, + "tool": tool, + } + + exec(inspect_host["code"], env, env) + + result = env["outputs"]["triage_results"][0] + assert tool.task_calls == 2 + assert result["success"] is True + assert result["verdict"] == "CLEAN" + assert result["inspect_attempts"] == 2 + assert result["failure_category"] == "" + + +def test_finalize_summary_persists_verdict_in_results_json( + tmp_path: Path, + monkeypatch, +) -> None: + workflow = _load_workflow() + finalize_summary = next( + node for node in workflow["nodes"] if node["id"] == "finalize_summary" + ) + + workspace_root = tmp_path / "workspace" + monkeypatch.setenv("FLOCKS_WORKSPACE_DIR", str(workspace_root)) + + previous_instance = WorkspaceManager._instance + WorkspaceManager._instance = None + try: + env = { + "inputs": { + "triage_results": [ + { + "host": "10.0.0.8", + "ssh_user": "root", + "ssh_target": "root@10.0.0.8", + "ssh_host": "10.0.0.8", + "success": True, + "verdict": "CLEAN", + "failure_category": "", + "inspect_attempts": 1, + "error": "", + "per_host_md": str(tmp_path / "host_triage" / "0001.md"), + }, + { + "host": "10.0.0.9", + "ssh_user": "root", + "ssh_target": "root@10.0.0.9", + "ssh_host": "10.0.0.9", + "success": False, + "verdict": "UNKNOWN", + "failure_category": "auth_failed", + "inspect_attempts": 0, + "error": "ssh timeout", + "per_host_md": str(tmp_path / "host_triage" / "0002.md"), + }, + ], + "hosts": ["10.0.0.8", "10.0.0.9"], + "ssh_user": "root", + "per_host_dir": str(tmp_path / "host_triage"), + "batch_report_path": str(tmp_path / "batch_host_triage_log.md"), + }, + "outputs": {}, + } + + exec(finalize_summary["code"], env, env) + + results_payload = json.loads( + Path(env["outputs"]["results_json_path"]).read_text(encoding="utf-8") + ) + manifest_payload = json.loads( + Path(env["outputs"]["manifest_path"]).read_text(encoding="utf-8") + ) + csv_rows = list( + csv.DictReader( + Path(env["outputs"]["results_csv_path"]).open(encoding="utf-8", newline="") + ) + ) + index_text = Path(env["outputs"]["index_path"]).read_text(encoding="utf-8") + + assert results_payload["triage_results"][0]["verdict"] == "CLEAN" + assert results_payload["triage_results"][1]["verdict"] == "UNKNOWN" + assert results_payload["triage_results"][1]["failure_category"] == "auth_failed" + assert manifest_payload["items"][0]["verdict"] == "CLEAN" + assert manifest_payload["items"][1]["verdict"] == "UNKNOWN" + assert manifest_payload["results_csv_path"].endswith(".csv") + assert csv_rows[0]["verdict"] == "CLEAN" + assert csv_rows[1]["failure_category"] == "auth_failed" + assert "- CLEAN: 1 台" in index_text + assert "- UNKNOWN: 1 台" in index_text + assert "- auth_failed: 1 台" in index_text + assert "判定: `CLEAN`" in index_text + finally: + WorkspaceManager._instance = previous_instance diff --git a/tests/workflow/test_workflow_llm.py b/tests/workflow/test_workflow_llm.py index 5c6e74c..27c9214 100644 --- a/tests/workflow/test_workflow_llm.py +++ b/tests/workflow/test_workflow_llm.py @@ -21,7 +21,7 @@ class _FakeProvider: def __init__( self, provider_id: str, - behavior: str, + behavior: str | list[str], *, configured: bool = True, models: list[str] | None = None, @@ -45,8 +45,17 @@ def get_models(self): async def chat(self, model_id: str, messages, **kwargs): self.calls += 1 - if self._behavior == "error": + behavior = self._behavior + if isinstance(behavior, list): + current = behavior.pop(0) if behavior else "ok" + else: + current = behavior + + if current == "error": raise RuntimeError("simulated failure") + if current == "timeout": + await asyncio.sleep(0.05) + return _FakeResponse("late") return _FakeResponse(f"{self.id}:{model_id}") @@ -190,6 +199,28 @@ async def _resolve_default_llm(): assert fallback.calls == 1 +def test_llm_retries_then_succeeds(monkeypatch): + provider = _FakeProvider("demo", ["error", "error", "ok"], models=["m"]) + _patch_provider(monkeypatch, {"demo": provider}) + + client = LLMClient(provider_id="demo", model="m") + out = client.ask("hello", max_retries=2, retry_delay_s=0) + + assert out == "demo:m" + assert provider.calls == 3 + + +def test_llm_timeout_retries_then_raises(monkeypatch): + provider = _FakeProvider("demo", "timeout", models=["m"]) + _patch_provider(monkeypatch, {"demo": provider}) + + client = LLMClient(provider_id="demo", model="m") + with pytest.raises(ValueError, match="timed out after 0.01s"): + client.ask("hello", timeout_s=0.01, max_retries=2, retry_delay_s=0) + + assert provider.calls == 3 + + def test_llm_raises_clear_error_when_default_is_unavailable(monkeypatch): provider = _FakeProvider("fallback", "ok", configured=False, models=["fallback-model"]) _patch_provider(monkeypatch, {"fallback": provider}) diff --git a/tests/workflow/test_workflow_node_timeout.py b/tests/workflow/test_workflow_node_timeout.py index e19e563..5da9e45 100644 --- a/tests/workflow/test_workflow_node_timeout.py +++ b/tests/workflow/test_workflow_node_timeout.py @@ -31,7 +31,7 @@ def test_node_timeout_skips_node_and_records_error(): engine = WorkflowEngine( workflow, runtime=rt, - node_timeout_s=0.3, + node_timeout_s=1.0, stop_on_error=False, ) result = engine.run(initial_inputs={}) @@ -43,7 +43,7 @@ def test_node_timeout_skips_node_and_records_error(): assert step_slow.node_id == "slow" assert step_slow.error is not None assert "节点执行超时" in step_slow.error - assert "0.3" in step_slow.error + assert "1.0" in step_slow.error assert step_slow.outputs == {} step_fast = result.history[1] @@ -97,3 +97,57 @@ def test_run_workflow_node_timeout_param(): assert len(result.history) == 1 assert result.history[0].get("error") is not None assert "节点执行超时" in result.history[0]["error"] + + +def test_run_workflow_uses_metadata_node_timeout_default(): + """Workflow metadata can override the historical 300s default.""" + workflow = { + "name": "metadata_timeout", + "start": "s", + "nodes": [ + { + "id": "s", + "type": "python", + "code": "import time; time.sleep(0.2); outputs['ok'] = 1", + "description": "Slow-ish", + }, + ], + "edges": [], + "metadata": {"node_timeout_s": 0.05}, + } + result = run_workflow( + workflow=workflow, + inputs={}, + ensure_requirements=False, + ) + assert result.status == "SUCCEEDED" + assert len(result.history) == 1 + assert "节点执行超时" in (result.history[0].get("error") or "") + + +def test_run_workflow_explicit_node_timeout_overrides_metadata(): + """Explicit caller timeout should win over workflow metadata.""" + workflow = { + "name": "metadata_timeout_override", + "start": "s", + "nodes": [ + { + "id": "s", + "type": "python", + "code": "import time; time.sleep(0.2); outputs['ok'] = 1", + "description": "Slow-ish", + }, + ], + "edges": [], + "metadata": {"node_timeout_s": 0.05}, + } + result = run_workflow( + workflow=workflow, + inputs={}, + node_timeout_s=1.0, + ensure_requirements=False, + ) + assert result.status == "SUCCEEDED" + assert len(result.history) == 1 + assert result.history[0].get("error") is None + assert result.history[0]["outputs"]["ok"] == 1 diff --git a/webui/src/api/workflow.ts b/webui/src/api/workflow.ts index d48f1e0..d0fa194 100644 --- a/webui/src/api/workflow.ts +++ b/webui/src/api/workflow.ts @@ -47,13 +47,31 @@ export interface WorkflowEdge { const?: Record; } +export interface WorkflowOutputSchema { + type?: string | string[]; + title?: string; + description?: string; + properties?: Record; + required?: string[]; + items?: WorkflowOutputSchema | WorkflowOutputSchema[]; + enum?: Array; + additionalProperties?: boolean | WorkflowOutputSchema; + [key: string]: any; +} + +export interface WorkflowMetadata { + sampleInputs?: Record; + outputSchema?: WorkflowOutputSchema; + [key: string]: any; +} + export interface WorkflowJSON { version?: string; name?: string; start: string; nodes: WorkflowNode[]; edges: WorkflowEdge[]; - metadata?: Record; + metadata?: WorkflowMetadata; } export interface Workflow { @@ -79,6 +97,18 @@ export interface Workflow { }; } +export interface WorkflowExecutionStep { + node_id: string; + node_type?: string; + type?: string; + inputs: Record; + outputs: Record; + stdout?: string; + error?: string; + traceback?: string; + duration_ms?: number; +} + export interface WorkflowExecution { id: string; workflowId: string; @@ -88,10 +118,20 @@ export interface WorkflowExecution { startedAt: number; finishedAt?: number; duration?: number; - executionLog: any[]; + executionLog: WorkflowExecutionStep[]; errorMessage?: string; } +export interface WorkflowNodeExecution { + node_id: string; + outputs: Record; + stdout: string; + error?: string; + traceback?: string; + duration_ms?: number; + success: boolean; +} + export interface WorkflowService { workflowId: string; workflowName: string; @@ -195,15 +235,7 @@ export const workflowAPI = { } | null>(`/api/workflow/${id}/kafka-config`), runNode: (id: string, data: { nodeId: string; inputs?: Record }) => - client.post<{ - node_id: string; - outputs: Record; - stdout: string; - error?: string; - traceback?: string; - duration_ms?: number; - success: boolean; - }>(`/api/workflow/${id}/run-node`, { node_id: data.nodeId, inputs: data.inputs ?? {} }), + client.post(`/api/workflow/${id}/run-node`, { node_id: data.nodeId, inputs: data.inputs ?? {} }), getSampleInputs: (id: string) => client.get<{ sampleInputs: Record }>(`/api/workflow/${id}/sample-inputs`), diff --git a/webui/src/locales/en-US/workflow.json b/webui/src/locales/en-US/workflow.json index 976c2ef..f53a49c 100644 --- a/webui/src/locales/en-US/workflow.json +++ b/webui/src/locales/en-US/workflow.json @@ -84,6 +84,9 @@ "descPlaceholder": "Node function description...", "code": "Code", "codePlaceholder": "# Python code...", + "expandCodeEditor": "Expand Editor", + "expandedCodeEditorTitle": "Expanded Editor", + "closeExpandedEditor": "Close Editor", "branchKey": "Branch Key select_key", "branchKeyPlaceholder": "Key name", "enableJoin": "Enable Join Merge", @@ -104,6 +107,29 @@ "selectWorkflow": "-- Select --", "inputMapping": "Input Mapping inputs_mapping", "inputConst": "Static Inputs inputs_const", + "runtimeSection": "Latest Run", + "runtimeLatest": "Showing latest match", + "runtimeCount": "{{count}} matches", + "noRuntimeData": "No recent execution data yet", + "nodeNotExecutedYet": "This node has not executed in the latest run", + "runtimeStatus": "Execution status: {{status}}", + "runtimeInputs": "Runtime Inputs", + "runtimeOutputs": "Runtime Outputs", + "runNodeSection": "Run Node", + "runNodeHint": "Execute this node in isolation", + "runNodeUnsupported": "This node type is not supported yet", + "runNodeUnsupportedDesc": "Branch and Loop nodes are not supported for isolated execution yet.", + "runNodeInputs": "Execution Inputs", + "useLatestInputs": "Use latest inputs", + "restoreSuggestedInputs": "Restore suggested inputs", + "runNodeAction": "Run Node", + "runningNode": "Running...", + "runNodeSuccess": "Run Succeeded", + "runNodeError": "Run Error", + "runNodeStdout": "Stdout", + "runNodeTraceback": "Traceback", + "runNodeInputObjectRequired": "Execution inputs must be a JSON object", + "runNodeFailed": "Failed to run node", "jsonFormatError": "JSON format error", "saveFailed": "Save failed", "saving": "Saving…", @@ -130,10 +156,14 @@ "run": { "testSection": "Test", "inputParams": "Input Parameters (JSON)", + "rootObjectRequired": "Input parameters must be a JSON object", "running": "Running...", "testRun": "Test Run", "stopRun": "Stop Run", "outputResults": "Output Results", + "savingSampleInputs": "Saving inputs", + "sampleInputsSaved": "Inputs saved", + "sampleInputsSaveFailed": "Failed to save sample inputs", "executionLog": "Execution Log ({{count}})", "jsonFormatError": "JSON format error, please check input", "runFailed": "Run failed", diff --git a/webui/src/locales/zh-CN/workflow.json b/webui/src/locales/zh-CN/workflow.json index 524dbce..885f8f6 100644 --- a/webui/src/locales/zh-CN/workflow.json +++ b/webui/src/locales/zh-CN/workflow.json @@ -84,6 +84,9 @@ "descPlaceholder": "节点功能描述...", "code": "代码", "codePlaceholder": "# Python 代码...", + "expandCodeEditor": "放大编辑", + "expandedCodeEditorTitle": "大编辑器", + "closeExpandedEditor": "收起编辑", "branchKey": "分支键 select_key", "branchKeyPlaceholder": "键名", "enableJoin": "启用 Join 合并", @@ -104,6 +107,29 @@ "selectWorkflow": "-- 选择 --", "inputMapping": "输入映射 inputs_mapping", "inputConst": "静态入参 inputs_const", + "runtimeSection": "最近一次运行", + "runtimeLatest": "显示最后一次", + "runtimeCount": "共匹配 {{count}} 次", + "noRuntimeData": "暂无最近运行数据", + "nodeNotExecutedYet": "最近一次执行中,该节点尚未运行", + "runtimeStatus": "执行状态:{{status}}", + "runtimeInputs": "真实输入", + "runtimeOutputs": "真实输出", + "runNodeSection": "单节点执行", + "runNodeHint": "隔离执行当前节点", + "runNodeUnsupported": "当前节点类型暂不支持", + "runNodeUnsupportedDesc": "Branch 和 Loop 节点暂不支持单节点执行。", + "runNodeInputs": "执行输入", + "useLatestInputs": "使用最近一次输入", + "restoreSuggestedInputs": "恢复建议输入", + "runNodeAction": "执行节点", + "runningNode": "执行中...", + "runNodeSuccess": "执行成功", + "runNodeError": "执行错误", + "runNodeStdout": "标准输出", + "runNodeTraceback": "错误堆栈", + "runNodeInputObjectRequired": "执行输入必须是 JSON 对象", + "runNodeFailed": "节点执行失败", "jsonFormatError": "JSON 格式错误", "saveFailed": "保存失败", "saving": "保存中\u2026", @@ -130,10 +156,14 @@ "run": { "testSection": "测试", "inputParams": "输入参数(JSON)", + "rootObjectRequired": "输入参数必须是 JSON 对象", "running": "运行中...", "testRun": "测试运行", "stopRun": "停止运行", "outputResults": "输出结果", + "savingSampleInputs": "正在保存输入", + "sampleInputsSaved": "输入已保存", + "sampleInputsSaveFailed": "保存输入参数失败", "executionLog": "执行日志 ({{count}})", "jsonFormatError": "JSON 格式错误,请检查输入", "runFailed": "运行失败", diff --git a/webui/src/pages/WorkflowDetail/NodeInfoPanel.test.tsx b/webui/src/pages/WorkflowDetail/NodeInfoPanel.test.tsx new file mode 100644 index 0000000..65f8248 --- /dev/null +++ b/webui/src/pages/WorkflowDetail/NodeInfoPanel.test.tsx @@ -0,0 +1,322 @@ +import React from 'react'; +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { render, screen } from '@testing-library/react'; +import userEvent from '@testing-library/user-event'; +import NodeInfoPanel from './NodeInfoPanel'; + +const { workflowAPI } = vi.hoisted(() => ({ + workflowAPI: { + list: vi.fn(), + update: vi.fn(), + runNode: vi.fn(), + }, +})); + +vi.mock('@/api/workflow', async () => { + const actual = await vi.importActual('@/api/workflow'); + return { + ...actual, + workflowAPI, + }; +}); + +vi.mock('react-i18next', () => ({ + useTranslation: () => ({ + t: (key: string, params?: Record) => { + const translations: Record = { + 'detail.nodeInfo.inputSources': '输入来源', + 'detail.nodeInfo.startNode': '起点节点', + 'detail.nodeInfo.triggerOnly': '仅触发', + 'detail.nodeInfo.outputDests': '输出去向', + 'detail.nodeInfo.outputKeyLabel': '键', + 'detail.nodeInfo.routeByPath': '按路径路由', + 'detail.nodeInfo.endNode': '终点节点', + 'detail.nodeInfo.runtimeSection': '最近一次运行', + 'detail.nodeInfo.runtimeLatest': '显示最后一次', + 'detail.nodeInfo.runtimeCount': `共匹配 ${params?.count ?? 0} 次`, + 'detail.nodeInfo.noRuntimeData': '暂无最近运行数据', + 'detail.nodeInfo.nodeNotExecutedYet': '最近一次执行中,该节点尚未运行', + 'detail.nodeInfo.runtimeStatus': `执行状态:${params?.status ?? ''}`, + 'detail.nodeInfo.runtimeInputs': '真实输入', + 'detail.nodeInfo.runtimeOutputs': '真实输出', + 'detail.nodeInfo.runNodeSection': '单节点执行', + 'detail.nodeInfo.runNodeHint': '隔离执行当前节点', + 'detail.nodeInfo.runNodeUnsupported': '当前节点类型暂不支持', + 'detail.nodeInfo.runNodeUnsupportedDesc': 'Branch 和 Loop 节点暂不支持单节点执行。', + 'detail.nodeInfo.runNodeInputs': '执行输入', + 'detail.nodeInfo.useLatestInputs': '使用最近一次输入', + 'detail.nodeInfo.restoreSuggestedInputs': '恢复建议输入', + 'detail.nodeInfo.runNodeAction': '执行节点', + 'detail.nodeInfo.runningNode': '执行中...', + 'detail.nodeInfo.runNodeSuccess': '执行成功', + 'detail.nodeInfo.runNodeError': '执行错误', + 'detail.nodeInfo.runNodeStdout': '标准输出', + 'detail.nodeInfo.runNodeTraceback': '错误堆栈', + 'detail.nodeInfo.runNodeInputObjectRequired': '执行输入必须是 JSON 对象', + 'detail.nodeInfo.runNodeFailed': '节点执行失败', + 'detail.nodeInfo.description': '描述', + 'detail.nodeInfo.descPlaceholder': 'desc', + 'detail.nodeInfo.code': '代码', + 'detail.nodeInfo.codePlaceholder': 'code', + 'detail.nodeInfo.expandCodeEditor': '放大编辑', + 'detail.nodeInfo.expandedCodeEditorTitle': '大编辑器', + 'detail.nodeInfo.closeExpandedEditor': '收起编辑', + 'detail.nodeInfo.startBadge': '起点', + 'detail.nodeInfo.close': '关闭', + 'detail.nodeInfo.saveNode': '保存节点', + 'detail.nodeInfo.saving': '保存中', + 'detail.nodeInfo.saved': '已保存', + 'detail.nodeInfo.saveFailed': '保存失败', + }; + return translations[key] ?? key; + }, + }), +})); + +describe('NodeInfoPanel', () => { + beforeEach(() => { + vi.clearAllMocks(); + workflowAPI.list.mockResolvedValue({ data: [] }); + workflowAPI.update.mockResolvedValue({ data: {} }); + workflowAPI.runNode.mockResolvedValue({ + data: { + node_id: 'node-1', + outputs: { result: 'ok' }, + stdout: 'done', + success: true, + duration_ms: 120, + }, + }); + }); + + const workflow = { + id: 'wf-1', + name: 'Demo Workflow', + category: 'default', + workflowJson: { + start: 'node-1', + nodes: [ + { + id: 'node-1', + type: 'python' as const, + code: 'outputs["result"] = inputs.get("host")', + description: 'demo node', + }, + ], + edges: [], + }, + status: 'draft' as const, + createdAt: Date.now(), + updatedAt: Date.now(), + stats: { + callCount: 0, + successCount: 0, + errorCount: 0, + totalRuntime: 0, + avgRuntime: 0, + thumbsUp: 0, + thumbsDown: 0, + }, + }; + + it('shows runtime inputs and outputs for the latest matching step', () => { + render( + {}} + onSaved={() => {}} + /> + ); + + expect(screen.getByText('最近一次运行')).toBeInTheDocument(); + expect(screen.getByText('真实输入')).toBeInTheDocument(); + expect(screen.getByText('真实输出')).toBeInTheDocument(); + expect(screen.getByDisplayValue(/demo.local/)).toBeInTheDocument(); + expect(screen.getByText(/"result": "ok"/)).toBeInTheDocument(); + }); + + it('shows empty runtime hint when there is no latest execution', () => { + render( + {}} + onSaved={() => {}} + /> + ); + + expect(screen.getByText('暂无最近运行数据')).toBeInTheDocument(); + }); + + it('toggles the runtime section content', async () => { + const user = userEvent.setup(); + + render( + {}} + onSaved={() => {}} + /> + ); + + const runtimeToggle = screen.getByRole('button', { name: /最近一次运行/ }); + expect(screen.getByText('真实输入')).toBeInTheDocument(); + + await user.click(runtimeToggle); + expect(screen.queryByText('真实输入')).not.toBeInTheDocument(); + expect(screen.queryByText('真实输出')).not.toBeInTheDocument(); + + await user.click(runtimeToggle); + expect(screen.getByText('真实输入')).toBeInTheDocument(); + expect(screen.getByText('真实输出')).toBeInTheDocument(); + }); + + it('runs a single node with latest runtime inputs', async () => { + const user = userEvent.setup(); + + render( + {}} + onSaved={() => {}} + /> + ); + + expect(screen.getByDisplayValue(/demo.local/)).toBeInTheDocument(); + await user.click(screen.getByRole('button', { name: '执行节点' })); + + expect(workflowAPI.runNode).toHaveBeenCalledWith('wf-1', { + nodeId: 'node-1', + inputs: { host: 'demo.local' }, + }); + expect(await screen.findByText('执行成功')).toBeInTheDocument(); + expect(screen.getByText(/done/)).toBeInTheDocument(); + }); + + it('opens the expanded code editor and keeps code in sync', async () => { + const user = userEvent.setup(); + + render( + {}} + onSaved={() => {}} + /> + ); + + await user.click(screen.getByRole('button', { name: '放大编辑' })); + expect(screen.getByRole('dialog', { name: '大编辑器' })).toBeInTheDocument(); + + const expandedEditor = screen.getByRole('dialog', { name: '大编辑器' }).querySelector('textarea'); + expect(expandedEditor).not.toBeNull(); + await user.clear(expandedEditor!); + await user.type(expandedEditor!, 'print("expanded")'); + + expect(screen.getAllByDisplayValue('print("expanded")').length).toBeGreaterThanOrEqual(1); + + await user.click(screen.getByRole('button', { name: '收起编辑' })); + expect(screen.queryByRole('dialog', { name: '大编辑器' })).not.toBeInTheDocument(); + expect(screen.getByDisplayValue('print("expanded")')).toBeInTheDocument(); + }); + + it('places run node section after the code editor for code nodes', () => { + render( + {}} + onSaved={() => {}} + /> + ); + + const codeLabel = screen.getByText('代码'); + const runNodeLabel = screen.getByText('单节点执行'); + + expect( + codeLabel.compareDocumentPosition(runNodeLabel) & Node.DOCUMENT_POSITION_FOLLOWING, + ).toBeTruthy(); + }); + + it('shows unsupported message for branch nodes', () => { + render( + {}} + onSaved={() => {}} + /> + ); + + expect(screen.getByText('当前节点类型暂不支持')).toBeInTheDocument(); + expect(screen.getByText('Branch 和 Loop 节点暂不支持单节点执行。')).toBeInTheDocument(); + }); +}); diff --git a/webui/src/pages/WorkflowDetail/NodeInfoPanel.tsx b/webui/src/pages/WorkflowDetail/NodeInfoPanel.tsx index ee28dd8..a020386 100644 --- a/webui/src/pages/WorkflowDetail/NodeInfoPanel.tsx +++ b/webui/src/pages/WorkflowDetail/NodeInfoPanel.tsx @@ -4,8 +4,8 @@ */ import { useState, useEffect } from 'react'; import { useTranslation } from 'react-i18next'; -import { X, AlertCircle, Save, Loader2 } from 'lucide-react'; -import { workflowAPI, Workflow, WorkflowEdge, WorkflowNode } from '@/api/workflow'; +import { X, AlertCircle, Save, Loader2, ChevronDown, ChevronRight, Play, RotateCcw, Maximize2 } from 'lucide-react'; +import { workflowAPI, Workflow, WorkflowEdge, WorkflowExecution, WorkflowNode, WorkflowNodeExecution } from '@/api/workflow'; // ───────────────────────────────────────────── // Constants @@ -37,6 +37,50 @@ function inferOutputKey(node: WorkflowNode): string { } } +function canRunNode(node: WorkflowNode): boolean { + return node.type !== 'branch' && node.type !== 'loop'; +} + +function getLatestNodeInputs(nodeId: string, latestExecution?: WorkflowExecution | null): Record | null { + const runtimeSteps = latestExecution?.executionLog?.filter((step) => step.node_id === nodeId) ?? []; + const latestStep = runtimeSteps[runtimeSteps.length - 1]; + if (latestStep?.inputs && Object.keys(latestStep.inputs).length > 0) { + return latestStep.inputs; + } + return null; +} + +function buildSuggestedNodeInputs( + node: WorkflowNode, + workflow: Workflow, + latestExecution?: WorkflowExecution | null, +): Record { + const runtimeInputs = getLatestNodeInputs(node.id, latestExecution); + if (runtimeInputs) { + return runtimeInputs; + } + + if (node.id === workflow.workflowJson.start) { + return workflow.workflowJson.metadata?.sampleInputs ?? {}; + } + + const incoming = workflow.workflowJson.edges.filter((edge) => edge.to === node.id); + const suggested: Record = {}; + for (const edge of incoming) { + if (edge.const) { + Object.assign(suggested, edge.const); + } + if (edge.mapping) { + for (const key of Object.keys(edge.mapping)) { + if (!(key in suggested)) { + suggested[key] = ''; + } + } + } + } + return suggested; +} + // ───────────────────────────────────────────── // Atoms // ───────────────────────────────────────────── @@ -182,20 +226,290 @@ function DataFlow({ node, edges }: { node: WorkflowNode; edges: WorkflowEdge[] } export interface NodeInfoPanelProps { node: WorkflowNode; workflow: Workflow; + latestExecution?: WorkflowExecution | null; width?: number; onClose: () => void; onSaved: (updated: Workflow) => void; } -export default function NodeInfoPanel({ node, workflow, width = 260, onClose, onSaved }: NodeInfoPanelProps) { +function RuntimeJsonBlock({ label, value, tone }: { + label: string; + value: Record; + tone: 'amber' | 'green'; +}) { + const bgClass = tone === 'amber' ? 'bg-amber-50 border-amber-100 text-amber-900' : 'bg-green-50 border-green-100 text-green-900'; + + return ( +
+ {label} +
+
+          {JSON.stringify(value, null, 2)}
+        
+
+
+ ); +} + +function RuntimeSection({ nodeId, latestExecution }: { nodeId: string; latestExecution?: WorkflowExecution | null }) { + const { t } = useTranslation('workflow'); + const [expanded, setExpanded] = useState(true); + const runtimeSteps = latestExecution?.executionLog?.filter((step) => step.node_id === nodeId) ?? []; + const latestStep = runtimeSteps[runtimeSteps.length - 1]; + + return ( +
+ + + {expanded && ( + <> + {!latestExecution && ( +

{t('detail.nodeInfo.noRuntimeData')}

+ )} + + {latestExecution && !latestStep && ( +

{t('detail.nodeInfo.nodeNotExecutedYet')}

+ )} + + {latestStep && ( +
+
+ {t('detail.nodeInfo.runtimeStatus', { status: latestExecution?.status ?? 'unknown' })} +
+ {latestStep.inputs && Object.keys(latestStep.inputs).length > 0 && ( + + )} + {latestStep.outputs && Object.keys(latestStep.outputs).length > 0 && ( + + )} + {latestStep.error && ( +
+

{latestStep.error}

+
+ )} +
+ )} + + )} +
+ ); +} + +function NodeRunSection({ + node, + workflow, + latestExecution, +}: { + node: WorkflowNode; + workflow: Workflow; + latestExecution?: WorkflowExecution | null; +}) { + const { t } = useTranslation('workflow'); + const supported = canRunNode(node); + const [expanded, setExpanded] = useState(true); + const suggestedInputs = buildSuggestedNodeInputs(node, workflow, latestExecution); + const latestRuntimeInputs = getLatestNodeInputs(node.id, latestExecution); + const [rawInputs, setRawInputs] = useState(() => JSON.stringify(suggestedInputs, null, 2)); + const [inputError, setInputError] = useState(''); + const [running, setRunning] = useState(false); + const [result, setResult] = useState(null); + + useEffect(() => { + setRawInputs(JSON.stringify(buildSuggestedNodeInputs(node, workflow, latestExecution), null, 2)); + setInputError(''); + setResult(null); + }, [node, workflow, latestExecution]); + + const handleRun = async () => { + if (!supported) { + return; + } + let parsed: Record; + try { + const candidate = JSON.parse(rawInputs || '{}'); + if (typeof candidate !== 'object' || candidate === null || Array.isArray(candidate)) { + setInputError(t('detail.nodeInfo.runNodeInputObjectRequired')); + return; + } + parsed = candidate; + setInputError(''); + } catch { + setInputError(t('detail.nodeInfo.jsonFormatError')); + return; + } + + setRunning(true); + try { + const response = await workflowAPI.runNode(workflow.id, { nodeId: node.id, inputs: parsed }); + setResult(response.data); + } catch (error: any) { + setResult({ + node_id: node.id, + outputs: {}, + stdout: '', + error: error?.response?.data?.detail || error?.message || t('detail.nodeInfo.runNodeFailed'), + success: false, + }); + } finally { + setRunning(false); + } + }; + + return ( +
+ + + {expanded && ( +
+ {!supported ? ( +

{t('detail.nodeInfo.runNodeUnsupportedDesc')}

+ ) : ( + <> +
+
+ {t('detail.nodeInfo.runNodeInputs')} +
+ {latestRuntimeInputs && ( + + )} + +
+
+