293 lines
6.5 KiB
Markdown
293 lines
6.5 KiB
Markdown
|
|
# RAG串行查询优化方案
|
|||
|
|
|
|||
|
|
## 🔴 当前问题
|
|||
|
|
RAG后端处理请求是**串行**的,导致:
|
|||
|
|
- 10个并发请求需要 10 × 30秒 = 300秒
|
|||
|
|
- 服务器利用率低
|
|||
|
|
- 用户等待时间长
|
|||
|
|
|
|||
|
|
## 🚀 优化方案
|
|||
|
|
|
|||
|
|
### 1. 立即优化 - 应用层并发处理
|
|||
|
|
|
|||
|
|
#### 方案A: FastAPI异步处理
|
|||
|
|
```python
|
|||
|
|
# app.py 或 main.py
|
|||
|
|
from fastapi import FastAPI, BackgroundTasks
|
|||
|
|
from concurrent.futures import ThreadPoolExecutor
|
|||
|
|
import asyncio
|
|||
|
|
import uvicorn
|
|||
|
|
|
|||
|
|
app = FastAPI()
|
|||
|
|
|
|||
|
|
# 创建线程池
|
|||
|
|
executor = ThreadPoolExecutor(max_workers=4)
|
|||
|
|
|
|||
|
|
@app.post("/retrieve")
|
|||
|
|
async def retrieve_async(query: dict):
|
|||
|
|
"""异步处理RAG查询"""
|
|||
|
|
# 在线程池中执行CPU密集型任务
|
|||
|
|
loop = asyncio.get_event_loop()
|
|||
|
|
result = await loop.run_in_executor(
|
|||
|
|
executor,
|
|||
|
|
process_rag_query, # 原有的同步处理函数
|
|||
|
|
query
|
|||
|
|
)
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
def process_rag_query(query):
|
|||
|
|
"""原有的RAG处理逻辑"""
|
|||
|
|
# 向量检索
|
|||
|
|
docs = vector_search(query['query'])
|
|||
|
|
# LLM生成
|
|||
|
|
answer = llm_generate(query['query'], docs)
|
|||
|
|
return {"answer": answer}
|
|||
|
|
|
|||
|
|
# 启动时使用多worker
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
uvicorn.run(app, host="0.0.0.0", port=8081, workers=4)
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
#### 方案B: 使用Gunicorn多进程
|
|||
|
|
```bash
|
|||
|
|
# gunicorn_config.py
|
|||
|
|
bind = "0.0.0.0:8081"
|
|||
|
|
workers = 4 # 进程数
|
|||
|
|
worker_class = "uvicorn.workers.UvicornWorker"
|
|||
|
|
worker_connections = 1000
|
|||
|
|
timeout = 120
|
|||
|
|
keepalive = 5
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
启动命令:
|
|||
|
|
```bash
|
|||
|
|
gunicorn app:app -c gunicorn_config.py
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 2. 架构优化 - 队列+Worker模式
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
# queue_worker.py
|
|||
|
|
import redis
|
|||
|
|
import json
|
|||
|
|
from threading import Thread
|
|||
|
|
import queue
|
|||
|
|
|
|||
|
|
class RAGQueueProcessor:
|
|||
|
|
def __init__(self):
|
|||
|
|
self.redis_client = redis.Redis(host='localhost', port=6379)
|
|||
|
|
self.task_queue = queue.Queue()
|
|||
|
|
self.workers = []
|
|||
|
|
|
|||
|
|
def start_workers(self, num_workers=4):
|
|||
|
|
"""启动多个worker处理请求"""
|
|||
|
|
for i in range(num_workers):
|
|||
|
|
worker = Thread(target=self.worker_loop, args=(i,))
|
|||
|
|
worker.start()
|
|||
|
|
self.workers.append(worker)
|
|||
|
|
|
|||
|
|
def worker_loop(self, worker_id):
|
|||
|
|
"""Worker循环处理任务"""
|
|||
|
|
while True:
|
|||
|
|
task = self.task_queue.get()
|
|||
|
|
if task is None:
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
task_id, query = task
|
|||
|
|
result = self.process_query(query)
|
|||
|
|
|
|||
|
|
# 存储结果
|
|||
|
|
self.redis_client.setex(
|
|||
|
|
f"result:{task_id}",
|
|||
|
|
300, # 5分钟过期
|
|||
|
|
json.dumps(result)
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def submit_task(self, query):
|
|||
|
|
"""提交任务到队列"""
|
|||
|
|
task_id = str(uuid.uuid4())
|
|||
|
|
self.task_queue.put((task_id, query))
|
|||
|
|
return task_id
|
|||
|
|
|
|||
|
|
# FastAPI集成
|
|||
|
|
@app.post("/retrieve")
|
|||
|
|
async def submit_query(query: dict):
|
|||
|
|
task_id = processor.submit_task(query)
|
|||
|
|
return {"task_id": task_id, "status": "processing"}
|
|||
|
|
|
|||
|
|
@app.get("/result/{task_id}")
|
|||
|
|
async def get_result(task_id: str):
|
|||
|
|
result = redis_client.get(f"result:{task_id}")
|
|||
|
|
if result:
|
|||
|
|
return json.loads(result)
|
|||
|
|
return {"status": "processing"}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 3. 缓存优化 - Redis缓存层
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
# cache_layer.py
|
|||
|
|
import hashlib
|
|||
|
|
import redis
|
|||
|
|
import json
|
|||
|
|
|
|||
|
|
class RAGCache:
|
|||
|
|
def __init__(self):
|
|||
|
|
self.redis_client = redis.Redis(
|
|||
|
|
host='localhost',
|
|||
|
|
port=6379,
|
|||
|
|
decode_responses=True
|
|||
|
|
)
|
|||
|
|
self.ttl = 3600 # 1小时缓存
|
|||
|
|
|
|||
|
|
def get_cache_key(self, query):
|
|||
|
|
"""生成缓存键"""
|
|||
|
|
return f"rag:{hashlib.md5(query.encode()).hexdigest()}"
|
|||
|
|
|
|||
|
|
def get(self, query):
|
|||
|
|
"""获取缓存结果"""
|
|||
|
|
key = self.get_cache_key(query)
|
|||
|
|
result = self.redis_client.get(key)
|
|||
|
|
if result:
|
|||
|
|
return json.loads(result)
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def set(self, query, result):
|
|||
|
|
"""设置缓存"""
|
|||
|
|
key = self.get_cache_key(query)
|
|||
|
|
self.redis_client.setex(
|
|||
|
|
key,
|
|||
|
|
self.ttl,
|
|||
|
|
json.dumps(result)
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 在主应用中使用
|
|||
|
|
cache = RAGCache()
|
|||
|
|
|
|||
|
|
@app.post("/retrieve")
|
|||
|
|
async def retrieve_with_cache(query: dict):
|
|||
|
|
# 先检查缓存
|
|||
|
|
cached = cache.get(query['query'])
|
|||
|
|
if cached:
|
|||
|
|
return cached
|
|||
|
|
|
|||
|
|
# 处理查询
|
|||
|
|
result = await process_query_async(query)
|
|||
|
|
|
|||
|
|
# 存入缓存
|
|||
|
|
cache.set(query['query'], result)
|
|||
|
|
return result
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 4. Docker Compose部署方案
|
|||
|
|
|
|||
|
|
```yaml
|
|||
|
|
# docker-compose.yml
|
|||
|
|
version: '3.8'
|
|||
|
|
|
|||
|
|
services:
|
|||
|
|
nginx:
|
|||
|
|
image: nginx:alpine
|
|||
|
|
ports:
|
|||
|
|
- "8081:80"
|
|||
|
|
volumes:
|
|||
|
|
- ./nginx.conf:/etc/nginx/nginx.conf
|
|||
|
|
depends_on:
|
|||
|
|
- rag1
|
|||
|
|
- rag2
|
|||
|
|
- rag3
|
|||
|
|
|
|||
|
|
rag1:
|
|||
|
|
image: aiec-rag:latest
|
|||
|
|
environment:
|
|||
|
|
- WORKER_ID=1
|
|||
|
|
- REDIS_HOST=redis
|
|||
|
|
depends_on:
|
|||
|
|
- redis
|
|||
|
|
|
|||
|
|
rag2:
|
|||
|
|
image: aiec-rag:latest
|
|||
|
|
environment:
|
|||
|
|
- WORKER_ID=2
|
|||
|
|
- REDIS_HOST=redis
|
|||
|
|
depends_on:
|
|||
|
|
- redis
|
|||
|
|
|
|||
|
|
rag3:
|
|||
|
|
image: aiec-rag:latest
|
|||
|
|
environment:
|
|||
|
|
- WORKER_ID=3
|
|||
|
|
- REDIS_HOST=redis
|
|||
|
|
depends_on:
|
|||
|
|
- redis
|
|||
|
|
|
|||
|
|
redis:
|
|||
|
|
image: redis:alpine
|
|||
|
|
volumes:
|
|||
|
|
- redis_data:/data
|
|||
|
|
|
|||
|
|
volumes:
|
|||
|
|
redis_data:
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 5. Nginx负载均衡配置
|
|||
|
|
|
|||
|
|
```nginx
|
|||
|
|
# nginx.conf
|
|||
|
|
upstream rag_backend {
|
|||
|
|
least_conn; # 最少连接数策略
|
|||
|
|
server rag1:8081;
|
|||
|
|
server rag2:8081;
|
|||
|
|
server rag3:8081;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
server {
|
|||
|
|
listen 80;
|
|||
|
|
|
|||
|
|
location / {
|
|||
|
|
proxy_pass http://rag_backend;
|
|||
|
|
proxy_connect_timeout 120s;
|
|||
|
|
proxy_send_timeout 120s;
|
|||
|
|
proxy_read_timeout 120s;
|
|||
|
|
|
|||
|
|
# 保持连接
|
|||
|
|
proxy_http_version 1.1;
|
|||
|
|
proxy_set_header Connection "";
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 📊 性能提升预期
|
|||
|
|
|
|||
|
|
| 优化方案 | 响应时间改善 | 并发能力 | 实施难度 |
|
|||
|
|
|---------|------------|---------|---------|
|
|||
|
|
| FastAPI异步 | 30% | 2-3倍 | 简单 |
|
|||
|
|
| 多进程Gunicorn | 50% | 4倍 | 简单 |
|
|||
|
|
| 队列+Worker | 60% | 10倍 | 中等 |
|
|||
|
|
| Redis缓存 | 90%(命中时) | 100倍 | 简单 |
|
|||
|
|
| 负载均衡 | 70% | N倍 | 中等 |
|
|||
|
|
|
|||
|
|
## 🎯 推荐实施顺序
|
|||
|
|
|
|||
|
|
1. **第一步**:添加Redis缓存(立即见效)
|
|||
|
|
2. **第二步**:使用Gunicorn多进程(简单有效)
|
|||
|
|
3. **第三步**:实现异步处理(代码改动小)
|
|||
|
|
4. **第四步**:部署多实例+负载均衡(扩展性好)
|
|||
|
|
5. **第五步**:优化模型和算法(长期优化)
|
|||
|
|
|
|||
|
|
## 💡 快速启动命令
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
# 1. 安装Redis
|
|||
|
|
docker run -d -p 6379:6379 redis:alpine
|
|||
|
|
|
|||
|
|
# 2. 修改启动命令(使用4个worker)
|
|||
|
|
gunicorn app:app \
|
|||
|
|
--workers 4 \
|
|||
|
|
--worker-class uvicorn.workers.UvicornWorker \
|
|||
|
|
--bind 0.0.0.0:8081 \
|
|||
|
|
--timeout 120
|
|||
|
|
|
|||
|
|
# 3. 或使用uvicorn多进程
|
|||
|
|
uvicorn app:app --host 0.0.0.0 --port 8081 --workers 4
|
|||
|
|
```
|