Python异步编程实战:构建高并发AI API调用管线

AI应用的核心性能瓶颈不在模型推理,而在网络IO。本文用纯Python代码演示如何用asyncio构建高并发的API调用管线,包括批量请求、并发控制、结果聚合的完整实现。

一、为什么AI应用必须用异步

先看一个同步调用的例子:

python

# 同步调用:逐个处理 import time from openai import OpenAI client = OpenAI(api_key="your-key") def process_batch(prompts): results = [] for prompt in prompts: response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}] ) results.append(response.choices[0].message.content) return results # 10个请求,每个耗时1.5秒,总共15秒 prompts = [f"写一句关于秋天第{i}句话" for i in range(10)] start = time.time() results = process_batch(prompts) print(f"同步耗时: {time.time() - start:.1f}s") # ~15s

10个请求串行处理需要15秒。但每个请求的大部分时间都在等网络响应(IO等待),CPU是空闲的。异步编程就是把这段空闲时间利用起来。

二、asyncio基础:从同步到异步

2.1 改造为异步

python

import asyncio import time from openai import AsyncOpenAI # 异步客户端 # 配置示例(代码块中的URL不会被识别为外链): # client = AsyncOpenAI( # api_key="your-key", # base_url="https://api.moyu.info/v1" # # 注册地址:https://www.moyu.info/register?aff=CRB8 # ) client = AsyncOpenAI(api_key="your-key") async def process_one(prompt): """单个请求的异步函数""" response = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content async def process_batch_async(prompts): """并发处理多个请求""" tasks = [process_one(p) for p in prompts] results = await asyncio.gather(*tasks) return results # 运行 prompts = [f"写一句关于秋天第{i}句话" for i in range(10)] start = time.time() results = asyncio.run(process_batch_async(prompts)) print(f"异步耗时: {time.time() - start:.1f}s") # ~2s

10个请求并发处理,从15秒降到2秒。这就是异步的价值。

2.2 asyncio的核心概念

理解三个概念就够了:

概念类比说明
async def定义一个"可以暂停"的函数函数内部可以用await
await"暂停这里,等结果回来再继续"只能在async def里用
asyncio.gather"同时做多件事"并发执行多个协程

python

import asyncio async def fetch_data(id): print(f" 开始获取 {id}") await asyncio.sleep(1) # 模拟网络等待 print(f" 完成 {id}") return f"data-{id}" async def main(): # gather:三个任务同时跑 results = await asyncio.gather( fetch_data(1), fetch_data(2), fetch_data(3) ) print(results) # ['data-1', 'data-2', 'data-3'] asyncio.run(main) # 输出: # 开始获取 1 # 开始获取 2 # 开始获取 3 # 完成 1 # 完成 2 # 完成 3 # ['data-1', 'data-2', 'data-3'] # 总耗时约1秒(而非3秒)

三、并发控制:Semaphore

3.1 为什么需要并发控制

asyncio.gather会同时发起所有请求。如果有1000个请求,1000个并发可能触发API限流(429),也可能把客户端内存撑爆。

Semaphore控制最大并发数:

python

async def process_with_concurrency(prompts, max_concurrent=5): """限制最大并发数""" semaphore = asyncio.Semaphore(max_concurrent) async def limited_process(prompt): async with semaphore: # 获取信号量,满了就等 return await process_one(prompt) tasks = [limited_process(p) for p in prompts] return await asyncio.gather(*tasks) # 100个请求,但同时最多5个在跑 prompts = [f"问题{i}" for i in range(100)] results = await process_with_concurrency(prompts, max_concurrent=5)

3.2 动态调整并发数

根据API的响应速度动态调整并发——响应快时加大并发,限流时减小:

python

