# 工作流提取详细指南 本指南提供通用的工作流提取算法,适用于各种架构模式。 ## ⚠️ 重要约束 **在调用 Task tool 时,必须在 prompt 开头包含以下约束:** ``` ⚠️ 重要约束:本次分析只返回文本结果,禁止生成任何文件(.md, .txt 等)。 所有 Mermaid 图表、清单、分析结论都应包含在你的文本回复中,不要使用 Write 或其他文件创建工具。 ``` **Explore agent 只返回文本结果,不要生成任何文件。** --- ## 核心思想 **不预设架构类型,而是从代码中提取执行流程特征。** --- ## 算法概览 ``` 1. 定位入口点(CLI/Web/定时任务) 2. 提取函数调用链(递归追踪) 3. 识别业务函数(排除辅助函数) 4. 分析流程模式(顺序/分支/循环/并发) 5. 生成 Mermaid 图表 ``` --- ## 步骤 1: 定位入口点 ### 1.1 CLI 应用 **Python**: ```bash grep -rn "if __name__ == '__main__'" --include="*.py" grep -rn "@click.command\|@click.group" --include="*.py" grep -rn "argparse.ArgumentParser" --include="*.py" ``` **示例**: ```python # main.py:45 if __name__ == "__main__": main() # ← 入口函数 ``` **Go**: ```bash grep -rn "func main()" --include="*.go" ``` --- ### 1.2 Web 应用 **FastAPI/Flask**: ```bash grep -rn "@app\.(get|post|put|delete)" --include="*.py" grep -rn "@router\." --include="*.py" ``` **示例**: ```python # routes.py:42 @app.post("/api/research") # ← 入口点 def research_endpoint(query: str): ... ``` **Express.js**: ```bash grep -rn "app\.(get|post)" --include="*.js" grep -rn "router\." --include="*.js" ``` --- ### 1.3 定时任务 **Celery**: ```bash grep -rn "@celery.task\|@shared_task" --include="*.py" ``` **Airflow**: ```bash grep -rn "DAG(" --include="*.py" ``` **Cron/APScheduler**: ```bash grep -rn "@schedule\|@cron" --include="*.py" ``` --- ### 1.4 消息消费者 **RabbitMQ/Kafka**: ```bash grep -rn "basic_consume\|KafkaConsumer" --include="*.py" ``` **示例**: ```python # consumer.py:25 def callback(ch, method, properties, body): # ← 入口点 process_message(body) ``` --- ## 步骤 2: 提取函数调用链 ### 2.1 读取入口函数 从步骤 1 识别的入口点开始: ```python # 示例:routes.py:42 @app.post("/api/research") def research_endpoint(query: str): task = create_task(query) # 调用 1 results = executor.run(task) # 调用 2 return format_response(results) # 调用 3 ``` **提取调用清单**: ``` research_endpoint ├── create_task ├── executor.run └── format_response ``` --- ### 2.2 递归追踪 读取 `executor.run` 的实现: ```python # executor.py:78 def run(self, task): data = self.fetch_data(task) # 调用 2.1 processed = self.process(data) # 调用 2.2 return self.finalize(processed) # 调用 2.3 ``` **更新调用树**: ``` research_endpoint ├── create_task ├── executor.run │ ├── fetch_data │ ├── process │ └── finalize └── format_response ``` --- ### 2.3 控制递归深度 **问题**: 可能陷入无限递归 **解决方案**: 限制最大深度 ```python MAX_DEPTH = 5 # 最多追踪 5 层 def trace_calls(func_name, depth=0): if depth >= MAX_DEPTH: return [] calls = extract_calls_from_function(func_name) result = [] for call in calls: result.append(call) result.extend(trace_calls(call, depth + 1)) return result ``` --- ## 步骤 3: 识别业务函数 ### 3.1 排除辅助函数 **规则**: | 函数特征 | 是否保留 | |---------|---------| | 私有函数 `_helper()` | ❌ 排除 | | 工具函数 `format_date()` | ❌ 排除 | | Getter/Setter | ❌ 排除 | | 函数体 < 5 行 | ❌ 排除 | | 包含业务关键词 | ✅ 保留 | | 调用数据库/外部 API | ✅ 保留 | | 处理核心数据模型 | ✅ 保留 | --- ### 3.2 业务关键词清单 ```python BUSINESS_KEYWORDS = [ # 处理动词 "process", "handle", "execute", "run", # CRUD 操作 "create", "update", "delete", "query", "fetch", # 业务逻辑 "calculate", "analyze", "generate", "transform", "search", "filter", "validate", "verify", # 工作流 "orchestrate", "coordinate", "schedule" ] ``` **示例判断**: ```python ✅ process_order() # 包含 "process" ✅ create_user() # 包含 "create" ✅ analyze_data() # 包含 "analyze" ❌ format_string() # 工具函数 ❌ _internal_helper() # 私有函数 ❌ get_config() # Getter ``` --- ### 3.3 检测数据库/API 调用 **数据库调用**: ```python def create_user(data): user = User(**data) db.session.add(user) # ✅ 数据库操作 db.session.commit() return user ``` **外部 API 调用**: ```python def fetch_weather(city): response = requests.get( # ✅ 外部 API f"https://api.weather.com/{city}" ) return response.json() ``` **搜索模式**: ```bash # 数据库 grep -n "db\.session\|query(\|execute(" file.py # HTTP 请求 grep -n "requests\.\|httpx\.\|fetch(" file.py ``` --- ## 步骤 4: 分析流程模式 ### 4.1 顺序流程 **代码特征**: ```python def process(): step1() step2() step3() ``` **识别**: 连续的函数调用,无分支 **生成图表**: `flowchart TD` (从上到下) --- ### 4.2 条件分支 **代码特征**: ```python def process(data): if validate(data): path_a() else: path_b() ``` **识别**: `if/else`, `match/case`, 三元运算符 **生成图表**: `flowchart TD` (带菱形决策节点) --- ### 4.3 循环优化 **代码特征**: ```python def optimize(data): while not is_good_enough(data): data = improve(data) return data ``` **识别**: `while` + 条件判断 **生成图表**: `flowchart TD` (带循环边) --- ### 4.4 状态机 **代码特征**: ```python state = "init" if state == "init": state = "processing" elif state == "processing": if condition: state = "done" else: state = "error" ``` **识别**: 状态变量 + 状态转换逻辑 **生成图表**: `stateDiagram-v2` --- ### 4.5 并发编排 **Python asyncio**: ```python results = await asyncio.gather( task1(), task2(), task3() ) ``` **JavaScript Promise.all**: ```javascript const results = await Promise.all([ fetchUser(), fetchOrders(), fetchProducts() ]); ``` **识别**: `asyncio.gather`, `Promise.all`, `WaitGroup` **生成图表**: `graph TB` + subgraph (并行任务分组) --- ### 4.6 图编排(LangGraph) **代码特征**: ```python graph = StateGraph(State) graph.add_node("a", func_a) graph.add_node("b", func_b) graph.add_edge("a", "b") graph.add_conditional_edges("b", router, { "continue": "a", "end": END }) ``` **识别**: `add_node`, `add_edge`, `add_conditional_edges` **生成图表**: `stateDiagram-v2` --- ## 步骤 5: 生成 Mermaid 图表 **⚠️ Mermaid 语法约束(版本 11.x)**: - **stateDiagram-v2**: 禁用 `--` 分隔符,不支持 `
` - **sequenceDiagram**: `alt/loop/par` 块必须正确配对 `end` - **所有类型**: 使用 `
` 换行(stateDiagram-v2 除外) ### 5.1 选择图表类型 ```python def select_diagram_type(flow_pattern): if flow_pattern == "state_machine": return "stateDiagram-v2" elif flow_pattern == "concurrent": return "graph TB" # 带 subgraph elif flow_pattern == "sequential_with_conditions": return "flowchart TD" elif flow_pattern == "linear_pipeline": return "flowchart LR" else: return "graph TB" # 默认 ``` --- ### 5.2 生成状态图(Multi-Agent) **输入数据**: ```python nodes = [ {"name": "researcher", "file": "agents/nodes.py", "line": 45}, {"name": "writer", "file": "agents/nodes.py", "line": 78} ] edges = [ {"from": "researcher", "to": "writer"}, { "from": "writer", "to": "researcher", "condition": "quality < 7" } ] ``` **生成代码**: ```python def generate_state_diagram(nodes, edges): mermaid = "stateDiagram-v2\n" mermaid += f" [*] --> {nodes[0]['name']}\n" for edge in edges: if "condition" in edge: mermaid += f" {edge['from']} --> {edge['to']}: {edge['condition']}\n" else: mermaid += f" {edge['from']} --> {edge['to']}\n" # 添加注释 for node in nodes: mermaid += f"\n note right of {node['name']}\n" mermaid += f" {node['file']}:{node['line']}\n" mermaid += f" end note\n" return mermaid ``` --- ### 5.3 生成流程图(顺序+分支) **输入数据**: ```python steps = [ {"name": "extract", "type": "process"}, {"name": "validate", "type": "decision"}, {"name": "transform", "type": "process"}, {"name": "load", "type": "process"} ] connections = [ {"from": "extract", "to": "validate"}, {"from": "validate", "to": "transform", "condition": "valid"}, {"from": "validate", "to": "error_handler", "condition": "invalid"} ] ``` **生成代码**: ```python def generate_flowchart(steps, connections): mermaid = "flowchart TD\n" # 定义节点 for step in steps: if step["type"] == "decision": mermaid += f" {step['name']}{{{step['name']}}}\n" # 菱形 else: mermaid += f" {step['name']}[{step['name']}]\n" # 矩形 # 连接 for conn in connections: if "condition" in conn: mermaid += f" {conn['from']} -->|{conn['condition']}| {conn['to']}\n" else: mermaid += f" {conn['from']} --> {conn['to']}\n" return mermaid ``` --- ## 完整示例 ### 示例代码 ```python # routes.py:42 @app.post("/api/research") def research_endpoint(query: str): task = create_research_task(query) results = execute_research(task) return results # research.py:15 def execute_research(task): # 并发搜索 search_results = asyncio.gather( search_google(task.query), search_arxiv(task.query) ) # 生成报告 if len(search_results) > 5: report = generate_detailed_report(search_results) else: report = generate_summary(search_results) return report ``` --- ### 提取结果 **入口点**: `research_endpoint` (routes.py:42) **调用链**: ``` research_endpoint ├── create_research_task └── execute_research ├── search_google (并发) ├── search_arxiv (并发) └── generate_detailed_report OR generate_summary (条件) ``` **流程模式**: 顺序 + 并发 + 条件分支 --- ### 生成 Mermaid 图 ```mermaid flowchart TD Start([API 请求]) --> CreateTask[创建研究任务] CreateTask --> Concurrent{并发搜索} Concurrent --> Google[搜索 Google] Concurrent --> ArXiv[搜索 ArXiv] Google --> Merge[合并结果] ArXiv --> Merge Merge --> Decision{结果数量} Decision -->|> 5 篇| Detailed[生成详细报告] Decision -->|≤ 5 篇| Summary[生成摘要] Detailed --> End([返回结果]) Summary --> End ``` ## 最佳实践 1. **先广度后深度** - 先了解整体流程 2. **限制递归深度** - 避免无限循环 3. **过滤辅助函数** - 只关注业务逻辑 4. **标注代码位置** - 便于验证 5. **可视化优先** - 图表比文字更直观