Files
AIEC-RAG---/AIEC-RAG/retriver/langgraph/langchain_components.py
2025-09-25 10:33:37 +08:00

952 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
LangChain组件实现
包含LLM、提示词模板、输出解析器等
"""
import os
import sys
import json
from typing import List, Dict, Any, Optional, Union
from dataclasses import dataclass
from langchain_core.language_models import BaseLLM
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import BaseOutputParser, PydanticOutputParser
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.documents import Document
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
# 尝试导入LangSmith的traceable装饰器
try:
from langsmith import traceable
LANGSMITH_AVAILABLE = True
except ImportError:
def traceable(name=None):
def decorator(func):
return func
return decorator
LANGSMITH_AVAILABLE = False
# 添加路径
project_root = os.path.join(os.path.dirname(__file__), '..', '..')
sys.path.append(project_root)
from retriver.langgraph.dashscope_llm import DashScopeLLM
from prompt_loader import get_prompt_loader
class SufficiencyCheckResult(BaseModel):
"""充分性检查结果"""
is_sufficient: bool = Field(description="是否足够回答用户查询")
confidence: float = Field(description="置信度0-1之间", ge=0, le=1)
reason: str = Field(description="判断理由")
sub_queries: Optional[List[str]] = Field(default=None, description="如果不充分,生成的子查询列表")
class QueryComplexityResult(BaseModel):
"""查询复杂度判断结果"""
is_complex: bool = Field(description="查询是否复杂")
complexity_level: str = Field(description="复杂度级别simple/complex")
confidence: float = Field(description="置信度0-1之间", ge=0, le=1)
reason: str = Field(description="判断理由")
class OneAPILLM(BaseLLM):
"""
LangChain包装的DashScope LLM保持原有接口兼容性
直接使用阿里云DashScope原生API
支持流式输出
"""
# Pydantic字段声明
oneapi_generator: Any = Field(default=None, description="DashScope LLM实例", exclude=True)
def __init__(self, dashscope_llm: DashScopeLLM, **kwargs):
super().__init__(**kwargs)
self.oneapi_generator = dashscope_llm # 保持接口兼容性
def _generate(
self,
messages: List[str],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
):
"""生成响应"""
# 简化处理,取第一个消息
message = messages[0] if messages else ""
# 直接调用DashScope LLM
try:
response_text = self.oneapi_generator._call_api(
message,
stop=stop,
**kwargs
)
except Exception as e:
print(f"[ERROR] DashScope API调用失败: {e}")
response_text = f"API调用失败: {str(e)}"
from langchain_core.outputs import LLMResult, Generation
# 获取Token使用信息
token_usage = getattr(self.oneapi_generator, 'last_token_usage', {})
total_usage = getattr(self.oneapi_generator, 'total_token_usage', {})
# 打印Token信息到控制台
if token_usage:
print(f"[?] LLM调用Token消耗: 输入{token_usage.get('prompt_tokens', 0)}, "
f"输出{token_usage.get('completion_tokens', 0)}, "
f"总计{token_usage.get('total_tokens', 0)} "
f"[累计: {total_usage.get('total_tokens', 0)}]")
result = LLMResult(
generations=[[Generation(text=response_text)]],
llm_output={
"model_name": self.oneapi_generator.model_name,
"token_usage": {
"current_call": token_usage,
"cumulative_total": total_usage,
"prompt_tokens": token_usage.get('prompt_tokens', 0),
"completion_tokens": token_usage.get('completion_tokens', 0),
"total_tokens": token_usage.get('total_tokens', 0)
}
}
)
return result
def _llm_type(self) -> str:
return "oneapi_llm"
def invoke(
self,
input: Any,
config: Optional[dict] = None,
**kwargs: Any
) -> Any:
"""
统一的调用接口,支持流式和非流式
Args:
input: 输入文本
config: 配置字典包含stream_callback等
**kwargs: 其他参数
Returns:
生成的文本或LLMResult对象
"""
# 直接委托给底层的DashScopeLLM的invoke方法
return self.oneapi_generator.invoke(input, config=config, **kwargs)
@property
def _identifying_params(self) -> Dict[str, Any]:
return {
"model_name": self.oneapi_generator.model_name,
"api_url": getattr(self.oneapi_generator, 'api_url', 'dashscope')
}
def stream(self, prompt: str, **kwargs):
"""
流式生成文本
Args:
prompt: 输入提示词
**kwargs: 其他参数
Yields:
每个token字符串
"""
# 直接调用底层DashScope的流式方法
return self.oneapi_generator.stream(prompt, **kwargs)
class SufficiencyCheckParser(BaseOutputParser[SufficiencyCheckResult]):
"""充分性检查结果解析器"""
def parse(self, text: str) -> SufficiencyCheckResult:
"""解析LLM输出为结构化结果"""
try:
# 清理markdown代码块标记
cleaned_text = text.strip()
if cleaned_text.startswith('```json'):
cleaned_text = cleaned_text[7:] # 移除 ```json
elif cleaned_text.startswith('```'):
cleaned_text = cleaned_text[3:] # 移除 ```
if cleaned_text.endswith('```'):
cleaned_text = cleaned_text[:-3] # 移除结尾的 ```
cleaned_text = cleaned_text.strip()
# 尝试解析JSON
data = json.loads(cleaned_text)
return SufficiencyCheckResult(**data)
except (json.JSONDecodeError, ValueError):
# 如果JSON解析失败使用规则解析
return self._rule_based_parse(text)
def _rule_based_parse(self, text: str) -> SufficiencyCheckResult:
"""基于规则的解析"""
text_lower = text.lower().strip()
# 判断是否充分
is_sufficient = any(keyword in text_lower for keyword in [
"充分", "足够", "sufficient", "enough", "adequate"
]) and not any(keyword in text_lower for keyword in [
"不充分", "不足够", "insufficient", "not enough", "inadequate"
])
# 提取置信度
confidence = 0.7 # 默认置信度
# 提取理由
reason = text[:200] if len(text) <= 200 else text[:200] + "..."
# 提取子查询
sub_queries = None
if not is_sufficient:
# 简单提取子查询(这里可以更复杂的实现)
lines = text.split('\n')
queries = []
for line in lines:
if '?' in line and len(line.strip()) > 10:
queries.append(line.strip())
sub_queries = queries[:2] if queries else None
return SufficiencyCheckResult(
is_sufficient=is_sufficient,
confidence=confidence,
reason=reason,
sub_queries=sub_queries
)
class QueryComplexityParser(BaseOutputParser[QueryComplexityResult]):
"""查询复杂度判断结果解析器"""
def parse(self, text: str) -> QueryComplexityResult:
"""解析LLM输出为结构化结果"""
try:
# 清理markdown代码块标记
cleaned_text = text.strip()
if cleaned_text.startswith('```json'):
cleaned_text = cleaned_text[7:] # 移除 ```json
elif cleaned_text.startswith('```'):
cleaned_text = cleaned_text[3:] # 移除 ```
if cleaned_text.endswith('```'):
cleaned_text = cleaned_text[:-3] # 移除结尾的 ```
cleaned_text = cleaned_text.strip()
# 尝试解析JSON
data = json.loads(cleaned_text)
return QueryComplexityResult(**data)
except (json.JSONDecodeError, ValueError):
# 如果JSON解析失败使用规则解析
return self._rule_based_parse(text)
def _rule_based_parse(self, text: str) -> QueryComplexityResult:
"""基于规则的解析"""
text_lower = text.lower().strip()
# 判断是否复杂
is_complex = any(keyword in text_lower for keyword in [
"复杂", "complex", "推理", "多步", "关联", "综合", "分析"
]) or any(keyword in text_lower for keyword in [
"需要", "require", "关系", "连接", "因果", "比较"
])
# 如果包含简单标识,则不复杂
if any(keyword in text_lower for keyword in [
"简单", "simple", "直接", "单一", "基础"
]):
is_complex = False
complexity_level = "complex" if is_complex else "simple"
confidence = 0.8 # 默认置信度
reason = text[:200] if len(text) <= 200 else text[:200] + "..."
return QueryComplexityResult(
is_complex=is_complex,
complexity_level=complexity_level,
confidence=confidence,
reason=reason
)
@dataclass
class ConceptExplorationResult:
"""概念探索结果数据类"""
selected_concept: str
exploration_reason: str
confidence: float
expected_knowledge: str
@dataclass
class ConceptContinueResult:
"""概念探索继续结果数据类"""
selected_node: str
selected_relation: str
exploration_reason: str
confidence: float
expected_knowledge: str
@dataclass
class ConceptSufficiencyResult:
"""概念探索充分性检查结果数据类"""
is_sufficient: bool
confidence: float
reason: str
missing_aspects: List[str]
coverage_score: float
@dataclass
class ConceptSubQueryResult:
"""概念探索子查询生成结果数据类"""
sub_query: str
focus_aspects: List[str]
expected_improvements: str
confidence: float
class ConceptExplorationParser:
"""概念探索初始选择解析器"""
def parse(self, text: str) -> ConceptExplorationResult:
"""解析概念探索选择结果"""
try:
data = json.loads(text.strip())
return ConceptExplorationResult(
selected_concept=data.get("selected_concept", ""),
exploration_reason=data.get("exploration_reason", ""),
confidence=float(data.get("confidence", 0.5)),
expected_knowledge=data.get("expected_knowledge", "")
)
except (json.JSONDecodeError, ValueError, KeyError) as e:
# 如果解析失败,返回默认结果
return ConceptExplorationResult(
selected_concept="",
exploration_reason=f"解析失败: {str(e)}",
confidence=0.1,
expected_knowledge=""
)
class ConceptContinueParser:
"""概念探索继续解析器"""
def parse(self, text: str) -> ConceptContinueResult:
"""解析概念探索继续结果"""
try:
data = json.loads(text.strip())
return ConceptContinueResult(
selected_node=data.get("selected_node", ""),
selected_relation=data.get("selected_relation", ""),
exploration_reason=data.get("exploration_reason", ""),
confidence=float(data.get("confidence", 0.5)),
expected_knowledge=data.get("expected_knowledge", "")
)
except (json.JSONDecodeError, ValueError, KeyError) as e:
# 如果解析失败,返回默认结果
return ConceptContinueResult(
selected_node="",
selected_relation="",
exploration_reason=f"解析失败: {str(e)}",
confidence=0.1,
expected_knowledge=""
)
class ConceptSufficiencyParser:
"""概念探索充分性检查解析器"""
def parse(self, text: str) -> ConceptSufficiencyResult:
"""解析概念探索充分性检查结果"""
try:
data = json.loads(text.strip())
return ConceptSufficiencyResult(
is_sufficient=bool(data.get("is_sufficient", False)),
confidence=float(data.get("confidence", 0.5)),
reason=data.get("reason", ""),
missing_aspects=data.get("missing_aspects", []),
coverage_score=float(data.get("coverage_score", 0.0))
)
except (json.JSONDecodeError, ValueError, KeyError) as e:
# 如果解析失败,返回不充分的默认结果
return ConceptSufficiencyResult(
is_sufficient=False,
confidence=0.1,
reason=f"解析失败: {str(e)}",
missing_aspects=["解析错误"],
coverage_score=0.0
)
class ConceptSubQueryParser:
"""概念探索子查询生成解析器"""
def parse(self, text: str) -> ConceptSubQueryResult:
"""解析概念探索子查询生成结果"""
try:
data = json.loads(text.strip())
return ConceptSubQueryResult(
sub_query=data.get("sub_query", ""),
focus_aspects=data.get("focus_aspects", []),
expected_improvements=data.get("expected_improvements", ""),
confidence=float(data.get("confidence", 0.5))
)
except (json.JSONDecodeError, ValueError, KeyError) as e:
# 如果解析失败,返回默认结果
return ConceptSubQueryResult(
sub_query="",
focus_aspects=["解析错误"],
expected_improvements=f"解析失败: {str(e)}",
confidence=0.1
)
# 获取全局提示词加载器
_prompt_loader = get_prompt_loader()
# 动态加载提示词模板
QUERY_COMPLEXITY_CHECK_PROMPT = _prompt_loader.get_prompt_template('query_complexity_check')
# 保留原始定义作为备份(已注释)
'''
QUERY_COMPLEXITY_CHECK_PROMPT = PromptTemplate(
input_variables=["query"],
template="""
作为一个智能查询分析助手,请分析用户查询的复杂度,判断该查询是否需要生成多方面的多个子查询来回答。
用户查询:{query}
请根据以下标准判断查询复杂度:
【复杂查询Complex特征 - 优先判断】:
1. **多问句查询**:包含多个问号(?),涉及多个不同的问题或主题
2. **跨领域查询**:涉及多个不同领域或行业的知识(如金融+技术+风险管理等)
3. **复合问题**:一个查询中包含多个子查询,即使每个子查询本身较简单
4. **关系型查询**:询问实体间的关系、比较、关联等
5. **因果推理**:询问原因、结果、影响等
6. **综合分析**:需要综合多个信息源进行分析
7. **推理链查询**:需要通过知识图谱的路径推理才能回答
8. **列表型查询**:要求列举多个项目或要素的问题
【简单查询Simple特征】
1. **单一问句**:只包含一个问号,聚焦于单一主题
2. **单领域查询**:仅涉及一个明确的领域或概念
3. **直接定义查询**:询问单一概念的定义、特点、属性等
4. **单一实体信息查询**:询问某个具体事物的基本信息
5. **可以通过文档中的连续文本段落直接回答的单一问题**
请以JSON格式返回判断结果
{{
"is_complex": false, // true表示复杂查询false表示简单查询
"complexity_level": "simple", // "simple""complex"
"confidence": 0.9, // 置信度0-1之间
"reason": "这是一个复杂查询,需要生成多方面的个子查询来回答..."
}}
请确保返回有效的JSON格式。
"""
)
'''
# 动态加载充分性检查提示词
SUFFICIENCY_CHECK_PROMPT = _prompt_loader.get_prompt_template('sufficiency_check')
# 保留原始定义作为备份(已注释)
'''
SUFFICIENCY_CHECK_PROMPT = PromptTemplate(
input_variables=["query", "passages", "decomposed_sub_queries"],
template="""
作为一个智能问答助手,请判断仅从给定的信息是否已经足够回答用户的查询。
用户查询:{query}
已生成的子查询:{decomposed_sub_queries}
检索到的信息(包括原始查询和子查询的结果):
{passages}
请分析这些信息是否包含足够的内容来完整回答用户的查询。注意检索结果包含两部分:
1. 【事件信息】- 来自知识图谱的事件节点,包含事件描述和上下文
2. 【段落信息】- 来自文档的段落内容,包含详细的文本描述
如果信息充分请返回JSON格式
{{
"is_sufficient": true,
"confidence": 0.9,
"reason": "事件信息和段落信息包含了回答查询所需的关键内容..."
}}
如果信息不充分请返回JSON格式并详细说明缺失的信息
{{
"is_sufficient": false,
"confidence": 0.5,
"reason": "检索信息缺少某些关键内容具体包括1) 缺少XXX的详细描述2) 缺少XXX的具体实例3) 缺少XXX的应用场景等..."
}}
请确保返回有效的JSON格式。
"""
)
'''
# 动态加载查询分解提示词
QUERY_DECOMPOSITION_PROMPT = _prompt_loader.get_prompt_template('query_decomposition')
# 保留原始定义作为备份(已注释)
'''
QUERY_DECOMPOSITION_PROMPT = PromptTemplate(
input_variables=["original_query"],
template="""
你是一个专业的查询分解助手。你的任务是将用户的复合查询分解为2个独立的、适合向量检索的子查询。
用户原始查询:{original_query}
【重要】向量检索特点:
- 向量检索通过语义相似度匹配,找到与查询最相关的文档段落
- 每个子查询应该聚焦一个明确的主题或概念
- 子查询应该是完整的、可独立检索的问题
【分解策略】:
1. **识别查询结构**
- 仔细查看查询中是否有多个问号()分隔的不同问题
- 查找连接词:"""以及""还有""另外"
- 查找标点符号:句号(。)、分号()等分隔符
2. **按主题分解**
- 如果查询包含多个独立主题,将其分解为独立的问题
- 每个子查询保持完整性,包含必要的上下文信息
【示例】:
输入:"混沌工程是什么?混沌工程的基础建设是由什么整合的?"
分析:这是两个关于混沌工程的不同问题
输出:["混沌工程是什么", "混沌工程的基础建设是由什么整合的"]
输入:"2023年某年度报告中提到存在数据安全问题的支付平台可能涉及哪些外部技术组件混沌工程的定义是什么"
分析:这包含两个完全不同的主题
输出:["2023年某年度报告中提到存在数据安全问题的支付平台可能涉及哪些外部技术组件", "混沌工程的定义是什么"]
输入:"什么是人工智能?它的主要应用有哪些?"
分析:两个相关但独立的问题
输出:["什么是人工智能", "人工智能的主要应用有哪些"]
输入:"区块链技术的优势和挑战是什么?"
分析:单一主题但包含两个方面
输出:["区块链技术的优势是什么", "区块链技术面临哪些挑战"]
【要求】:
1. [NOTE] 必须使用自然语言,类似人类会问的问题
2. [ERROR] 禁止使用SQL语句、代码或技术查询语法
3. [SEARCH] 严格按照查询的自然分割点进行分解
4. [?] 每个子查询必须是完整的自然语言问题
5. [BLOCKED] 绝不要添加"详细信息""相关内容""补充信息"等后缀
6. [INFO] 保持原始查询中的所有关键信息(时间、地点、对象等)
7. [TARGET] 确保两个子查询可以独立进行向量检索
请严格按照JSON格式返回自然语言查询
{{
"sub_queries": ["自然语言子查询1", "自然语言子查询2"]
}}
"""
)
'''
# 动态加载子查询生成提示词
SUB_QUERY_GENERATION_PROMPT = _prompt_loader.get_prompt_template('sub_query_generation')
# 保留原始定义作为备份(已注释)
'''
SUB_QUERY_GENERATION_PROMPT = PromptTemplate(
input_variables=["original_query", "existing_passages", "previous_sub_queries", "insufficiency_reason"],
template="""
基于用户的原始查询、已有的检索结果和充分性检查反馈生成2个相关的子查询来获取缺失信息。
原始查询:{original_query}
之前生成的子查询:{previous_sub_queries}
已有检索结果(包含事件信息和段落信息):
{existing_passages}
充分性检查反馈(信息不充分的原因):
{insufficiency_reason}
请根据充分性检查反馈中指出的缺失信息生成2个具体的自然语言子查询来补充这些缺失内容。注意已有检索结果包含
1. 【事件信息】- 来自知识图谱的事件节点
2. 【段落信息】- 来自文档的段落内容
【重要要求】:
1. [NOTE] 必须使用自然语言表达,类似人类会问的问题
2. [ERROR] 禁止使用SQL语句、代码或技术查询语法
3. [OK] 使用疑问句形式,如"什么是...""如何...""有哪些..."
4. [TARGET] 直接针对充分性检查中指出的缺失信息
5. [LINK] 与原始查询高度相关
6. [INFO] 查询要具体明确,能够获取到具体的信息
7. [BLOCKED] 避免与之前已生成的子查询重复
8. [TARGET] 确保每个子查询都能独立检索到有价值的信息
【示例格式】:
- [OK] 正确:"混沌工程的基础建设具体包括哪些组件?"
- [ERROR] 错误:"SELECT * FROM chaos_engineering WHERE type='infrastructure'"
- [OK] 正确:"供应链风险管理中常见的网络安全威胁有哪些?"
- [ERROR] 错误:"SELECT risks FROM supply_chain WHERE category='network'"
请以JSON格式返回自然语言查询
{{
"sub_queries": ["自然语言子查询1", "自然语言子查询2"]
}}
"""
)
'''
# 动态加载简单答案提示词
SIMPLE_ANSWER_PROMPT = _prompt_loader.get_prompt_template('simple_answer')
# 保留原始定义作为备份(已注释)
'''
SIMPLE_ANSWER_PROMPT = PromptTemplate(
input_variables=["query", "passages"],
template="""
基于检索到的信息,请为用户的查询提供一个准确、简洁的答案。
用户查询:{query}
检索到的相关信息:
{passages}
请基于这些信息回答用户的查询。注意检索结果包含:
1. 【事件信息】- 来自知识图谱的事件节点,提供事件相关的上下文
2. 【段落信息】- 来自文档的段落内容,提供详细的文本描述
要求:
1. 直接回答用户的查询
2. 严格基于提供的信息,不要编造内容
3. 综合利用事件信息和段落信息
4. 如果信息不足以完整回答,请明确说明
5. 答案要简洁明了,重点突出
答案:
"""
)
'''
# 动态加载最终答案提示词
FINAL_ANSWER_PROMPT = _prompt_loader.get_prompt_template('final_answer')
# 保留原始定义作为备份(已注释)
'''
FINAL_ANSWER_PROMPT = PromptTemplate(
input_variables=["original_query", "all_passages", "sub_queries"],
template="""
基于所有检索到的信息,请为用户的查询提供一个完整、准确的答案。
用户查询:{original_query}
子查询历史:{sub_queries}
所有检索到的信息:
{all_passages}
请基于这些信息提供一个全面的答案。注意检索结果包含:
1. 【事件信息】- 来自知识图谱的事件节点,提供事件相关的上下文
2. 【段落信息】- 来自文档的段落内容,提供详细的文本描述
要求:
1. 直接回答用户的查询
2. 综合利用事件信息和段落信息,不要编造内容
3. 如果信息仍然不足,请明确说明
4. 答案要结构清晰,逻辑连贯
答案:
"""
)
'''
def create_oneapi_llm(
oneapi_key: Optional[str] = None,
oneapi_base_url: Optional[str] = None,
model_name: Optional[str] = None
) -> OneAPILLM:
"""
创建DashScope LLM的便捷函数保持原有接口兼容性
Args:
oneapi_key: 阿里云DashScope API Key
oneapi_base_url: 不再使用,保持兼容性
model_name: 模型名称
Returns:
OneAPI LLM实例内部使用DashScope
"""
# 从环境变量获取配置默认使用qwen-max
api_key = oneapi_key or os.getenv("ONEAPI_KEY")
model_name = model_name or os.getenv("ONEAPI_MODEL_MAX", "qwen-max")
# 创建DashScope LLM
dashscope_llm = DashScopeLLM(
api_key=api_key,
model_name=model_name
)
return OneAPILLM(dashscope_llm)
def format_passages(passages: List[str]) -> str:
"""格式化段落列表为字符串"""
if not passages:
return "无相关段落"
formatted = []
for i, passage in enumerate(passages, 1):
formatted.append(f"段落{i}{passage}")
return "\n\n".join(formatted)
def format_mixed_passages(documents: List[Document]) -> str:
"""格式化混合的文档列表(事件+段落)为字符串"""
if not documents:
return "无相关信息"
event_docs = [doc for doc in documents if doc.metadata.get('node_type') == 'event']
text_docs = [doc for doc in documents if doc.metadata.get('node_type') == 'text']
formatted_parts = []
# 格式化事件信息
if event_docs:
formatted_parts.append("【事件信息】")
for i, doc in enumerate(event_docs, 1):
formatted_parts.append(f"事件{i}{doc.page_content}")
# 格式化段落信息
if text_docs:
if formatted_parts: # 如果有事件信息,添加分隔
formatted_parts.append("") # 空行分隔
formatted_parts.append("【段落信息】")
for i, doc in enumerate(text_docs, 1):
formatted_parts.append(f"段落{i}{doc.page_content}")
return "\n\n".join(formatted_parts)
def format_sub_queries(sub_queries: List[str]) -> str:
"""格式化子查询列表为字符串"""
if not sub_queries:
return "无子查询"
return "".join(sub_queries)
def format_triplets(triplets: List[Dict[str, str]]) -> str:
"""格式化三元组列表为字符串"""
if not triplets:
return "无三元组信息"
formatted = []
for i, triplet in enumerate(triplets, 1):
source = triplet.get('source', 'unknown')
relation = triplet.get('relation', 'unknown')
target = triplet.get('target', 'unknown')
formatted.append(f"{i}. ({source}) --[{relation}]--> ({target})")
return "\n".join(formatted)
def format_exploration_path(path: List[Dict[str, str]]) -> str:
"""格式化探索路径为字符串"""
if not path:
return "无探索路径"
path_str = []
for step in path:
source = step.get('source', 'unknown')
relation = step.get('relation', 'unknown')
target = step.get('target', 'unknown')
reason = step.get('reason', '')
path_str.append(f"({source}) --[{relation}]--> ({target})")
if reason:
path_str.append(f" 探索原因: {reason}")
return " -> ".join([step.split(" 探索原因:")[0] for step in path_str if not step.strip().startswith("探索原因:")])
# 概念探索相关提示词模板
CONCEPT_EXPLORATION_INIT_PROMPT = PromptTemplate(
input_variables=["node_name", "connected_concepts", "user_query", "insufficiency_reason"],
template="""
作为一个知识图谱概念探索专家你需要分析一个Node节点及其连接的Concept节点判断出最值得探索的概念方向。
**探索起点Node节点**: {node_name}
**该Node连接的Concept节点列表**:
{connected_concepts}
**用户原始查询**: {user_query}
**上次充分性检查不通过的原因**: {insufficiency_reason}
**任务**: 根据用户查询和不充分的原因从连接的Concept列表中选择一个最值得深入探索的概念这个概念应该最有可能帮助回答用户查询或补充缺失的信息。
**判断标准**:
1. 与用户查询的相关性最高
2. 最有可能补充当前缺失的关键信息
3. 具有较强的延展性,能够引出更多相关知识
请以JSON格式返回结果
{{
"selected_concept": "最值得探索的概念名称",
"exploration_reason": "选择这个概念的详细原因",
"confidence": 0.9, // 置信度0-1之间
"expected_knowledge": "期望从这个概念探索中获得什么知识"
}}
只返回JSON格式的结果不要添加其他内容。
"""
)
CONCEPT_EXPLORATION_CONTINUE_PROMPT = PromptTemplate(
input_variables=["current_node", "neighbor_triplets", "exploration_path", "user_query"],
template="""
作为一个知识图谱概念探索专家,你正在进行概念探索,需要决定下一步的探索方向。
**当前节点**: {current_node}
**当前节点的邻居三元组**:
{neighbor_triplets}
**已有探索路径**:
{exploration_path}
**用户原始查询**: {user_query}
**任务**: 从当前节点的邻居中选择一个最值得继续探索的节点,确保:
1. 不重复之前探索过的节点
2. 选择的节点与用户查询最相关
3. 能够获得更深入的知识
请以JSON格式返回结果
{{
"selected_node": "选择的下一个节点名称",
"selected_relation": "到达该节点的关系",
"exploration_reason": "选择这个节点的详细原因",
"confidence": 0.9, // 置信度0-1之间
"expected_knowledge": "期望从这个节点探索中获得什么知识"
}}
只返回JSON格式的结果不要添加其他内容。
"""
)
CONCEPT_EXPLORATION_SUFFICIENCY_PROMPT = PromptTemplate(
input_variables=["user_query", "all_passages", "exploration_knowledge", "insufficiency_reason"],
template="""
作为一个智能信息充分性评估专家,请综合评估当前信息是否足够回答用户查询。
**用户查询**: {user_query}
**现有检索信息**:
{all_passages}
**概念探索获得的知识**:
{exploration_knowledge}
**上次不充分的原因**: {insufficiency_reason}
**评估任务**:
1. 综合分析检索信息(事件信息+段落信息)和探索知识是否能够完整回答用户查询
2. 判断是否还有关键信息缺失
3. 如果仍然不充分,明确指出还需要什么信息
**评估标准**:
- 信息的完整性:能否覆盖查询的所有方面
- 信息的准确性:提供的信息是否准确可靠
- 信息的相关性:信息与查询的匹配度
- 逻辑连贯性:信息间是否形成完整的逻辑链
请以JSON格式返回评估结果
{{
"is_sufficient": false, // true表示信息充分false表示不充分
"confidence": 0.85, // 判断的置信度0-1之间
"reason": "详细说明充分或不充分的原因",
"missing_aspects": ["缺失的关键信息1", "缺失的关键信息2"], // 如果不充分,列出缺失的关键方面
"coverage_score": 0.7 // 当前信息对查询的覆盖程度0-1之间
}}
只返回JSON格式的结果不要添加其他内容。
"""
)
CONCEPT_EXPLORATION_SUB_QUERY_PROMPT = PromptTemplate(
input_variables=["user_query", "missing_aspects", "exploration_results"],
template="""
作为一个智能查询分析专家,根据概念探索的结果和仍然缺失的信息,生成新的子查询来进行第二轮概念探索。
**原始用户查询**: {user_query}
**仍然缺失的关键信息**: {missing_aspects}
**第一轮探索结果**: {exploration_results}
**任务**: 生成一个更精确的子查询,这个查询应该:
1. 专门针对缺失的关键信息
2. 比原始查询更具体和聚焦
3. 能够引导第二轮探索找到缺失的关键信息
**生成原则**:
- 保持与原始查询的相关性
- 专注于最关键的缺失方面
- 使用明确、具体的表述
- 避免过于宽泛或模糊
请以JSON格式返回结果
{{
"sub_query": "针对缺失信息生成的新查询",
"focus_aspects": ["查询重点关注的方面1", "方面2"],
"expected_improvements": "期望这个子查询如何改善信息充分性",
"confidence": 0.9
}}
只返回JSON格式的结果不要添加其他内容。
"""
)
# 导出列表
__all__ = [
'OneAPILLM',
'SufficiencyCheckParser',
'QueryComplexityParser',
'ConceptExplorationParser',
'ConceptContinueParser',
'ConceptSufficiencyParser',
'ConceptSubQueryParser',
'QueryComplexityResult',
'SufficiencyCheckResult',
'ConceptExplorationResult',
'ConceptContinueResult',
'ConceptSufficiencyResult',
'ConceptSubQueryResult',
'QUERY_COMPLEXITY_CHECK_PROMPT',
'SUFFICIENCY_CHECK_PROMPT',
'QUERY_DECOMPOSITION_PROMPT',
'SUB_QUERY_GENERATION_PROMPT',
'SIMPLE_ANSWER_PROMPT',
'FINAL_ANSWER_PROMPT',
'CONCEPT_EXPLORATION_INIT_PROMPT',
'CONCEPT_EXPLORATION_CONTINUE_PROMPT',
'CONCEPT_EXPLORATION_SUFFICIENCY_PROMPT',
'CONCEPT_EXPLORATION_SUB_QUERY_PROMPT',
'create_oneapi_llm',
'format_passages',
'format_mixed_passages',
'format_sub_queries',
'format_triplets',
'format_exploration_path'
]