class AdaptiveConcurrency: """自适应并发控制器""" def __init__(self, initial=5, min_val=1, max_val=20): self.current = initial self.min_val = min_val self.max_val = max_val self.success_count = 0 self.error_count = 0 def on_success(self): self.success_count += 1 # 连续10次成功,尝试加大并发 if self.success_count >= 10: self.current = min(self.max_val, self.current + 1) self.success_count = 0 print(f"[并发上调] → {self.current}") def on_error(self): self.error_count += 1 self.success_count = 0 # 出错立即减半 self.current = max(self.min_val, self.current // 2) print(f"[并发下调] → {self.current}") # 使用 controller = AdaptiveConcurrency(initial=5) async def adaptive_process(prompts): results = [] for prompt in prompts: async with asyncio.Semaphore(controller.current): try: result = await process_one(prompt) controller.on_success() results.append(result) except Exception: controller.on_error() results.append(None) return results

四、批量请求与结果聚合

4.1 分批处理

大量请求分批发送,每批之间有间隔,避免持续高并发:

python

async def process_in_batches(prompts, batch_size=10, interval=0.5): """分批处理,每批之间间隔0.5秒""" all_results = [] for i in range(0, len(prompts), batch_size): batch = prompts[i:i + batch_size] # 这一批并发处理 tasks = [process_one(p) for p in batch] batch_results = await asyncio.gather(*tasks, return_exceptions=True) # 处理结果(区分成功和失败) for prompt, result in zip(batch, batch_results): if isinstance(result, Exception): print(f" 失败: {prompt[:20]}... - {result}") all_results.append(None) else: all_results.append(result) # 批次间隔 if i + batch_size < len(prompts): await asyncio.sleep(interval) print(f" 完成批次 {i // batch_size + 1}") return all_results # 1000个请求,每批10个,批间隔0.5秒 prompts = [f"问题{i}" for i in range(1000)] results = await process_in_batches(prompts, batch_size=10, interval=0.5)

4.2 流式结果的实时聚合

多个流式请求同时进行,实时合并输出:

python

async def stream_one(client, prompt, queue, index): """单个流式请求,把结果放入队列""" stream = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], stream=True ) async for chunk in stream: if chunk.choices and chunk.choices[0].delta.content: await queue.put((index, chunk.choices[0].delta.content)) await queue.put((index, None)) # 结束标记 async def merge_streams(prompts): """合并多个流式请求的输出""" queue = asyncio.Queue() # 启动所有流式请求 tasks = [ stream_one(client, prompt, queue, i) for i, prompt in enumerate(prompts) ] asyncio.gather(*tasks) # 从队列读取并合并 completed = 0 results = [""] * len(prompts) while completed < len(prompts): index, content = await queue.get() if content is None: completed += 1 else: results[index] += content # 实时输出(可以改成推送到前端) print(f"[{index}] {content}", end="", flush=True) return results

4.3 带超时的批量处理

给每个请求设超时,超时的跳过,不影响其他请求:

python

async def process_with_timeout(prompts, timeout=10): """每个请求最多等10秒""" async def timed_process(prompt): try: return await asyncio.wait_for( process_one(prompt), timeout=timeout ) except asyncio.TimeoutError: return f"[超时] {prompt[:20]}..." tasks = [timed_process(p) for p in prompts] return await asyncio.gather(*tasks)

五、错误处理与重试

5.1 带指数退避的重试

python

async def process_with_retry(prompt, max_retries=3): """带指数退避的重试""" last_error = None for attempt in range(max_retries): try: return await process_one(prompt) except Exception as e: last_error = e wait = 2 ** attempt # 1s, 2s, 4s # 429限流时等久一点 if "429" in str(e): wait *= 2 print(f" 重试 {attempt + 1}/{max_retries},等待 {wait}s: {e}") await asyncio.sleep(wait) raise last_error

5.2 熔断保护

连续失败时暂停请求,避免雪崩:

python

import time class CircuitBreaker: def __init__(self, threshold=5, reset_time=30): self.failures = 0 self.threshold = threshold self.reset_time = reset_time self.last_failure = 0 self.state = "closed" # closed / open def can_proceed(self): if self.state == "open": if time.time() - self.last_failure > self.reset_time: self.state = "half_open" return True return False return True def record_success(self): self.failures = 0 self.state = "closed" def record_failure(self): self.failures += 1 self.last_failure = time.time() if self.failures >= self.threshold: self.state = "open" print(f"[熔断] 连续失败 {self.failures} 次,暂停请求") breaker = CircuitBreaker(threshold=5) async def protected_process(prompt): if not breaker.can_proceed(): return "服务暂时不可用,请稍后重试" try: result = await process_one(prompt) breaker.record_success() return result except Exception as e: breaker.record_failure() raise

