first commit

This commit is contained in:
闫旭隆
2025-11-02 18:06:38 +08:00
commit 233e0ff245
40 changed files with 8876 additions and 0 deletions

530
tests/EXECUTION_ANALYSIS.md Normal file
View File

@ -0,0 +1,530 @@
# 智能深度研究系统 - 执行过程详细分析
**基于**: `llm_calls_20251031_150543.json`
**测试问题**: "Python asyncio最佳实践"
**深度模式**: quick
**总LLM调用次数**: 5次
**总耗时**: 49.49秒
---
## 整体架构回顾
```
┌─────────────────────────────────────────────────────────────┐
│ LangGraph 执行引擎 │
│ (持续调用Agent直到任务完成或无工具调用) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ ResearchCoordinator (主Agent) │
│ - 协调整个研究流程 │
│ - 通过虚拟文件系统管理状态 │
│ - 使用task工具调用SubAgent │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 虚拟文件系统 (State) │
│ - /question.txt │
│ - /config.json │
│ - /search_queries.json │
│ - /iteration_N/*.json │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 6个SubAgent (通过task调用) │
│ 1. intent-analyzer 4. content-analyzer │
│ 2. search-orchestrator 5. confidence-evaluator │
│ 3. source-validator 6. report-generator │
└─────────────────────────────────────────────────────────────┘
```
---
## 详细执行流程分析
### 🔵 LLM调用 #1 - 主Agent初始化 (步骤1)
**时间**: 15:04:53 → 15:05:01 (8秒)
**执行者**: ResearchCoordinator (主Agent)
**Token消耗**: 输入5,920 + 输出103 = 6,023 tokens
#### 输入
```
[SystemMessage] 完整的系统提示词
- 研究问题: "Python asyncio最佳实践"
- 7步执行流程说明
- 重要提示: write_file和task不能在同一个响应中调用
[HumanMessage] "请开始研究这个问题Python asyncio最佳实践"
```
#### 主Agent的思考过程
根据系统提示词主Agent理解到
1. 需要先执行步骤1初始化虚拟文件系统
2. 写入`/question.txt``/config.json`
3. **不能**在同一响应中调用task工具
#### 输出
```json
{
"tool_calls": [
{
"name": "write_file",
"file_path": "/question.txt",
"content": "Python asyncio最佳实践"
},
{
"name": "write_file",
"file_path": "/config.json",
"content": "{\"depth_mode\": \"quick\", \"report_format\": \"technical\", ...}"
}
]
}
```
#### 关键点
- ✅ 主Agent **只**调用了write_file没有调用task
- ✅ 两个文件被并行写入
- 🔄 LangGraph看到有工具调用会继续执行下一轮
#### State更新
```
虚拟文件系统:
├── /question.txt ✅ (新增)
└── /config.json ✅ (新增)
```
---
### 🔵 LLM调用 #2 - 主Agent启动SubAgent (步骤2)
**时间**: 15:05:01 → 15:05:06 (4.5秒)
**执行者**: ResearchCoordinator (主Agent)
**Token消耗**: 输入6,046 + 输出69 = 6,115 tokens
#### 输入
```
[SystemMessage] 完整的系统提示词 (相同)
[HumanMessage] "请开始研究这个问题Python asyncio最佳实践"
[AIMessage] 之前的响应 (write_file × 2)
[ToolMessage] "Updated file /question.txt" ✅
[ToolMessage] "Updated file /config.json" ✅
```
#### 主Agent的思考过程
主Agent看到
1. 两个文件已成功写入ToolMessage确认
2. 步骤1已完成应该进入步骤2
3. 现在可以调用task工具启动intent-analyzer
#### 输出
```json
{
"content": "已将研究问题和配置写入虚拟文件系统。现在,我将调用 `intent-analyzer` SubAgent 来分析研究问题并生成搜索查询。",
"tool_calls": [
{
"name": "task",
"args": {
"description": "分析研究问题并生成搜索查询",
"subagent_type": "intent-analyzer"
}
}
]
}
```
#### 关键点
- ✅ 主Agent在**独立的响应**中调用task
- ✅ 此时虚拟文件系统已包含两个文件
- 🚀 task工具触发SubAgent创建和执行
---
### 🔵 LLM调用 #3 - SubAgent读取文件
**时间**: 15:05:06 → 15:05:08 (2.7秒)
**执行者**: intent-analyzer (SubAgent)
**Token消耗**: 输入2,724 + 输出49 = 2,773 tokens
#### 输入
```
[SystemMessage] intent-analyzer的专用系统提示词
- 任务: 分析问题并生成搜索查询
- 需要读取: /question.txt, /config.json
- 需要输出: /search_queries.json
[HumanMessage] "分析研究问题并生成搜索查询"
```
#### SubAgent的思考过程
SubAgent按照提示词
1. 需要先读取输入文件
2. 使用ls查看文件系统结构
3. 使用read_file读取两个文件
#### 输出
```json
{
"tool_calls": [
{"name": "ls"},
{"name": "read_file", "file_path": "/question.txt"},
{"name": "read_file", "file_path": "/config.json"}
]
}
```
#### 关键机制解析
**🔑 核心问题SubAgent如何访问主Agent写入的文件**
根据DeepAgents源码 (`deepagents/middleware/subagents.py:332`):
```python
def _validate_and_prepare_state(subagent_type, description, runtime):
# 创建SubAgent的state从主Agent的state复制
subagent_state = {
k: v for k, v in runtime.state.items()
if k not in ("messages", "todos") # 排除这两个键
}
subagent_state["messages"] = [HumanMessage(content=description)]
return subagent, subagent_state
```
**执行时序**:
```
1. 主Agent调用 #1: write_file × 2
→ 返回 Command(update={"files": {...}})
2. LangGraph应用State更新
→ runtime.state["files"] = {
"/question.txt": {...},
"/config.json": {...}
}
3. 主Agent调用 #2: task(...)
→ task工具内部调用 _validate_and_prepare_state()
→ 复制 runtime.state (包括files) 给SubAgent
4. SubAgent启动时state中已包含这两个文件 ✅
```
这就是为什么**必须分两个响应**
- ❌ 如果在同一响应中调用write_file和task两个工具会**并行**执行
- ❌ 当task开始复制state时write_file的更新还没被应用
- ✅ 分两个响应后第二次调用时state已包含更新
---
### 🔵 LLM调用 #4 - SubAgent分析并生成查询
**时间**: 15:05:08 → 15:05:28 (19.4秒)
**执行者**: intent-analyzer (SubAgent)
**Token消耗**: 输入2,861 + 输出404 = 3,265 tokens
#### 输入
```
[SystemMessage] intent-analyzer的系统提示词
[HumanMessage] "分析研究问题并生成搜索查询"
[AIMessage] 之前的工具调用 (ls, read_file × 2)
[ToolMessage] "[\"/question.txt\", \"/config.json\"]" ✅
[ToolMessage] " 1\tPython asyncio最佳实践" ✅
[ToolMessage] " 1\t{\"depth_mode\": \"quick\", ...}" ✅
```
#### SubAgent的思考过程
SubAgent成功读取到文件内容
1. **问题**: "Python asyncio最佳实践"
2. **配置**: quick模式 → 生成3个查询
3. **领域分析**: 技术(编程)
4. **查询策略**: 基础概念 + 最佳实践 + 实现细节
#### 输出
```json
{
"content": "已经读取了原始研究问题和配置文件。研究问题是关于...",
"tool_calls": [
{
"name": "write_file",
"file_path": "/search_queries.json",
"content": "{\n \"original_question\": \"Python asyncio最佳实践\",\n \"queries\": [\n {\"query\": \"What is Python asyncio and how does it work?\", ...},\n {\"query\": \"Best practices for using Python asyncio\", ...},\n {\"query\": \"How to implement best practices with Python asyncio in real projects\", ...}\n ]\n}"
}
]
}
```
#### 生成的3个搜索查询
1. **基础概念**: "What is Python asyncio and how does it work?"
2. **最佳实践**: "Best practices for using Python asyncio"
3. **实现细节**: "How to implement best practices with Python asyncio in real projects"
#### State更新
```
虚拟文件系统:
├── /question.txt
├── /config.json
└── /search_queries.json ✅ (新增SubAgent写入)
```
---
### 🔵 LLM调用 #5 - SubAgent完成并返回
**时间**: 15:05:28 → 15:05:43 (14.6秒)
**执行者**: intent-analyzer (SubAgent)
**Token消耗**: 输入3,278 + 输出241 = 3,519 tokens
#### 输入
```
[SystemMessage] intent-analyzer的系统提示词
[HumanMessage] "分析研究问题并生成搜索查询"
[之前的所有消息和工具调用...]
[ToolMessage] "Updated file /search_queries.json" ✅
```
#### SubAgent的思考过程
SubAgent看到
1. 文件已成功写入
2. 任务已完成生成了3个查询
3. 可以总结并返回结果
#### 输出
```json
{
"content": "搜索查询已成功生成并写入`/search_queries.json`文件中。以下是生成的搜索查询:\n\n- **原始问题**: Python asyncio最佳实践\n- **领域**: 技术(编程)\n...",
"tool_calls": [] // ← 空数组,没有更多工具调用
}
```
#### 关键点
- ✅ SubAgent没有工具调用tool_calls为空
- 🔄 LangGraph检测到SubAgent完成返回结果给主Agent
- 📦 SubAgent的state更新包括/search_queries.json合并回主Agent
#### SubAgent返回机制
根据`deepagents/middleware/subagents.py:316-323`:
```python
def _return_command_with_state_update(result, tool_call_id):
# 排除messages和todos其他state都返回给主Agent
state_update = {
k: v for k, v in result.items()
if k not in ("messages", "todos")
}
return Command(update={
**state_update, # files会被合并
"messages": [ToolMessage(result["messages"][-1].text, tool_call_id)]
})
```
#### 最终State
```
虚拟文件系统 (主Agent):
├── /question.txt (主Agent写入)
├── /config.json (主Agent写入)
└── /search_queries.json (SubAgent写入已合并) ✅
```
---
## 执行流程图
```mermaid
sequenceDiagram
participant User
participant LangGraph
participant 主Agent
participant State as 虚拟文件系统
participant SubAgent as intent-analyzer
User->>LangGraph: "研究: Python asyncio最佳实践"
Note over LangGraph,主Agent: 🔵 LLM调用 #1 (8秒)
LangGraph->>主Agent: SystemMessage + HumanMessage
主Agent->>主Agent: 理解: 需执行步骤1 - 初始化
主Agent->>State: write_file(/question.txt)
主Agent->>State: write_file(/config.json)
State-->>主Agent: ToolMessage × 2
Note over LangGraph,State: State更新: files包含2个文件
Note over LangGraph,主Agent: 🔵 LLM调用 #2 (4.5秒)
LangGraph->>主Agent: 之前的消息 + ToolMessage
主Agent->>主Agent: 理解: 步骤1完成进入步骤2
主Agent->>LangGraph: task(intent-analyzer)
Note over LangGraph,SubAgent: task工具复制state给SubAgent
LangGraph->>SubAgent: 创建SubAgent (state包含2个文件)
Note over LangGraph,SubAgent: 🔵 LLM调用 #3 (2.7秒)
LangGraph->>SubAgent: SystemMessage + HumanMessage
SubAgent->>SubAgent: 理解: 需读取输入文件
SubAgent->>State: ls()
SubAgent->>State: read_file(/question.txt)
SubAgent->>State: read_file(/config.json)
State-->>SubAgent: ToolMessage × 3 ✅ 文件存在!
Note over LangGraph,SubAgent: 🔵 LLM调用 #4 (19.4秒)
LangGraph->>SubAgent: 之前的消息 + ToolMessage
SubAgent->>SubAgent: 分析问题生成3个查询
SubAgent->>State: write_file(/search_queries.json)
State-->>SubAgent: ToolMessage
Note over LangGraph,SubAgent: 🔵 LLM调用 #5 (14.6秒)
LangGraph->>SubAgent: 之前的消息 + ToolMessage
SubAgent->>SubAgent: 理解: 任务完成
SubAgent-->>LangGraph: 无工具调用 (完成)
Note over LangGraph,State: SubAgent state合并回主Agent
LangGraph->>主Agent: ToolMessage (SubAgent结果)
Note over 主Agent: 继续步骤3...
主Agent-->>User: (测试在此停止)
```
---
## Token消耗分析
| 调用 | 执行者 | 输入Token | 输出Token | 总计 | 占比 |
|------|--------|-----------|-----------|------|------|
| #1 | 主Agent | 5,920 | 103 | 6,023 | 31.2% |
| #2 | 主Agent | 6,046 | 69 | 6,115 | 31.7% |
| #3 | SubAgent | 2,724 | 49 | 2,773 | 14.4% |
| #4 | SubAgent | 2,861 | 404 | 3,265 | 16.9% |
| #5 | SubAgent | 3,278 | 241 | 3,519 | 18.2% |
| **总计** | | **20,829** | **866** | **19,295** | **100%** |
**关键观察**:
- 主Agent的Token消耗主要在系统提示词非常详细
- SubAgent的输入Token较少专用提示词更简洁
- 输出Token主要用于JSON生成调用#4
---
## 关键技术要点总结
### ✅ 成功解决的问题
1. **虚拟文件系统共享**
- SubAgent能成功读取主Agent写入的文件
- 通过state复制机制实现
2. **工具调用顺序**
- write_file在第一个响应
- task在第二个响应
- 确保state更新已应用
3. **SubAgent生命周期**
- 创建 → 接收任务描述
- 执行 → 读取文件、处理、写入结果
- 返回 → state合并回主Agent
### 🎯 设计亮点
1. **声明式流程控制**
- 通过系统提示词定义流程
- 不使用Python while循环
- LLM自主决策下一步
2. **文件驱动的状态管理**
- 所有状态通过虚拟文件系统
- 跨Agent通信通过文件
- 易于调试和追踪
3. **降级运行策略**
- 部分失败不影响整体
- 提示词中明确说明
---
## 后续步骤预测
如果测试继续运行,预期流程:
```
✅ 步骤1: 初始化 (已完成)
✅ 步骤2: 意图分析 (已完成)
⏭️ 步骤3.1: 并行搜索
- 主Agent调用search-orchestrator
- 使用Tavily API搜索3个查询
- 写入/iteration_1/search_results.json
⏭️ 步骤3.2: 来源验证
- 主Agent调用source-validator
- Tier 1-4分级
- 写入/iteration_1/sources.json
⏭️ 步骤3.3: 内容分析
- 主Agent调用content-analyzer
- 提取信息,交叉验证
- 写入/iteration_1/findings.json
⏭️ 步骤3.4: 置信度评估
- 主Agent调用confidence-evaluator
- 计算置信度 (50%+30%+20%)
- 写入/iteration_decision.json
- 决策: FINISH 或 CONTINUE
⏭️ 步骤7: 报告生成
- 主Agent调用report-generator
- 读取所有iteration数据
- 写入/final_report.md
```
---
## 性能优化建议
基于当前执行情况:
1. **系统提示词优化**
- 主Agent的提示词非常长5,920 tokens
- 可以精简部分重复说明
- 预期节省 ~20% Token
2. **并行SubAgent调用**
- 当前是串行步骤3.1 → 3.2 → 3.3
- 某些步骤可以并行(如果依赖允许)
- 预期减少 30-40% 时间
3. **缓存机制**
- 相同问题的搜索结果可缓存
- 减少API调用次数
---
## 总结
**测试成功证明**:
- 虚拟文件系统在主Agent和SubAgent之间正确共享
- 工具调用顺序控制有效
- 基于提示词的流程控制可行
🎯 **下一步工作**:
1. 完成剩余SubAgent的测试
2. 实现完整的端到端流程
3. 添加错误处理和降级策略
4. 性能优化
📊 **当前进度**: 2/7步 (28.6%)
- ✅ 步骤1: 初始化
- ✅ 步骤2: 意图分析
- ⏳ 步骤3-7: 待实现
---
**生成时间**: 2025-10-31
**测试数据**: `llm_calls_20251031_150543.json`

