【Claude】缓存机制与性能调优指南 — 已解决
【Claude】缓存机制与性能调优指南 — 已解决
适用版本:Claude Code v1.0.x 及以上
受影响场景:重复请求优化、长上下文缓存、API 延迟降低、批量任务加速
阅读时长:约 25 分钟
目录
- 问题现象
- 原理深挖:Prompt Caching 机制
- 根因分析:性能瓶颈六大根源
- 多方案解决:从缓存到调优
- 验证回归:性能调优验证
- 避坑最佳实践
- 附录:缓存参数速查表
1. 问题现象
1.1 典型问题表现
问题一:相同上下文重复发送导致高成本
# 场景:10 轮对话,每轮系统提示 + CLAUDE.md = 15K tokens # 每轮都重新发送这 15K tokens # 10 轮 = 150K tokens 重复发送 × $3/M = $0.45 # 如果启用缓存:只首次全价,后续 10% → 节省 80%+问题二:首次响应延迟过高
# 冷启动:系统提示 + 工具定义 + CLAUDE.md + 用户输入 # = 30K tokens 输入 → API 处理 3-5 秒 # 用户感知延迟高问题三:批量任务慢
# 100 个文件逐个分析 # 每次都重新构建上下文 → 重复开销 # 总耗时: 100 × 5s = 500s问题四:缓存未命中不知原因
# 启用了 cache_control 但 cache_creation_input_tokens 总是 0 # 不知道为什么缓存不生效问题五:缓存过期导致重复计费
# 缓存 TTL 5 分钟 # 第 6 分钟发送请求 → 缓存已过期 → 重新全价计费 # 长对话中频繁过期2. 原理深挖:Prompt Caching 机制
2.1 什么是 Prompt Caching
Anthropic API 提供的 Prompt Caching 允许将请求的前缀部分缓存,后续请求如果前缀相同,只需支付 10% 的输入 Token 费用。
无缓存请求: 请求 1: [系统提示 10K][CLAUDE.md 5K][对话历史 5K] → 全价 20K tokens 请求 2: [系统提示 10K][CLAUDE.md 5K][对话历史 8K] → 全价 23K tokens 请求 3: [系统提示 10K][CLAUDE.md 5K][对话历史 12K] → 全价 27K tokens 总计: 70K tokens × $3/M = $0.21 有缓存请求: 请求 1: [系统提示 10K][CLAUDE.md 5K (cached)][对话历史 5K] → 15K 全价 + 10K 缓存写入(1.25倍) = $0.056 请求 2: [系统提示 10K (cached)][CLAUDE.md 5K (cached)][对话历史 8K] → 3K 全价 + 15K 缓存读取(0.1倍) = $0.011 请求 3: [系统提示 10K (cached)][CLAUDE.md 5K (cached)][对话历史 12K] → 4K 全价 + 15K 缓存读取(0.1倍) = $0.015 总计: ~$0.082 (节省 61%)2.2 缓存定价
| 操作 | 价格倍率 | 说明 |
|---|---|---|
| 正常输入 | 1.0× | 标准 Token 价格 |
| 缓存写入 | 1.25× | 首次缓存,稍贵 |
| 缓存读取 | 0.1× | 命中缓存,极便宜 |
Sonnet 4 示例: 正常输入: $3.00/M tokens 缓存写入: $3.75/M tokens (1.25×) 缓存读取: $0.30/M tokens (0.1×)2.3 缓存规则
缓存条件: 1. 前缀必须完全匹配(包括空格、换行) 2. 最小缓存长度: 1024 tokens (Sonnet/Opus) / 2048 tokens (Haiku) 3. 缓存 TTL: 5 分钟 4. 最多 4 个 cache_control 断点 5. 缓存按组织级别隔离 缓存位置: 请求结构: [system][messages[0]][messages[1]]... cache_control 只能放在 content block 的最后一个 system: { "type": "text", "text": "...", "cache_control": {"type": "ephemeral"} ← 缓存点 1 } messages: [ {"role": "user", "content": [ {"type": "text", "text": "固定前缀", "cache_control": {"type": "ephemeral"}}, ← 缓存点 2 {"type": "text", "text": "变化部分"} ]} ]2.4 Claude Code 的缓存策略
Claude Code 内部自动使用 Prompt Caching:
Claude Code 缓存层次: Layer 1: System Prompt (系统提示) - 工具定义 + 行为规则 + 安全约束 - ~10K tokens - 始终缓存(每次请求前缀相同) Layer 2: CLAUDE.md (记忆文件) - 项目配置和规范 - ~3K tokens - 始终缓存(会话内不变) Layer 3: 对话历史 - 前面的对话消息 - 增量缓存(新消息追加到尾部) - 最近的 N 轮对话被缓存 Layer 4: 当前用户输入 - 不缓存(每次不同)2.5 缓存命中与未命中
# API 响应中的缓存指标 response = client.messages.create(...) response.usage.cache_creation_input_tokens # 缓存写入的 Token 数 response.usage.cache_read_input_tokens # 缓存读取的 Token 数 response.usage.input_tokens # 未缓存的输入 Token 数 # 判断缓存状态 if response.usage.cache_read_input_tokens > 0: print("✓ 缓存命中") elif response.usage.cache_creation_input_tokens > 0: print("→ 缓存写入(首次)") else: print("✗ 无缓存")3. 根因分析:性能瓶颈六大根源
3.1 根源一:未利用 Prompt Caching
SDK 直接调用时不自动启用缓存,需要手动设置cache_control。
3.2 根源二:上下文顺序不固定
如果系统提示或对话前缀在每次请求中有微小变化(多一个空格、顺序变化),缓存就会失效。
3.3 根源三:缓存过期
5 分钟 TTL 内没有发送请求,缓存自动清除。长对话中如果思考时间超过 5 分钟,每次都要重新缓存。
3.4 根源四:批量任务无复用
批量处理多个文件时,如果每个请求都构建新的上下文,无法复用缓存。
3.5 根源五:工具定义重复发送
每次 API 调用都发送完整的工具定义(~5K tokens),即使工具列表没有变化。
3.6 根源六:网络延迟未优化
没有使用流式输出、连接复用、就近接入等网络优化手段。
4. 多方案解决:从缓存到调优
4.1 方案一:SDK 手动启用缓存
import anthropic client = anthropic.Anthropic(api_key="sk-ant-xxx") SYSTEM_PROMPT = """你是一个代码审查助手。遵循以下规范: 1. 检查安全漏洞 2. 检查性能问题 3. 检查代码风格 ...(长系统提示 2000 tokens) """ def cached_chat(user_message, messages=None): """启用 Prompt Caching 的调用""" # 系统提示加缓存标记 system_block = { "type": "text", "text": SYSTEM_PROMPT, "cache_control": {"type": "ephemeral"} } # 构建消息 if messages is None: messages = [] messages.append({"role": "user", "content": user_message}) # 对历史消息的最后一条加缓存 if len(messages) >= 2: # 对话历史的前缀加缓存 history_text = json.dumps(messages[:-1]) # 实际操作中,cache_control 加在 content block 上 response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=4096, system=[system_block], # 系统提示缓存 messages=messages, tools=get_tools_with_cache() # 工具定义缓存 ) # 报告缓存状态 cache_read = response.usage.cache_read_input_tokens cache_write = response.usage.cache_creation_input_tokens non_cached = response.usage.input_tokens if cache_read > 0: print(f"✓ 缓存命中: {cache_read:,} tokens (0.1×)") if cache_write > 0: print(f"→ 缓存写入: {cache_write:,} tokens (1.25×)") print(f" 非缓存: {non_cached:,} tokens (1.0×)") return response # 第一次调用: 缓存写入 cached_chat("审查 auth.py") # 输出: → 缓存写入: 10000 tokens (1.25×) # 5 分钟内第二次调用: 缓存命中 cached_chat("审查 user.py") # 输出: ✓ 缓存命中: 10000 tokens (0.1×)4.2 方案二:多断点缓存策略
def multi_breakpoint_cache(system_prompt, claude_md, conversation_history, current_input): """ 多断点缓存策略 在 system、CLAUDE.md、对话历史三个位置设置缓存断点 最大化缓存命中率 """ messages = [] # 将对话历史转为 content blocks if conversation_history: history_content = [] for msg in conversation_history: history_content.append({ "type": "text", "text": f"[{msg['role']}]: {msg['content']}\n" }) # 在历史末尾加缓存断点 history_content[-1]["cache_control"] = {"type": "ephemeral"} messages.append({ "role": "user", "content": history_content }) # 当前输入 messages.append({"role": "user", "content": current_input}) # 系统提示 + CLAUDE.md 两个缓存断点 system_blocks = [ { "type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"} # 断点 1 }, { "type": "text", "text": claude_md, "cache_control": {"type": "ephemeral"} # 断点 2 } ] response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=4096, system=system_blocks, messages=messages ) return response # 使用 system = "你是代码助手..." # 固定 claude_md = open("CLAUDE.md").read() # 固定 history = [ {"role": "user", "content": "分析 auth.py"}, {"role": "assistant", "content": "auth.py 分析完成..."} ] # 第一次:写入缓存 multi_breakpoint_cache(system, claude_md, history, "继续分析 user.py") # 第二次(5分钟内):命中缓存 history.append({"role": "assistant", "content": "user.py 分析完成..."}) multi_breakpoint_cache(system, claude_md, history, "分析 order.py")4.3 方案三:缓存预热策略
""" 缓存预热:在批量任务开始前先写入缓存 """ def warmup_cache(system_prompt, tools_definition): """预热缓存,确保后续请求命中""" # 发送一个最小请求来写入缓存 response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=10, # 最小输出 system=[{ "type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"} }], tools=tools_definition, messages=[{"role": "user", "content": "ok"}] ) cache_written = response.usage.cache_creation_input_tokens print(f"✓ 缓存预热完成: {cache_written:,} tokens 已缓存") return response def batch_process_with_cache(files, system_prompt): """批量处理 + 缓存复用""" # 1. 预热缓存 warmup_cache(system_prompt, get_tools()) # 2. 批量处理(5 分钟内完成以确保缓存命中) results = [] for filepath in files: content = open(filepath).read() response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=2048, system=[{ "type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"} }], messages=[{ "role": "user", "content": f"审查: {filepath}\n\n{content}" }] ) cache_read = response.usage.cache_read_input_tokens cost = calculate_cost(response) results.append({ "file": filepath, "result": response.content[0].text, "cache_hit": cache_read > 0, "cost": cost }) status = "✓" if cache_read > 0 else "✗" print(f" {status} {filepath}: ${cost:.4f}") # 汇总 total_cost = sum(r["cost"] for r in results) cache_hits = sum(1 for r in results if r["cache_hit"]) print(f"\n总计: {len(results)} 文件, ${total_cost:.4f}, " f"缓存命中 {cache_hits}/{len(results)}") return results # 使用 files = [f"src/{f}" for f in os.listdir("src") if f.endswith(".py")] batch_process_with_cache(files, SYSTEM_PROMPT)4.4 方案四:缓存保活策略
""" 缓存保活:在长对话中定期发送请求保持缓存不过期 """ import threading import time class CacheKeepAlive: """缓存保活管理器""" def __init__(self, client, system_prompt, interval=240): """ 参数: interval: 保活间隔(秒),默认 240s(4分钟,小于 5分钟 TTL) """ self.client = client self.system_prompt = system_prompt self.interval = interval self._running = False self._thread = None def _keepalive_loop(self): """保活循环""" while self._running: time.sleep(self.interval) if not self._running: break # 发送最小请求保持缓存 try: self.client.messages.create( model="claude-sonnet-4-20250514", max_tokens=1, system=[{ "type": "text", "text": self.system_prompt, "cache_control": {"type": "ephemeral"} }], messages=[{"role": "user", "content": "."}] ) print(f" [keepalive] 缓存已刷新") except Exception as e: print(f" [keepalive] 失败: {e}") def start(self): """启动保活""" self._running = True self._thread = threading.Thread(target=self._keepalive_loop, daemon=True) self._thread.start() print("✓ 缓存保活已启动") def stop(self): """停止保活""" self._running = False if self._thread: self._thread.join(timeout=5) print("✓ 缓存保活已停止") # 使用 keepalive = CacheKeepAlive(client, SYSTEM_PROMPT, interval=240) keepalive.start() # 长对话中,即使用户思考超过 5 分钟,缓存也不会过期 # ... 对话 ... # ... 用户思考 10 分钟 ... # 缓存仍然有效! keepalive.stop()4.5 方案五:性能基准测试
""" 性能基准测试:对比缓存前后的延迟和成本 """ import time import anthropic client = anthropic.Anthropic(api_key="sk-ant-xxx") SYSTEM_PROMPT = "你是代码助手。" * 500 # ~2500 tokens def benchmark_no_cache(calls=5): """无缓存基准""" total_time = 0 total_cost = 0 for i in range(calls): start = time.time() response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=100, system=SYSTEM_PROMPT, # 字符串形式,无缓存 messages=[{"role": "user", "content": f"test {i}"}] ) elapsed = time.time() - start input_t = response.usage.input_tokens output_t = response.usage.output_tokens cost = (input_t * 3 + output_t * 15) / 1_000_000 total_time += elapsed total_cost += cost print(f" #{i+1}: {elapsed:.2f}s, ${cost:.4f}") print(f"\n无缓存: 平均 {total_time/calls:.2f}s, ${total_cost:.4f}") return total_time / calls, total_cost def benchmark_with_cache(calls=5): """有缓存基准""" total_time = 0 total_cost = 0 for i in range(calls): start = time.time() response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=100, system=[{ "type": "text", "text": SYSTEM_PROMPT, "cache_control": {"type": "ephemeral"} }], messages=[{"role": "user", "content": f"test {i}"}] ) elapsed = time.time() - start input_t = response.usage.input_tokens output_t = response.usage.output_tokens cache_read = response.usage.cache_read_input_tokens cache_write = response.usage.cache_creation_input_tokens # 计算实际成本 cost = ( input_t * 3 + cache_write * 3.75 + cache_read * 0.3 + output_t * 15 ) / 1_000_000 cache_status = "命中" if cache_read > 0 else "写入" if cache_write > 0 else "无" total_time += elapsed total_cost += cost print(f" #{i+1}: {elapsed:.2f}s, ${cost:.4f} (缓存:{cache_status})") print(f"\n有缓存: 平均 {total_time/calls:.2f}s, ${total_cost:.4f}") return total_time / calls, total_cost # 运行基准 print("=== 无缓存 ===") no_cache_time, no_cache_cost = benchmark_no_cache() print("\n=== 有缓存 ===") with_cache_time, with_cache_cost = benchmark_with_cache() print(f"\n=== 对比 ===") print(f"延迟: {no_cache_time:.2f}s → {with_cache_time:.2f}s " f"({(1-with_cache_time/no_cache_time)*100:.0f}% 提升)") print(f"成本: ${no_cache_cost:.4f} → ${with_cache_cost:.4f} " f"({(1-with_cache_cost/no_cache_cost)*100:.0f}% 节省)")4.6 方案六:网络层优化
""" 网络层优化:连接复用、超时调优、就近接入 """ import anthropic import httpx # 1. 连接复用(SDK 默认已启用) # httpx.Client 自动复用 TCP 连接 # 2. 自定义 HTTP 客户端(调优连接池) custom_http = httpx.Client( timeout=httpx.Timeout( connect=5.0, # 连接超时 read=60.0, # 读取超时 write=10.0, # 写入超时 pool=5.0 # 连接池等待 ), limits=httpx.Limits( max_connections=10, max_keepalive_connections=5, keepalive_expiry=30.0 ), http2=True # 启用 HTTP/2 多路复用 ) client = anthropic.Anthropic( api_key="sk-ant-xxx", http_client=custom_http ) # 3. 流式输出减少首字节延迟 def stream_first_byte(prompt): """使用流式输出,获取首个 token 的延迟""" start = time.time() first_byte_time = None with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=4096, messages=[{"role": "user", "content": prompt}] ) as stream: for event in stream: if event.type == "content_block_delta" and first_byte_time is None: first_byte_time = time.time() - start print(f" 首字节延迟: {first_byte_time:.2f}s") break total_time = time.time() - start print(f" 总耗时: {total_time:.2f}s") # 流式 vs 非流式: # 非流式: 等待完整响应 → 用户等待 total_time # 流式: 首字节后即可开始阅读 → 用户感知延迟 first_byte_time4.7 方案七:并发优化
""" 并发优化:多请求并行处理 """ import anthropic from concurrent.futures import ThreadPoolExecutor, as_completed import threading # 线程安全的统计 class CostTracker: def __init__(self): self._lock = threading.Lock() self.total_cost = 0 self.total_tokens = 0 def add(self, cost, tokens): with self._lock: self.total_cost += cost self.total_tokens += tokens tracker = CostTracker() def process_file(filepath, client, system_prompt): """处理单个文件(可并发)""" content = open(filepath).read() response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=2048, system=[{ "type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"} }], messages=[{"role": "user", "content": f"审查: {filepath}\n{content}"}] ) cost = (response.usage.input_tokens * 3 + response.usage.output_tokens * 15) / 1_000_000 tracker.add(cost, response.usage.input_tokens + response.usage.output_tokens) return filepath, response.content[0].text def concurrent_batch(files, system_prompt, max_workers=5): """并发批量处理""" client = anthropic.Anthropic(api_key="sk-ant-xxx") start = time.time() results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit(process_file, f, client, system_prompt): f for f in files } for future in as_completed(futures): filepath = futures[future] try: result = future.result() results.append(result) print(f" ✓ {filepath}") except Exception as e: print(f" ✗ {filepath}: {e}") elapsed = time.time() - start print(f"\n并发 {max_workers}: {len(results)} 文件, {elapsed:.1f}s, " f"${tracker.total_cost:.4f}") return results # 对比不同并发度 import os files = [f"src/{f}" for f in os.listdir("src") if f.endswith(".py")][:20] for workers in [1, 3, 5, 10]: tracker = CostTracker() concurrent_batch(files, SYSTEM_PROMPT, max_workers=workers)5. 验证回归:性能调优验证
5.1 缓存命中率验证
def verify_cache(): """验证缓存是否生效""" system = "你是助手。" * 500 # >1024 tokens # 第一次请求 r1 = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=10, system=[{"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}], messages=[{"role": "user", "content": "hi"}] ) assert r1.usage.cache_creation_input_tokens > 0, "首次应写入缓存" print(f"✓ 首次缓存写入: {r1.usage.cache_creation_input_tokens} tokens") # 第二次请求(应命中) r2 = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=10, system=[{"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}], messages=[{"role": "user", "content": "hello"}] ) assert r2.usage.cache_read_input_tokens > 0, "第二次应命中缓存" print(f"✓ 缓存命中: {r2.usage.cache_read_input_tokens} tokens") # 成本对比 cost1 = r1.usage.cache_creation_input_tokens * 3.75 / 1_000_000 cost2 = r2.usage.cache_read_input_tokens * 0.3 / 1_000_000 print(f" 首次: ${cost1:.4f}, 命中: ${cost2:.4f}") print(f" 节省: {(1-cost2/cost1)*100:.0f}%") verify_cache()5.2 验证清单
| # | 验证项 | 预期 | 方法 |
|---|---|---|---|
| 1 | 缓存写入 | cache_creation > 0 | 首次请求检查 |
| 2 | 缓存命中 | cache_read > 0 | 5分钟内二次请求 |
| 3 | 缓存过期 | 5分钟后 cache_read=0 | 等待后请求 |
| 4 | 成本降低 | >50% | 对比有/无缓存 |
| 5 | 延迟降低 | >20% | 对比响应时间 |
| 6 | 并发安全 | 无竞态 | 多线程测试 |
| 7 | 保活有效 | 超时后仍命中 | keepalive 测试 |
| 8 | 流式首字节 | <2s | stream 测试 |
6. 避坑最佳实践
6.1 缓存使用原则
原则 1: 固定前缀 — 系统提示和 CLAUDE.md 保持不变 原则 2: 5分钟窗口 — 批量任务在 5 分钟内完成 原则 3: 最小 1024 — 缓存内容至少 1024 tokens 原则 4: 最多 4 断点 — cache_control 最多 4 个 原则 5: 保活策略 — 长对话用 keepalive 原则 6: 预热缓存 — 批量前先 warmup 原则 7: 监控命中 — 检查 cache_read_input_tokens 原则 8: 并发复用 — 多线程共享缓存6.2 缓存失效原因
| 原因 | 检查方法 | 解决 |
|---|---|---|
| 前缀变化 | diff 前缀 | 保持完全一致 |
| TTL 过期 | 间隔 >5 分钟 | 用 keepalive |
| 内容 <1024 tokens | 检查长度 | 增加缓存内容 |
| 超过 4 断点 | 检查 cache_control 数 | 减少断点 |
| 不同组织 | API Key 不同 | 同一组织 |
6.3 性能调优检查清单
- 系统提示使用 cache_control
- CLAUDE.md 使用 cache_control
- 对话历史增量缓存
- 批量任务在 5 分钟内完成
- 长对话使用 keepalive
- 流式输出减少感知延迟
- 并发处理批量任务
- HTTP/2 连接复用
- 监控缓存命中率
- 定期成本对比
7. 附录:缓存参数速查表
7.1 缓存定价
| 模型 | 正常输入 | 缓存写入 (1.25×) | 缓存读取 (0.1×) |
|---|---|---|---|
| Opus 4 | $15/M | $18.75/M | $1.50/M |
| Sonnet 4 | $3/M | $3.75/M | $0.30/M |
| Haiku 3.5 | $0.25/M | $0.31/M | $0.025/M |
7.2 缓存限制
| 限制 | 值 |
|---|---|
| 最小缓存长度 (Opus/Sonnet) | 1,024 tokens |
| 最小缓存长度 (Haiku) | 2,048 tokens |
| 缓存 TTL | 5 分钟 |
| 最大 cache_control 断点 | 4 |
| 缓存隔离 | 按组织 |
7.3 性能优化手段对比
| 手段 | 成本影响 | 延迟影响 | 复杂度 |
|---|---|---|---|
| Prompt Caching | -50~90% | -20~40% | 低 |
| 流式输出 | 0% | -60% 感知 | 低 |
| 并发处理 | 0% | -N倍 | 中 |
| keepalive | +1% | 0% | 中 |
| HTTP/2 | 0% | -10% | 低 |
| 模型降级 | -80% | -50% | 低 |
结语
Prompt Caching 是 Anthropic API 提供的强大成本优化机制。通过固定前缀、多断点缓存、缓存预热、保活策略、并发复用等手段,可以在长对话和批量任务中节省 50-90% 的输入 Token 成本。
核心要点回顾:
- 固定前缀:系统提示和 CLAUDE.md 保持不变,确保缓存命中
- 多断点缓存:在 system、CLAUDE.md、对话历史设置缓存断点
- 缓存预热:批量任务前先 warmup,后续请求全部命中
- 保活策略:长对话中定期请求保持缓存不过期
- 并发复用:多线程共享同一缓存前缀
- 流式输出:减少用户感知延迟
- 监控命中:检查
cache_read_input_tokens确认缓存生效 - 成本对比:定期对比有/无缓存的成本差异