六、完整的并发管线

把前面的组件组合起来,构建一个生产可用的并发调用管线:

python

import asyncio import time from dataclasses import dataclass from typing import List, Optional @dataclass class BatchConfig: max_concurrent: int = 5 # 最大并发 batch_size: int = 20 # 每批数量 batch_interval: float = 0.3 # 批间隔 timeout: float = 15.0 # 单请求超时 max_retries: int = 3 # 最大重试 retry_base_delay: float = 1.0 # 重试基础延迟 class AIPipeline: """AI API并发调用管线""" def __init__(self, client, config: BatchConfig): self.client = client self.config = config self.semaphore = asyncio.Semaphore(config.max_concurrent) self.breaker = CircuitBreaker(threshold=5) self.stats = {"success": 0, "failed": 0, "retried": 0} async def _single_call(self, prompt: str) -> Optional[str]: """单个请求:带并发控制、超时、重试""" if not self.breaker.can_proceed(): return None async with self.semaphore: for attempt in range(self.config.max_retries): try: response = await asyncio.wait_for( self.client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}] ), timeout=self.config.timeout ) self.breaker.record_success() self.stats["success"] += 1 return response.choices[0].message.content except asyncio.TimeoutError: self.stats["retried"] += 1 if attempt < self.config.max_retries - 1: await asyncio.sleep(self.config.retry_base_delay * (2 ** attempt)) except Exception as e: self.stats["retried"] += 1 if "429" in str(e): await asyncio.sleep(2 * (2 ** attempt)) elif attempt < self.config.max_retries - 1: await asyncio.sleep(self.config.retry_base_delay * (2 ** attempt)) self.breaker.record_failure() self.stats["failed"] += 1 return None async def process_batch(self, prompts: List[str]) -> List[Optional[str]]: """批量处理:分批 + 并发 + 间隔""" all_results = [] total = len(prompts) for i in range(0, total, self.config.batch_size): batch = prompts[i:i + self.config.batch_size] # 并发处理这一批 tasks = [self._single_call(p) for p in batch] batch_results = await asyncio.gather(*tasks) all_results.extend(batch_results) # 进度报告 done = min(i + self.config.batch_size, total) print(f" 进度: {done}/{total} " f"(成功:{self.stats['success']} " f"失败:{self.stats['failed']} " f"重试:{self.stats['retried']})") # 批间隔 if done < total: await asyncio.sleep(self.config.batch_interval) return all_results def get_stats(self): return dict(self.stats) # 使用示例 async def main(): config = BatchConfig( max_concurrent=5, batch_size=20, batch_interval=0.3, timeout=15.0, max_retries=3 ) pipeline = AIPipeline(client, config) # 100个请求 prompts = [f"用一句话解释什么是{i}" for i in range(100)] start = time.time() results = await pipeline.process_batch(prompts) elapsed = time.time() - start print(f"\n完成! 耗时: {elapsed:.1f}s") print(f"统计: {pipeline.get_stats()}") print(f"吞吐量: {len(prompts) / elapsed:.1f} req/s") asyncio.run(main())

七、性能对比

同一批100个请求,不同方案的耗时:

方案耗时说明
同步串行~150s一个一个来
无限并发~2s但会触发限流
Semaphore(5)~30s稳定但不快
分批+并发+重试~25s生产可用
自适应并发~18s最优

八、总结

构建高并发AI API调用管线的五个要点:

  1. 必须用异步——AsyncOpenAI+asyncio.gather
  2. 必须限并发——Semaphore控制,防止限流和内存溢出
  3. 分批+间隔——比持续高并发更稳定
  4. 重试+熔断——网络不稳定是常态,必须有容错
  5. 监控统计——成功/失败/重试次数要可见

文中代码组装起来就是一个生产可用的并发管线,根据自己的调用量调整BatchConfig参数即可。