11 KiB
工作流提取详细指南
本指南提供通用的工作流提取算法,适用于各种架构模式。
⚠️ 重要约束
在调用 Task tool 时,必须在 prompt 开头包含以下约束:
⚠️ 重要约束:本次分析只返回文本结果,禁止生成任何文件(.md, .txt 等)。
所有 Mermaid 图表、清单、分析结论都应包含在你的文本回复中,不要使用 Write 或其他文件创建工具。
Explore agent 只返回文本结果,不要生成任何文件。
核心思想
不预设架构类型,而是从代码中提取执行流程特征。
算法概览
1. 定位入口点(CLI/Web/定时任务)
2. 提取函数调用链(递归追踪)
3. 识别业务函数(排除辅助函数)
4. 分析流程模式(顺序/分支/循环/并发)
5. 生成 Mermaid 图表
步骤 1: 定位入口点
1.1 CLI 应用
Python:
grep -rn "if __name__ == '__main__'" --include="*.py"
grep -rn "@click.command\|@click.group" --include="*.py"
grep -rn "argparse.ArgumentParser" --include="*.py"
示例:
# main.py:45
if __name__ == "__main__":
main() # ← 入口函数
Go:
grep -rn "func main()" --include="*.go"
1.2 Web 应用
FastAPI/Flask:
grep -rn "@app\.(get|post|put|delete)" --include="*.py"
grep -rn "@router\." --include="*.py"
示例:
# routes.py:42
@app.post("/api/research") # ← 入口点
def research_endpoint(query: str):
...
Express.js:
grep -rn "app\.(get|post)" --include="*.js"
grep -rn "router\." --include="*.js"
1.3 定时任务
Celery:
grep -rn "@celery.task\|@shared_task" --include="*.py"
Airflow:
grep -rn "DAG(" --include="*.py"
Cron/APScheduler:
grep -rn "@schedule\|@cron" --include="*.py"
1.4 消息消费者
RabbitMQ/Kafka:
grep -rn "basic_consume\|KafkaConsumer" --include="*.py"
示例:
# consumer.py:25
def callback(ch, method, properties, body): # ← 入口点
process_message(body)
步骤 2: 提取函数调用链
2.1 读取入口函数
从步骤 1 识别的入口点开始:
# 示例:routes.py:42
@app.post("/api/research")
def research_endpoint(query: str):
task = create_task(query) # 调用 1
results = executor.run(task) # 调用 2
return format_response(results) # 调用 3
提取调用清单:
research_endpoint
├── create_task
├── executor.run
└── format_response
2.2 递归追踪
读取 executor.run 的实现:
# executor.py:78
def run(self, task):
data = self.fetch_data(task) # 调用 2.1
processed = self.process(data) # 调用 2.2
return self.finalize(processed) # 调用 2.3
更新调用树:
research_endpoint
├── create_task
├── executor.run
│ ├── fetch_data
│ ├── process
│ └── finalize
└── format_response
2.3 控制递归深度
问题: 可能陷入无限递归
解决方案: 限制最大深度
MAX_DEPTH = 5 # 最多追踪 5 层
def trace_calls(func_name, depth=0):
if depth >= MAX_DEPTH:
return []
calls = extract_calls_from_function(func_name)
result = []
for call in calls:
result.append(call)
result.extend(trace_calls(call, depth + 1))
return result
步骤 3: 识别业务函数
3.1 排除辅助函数
规则:
| 函数特征 | 是否保留 |
|---|---|
私有函数 _helper() |
❌ 排除 |
工具函数 format_date() |
❌ 排除 |
| Getter/Setter | ❌ 排除 |
| 函数体 < 5 行 | ❌ 排除 |
| 包含业务关键词 | ✅ 保留 |
| 调用数据库/外部 API | ✅ 保留 |
| 处理核心数据模型 | ✅ 保留 |
3.2 业务关键词清单
BUSINESS_KEYWORDS = [
# 处理动词
"process", "handle", "execute", "run",
# CRUD 操作
"create", "update", "delete", "query", "fetch",
# 业务逻辑
"calculate", "analyze", "generate", "transform",
"search", "filter", "validate", "verify",
# 工作流
"orchestrate", "coordinate", "schedule"
]
示例判断:
✅ process_order() # 包含 "process"
✅ create_user() # 包含 "create"
✅ analyze_data() # 包含 "analyze"
❌ format_string() # 工具函数
❌ _internal_helper() # 私有函数
❌ get_config() # Getter
3.3 检测数据库/API 调用
数据库调用:
def create_user(data):
user = User(**data)
db.session.add(user) # ✅ 数据库操作
db.session.commit()
return user
外部 API 调用:
def fetch_weather(city):
response = requests.get( # ✅ 外部 API
f"https://api.weather.com/{city}"
)
return response.json()
搜索模式:
# 数据库
grep -n "db\.session\|query(\|execute(" file.py
# HTTP 请求
grep -n "requests\.\|httpx\.\|fetch(" file.py
步骤 4: 分析流程模式
4.1 顺序流程
代码特征:
def process():
step1()
step2()
step3()
识别: 连续的函数调用,无分支
生成图表: flowchart TD (从上到下)
4.2 条件分支
代码特征:
def process(data):
if validate(data):
path_a()
else:
path_b()
识别: if/else, match/case, 三元运算符
生成图表: flowchart TD (带菱形决策节点)
4.3 循环优化
代码特征:
def optimize(data):
while not is_good_enough(data):
data = improve(data)
return data
识别: while + 条件判断
生成图表: flowchart TD (带循环边)
4.4 状态机
代码特征:
state = "init"
if state == "init":
state = "processing"
elif state == "processing":
if condition:
state = "done"
else:
state = "error"
识别: 状态变量 + 状态转换逻辑
生成图表: stateDiagram-v2
4.5 并发编排
Python asyncio:
results = await asyncio.gather(
task1(),
task2(),
task3()
)
JavaScript Promise.all:
const results = await Promise.all([
fetchUser(),
fetchOrders(),
fetchProducts()
]);
识别: asyncio.gather, Promise.all, WaitGroup
生成图表: graph TB + subgraph (并行任务分组)
4.6 图编排(LangGraph)
代码特征:
graph = StateGraph(State)
graph.add_node("a", func_a)
graph.add_node("b", func_b)
graph.add_edge("a", "b")
graph.add_conditional_edges("b", router, {
"continue": "a",
"end": END
})
识别: add_node, add_edge, add_conditional_edges
生成图表: stateDiagram-v2
步骤 5: 生成 Mermaid 图表
⚠️ Mermaid 语法约束(版本 11.x):
- stateDiagram-v2: 禁用
--分隔符,不支持<br/> - sequenceDiagram:
alt/loop/par块必须正确配对end - 所有类型: 使用
<br/>换行(stateDiagram-v2 除外)
5.1 选择图表类型
def select_diagram_type(flow_pattern):
if flow_pattern == "state_machine":
return "stateDiagram-v2"
elif flow_pattern == "concurrent":
return "graph TB" # 带 subgraph
elif flow_pattern == "sequential_with_conditions":
return "flowchart TD"
elif flow_pattern == "linear_pipeline":
return "flowchart LR"
else:
return "graph TB" # 默认
5.2 生成状态图(Multi-Agent)
输入数据:
nodes = [
{"name": "researcher", "file": "agents/nodes.py", "line": 45},
{"name": "writer", "file": "agents/nodes.py", "line": 78}
]
edges = [
{"from": "researcher", "to": "writer"},
{
"from": "writer",
"to": "researcher",
"condition": "quality < 7"
}
]
生成代码:
def generate_state_diagram(nodes, edges):
mermaid = "stateDiagram-v2\n"
mermaid += f" [*] --> {nodes[0]['name']}\n"
for edge in edges:
if "condition" in edge:
mermaid += f" {edge['from']} --> {edge['to']}: {edge['condition']}\n"
else:
mermaid += f" {edge['from']} --> {edge['to']}\n"
# 添加注释
for node in nodes:
mermaid += f"\n note right of {node['name']}\n"
mermaid += f" {node['file']}:{node['line']}\n"
mermaid += f" end note\n"
return mermaid
5.3 生成流程图(顺序+分支)
输入数据:
steps = [
{"name": "extract", "type": "process"},
{"name": "validate", "type": "decision"},
{"name": "transform", "type": "process"},
{"name": "load", "type": "process"}
]
connections = [
{"from": "extract", "to": "validate"},
{"from": "validate", "to": "transform", "condition": "valid"},
{"from": "validate", "to": "error_handler", "condition": "invalid"}
]
生成代码:
def generate_flowchart(steps, connections):
mermaid = "flowchart TD\n"
# 定义节点
for step in steps:
if step["type"] == "decision":
mermaid += f" {step['name']}{{{step['name']}}}\n" # 菱形
else:
mermaid += f" {step['name']}[{step['name']}]\n" # 矩形
# 连接
for conn in connections:
if "condition" in conn:
mermaid += f" {conn['from']} -->|{conn['condition']}| {conn['to']}\n"
else:
mermaid += f" {conn['from']} --> {conn['to']}\n"
return mermaid
完整示例
示例代码
# routes.py:42
@app.post("/api/research")
def research_endpoint(query: str):
task = create_research_task(query)
results = execute_research(task)
return results
# research.py:15
def execute_research(task):
# 并发搜索
search_results = asyncio.gather(
search_google(task.query),
search_arxiv(task.query)
)
# 生成报告
if len(search_results) > 5:
report = generate_detailed_report(search_results)
else:
report = generate_summary(search_results)
return report
提取结果
入口点: research_endpoint (routes.py:42)
调用链:
research_endpoint
├── create_research_task
└── execute_research
├── search_google (并发)
├── search_arxiv (并发)
└── generate_detailed_report OR generate_summary (条件)
流程模式: 顺序 + 并发 + 条件分支
生成 Mermaid 图
flowchart TD
Start([API 请求]) --> CreateTask[创建研究任务]
CreateTask --> Concurrent{并发搜索}
Concurrent --> Google[搜索 Google]
Concurrent --> ArXiv[搜索 ArXiv]
Google --> Merge[合并结果]
ArXiv --> Merge
Merge --> Decision{结果数量}
Decision -->|> 5 篇| Detailed[生成详细报告]
Decision -->|≤ 5 篇| Summary[生成摘要]
Detailed --> End([返回结果])
Summary --> End
最佳实践
- 先广度后深度 - 先了解整体流程
- 限制递归深度 - 避免无限循环
- 过滤辅助函数 - 只关注业务逻辑
- 标注代码位置 - 便于验证
- 可视化优先 - 图表比文字更直观