0
tests/__init__.py Normal file
View File

156
tests/analyze_llm_calls.py Normal file
View File

@ -0,0 +1,156 @@
"""
分析LLM调用记录
使用方法:
python tests/analyze_llm_calls.py tests/llm_calls_20251031_150543.json
"""
import sys
import json
def analyze_llm_calls(json_file):
"""分析LLM调用记录"""
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print("\n" + "="*80)
print("LLM调用分析报告")
print("="*80)
print(f"\n总调用次数: {data['total_calls']}")
for i, call in enumerate(data['calls'], 1):
print(f"\n{''*80}")
print(f"调用 #{i}")
print(''*80)
# 时间信息
start = call.get('timestamp_start', 'N/A')
end = call.get('timestamp_end', 'N/A')
print(f"时间: {start} -> {end}")
# 消息数
messages = call.get('messages', [[]])
if messages:
msg_count = len(messages[0])
print(f"输入消息数: {msg_count}")
# 显示最后一条消息类型
if messages[0]:
last_msg = messages[0][-1]
print(f"最后一条输入消息: {last_msg['type']}")
# 响应信息
response = call.get('response', {})
generations = response.get('generations', [])
if generations:
gen = generations[0]
msg = gen.get('message', {})
print(f"响应类型: {msg.get('type', 'N/A')}")
# 内容
content = msg.get('content', '')
if content:
preview = content[:100].replace('\n', ' ')
print(f"响应内容: {preview}...")
# 工具调用
tool_calls = msg.get('tool_calls', [])
if tool_calls:
print(f"工具调用: {len(tool_calls)}")
for tc in tool_calls:
print(f" - {tc['name']}")
else:
print("工具调用: 无")
# Token使用
llm_output = response.get('llm_output', {})
token_usage = llm_output.get('token_usage', {})
if token_usage:
print(f"Token使用: {token_usage.get('prompt_tokens', 0)} input + {token_usage.get('completion_tokens', 0)} output = {token_usage.get('total_tokens', 0)} total")
print("\n" + "="*80)
print("执行流程总结")
print("="*80)
# 分析执行流程
call_summaries = []
for i, call in enumerate(data['calls'], 1):
response = call.get('response', {})
generations = response.get('generations', [])
if generations:
msg = generations[0].get('message', {})
tool_calls = msg.get('tool_calls', [])
if tool_calls:
tools = [tc['name'] for tc in tool_calls]
call_summaries.append(f"调用#{i}: {', '.join(tools)}")
else:
content_preview = msg.get('content', '')[:50].replace('\n', ' ')
call_summaries.append(f"调用#{i}: 返回文本 ({content_preview}...)")
for summary in call_summaries:
print(f" {summary}")
# 判断是否完成
print("\n" + "="*80)
print("状态判断")
print("="*80)
last_call = data['calls'][-1]
last_response = last_call.get('response', {})
last_generations = last_response.get('generations', [])
if last_generations:
last_msg = last_generations[0].get('message', {})
last_tool_calls = last_msg.get('tool_calls', [])
if not last_tool_calls:
print("⚠️ 最后一次调用没有工具调用")
print("原因: SubAgent返回了纯文本响应导致主Agent停止")
print("影响: Agent停止执行未完成完整流程")
print("\n预期行为: 主Agent应该继续执行步骤3并行搜索")
else:
print("✅ 最后一次调用有工具调用,流程继续")
else:
print("❌ 无法判断状态")
# 检查是否完成意图分析
search_queries_created = False
for call in data['calls']:
response = call.get('response', {})
generations = response.get('generations', [])
if generations:
msg = generations[0].get('message', {})
tool_calls = msg.get('tool_calls', [])
for tc in tool_calls:
if tc['name'] == 'write_file' and '/search_queries.json' in str(tc.get('args', {})):
search_queries_created = True
print("\n" + "="*80)
print("步骤完成情况")
print("="*80)
print(f"✅ 步骤1: 初始化 - 已完成 (/question.txt, /config.json)")
print(f"✅ 步骤2: 意图分析 - {'已完成' if search_queries_created else '未完成'} (/search_queries.json)")
print(f"❌ 步骤3: 并行搜索 - 未开始")
print(f"❌ 后续步骤 - 未开始")
print("\n" + "="*80)
print("建议")
print("="*80)
print("1. 问题根源: intent-analyzer SubAgent完成后返回纯文本导致主Agent停止")
print("2. 解决方案: 修改主Agent的系统提示词明确要求在SubAgent返回后继续执行下一步")
print("3. 或者: 检查LangGraph的recursion_limit配置确保允许足够的步骤数")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("使用方法: python analyze_llm_calls.py <json_file>")
sys.exit(1)
json_file = sys.argv[1]
analyze_llm_calls(json_file)

