Files
AIEC-RAG---/AIEC-RAG/rag-optimization-plan.md
2025-09-25 10:33:37 +08:00

6.5 KiB
Raw Permalink Blame History

RAG串行查询优化方案

🔴 当前问题

RAG后端处理请求是串行的,导致:

  • 10个并发请求需要 10 × 30秒 = 300秒
  • 服务器利用率低
  • 用户等待时间长

🚀 优化方案

1. 立即优化 - 应用层并发处理

方案A: FastAPI异步处理

# app.py 或 main.py
from fastapi import FastAPI, BackgroundTasks
from concurrent.futures import ThreadPoolExecutor
import asyncio
import uvicorn

app = FastAPI()

# 创建线程池
executor = ThreadPoolExecutor(max_workers=4)

@app.post("/retrieve")
async def retrieve_async(query: dict):
    """异步处理RAG查询"""
    # 在线程池中执行CPU密集型任务
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        executor,
        process_rag_query,  # 原有的同步处理函数
        query
    )
    return result

def process_rag_query(query):
    """原有的RAG处理逻辑"""
    # 向量检索
    docs = vector_search(query['query'])
    # LLM生成
    answer = llm_generate(query['query'], docs)
    return {"answer": answer}

# 启动时使用多worker
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8081, workers=4)

方案B: 使用Gunicorn多进程

# gunicorn_config.py
bind = "0.0.0.0:8081"
workers = 4  # 进程数
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
timeout = 120
keepalive = 5

启动命令:

gunicorn app:app -c gunicorn_config.py

2. 架构优化 - 队列+Worker模式

# queue_worker.py
import redis
import json
from threading import Thread
import queue

class RAGQueueProcessor:
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379)
        self.task_queue = queue.Queue()
        self.workers = []

    def start_workers(self, num_workers=4):
        """启动多个worker处理请求"""
        for i in range(num_workers):
            worker = Thread(target=self.worker_loop, args=(i,))
            worker.start()
            self.workers.append(worker)

    def worker_loop(self, worker_id):
        """Worker循环处理任务"""
        while True:
            task = self.task_queue.get()
            if task is None:
                break

            task_id, query = task
            result = self.process_query(query)

            # 存储结果
            self.redis_client.setex(
                f"result:{task_id}",
                300,  # 5分钟过期
                json.dumps(result)
            )

    def submit_task(self, query):
        """提交任务到队列"""
        task_id = str(uuid.uuid4())
        self.task_queue.put((task_id, query))
        return task_id

# FastAPI集成
@app.post("/retrieve")
async def submit_query(query: dict):
    task_id = processor.submit_task(query)
    return {"task_id": task_id, "status": "processing"}

@app.get("/result/{task_id}")
async def get_result(task_id: str):
    result = redis_client.get(f"result:{task_id}")
    if result:
        return json.loads(result)
    return {"status": "processing"}

3. 缓存优化 - Redis缓存层

# cache_layer.py
import hashlib
import redis
import json

class RAGCache:
    def __init__(self):
        self.redis_client = redis.Redis(
            host='localhost',
            port=6379,
            decode_responses=True
        )
        self.ttl = 3600  # 1小时缓存

    def get_cache_key(self, query):
        """生成缓存键"""
        return f"rag:{hashlib.md5(query.encode()).hexdigest()}"

    def get(self, query):
        """获取缓存结果"""
        key = self.get_cache_key(query)
        result = self.redis_client.get(key)
        if result:
            return json.loads(result)
        return None

    def set(self, query, result):
        """设置缓存"""
        key = self.get_cache_key(query)
        self.redis_client.setex(
            key,
            self.ttl,
            json.dumps(result)
        )

# 在主应用中使用
cache = RAGCache()

@app.post("/retrieve")
async def retrieve_with_cache(query: dict):
    # 先检查缓存
    cached = cache.get(query['query'])
    if cached:
        return cached

    # 处理查询
    result = await process_query_async(query)

    # 存入缓存
    cache.set(query['query'], result)
    return result

4. Docker Compose部署方案

# docker-compose.yml
version: '3.8'

services:
  nginx:
    image: nginx:alpine
    ports:
      - "8081:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - rag1
      - rag2
      - rag3

  rag1:
    image: aiec-rag:latest
    environment:
      - WORKER_ID=1
      - REDIS_HOST=redis
    depends_on:
      - redis

  rag2:
    image: aiec-rag:latest
    environment:
      - WORKER_ID=2
      - REDIS_HOST=redis
    depends_on:
      - redis

  rag3:
    image: aiec-rag:latest
    environment:
      - WORKER_ID=3
      - REDIS_HOST=redis
    depends_on:
      - redis

  redis:
    image: redis:alpine
    volumes:
      - redis_data:/data

volumes:
  redis_data:

5. Nginx负载均衡配置

# nginx.conf
upstream rag_backend {
    least_conn;  # 最少连接数策略
    server rag1:8081;
    server rag2:8081;
    server rag3:8081;
}

server {
    listen 80;

    location / {
        proxy_pass http://rag_backend;
        proxy_connect_timeout 120s;
        proxy_send_timeout 120s;
        proxy_read_timeout 120s;

        # 保持连接
        proxy_http_version 1.1;
        proxy_set_header Connection "";
    }
}

📊 性能提升预期

优化方案 响应时间改善 并发能力 实施难度
FastAPI异步 30% 2-3倍 简单
多进程Gunicorn 50% 4倍 简单
队列+Worker 60% 10倍 中等
Redis缓存 90%(命中时) 100倍 简单
负载均衡 70% N倍 中等

🎯 推荐实施顺序

  1. 第一步添加Redis缓存立即见效
  2. 第二步使用Gunicorn多进程简单有效
  3. 第三步:实现异步处理(代码改动小)
  4. 第四步:部署多实例+负载均衡(扩展性好)
  5. 第五步:优化模型和算法(长期优化)

💡 快速启动命令

# 1. 安装Redis
docker run -d -p 6379:6379 redis:alpine

# 2. 修改启动命令使用4个worker
gunicorn app:app \
  --workers 4 \
  --worker-class uvicorn.workers.UvicornWorker \
  --bind 0.0.0.0:8081 \
  --timeout 120

# 3. 或使用uvicorn多进程
uvicorn app:app --host 0.0.0.0 --port 8081 --workers 4