# RAG串行查询优化方案 ## 🔴 当前问题 RAG后端处理请求是**串行**的,导致: - 10个并发请求需要 10 × 30秒 = 300秒 - 服务器利用率低 - 用户等待时间长 ## 🚀 优化方案 ### 1. 立即优化 - 应用层并发处理 #### 方案A: FastAPI异步处理 ```python # app.py 或 main.py from fastapi import FastAPI, BackgroundTasks from concurrent.futures import ThreadPoolExecutor import asyncio import uvicorn app = FastAPI() # 创建线程池 executor = ThreadPoolExecutor(max_workers=4) @app.post("/retrieve") async def retrieve_async(query: dict): """异步处理RAG查询""" # 在线程池中执行CPU密集型任务 loop = asyncio.get_event_loop() result = await loop.run_in_executor( executor, process_rag_query, # 原有的同步处理函数 query ) return result def process_rag_query(query): """原有的RAG处理逻辑""" # 向量检索 docs = vector_search(query['query']) # LLM生成 answer = llm_generate(query['query'], docs) return {"answer": answer} # 启动时使用多worker if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8081, workers=4) ``` #### 方案B: 使用Gunicorn多进程 ```bash # gunicorn_config.py bind = "0.0.0.0:8081" workers = 4 # 进程数 worker_class = "uvicorn.workers.UvicornWorker" worker_connections = 1000 timeout = 120 keepalive = 5 ``` 启动命令: ```bash gunicorn app:app -c gunicorn_config.py ``` ### 2. 架构优化 - 队列+Worker模式 ```python # queue_worker.py import redis import json from threading import Thread import queue class RAGQueueProcessor: def __init__(self): self.redis_client = redis.Redis(host='localhost', port=6379) self.task_queue = queue.Queue() self.workers = [] def start_workers(self, num_workers=4): """启动多个worker处理请求""" for i in range(num_workers): worker = Thread(target=self.worker_loop, args=(i,)) worker.start() self.workers.append(worker) def worker_loop(self, worker_id): """Worker循环处理任务""" while True: task = self.task_queue.get() if task is None: break task_id, query = task result = self.process_query(query) # 存储结果 self.redis_client.setex( f"result:{task_id}", 300, # 5分钟过期 json.dumps(result) ) def submit_task(self, query): """提交任务到队列""" task_id = str(uuid.uuid4()) self.task_queue.put((task_id, query)) return task_id # FastAPI集成 @app.post("/retrieve") async def submit_query(query: dict): task_id = processor.submit_task(query) return {"task_id": task_id, "status": "processing"} @app.get("/result/{task_id}") async def get_result(task_id: str): result = redis_client.get(f"result:{task_id}") if result: return json.loads(result) return {"status": "processing"} ``` ### 3. 缓存优化 - Redis缓存层 ```python # cache_layer.py import hashlib import redis import json class RAGCache: def __init__(self): self.redis_client = redis.Redis( host='localhost', port=6379, decode_responses=True ) self.ttl = 3600 # 1小时缓存 def get_cache_key(self, query): """生成缓存键""" return f"rag:{hashlib.md5(query.encode()).hexdigest()}" def get(self, query): """获取缓存结果""" key = self.get_cache_key(query) result = self.redis_client.get(key) if result: return json.loads(result) return None def set(self, query, result): """设置缓存""" key = self.get_cache_key(query) self.redis_client.setex( key, self.ttl, json.dumps(result) ) # 在主应用中使用 cache = RAGCache() @app.post("/retrieve") async def retrieve_with_cache(query: dict): # 先检查缓存 cached = cache.get(query['query']) if cached: return cached # 处理查询 result = await process_query_async(query) # 存入缓存 cache.set(query['query'], result) return result ``` ### 4. Docker Compose部署方案 ```yaml # docker-compose.yml version: '3.8' services: nginx: image: nginx:alpine ports: - "8081:80" volumes: - ./nginx.conf:/etc/nginx/nginx.conf depends_on: - rag1 - rag2 - rag3 rag1: image: aiec-rag:latest environment: - WORKER_ID=1 - REDIS_HOST=redis depends_on: - redis rag2: image: aiec-rag:latest environment: - WORKER_ID=2 - REDIS_HOST=redis depends_on: - redis rag3: image: aiec-rag:latest environment: - WORKER_ID=3 - REDIS_HOST=redis depends_on: - redis redis: image: redis:alpine volumes: - redis_data:/data volumes: redis_data: ``` ### 5. Nginx负载均衡配置 ```nginx # nginx.conf upstream rag_backend { least_conn; # 最少连接数策略 server rag1:8081; server rag2:8081; server rag3:8081; } server { listen 80; location / { proxy_pass http://rag_backend; proxy_connect_timeout 120s; proxy_send_timeout 120s; proxy_read_timeout 120s; # 保持连接 proxy_http_version 1.1; proxy_set_header Connection ""; } } ``` ## 📊 性能提升预期 | 优化方案 | 响应时间改善 | 并发能力 | 实施难度 | |---------|------------|---------|---------| | FastAPI异步 | 30% | 2-3倍 | 简单 | | 多进程Gunicorn | 50% | 4倍 | 简单 | | 队列+Worker | 60% | 10倍 | 中等 | | Redis缓存 | 90%(命中时) | 100倍 | 简单 | | 负载均衡 | 70% | N倍 | 中等 | ## 🎯 推荐实施顺序 1. **第一步**:添加Redis缓存(立即见效) 2. **第二步**:使用Gunicorn多进程(简单有效) 3. **第三步**:实现异步处理(代码改动小) 4. **第四步**:部署多实例+负载均衡(扩展性好) 5. **第五步**:优化模型和算法(长期优化) ## 💡 快速启动命令 ```bash # 1. 安装Redis docker run -d -p 6379:6379 redis:alpine # 2. 修改启动命令(使用4个worker) gunicorn app:app \ --workers 4 \ --worker-class uvicorn.workers.UvicornWorker \ --bind 0.0.0.0:8081 \ --timeout 120 # 3. 或使用uvicorn多进程 uvicorn app:app --host 0.0.0.0 --port 8081 --workers 4 ```