308
tests/debug_llm_calls.py Normal file
View File

@ -0,0 +1,308 @@
"""
记录LLM调用的详细信息 - 保存为JSON文件
使用方法:
export PYTHONIOENCODING=utf-8 && python tests/debug_llm_calls.py
"""
import sys
import os
import json
from datetime import datetime
from typing import Any, Dict, List
from uuid import UUID
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage
from langchain_core.outputs import LLMResult
from src.agents.coordinator import create_research_coordinator
from src.config import Config
class LLMCallLogger(BaseCallbackHandler):
"""记录所有LLM调用的回调处理器"""
def __init__(self):
self.calls: List[Dict[str, Any]] = []
self.current_call = None
self.call_count = 0
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""LLM开始时调用"""
self.call_count += 1
self.current_call = {
"call_id": self.call_count,
"timestamp_start": datetime.now().isoformat(),
"prompts": prompts,
"kwargs": {k: str(v) for k, v in kwargs.items() if k != "invocation_params"},
}
print(f"\n{'='*80}")
print(f"🔵 LLM调用 #{self.call_count} 开始 - {datetime.now().strftime('%H:%M:%S')}")
print('='*80)
if prompts:
print(f"Prompt长度: {len(prompts[0])} 字符")
print(f"Prompt预览: {prompts[0][:200]}...")
def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[List[BaseMessage]],
**kwargs: Any
) -> None:
"""Chat模型开始时调用"""
self.call_count += 1
self.current_call = {
"call_id": self.call_count,
"timestamp_start": datetime.now().isoformat(),
"messages": [
[
{
"type": type(msg).__name__,
"content": msg.content if hasattr(msg, 'content') else str(msg),
"tool_calls": getattr(msg, 'tool_calls', None)
}
for msg in msg_list
]
for msg_list in messages
],
"kwargs": {k: str(v) for k, v in kwargs.items() if k not in ["invocation_params", "tags", "metadata"]},
}
print(f"\n{'='*80}")
print(f"🔵 Chat模型调用 #{self.call_count} 开始 - {datetime.now().strftime('%H:%M:%S')}")
print('='*80)
if messages:
print(f"消息数量: {len(messages[0])}")
for i, msg in enumerate(messages[0][-3:], 1):
msg_type = type(msg).__name__
print(f" {i}. {msg_type}: {str(msg.content)[:100] if hasattr(msg, 'content') else 'N/A'}...")
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""LLM结束时调用"""
if self.current_call:
self.current_call["timestamp_end"] = datetime.now().isoformat()
# 提取响应
generations = []
for gen_list in response.generations:
for gen in gen_list:
gen_info = {
"text": gen.text if hasattr(gen, 'text') else None,
}
if hasattr(gen, 'message'):
msg = gen.message
gen_info["message"] = {
"type": type(msg).__name__,
"content": msg.content if hasattr(msg, 'content') else None,
"tool_calls": [
{
"name": tc.get("name"),
"args": tc.get("args"),
"id": tc.get("id")
}
for tc in (msg.tool_calls if hasattr(msg, 'tool_calls') and msg.tool_calls else [])
] if hasattr(msg, 'tool_calls') else None
}
generations.append(gen_info)
self.current_call["response"] = {
"generations": generations,
"llm_output": response.llm_output,
}
self.calls.append(self.current_call)
print(f"\n✅ LLM调用 #{self.current_call['call_id']} 完成")
if generations:
gen = generations[0]
if gen.get("message"):
msg = gen["message"]
print(f"响应类型: {msg['type']}")
if msg.get('content'):
print(f"内容: {msg['content'][:150]}...")
if msg.get('tool_calls'):
print(f"工具调用: {len(msg['tool_calls'])}")
for tc in msg['tool_calls'][:3]:
print(f" - {tc['name']}")
self.current_call = None
def on_llm_error(self, error: Exception, **kwargs: Any) -> None:
"""LLM出错时调用"""
if self.current_call:
self.current_call["timestamp_end"] = datetime.now().isoformat()
self.current_call["error"] = str(error)
self.calls.append(self.current_call)
print(f"\n❌ LLM调用 #{self.current_call['call_id']} 出错: {error}")
self.current_call = None
def save_to_file(self, filepath: str):
"""保存记录到JSON文件"""
with open(filepath, 'w', encoding='utf-8') as f:
json.dump({
"total_calls": len(self.calls),
"calls": self.calls
}, f, ensure_ascii=False, indent=2)
print(f"\n💾 已保存 {len(self.calls)} 次LLM调用记录到: {filepath}")
def test_with_llm_logging(question: str, depth: str = "quick", max_steps: int = 10):
"""
测试研究流程记录所有LLM调用
Args:
question: 研究问题
depth: 深度模式
max_steps: 最大执行步骤数(防止无限循环)
"""
print("\n" + "🔬 " * 40)
print("智能深度研究系统 - LLM调用记录模式")
print("🔬 " * 40)
print(f"\n研究问题: {question}")
print(f"深度模式: {depth}")
print(f"最大步骤数: {max_steps}")
print(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 创建日志记录器
logger = LLMCallLogger()
# 创建Agent带callback
print("\n" + "="*80)
print("创建Agent...")
print("="*80)
try:
# 获取LLM并添加callback
llm = Config.get_llm()
llm.callbacks = [logger]
# 创建Agent
agent = create_research_coordinator(
question=question,
depth=depth,
format="technical",
min_tier=3
)
print("✅ Agent创建成功")
except Exception as e:
print(f"❌ Agent创建失败: {e}")
import traceback
traceback.print_exc()
return
# 执行研究
print("\n" + "="*80)
print(f"执行研究流程(最多{max_steps}步)...")
print("="*80)
try:
start_time = datetime.now()
step_count = 0
# 使用stream模式但限制步骤数
for chunk in agent.stream(
{
"messages": [
{
"role": "user",
"content": f"请开始研究这个问题:{question}"
}
]
},
config={"callbacks": [logger]}
):
step_count += 1
print(f"\n{''*80}")
print(f"📍 步骤 #{step_count} - {datetime.now().strftime('%H:%M:%S')}")
print(''*80)
# 显示state更新
if isinstance(chunk, dict):
if 'messages' in chunk:
print(f" 消息: {len(chunk['messages'])}")
if 'files' in chunk:
print(f" 文件: {len(chunk['files'])}")
for path in list(chunk['files'].keys())[:3]:
print(f" - {path}")
# 限制步骤数
if step_count >= max_steps:
print(f"\n⚠️ 达到最大步骤数 {max_steps},停止执行")
break
# 超时保护
elapsed = (datetime.now() - start_time).total_seconds()
if elapsed > 120: # 2分钟
print(f"\n⚠️ 超过2分钟停止执行")
break
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print("\n" + "="*80)
print("执行结束")
print("="*80)
print(f"总步骤数: {step_count}")
print(f"LLM调用次数: {len(logger.calls)}")
print(f"总耗时: {duration:.2f}")
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断")
except Exception as e:
print(f"\n\n❌ 执行失败: {e}")
import traceback
traceback.print_exc()
finally:
# 保存日志
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = "tests"
os.makedirs(output_dir, exist_ok=True)
log_file = os.path.join(output_dir, f"llm_calls_{timestamp}.json")
logger.save_to_file(log_file)
# 也保存一份摘要
summary_file = os.path.join(output_dir, f"llm_calls_summary_{timestamp}.txt")
with open(summary_file, 'w', encoding='utf-8') as f:
f.write(f"LLM调用记录摘要\n")
f.write(f"{'='*80}\n\n")
f.write(f"总调用次数: {len(logger.calls)}\n")
f.write(f"执行时长: {duration:.2f}\n\n")
for i, call in enumerate(logger.calls, 1):
f.write(f"\n{''*80}\n")
f.write(f"调用 #{i}\n")
f.write(f"{''*80}\n")
f.write(f"开始: {call['timestamp_start']}\n")
f.write(f"结束: {call.get('timestamp_end', 'N/A')}\n")
if 'messages' in call:
f.write(f"消息数: {len(call['messages'][0]) if call['messages'] else 0}\n")
if 'response' in call:
gens = call['response'].get('generations', [])
if gens:
gen = gens[0]
if gen.get('message'):
msg = gen['message']
f.write(f"响应类型: {msg['type']}\n")
if msg.get('tool_calls'):
f.write(f"工具调用: {[tc['name'] for tc in msg['tool_calls']]}\n")
if 'error' in call:
f.write(f"错误: {call['error']}\n")
print(f"📄 摘要已保存到: {summary_file}")
if __name__ == "__main__":
question = "Python asyncio最佳实践"
# 只执行前几步不做完整research
test_with_llm_logging(question, depth="quick", max_steps=10)

190
tests/debug_research.py Normal file
View File

@ -0,0 +1,190 @@
"""
调试研究流程 - 详细追踪Agent执行情况
使用方法:
export PYTHONIOENCODING=utf-8 && python tests/debug_research.py
"""
import sys
import os
import json
from datetime import datetime
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.agents.coordinator import create_research_coordinator
from src.config import Config
def print_step(step_num: int, title: str):
"""打印步骤标题"""
print("\n" + "="*80)
print(f"步骤 {step_num}: {title}")
print("="*80)
def print_substep(title: str):
"""打印子步骤"""
print(f"\n>>> {title}")
print("-"*60)
def print_file_content(file_path: str, content: any, max_length: int = 500):
"""打印文件内容"""
print(f"\n📄 文件: {file_path}")
if isinstance(content, dict) or isinstance(content, list):
content_str = json.dumps(content, ensure_ascii=False, indent=2)
else:
content_str = str(content)
if len(content_str) > max_length:
print(content_str[:max_length] + "...")
else:
print(content_str)
def debug_research(question: str, depth: str = "quick"):
"""
调试研究流程,显示详细执行日志
Args:
question: 研究问题
depth: 深度模式使用quick模式加快调试
"""
print("\n" + "🔬 "* 40)
print("智能深度研究系统 - 调试模式")
print("🔬 " * 40)
print(f"\n研究问题: {question}")
print(f"深度模式: {depth}")
print(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 验证API配置
print_step(0, "验证API配置")
print(f"DashScope API Key: {Config.DASHSCOPE_API_KEY[:20]}..." if Config.DASHSCOPE_API_KEY else "❌ 未配置")
print(f"Tavily API Key: {Config.TAVILY_API_KEY[:20]}..." if Config.TAVILY_API_KEY else "❌ 未配置")
print(f"LLM模型: {Config.LLM_MODEL}")
# 创建Agent
print_step(1, "创建ResearchCoordinator Agent")
try:
agent = create_research_coordinator(
question=question,
depth=depth,
format="technical",
min_tier=3
)
print("✅ Agent创建成功")
print(f"Agent类型: {type(agent)}")
except Exception as e:
print(f"❌ Agent创建失败: {e}")
import traceback
traceback.print_exc()
return
# 执行研究
print_step(2, "执行研究流程")
print("调用 agent.invoke() ...")
print("注意:这可能需要几分钟,请耐心等待...\n")
try:
# 记录开始时间
start_time = datetime.now()
# 执行Agent
result = agent.invoke({
"messages": [
{
"role": "user",
"content": f"请开始研究这个问题:{question}"
}
]
})
# 记录结束时间
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print_step(3, "执行完成")
print(f"✅ 研究完成!")
print(f"⏱️ 总耗时: {duration:.2f}秒 ({duration/60:.2f}分钟)")
# 显示结果
print_step(4, "结果分析")
print(f"结果类型: {type(result)}")
print(f"结果键: {result.keys() if isinstance(result, dict) else 'N/A'}")
# 尝试提取消息
if isinstance(result, dict) and 'messages' in result:
messages = result['messages']
print(f"\n消息数量: {len(messages)}")
# 显示最后几条消息
print("\n最后3条消息:")
for i, msg in enumerate(messages[-3:], 1):
print(f"\n--- 消息 {i} ---")
if hasattr(msg, 'content'):
content = msg.content
if len(content) > 300:
print(content[:300] + "...")
else:
print(content)
else:
print(msg)
# 尝试访问虚拟文件系统
print_step(5, "虚拟文件系统检查")
print("注意需要根据DeepAgents实际API来访问虚拟文件系统")
print("这部分功能待实现...")
# 保存完整结果到文件
print_step(6, "保存调试结果")
output_dir = "outputs/debug"
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(output_dir, f"debug_{timestamp}.json")
debug_data = {
"question": question,
"depth": depth,
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"duration_seconds": duration,
"result": str(result), # 转换为字符串以便保存
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(debug_data, f, ensure_ascii=False, indent=2)
print(f"✅ 调试结果已保存到: {output_file}")
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断执行")
print(f"已执行时间: {(datetime.now() - start_time).total_seconds():.2f}")
except Exception as e:
print(f"\n\n❌ 执行失败: {e}")
import traceback
traceback.print_exc()
# 保存错误信息
output_dir = "outputs/debug"
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
error_file = os.path.join(output_dir, f"error_{timestamp}.txt")
with open(error_file, 'w', encoding='utf-8') as f:
f.write(f"Question: {question}\n")
f.write(f"Depth: {depth}\n")
f.write(f"Error: {str(e)}\n\n")
f.write(traceback.format_exc())
print(f"错误信息已保存到: {error_file}")
if __name__ == "__main__":
# 使用简单的问题和quick模式进行调试
question = "Python asyncio最佳实践"
debug_research(question, depth="quick")

194
tests/debug_research_v2.py Normal file
View File

@ -0,0 +1,194 @@
"""
调试研究流程 V2 - 检查虚拟文件系统
使用方法:
export PYTHONIOENCODING=utf-8 && python tests/debug_research_v2.py
"""
import sys
import os
import json
from datetime import datetime
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.agents.coordinator import create_research_coordinator
from src.config import Config
def debug_research_with_files(question: str, depth: str = "quick"):
"""
调试研究流程,重点检查虚拟文件系统
Args:
question: 研究问题
depth: 深度模式
"""
print("\n" + "🔬 " * 40)
print("智能深度研究系统 - 调试模式 V2")
print("🔬 " * 40)
print(f"\n研究问题: {question}")
print(f"深度模式: {depth}")
print(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 创建Agent
print("\n" + "="*80)
print("创建ResearchCoordinator Agent")
print("="*80)
try:
agent = create_research_coordinator(
question=question,
depth=depth,
format="technical",
min_tier=3
)
print("✅ Agent创建成功")
except Exception as e:
print(f"❌ Agent创建失败: {e}")
import traceback
traceback.print_exc()
return
# 执行研究
print("\n" + "="*80)
print("执行研究流程")
print("="*80)
try:
start_time = datetime.now()
result = agent.invoke({
"messages": [
{
"role": "user",
"content": f"请开始研究这个问题:{question}"
}
]
})
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print(f"\n✅ 执行完成!耗时: {duration:.2f}")
# 分析结果
print("\n" + "="*80)
print("结果分析")
print("="*80)
print(f"\n结果类型: {type(result)}")
print(f"结果键: {list(result.keys())}")
# 检查消息
if 'messages' in result:
messages = result['messages']
print(f"\n📨 消息数量: {len(messages)}")
print("\n所有消息内容:")
for i, msg in enumerate(messages, 1):
print(f"\n{'='*60}")
print(f"消息 #{i}")
print('='*60)
# 检查消息类型
msg_type = type(msg).__name__
print(f"类型: {msg_type}")
# 提取内容
if hasattr(msg, 'content'):
content = msg.content
print(f"内容长度: {len(content)} 字符")
# 显示内容
if len(content) > 500:
print(f"\n内容预览:\n{content[:500]}...")
else:
print(f"\n完整内容:\n{content}")
# 检查其他属性
if hasattr(msg, 'additional_kwargs'):
kwargs = msg.additional_kwargs
if kwargs:
print(f"\n额外参数: {kwargs}")
if hasattr(msg, 'tool_calls'):
tool_calls = msg.tool_calls
if tool_calls:
print(f"\n工具调用: {tool_calls}")
# 检查文件系统
if 'files' in result:
files = result['files']
print("\n" + "="*80)
print("虚拟文件系统")
print("="*80)
print(f"\n📁 文件数量: {len(files)}")
for file_path, file_info in files.items():
print(f"\n{'='*60}")
print(f"文件: {file_path}")
print('='*60)
# 显示文件信息
if isinstance(file_info, dict):
for key, value in file_info.items():
if key == 'content':
if len(str(value)) > 300:
print(f"{key}: {str(value)[:300]}...")
else:
print(f"{key}: {value}")
else:
print(f"{key}: {value}")
else:
if len(str(file_info)) > 300:
print(f"内容: {str(file_info)[:300]}...")
else:
print(f"内容: {file_info}")
# 保存完整结果
print("\n" + "="*80)
print("保存调试结果")
print("="*80)
output_dir = "outputs/debug"
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 保存JSON结果
output_file = os.path.join(output_dir, f"debug_v2_{timestamp}.json")
with open(output_file, 'w', encoding='utf-8') as f:
# 序列化结果
serialized_result = {
"question": question,
"depth": depth,
"duration_seconds": duration,
"messages": [
{
"type": type(msg).__name__,
"content": msg.content if hasattr(msg, 'content') else str(msg)
}
for msg in result.get('messages', [])
],
"files": {
path: str(content)
for path, content in result.get('files', {}).items()
}
}
json.dump(serialized_result, f, ensure_ascii=False, indent=2)
print(f"✅ 调试结果已保存到: {output_file}")
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断执行")
except Exception as e:
print(f"\n\n❌ 执行失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
question = "Python asyncio最佳实践"
debug_research_with_files(question, depth="quick")

129
tests/debug_with_stream.py Normal file
View File

@ -0,0 +1,129 @@
"""
带流式输出的调试脚本 - 实时显示Agent的执行情况
使用方法:
export PYTHONIOENCODING=utf-8 && python tests/debug_with_stream.py
"""
import sys
import os
from datetime import datetime
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.agents.coordinator import create_research_coordinator
from src.config import Config
def stream_research(question: str, depth: str = "quick"):
"""
调试研究流程,实时显示执行情况
Args:
question: 研究问题
depth: 深度模式
"""
print("\n" + "🔬 " * 40)
print("智能深度研究系统 - 流式调试模式")
print("🔬 " * 40)
print(f"\n研究问题: {question}")
print(f"深度模式: {depth}")
print(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 创建Agent
print("\n" + "="*80)
print("创建Agent...")
print("="*80)
try:
agent = create_research_coordinator(
question=question,
depth=depth,
format="technical",
min_tier=3
)
print("✅ Agent创建成功")
except Exception as e:
print(f"❌ Agent创建失败: {e}")
import traceback
traceback.print_exc()
return
# 执行研究使用stream模式
print("\n" + "="*80)
print("开始执行(流式模式)...")
print("="*80)
try:
start_time = datetime.now()
# 使用stream方法实时显示
step_count = 0
for chunk in agent.stream({
"messages": [
{
"role": "user",
"content": f"请开始研究这个问题:{question}"
}
]
}):
step_count += 1
print(f"\n{'='*60}")
print(f"步骤 #{step_count} - {datetime.now().strftime('%H:%M:%S')}")
print('='*60)
# 显示当前chunk的内容
if isinstance(chunk, dict):
# 检查是否有新消息
if 'messages' in chunk:
messages = chunk['messages']
if messages:
last_msg = messages[-1]
msg_type = type(last_msg).__name__
print(f"消息类型: {msg_type}")
if hasattr(last_msg, 'content'):
content = last_msg.content
if content:
print(f"内容: {content[:200]}")
if hasattr(last_msg, 'tool_calls') and last_msg.tool_calls:
print(f"工具调用:")
for tc in last_msg.tool_calls:
print(f" - {tc.get('name', 'unknown')}")
# 检查是否有文件更新
if 'files' in chunk:
files = chunk['files']
print(f"文件系统: {len(files)} 个文件")
for path in list(files.keys())[:5]:
print(f" - {path}")
# 超时保护
elapsed = (datetime.now() - start_time).total_seconds()
if elapsed > 120: # 2分钟
print("\n⚠️ 超过2分钟停止...")
break
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print("\n" + "="*80)
print("执行完成")
print("="*80)
print(f"总步骤数: {step_count}")
print(f"总耗时: {duration:.2f}")
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断")
except Exception as e:
print(f"\n\n❌ 执行失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
question = "Python asyncio最佳实践"
stream_research(question, depth="quick")

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,50 @@
LLM调用记录摘要
================================================================================
总调用次数: 5
执行时长: 49.49秒
────────────────────────────────────────────────────────────────────────────────
调用 #1
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T15:04:53.546542
结束: 2025-10-31T15:05:01.620812
消息数: 2
响应类型: AIMessage
工具调用: ['write_file', 'write_file']
────────────────────────────────────────────────────────────────────────────────
调用 #2
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T15:05:01.645324
结束: 2025-10-31T15:05:06.144999
消息数: 5
响应类型: AIMessage
工具调用: ['task']
────────────────────────────────────────────────────────────────────────────────
调用 #3
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T15:05:06.162121
结束: 2025-10-31T15:05:08.895694
消息数: 2
响应类型: AIMessage
工具调用: ['ls', 'read_file', 'read_file']
────────────────────────────────────────────────────────────────────────────────
调用 #4
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T15:05:08.920379
结束: 2025-10-31T15:05:28.363429
消息数: 6
响应类型: AIMessage
工具调用: ['write_file']
────────────────────────────────────────────────────────────────────────────────
调用 #5
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T15:05:28.383429
结束: 2025-10-31T15:05:43.011375
消息数: 8
响应类型: AIMessage

View File

@ -0,0 +1,41 @@
LLM调用记录摘要
================================================================================
总调用次数: 4
执行时长: 10.83秒
────────────────────────────────────────────────────────────────────────────────
调用 #1
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T15:54:08.326370
结束: 2025-10-31T15:54:12.078242
消息数: 2
响应类型: AIMessage
工具调用: ['write_file', 'task']
────────────────────────────────────────────────────────────────────────────────
调用 #2
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T15:54:12.104980
结束: 2025-10-31T15:54:14.650206
消息数: 2
响应类型: AIMessage
工具调用: ['ls', 'read_file', 'read_file']
────────────────────────────────────────────────────────────────────────────────
调用 #3
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T15:54:14.681994
结束: 2025-10-31T15:54:16.817896
消息数: 6
响应类型: AIMessage
────────────────────────────────────────────────────────────────────────────────
调用 #4
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T15:54:16.836410
结束: 2025-10-31T15:54:19.120601
消息数: 5
响应类型: AIMessage
工具调用: ['ls']

View File

@ -0,0 +1,86 @@
LLM调用记录摘要
================================================================================
总调用次数: 9
执行时长: 63.84秒
────────────────────────────────────────────────────────────────────────────────
调用 #1
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T16:05:27.194390
结束: 2025-10-31T16:05:34.197522
消息数: 2
响应类型: AIMessage
工具调用: ['write_file', 'write_file']
────────────────────────────────────────────────────────────────────────────────
调用 #2
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T16:05:34.227598
结束: 2025-10-31T16:05:38.551273
消息数: 5
响应类型: AIMessage
工具调用: ['task']
────────────────────────────────────────────────────────────────────────────────
调用 #3
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T16:05:38.571280
结束: 2025-10-31T16:05:41.055201
消息数: 2
响应类型: AIMessage
工具调用: ['ls', 'read_file', 'read_file']
────────────────────────────────────────────────────────────────────────────────
调用 #4
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T16:05:41.124345
结束: 2025-10-31T16:05:46.426078
消息数: 6
响应类型: AIMessage
工具调用: ['write_todos']
────────────────────────────────────────────────────────────────────────────────
调用 #5
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T16:05:46.441981
结束: 2025-10-31T16:05:52.572892
消息数: 8
响应类型: AIMessage
工具调用: ['write_todos']
────────────────────────────────────────────────────────────────────────────────
调用 #6
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T16:05:52.590619
结束: 2025-10-31T16:06:06.265340
消息数: 10
响应类型: AIMessage
工具调用: ['write_todos']
────────────────────────────────────────────────────────────────────────────────
调用 #7
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T16:06:06.286920
结束: 2025-10-31T16:06:17.218848
消息数: 12
响应类型: AIMessage
工具调用: ['write_file']
────────────────────────────────────────────────────────────────────────────────
调用 #8
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T16:06:17.235858
结束: 2025-10-31T16:06:20.406293
消息数: 14
响应类型: AIMessage
工具调用: ['write_todos']
────────────────────────────────────────────────────────────────────────────────
调用 #9
────────────────────────────────────────────────────────────────────────────────
开始: 2025-10-31T16:06:20.425967
结束: 2025-10-31T16:06:30.994058
消息数: 16
响应类型: AIMessage

195
tests/test_coordinator.py Normal file
View File

@ -0,0 +1,195 @@
"""
ResearchCoordinator测试
测试主Agent的完整执行流程
"""
import sys
import os
# 添加src目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from src.agents.coordinator import create_research_coordinator, run_research
from src.config import Config
def test_coordinator_creation():
"""测试ResearchCoordinator创建"""
print("=" * 60)
print("测试1: ResearchCoordinator创建")
print("=" * 60)
try:
# 测试默认参数
agent = create_research_coordinator(
question="什么是Python asyncio?",
depth="quick"
)
print("✓ ResearchCoordinator创建成功")
print(f" Agent类型: {type(agent)}")
return True
except Exception as e:
print(f"✗ ResearchCoordinator创建失败: {e}")
import traceback
traceback.print_exc()
return False
def test_config_validation():
"""测试配置验证"""
print("\n" + "=" * 60)
print("测试2: 配置验证")
print("=" * 60)
# 测试无效的深度模式
try:
agent = create_research_coordinator(
question="测试问题",
depth="invalid_depth"
)
print("✗ 应该抛出ValueError但没有")
return False
except ValueError as e:
print(f"✓ 正确捕获无效深度模式: {e}")
# 测试无效的min_tier
try:
agent = create_research_coordinator(
question="测试问题",
min_tier=5
)
print("✗ 应该抛出ValueError但没有")
return False
except ValueError as e:
print(f"✓ 正确捕获无效min_tier: {e}")
# 测试无效的格式
try:
agent = create_research_coordinator(
question="测试问题",
format="invalid_format"
)
print("✗ 应该抛出ValueError但没有")
return False
except ValueError as e:
print(f"✓ 正确捕获无效格式: {e}")
return True
def test_simple_research_dry_run():
"""测试简单研究流程dry run不执行真实搜索"""
print("\n" + "=" * 60)
print("测试3: 简单研究流程(模拟)")
print("=" * 60)
print("\n注意: 这个测试需要API密钥才能执行真实的Agent调用")
print("如果API密钥未配置将跳过此测试\n")
# 检查API密钥
try:
Config.validate()
except ValueError as e:
print(f"⚠️ 跳过测试:{e}")
return True # 不算失败
try:
# 创建Agent但不执行
agent = create_research_coordinator(
question="Python装饰器的作用",
depth="quick",
format="technical"
)
print("✓ Agent创建成功准备就绪")
print(" 如需运行完整测试请确保API密钥已配置")
print(" 然后运行python -m tests.test_integration")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_depth_configs():
"""测试三种深度模式的配置"""
print("\n" + "=" * 60)
print("测试4: 深度模式配置")
print("=" * 60)
depth_modes = ["quick", "standard", "deep"]
for depth in depth_modes:
try:
agent = create_research_coordinator(
question="测试问题",
depth=depth
)
depth_config = Config.get_depth_config(depth)
print(f"\n{depth}模式配置正确:")
print(f" - 最大迭代: {depth_config['max_iterations']}")
print(f" - 置信度阈值: {depth_config['confidence_threshold']}")
print(f" - 目标来源数: {depth_config['target_sources']}")
print(f" - 并行搜索数: {depth_config['parallel_searches']}")
except Exception as e:
print(f"{depth}模式配置失败: {e}")
return False
return True
def main():
"""运行所有测试"""
print("\n")
print("=" * 60)
print("ResearchCoordinator测试套件")
print("=" * 60)
print("\n")
results = []
# 测试1: 创建
results.append(("创建测试", test_coordinator_creation()))
# 测试2: 配置验证
results.append(("配置验证", test_config_validation()))
# 测试3: 简单研究流程
results.append(("简单研究流程", test_simple_research_dry_run()))
# 测试4: 深度模式配置
results.append(("深度模式配置", test_depth_configs()))
# 总结
print("\n" + "=" * 60)
print("测试总结")
print("=" * 60)
for test_name, passed in results:
status = "✓ 通过" if passed else "✗ 失败"
print(f"{test_name}: {status}")
all_passed = all(result[1] for result in results)
print("\n" + "=" * 60)
if all_passed:
print("✓ 所有测试通过ResearchCoordinator实现正确。")
else:
print("✗ 部分测试失败,请检查实现。")
print("=" * 60 + "\n")
return all_passed
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

199
tests/test_minimal_agent.py Normal file
View File

@ -0,0 +1,199 @@
"""
最小化测试 - 理解DeepAgents的工作机制
使用方法:
export PYTHONIOENCODING=utf-8 && python tests/test_minimal_agent.py
"""
import sys
import os
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from deepagents import create_deep_agent
from src.config import Config
def test_minimal_agent():
"""测试最简单的Agent执行"""
print("\n" + "="*80)
print("最小化测试 - 主Agent写文件")
print("="*80)
# 创建一个最简单的主Agent
main_system_prompt = """你是一个简单的测试Agent。
你的任务:
1. 使用 write_file 工具写入一个文件到 `/test.txt`,内容为 "Hello World"
2. 使用 read_file 工具读取 `/test.txt`
3. 告诉用户文件内容
**重要**:完成后明确说"任务完成"
"""
agent = create_deep_agent(
model=Config.get_llm(),
subagents=[], # 不使用SubAgent
system_prompt=main_system_prompt,
)
print("✅ Agent创建成功")
print("\n开始执行...")
try:
result = agent.invoke({
"messages": [
{
"role": "user",
"content": "请开始执行任务"
}
]
})
print("\n" + "="*80)
print("执行结果")
print("="*80)
# 检查消息
if 'messages' in result:
print(f"\n消息数量: {len(result['messages'])}")
# 显示最后一条消息
last_msg = result['messages'][-1]
print(f"\n最后一条消息:")
if hasattr(last_msg, 'content'):
print(last_msg.content)
# 检查文件系统
if 'files' in result:
print(f"\n文件数量: {len(result['files'])}")
for path, info in result['files'].items():
print(f"\n文件: {path}")
if isinstance(info, dict) and 'content' in info:
print(f"内容: {info['content']}")
else:
print(f"内容: {info}")
print("\n✅ 测试完成")
except Exception as e:
print(f"\n❌ 测试失败: {e}")
import traceback
traceback.print_exc()
def test_agent_with_subagent():
"""测试主Agent和SubAgent的文件共享"""
print("\n" + "="*80)
print("测试主Agent和SubAgent的文件共享")
print("="*80)
# 定义一个简单的SubAgent
subagent_config = {
"name": "file-reader",
"description": "读取文件并返回内容",
"system_prompt": """你是一个文件读取Agent。
你的任务:
1. 使用 read_file 工具读取 `/test.txt` 文件
2. 告诉用户文件内容
**重要**
- 如果文件不存在,明确说"文件不存在"
- 如果文件存在,告诉用户文件内容
- 完成后明确说"任务完成"
""",
"tools": [],
}
# 主Agent
main_system_prompt = """你是一个测试协调Agent。
你的任务:
1. 使用 write_file 工具写入一个文件到 `/test.txt`,内容为 "Hello from Main Agent"
2. 使用 task 工具调用 file-reader SubAgenttask(description="读取测试文件", subagent_type="file-reader")
3. 等待SubAgent返回结果
4. 告诉用户SubAgent读取的内容
**重要**:完成后明确说"所有任务完成"
"""
agent = create_deep_agent(
model=Config.get_llm(),
subagents=[subagent_config],
system_prompt=main_system_prompt,
)
print("✅ Agent创建成功1主 + 1子")
print("\n开始执行...")
try:
result = agent.invoke({
"messages": [
{
"role": "user",
"content": "请开始执行任务"
}
]
})
print("\n" + "="*80)
print("执行结果")
print("="*80)
# 检查消息
if 'messages' in result:
print(f"\n消息数量: {len(result['messages'])}")
# 显示所有消息内容
print("\n所有消息:")
for i, msg in enumerate(result['messages'], 1):
print(f"\n--- 消息 #{i} ---")
msg_type = type(msg).__name__
print(f"类型: {msg_type}")
if hasattr(msg, 'content'):
content = msg.content
if len(content) > 200:
print(f"内容: {content[:200]}...")
else:
print(f"内容: {content}")
if hasattr(msg, 'tool_calls') and msg.tool_calls:
print(f"工具调用: {msg.tool_calls}")
# 检查文件系统
if 'files' in result:
print(f"\n文件系统:")
print(f"文件数量: {len(result['files'])}")
for path, info in result['files'].items():
print(f"\n 文件: {path}")
if isinstance(info, dict) and 'content' in info:
print(f" 内容: {info['content']}")
else:
print(f" 内容: {info}")
print("\n✅ 测试完成")
except Exception as e:
print(f"\n❌ 测试失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
print("\n🧪 DeepAgents最小化测试")
print("="*80)
# 测试1单个Agent的文件操作
test_minimal_agent()
print("\n\n")
# 测试2主Agent和SubAgent的文件共享
test_agent_with_subagent()

237
tests/test_phase1_setup.py Normal file
View File

@ -0,0 +1,237 @@
"""
Phase 1 基础设施测试
测试项:
1. 依赖包导入
2. API密钥配置
3. LLM连接
4. 批量搜索工具
"""
import sys
import os
# 添加src目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
def test_imports():
"""测试所有必要的包是否能正确导入"""
print("=" * 60)
print("测试 1: 检查依赖包导入")
print("=" * 60)
try:
import deepagents
print("✓ deepagents 导入成功")
except ImportError as e:
print(f"✗ deepagents 导入失败: {e}")
return False
try:
import langchain
print("✓ langchain 导入成功")
except ImportError as e:
print(f"✗ langchain 导入失败: {e}")
return False
try:
import tavily
print("✓ tavily 导入成功")
except ImportError as e:
print(f"✗ tavily 导入失败: {e}")
return False
try:
from dotenv import load_dotenv
print("✓ python-dotenv 导入成功")
except ImportError as e:
print(f"✗ python-dotenv 导入失败: {e}")
return False
try:
import click
print("✓ click 导入成功")
except ImportError as e:
print(f"✗ click 导入失败: {e}")
return False
try:
from rich import print as rprint
print("✓ rich 导入成功")
except ImportError as e:
print(f"✗ rich 导入失败: {e}")
return False
print("\n所有依赖包导入成功!\n")
return True
def test_config():
"""测试配置是否正确"""
print("=" * 60)
print("测试 2: 检查配置")
print("=" * 60)
try:
from src.config import Config
print(f"LLM模型: {Config.LLM_MODEL}")
print(f"LLM温度: {Config.LLM_TEMPERATURE}")
print(f"最大Tokens: {Config.LLM_MAX_TOKENS}")
print(f"默认深度模式: {Config.DEFAULT_DEPTH}")
print(f"最大并行搜索数: {Config.MAX_PARALLEL_SEARCHES}")
print(f"搜索超时: {Config.SEARCH_TIMEOUT}")
# 检查API密钥
if Config.DASHSCOPE_API_KEY and Config.DASHSCOPE_API_KEY != "your_dashscope_api_key_here":
print("✓ DASHSCOPE_API_KEY 已配置")
else:
print("✗ DASHSCOPE_API_KEY 未配置或使用默认值")
print(" 请在.env文件中设置真实的API密钥")
return False
if Config.TAVILY_API_KEY and Config.TAVILY_API_KEY != "your_tavily_api_key_here":
print("✓ TAVILY_API_KEY 已配置")
else:
print("✗ TAVILY_API_KEY 未配置或使用默认值")
print(" 请在.env文件中设置真实的API密钥")
return False
print("\n配置检查通过!\n")
return True
except Exception as e:
print(f"✗ 配置检查失败: {e}\n")
return False
def test_llm_connection():
"""测试LLM连接"""
print("=" * 60)
print("测试 3: 检查LLM连接")
print("=" * 60)
try:
from src.config import Config
llm = Config.get_llm()
print(f"LLM实例创建成功: {llm.model_name}")
# 发送一个简单的测试消息
print("发送测试消息...")
response = llm.invoke("你好,请用一句话介绍你自己。")
print(f"LLM响应: {response.content[:100]}...")
print("\n✓ LLM连接测试成功\n")
return True
except Exception as e:
print(f"✗ LLM连接测试失败: {e}\n")
return False
def test_search_tools():
"""测试批量搜索工具"""
print("=" * 60)
print("测试 4: 检查批量搜索工具")
print("=" * 60)
try:
from src.tools.search_tools import batch_internet_search
# 测试并行搜索
test_queries = [
"Python programming",
"Machine learning basics",
"Web development tutorial"
]
print(f"执行 {len(test_queries)} 个并行搜索...")
print(f"查询: {test_queries}")
result = batch_internet_search.invoke({
"queries": test_queries,
"max_results_per_query": 3
})
print(f"\n搜索结果统计:")
print(f" 总查询数: {result['total_queries']}")
print(f" 成功查询: {result['successful_queries']}")
print(f" 失败查询: {result['failed_queries']}")
print(f" 总结果数: {result['total_results']}")
print(f" 去重后结果数: {result['unique_results']}")
if result['errors']:
print(f"\n错误信息:")
for error in result['errors']:
print(f" - {error}")
if result['success'] and result['unique_results'] > 0:
print(f"\n前3个搜索结果:")
for i, res in enumerate(result['results'][:3], 1):
print(f" {i}. {res.get('title', 'N/A')}")
print(f" URL: {res.get('url', 'N/A')}")
print(f" 得分: {res.get('score', 'N/A')}")
print("\n✓ 批量搜索工具测试成功!\n")
return True
else:
print("\n✗ 批量搜索工具测试失败:未返回有效结果\n")
return False
except Exception as e:
print(f"✗ 批量搜索工具测试失败: {e}\n")
import traceback
traceback.print_exc()
return False
def main():
"""运行所有测试"""
print("\n")
print("=" * 60)
print("Phase 1 基础设施测试")
print("=" * 60)
print("\n")
results = []
# 测试1: 导入检查
results.append(("依赖包导入", test_imports()))
# 测试2: 配置检查
results.append(("配置检查", test_config()))
# 测试3: LLM连接如果配置通过
if results[-1][1]:
results.append(("LLM连接", test_llm_connection()))
# 测试4: 搜索工具(如果配置通过)
if results[1][1]:
results.append(("批量搜索工具", test_search_tools()))
# 总结
print("=" * 60)
print("测试总结")
print("=" * 60)
for test_name, passed in results:
status = "✓ 通过" if passed else "✗ 失败"
print(f"{test_name}: {status}")
all_passed = all(result[1] for result in results)
print("\n" + "=" * 60)
if all_passed:
print("✓ 所有测试通过Phase 1 基础设施搭建完成。")
else:
print("✗ 部分测试失败,请检查配置和依赖。")
print("=" * 60 + "\n")
return all_passed
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

253
tests/test_subagents.py Normal file
View File

@ -0,0 +1,253 @@
"""
SubAgent配置测试
测试所有SubAgent配置是否符合DeepAgents框架规范
"""
import sys
import os
# 添加src目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
import pytest
from src.agents.subagents import (
get_subagent_configs,
validate_subagent_config,
get_validated_subagent_configs
)
class TestSubAgentConfigs:
"""SubAgent配置测试类"""
def test_subagent_count(self):
"""测试SubAgent数量"""
configs = get_subagent_configs()
assert len(configs) == 6, f"应该有6个SubAgent实际有{len(configs)}"
def test_required_fields(self):
"""测试所有必需字段是否存在"""
configs = get_subagent_configs()
required_fields = ["name", "description", "system_prompt"]
for config in configs:
for field in required_fields:
assert field in config, f"SubAgent {config.get('name', 'unknown')} 缺少必需字段: {field}"
def test_name_format(self):
"""测试name是否使用kebab-case格式"""
configs = get_subagent_configs()
for config in configs:
name = config["name"]
# 检查是否只包含小写字母和连字符
assert all(c.islower() or c == '-' for c in name), \
f"SubAgent name必须使用kebab-case格式: {name}"
# 不应该以连字符开始或结束
assert not name.startswith('-') and not name.endswith('-'), \
f"SubAgent name不应该以连字符开始或结束: {name}"
def test_system_prompt_not_empty(self):
"""测试system_prompt不为空"""
configs = get_subagent_configs()
for config in configs:
system_prompt = config.get("system_prompt", "")
assert system_prompt.strip(), \
f"SubAgent {config['name']} 的system_prompt不能为空"
# 检查system_prompt应该相当详细至少500字符
assert len(system_prompt) > 500, \
f"SubAgent {config['name']} 的system_prompt过短应该>500字符"
def test_no_prompt_field(self):
"""测试配置中不应该使用'prompt'字段(常见错误)"""
configs = get_subagent_configs()
for config in configs:
assert "prompt" not in config, \
f"SubAgent {config['name']} 使用了错误的字段'prompt',应该使用'system_prompt'"
def test_description_present(self):
"""测试description字段存在且有意义"""
configs = get_subagent_configs()
for config in configs:
description = config.get("description", "")
assert description.strip(), \
f"SubAgent {config['name']} 的description不能为空"
# 描述应该简洁10-100字符
assert 10 <= len(description) <= 200, \
f"SubAgent {config['name']} 的description长度不合适应该10-200字符"
def test_tools_field_type(self):
"""测试tools字段类型正确"""
configs = get_subagent_configs()
for config in configs:
if "tools" in config:
assert isinstance(config["tools"], list), \
f"SubAgent {config['name']} 的tools字段应该是列表"
def test_specific_subagent_names(self):
"""测试6个SubAgent的具体名称"""
configs = get_subagent_configs()
expected_names = {
"intent-analyzer",
"search-orchestrator",
"source-validator",
"content-analyzer",
"confidence-evaluator",
"report-generator"
}
actual_names = {config["name"] for config in configs}
assert actual_names == expected_names, \
f"SubAgent名称不匹配。期望: {expected_names}, 实际: {actual_names}"
def test_system_prompt_mentions_files(self):
"""测试system_prompt是否提到虚拟文件系统路径"""
configs = get_subagent_configs()
# 某些SubAgent应该在system_prompt中提到文件路径
file_related_agents = [
"intent-analyzer",
"search-orchestrator",
"source-validator",
"content-analyzer",
"confidence-evaluator",
"report-generator"
]
for config in configs:
if config["name"] in file_related_agents:
system_prompt = config["system_prompt"]
# 检查是否提到虚拟文件系统(以/开头的路径)
assert "/" in system_prompt, \
f"SubAgent {config['name']} 的system_prompt应该提到虚拟文件系统路径"
def test_search_orchestrator_has_tools(self):
"""测试search-orchestrator应该有搜索工具"""
configs = get_subagent_configs()
search_orchestrator = next(
(c for c in configs if c["name"] == "search-orchestrator"),
None
)
assert search_orchestrator is not None, "未找到search-orchestrator"
assert "tools" in search_orchestrator, "search-orchestrator应该有tools字段"
assert len(search_orchestrator["tools"]) > 0, \
"search-orchestrator应该至少有一个工具"
def test_validate_function(self):
"""测试validate_subagent_config函数"""
# 有效配置
valid_config = {
"name": "test-agent",
"description": "测试agent",
"system_prompt": "这是一个测试prompt"
}
assert validate_subagent_config(valid_config) == True
# 缺少必需字段
invalid_config = {
"name": "test-agent",
"description": "测试agent"
# 缺少system_prompt
}
with pytest.raises(ValueError, match="缺少必需字段"):
validate_subagent_config(invalid_config)
# 错误的name格式
invalid_name_config = {
"name": "TestAgent", # 应该是kebab-case
"description": "测试agent",
"system_prompt": "测试"
}
with pytest.raises(ValueError, match="kebab-case"):
validate_subagent_config(invalid_name_config)
def test_get_validated_configs(self):
"""测试get_validated_subagent_configs函数"""
configs = get_validated_subagent_configs()
assert len(configs) == 6, "应该返回6个经过验证的SubAgent配置"
def test_system_prompt_structure(self):
"""测试system_prompt是否有良好的结构"""
configs = get_subagent_configs()
for config in configs:
system_prompt = config["system_prompt"]
# 应该有清晰的任务说明
assert any(keyword in system_prompt for keyword in ["任务", "流程", "步骤"]), \
f"SubAgent {config['name']} 的system_prompt应该包含任务说明"
# 应该有输入输出说明
assert any(keyword in system_prompt for keyword in ["输入", "输出", "读取", "写入"]), \
f"SubAgent {config['name']} 的system_prompt应该包含输入输出说明"
def test_confidence_evaluator_mentions_formula(self):
"""测试confidence-evaluator是否提到置信度计算公式"""
configs = get_subagent_configs()
confidence_evaluator = next(
(c for c in configs if c["name"] == "confidence-evaluator"),
None
)
assert confidence_evaluator is not None
system_prompt = confidence_evaluator["system_prompt"]
# 应该提到公式和百分比
assert "50%" in system_prompt and "30%" in system_prompt and "20%" in system_prompt, \
"confidence-evaluator应该包含置信度计算公式50%+30%+20%"
def test_source_validator_mentions_tiers(self):
"""测试source-validator是否提到Tier分级"""
configs = get_subagent_configs()
source_validator = next(
(c for c in configs if c["name"] == "source-validator"),
None
)
assert source_validator is not None
system_prompt = source_validator["system_prompt"]
# 应该提到Tier 1-4
for tier in ["Tier 1", "Tier 2", "Tier 3", "Tier 4"]:
assert tier in system_prompt or tier.replace(" ", "") in system_prompt, \
f"source-validator应该包含{tier}分级说明"
def print_subagent_summary():
"""打印SubAgent配置摘要"""
print("\n" + "=" * 60)
print("SubAgent配置摘要")
print("=" * 60)
configs = get_subagent_configs()
for i, config in enumerate(configs, 1):
print(f"\n{i}. {config['name']}")
print(f" 描述: {config['description']}")
print(f" System Prompt长度: {len(config['system_prompt'])} 字符")
if "tools" in config:
print(f" 工具数量: {len(config['tools'])}")
else:
print(f" 工具数量: 0")
print("\n" + "=" * 60)
if __name__ == "__main__":
# 运行测试
print("运行SubAgent配置测试...\n")
# 打印摘要
print_subagent_summary()
# 使用pytest运行测试
pytest.main([__file__, "-v", "--tb=short"])