【Claude】缓存机制与性能调优指南 — 已解决

【Claude】缓存机制与性能调优指南 — 已解决

适用版本:Claude Code v1.0.x 及以上
受影响场景:重复请求优化、长上下文缓存、API 延迟降低、批量任务加速
阅读时长:约 25 分钟


目录

  1. 问题现象
  2. 原理深挖:Prompt Caching 机制
  3. 根因分析:性能瓶颈六大根源
  4. 多方案解决:从缓存到调优
  5. 验证回归:性能调优验证
  6. 避坑最佳实践
  7. 附录:缓存参数速查表

1. 问题现象

1.1 典型问题表现

问题一:相同上下文重复发送导致高成本

# 场景:10 轮对话,每轮系统提示 + CLAUDE.md = 15K tokens # 每轮都重新发送这 15K tokens # 10 轮 = 150K tokens 重复发送 × $3/M = $0.45 # 如果启用缓存:只首次全价,后续 10% → 节省 80%+

问题二:首次响应延迟过高

# 冷启动:系统提示 + 工具定义 + CLAUDE.md + 用户输入 # = 30K tokens 输入 → API 处理 3-5 秒 # 用户感知延迟高

问题三:批量任务慢

# 100 个文件逐个分析 # 每次都重新构建上下文 → 重复开销 # 总耗时: 100 × 5s = 500s

问题四:缓存未命中不知原因

# 启用了 cache_control 但 cache_creation_input_tokens 总是 0 # 不知道为什么缓存不生效

问题五:缓存过期导致重复计费

# 缓存 TTL 5 分钟 # 第 6 分钟发送请求 → 缓存已过期 → 重新全价计费 # 长对话中频繁过期

2. 原理深挖:Prompt Caching 机制

2.1 什么是 Prompt Caching

Anthropic API 提供的 Prompt Caching 允许将请求的前缀部分缓存,后续请求如果前缀相同,只需支付 10% 的输入 Token 费用。

无缓存请求: 请求 1: [系统提示 10K][CLAUDE.md 5K][对话历史 5K] → 全价 20K tokens 请求 2: [系统提示 10K][CLAUDE.md 5K][对话历史 8K] → 全价 23K tokens 请求 3: [系统提示 10K][CLAUDE.md 5K][对话历史 12K] → 全价 27K tokens 总计: 70K tokens × $3/M = $0.21 有缓存请求: 请求 1: [系统提示 10K][CLAUDE.md 5K (cached)][对话历史 5K] → 15K 全价 + 10K 缓存写入(1.25倍) = $0.056 请求 2: [系统提示 10K (cached)][CLAUDE.md 5K (cached)][对话历史 8K] → 3K 全价 + 15K 缓存读取(0.1倍) = $0.011 请求 3: [系统提示 10K (cached)][CLAUDE.md 5K (cached)][对话历史 12K] → 4K 全价 + 15K 缓存读取(0.1倍) = $0.015 总计: ~$0.082 (节省 61%)

2.2 缓存定价

操作价格倍率说明
正常输入1.0×标准 Token 价格
缓存写入1.25×首次缓存,稍贵
缓存读取0.1×命中缓存,极便宜
Sonnet 4 示例: 正常输入: $3.00/M tokens 缓存写入: $3.75/M tokens (1.25×) 缓存读取: $0.30/M tokens (0.1×)

2.3 缓存规则

缓存条件: 1. 前缀必须完全匹配(包括空格、换行) 2. 最小缓存长度: 1024 tokens (Sonnet/Opus) / 2048 tokens (Haiku) 3. 缓存 TTL: 5 分钟 4. 最多 4 个 cache_control 断点 5. 缓存按组织级别隔离 缓存位置: 请求结构: [system][messages[0]][messages[1]]... cache_control 只能放在 content block 的最后一个 system: { "type": "text", "text": "...", "cache_control": {"type": "ephemeral"} ← 缓存点 1 } messages: [ {"role": "user", "content": [ {"type": "text", "text": "固定前缀", "cache_control": {"type": "ephemeral"}}, ← 缓存点 2 {"type": "text", "text": "变化部分"} ]} ]

2.4 Claude Code 的缓存策略

Claude Code 内部自动使用 Prompt Caching:

Claude Code 缓存层次: Layer 1: System Prompt (系统提示) - 工具定义 + 行为规则 + 安全约束 - ~10K tokens - 始终缓存(每次请求前缀相同) Layer 2: CLAUDE.md (记忆文件) - 项目配置和规范 - ~3K tokens - 始终缓存(会话内不变) Layer 3: 对话历史 - 前面的对话消息 - 增量缓存(新消息追加到尾部) - 最近的 N 轮对话被缓存 Layer 4: 当前用户输入 - 不缓存(每次不同)

2.5 缓存命中与未命中

# API 响应中的缓存指标 response = client.messages.create(...) response.usage.cache_creation_input_tokens # 缓存写入的 Token 数 response.usage.cache_read_input_tokens # 缓存读取的 Token 数 response.usage.input_tokens # 未缓存的输入 Token 数 # 判断缓存状态 if response.usage.cache_read_input_tokens > 0: print("✓ 缓存命中") elif response.usage.cache_creation_input_tokens > 0: print("→ 缓存写入(首次)") else: print("✗ 无缓存")

3. 根因分析:性能瓶颈六大根源

3.1 根源一:未利用 Prompt Caching

SDK 直接调用时不自动启用缓存,需要手动设置cache_control

3.2 根源二:上下文顺序不固定

如果系统提示或对话前缀在每次请求中有微小变化(多一个空格、顺序变化),缓存就会失效。

3.3 根源三:缓存过期

5 分钟 TTL 内没有发送请求,缓存自动清除。长对话中如果思考时间超过 5 分钟,每次都要重新缓存。

3.4 根源四:批量任务无复用

批量处理多个文件时,如果每个请求都构建新的上下文,无法复用缓存。

3.5 根源五:工具定义重复发送

每次 API 调用都发送完整的工具定义(~5K tokens),即使工具列表没有变化。

3.6 根源六:网络延迟未优化

没有使用流式输出、连接复用、就近接入等网络优化手段。


4. 多方案解决:从缓存到调优

4.1 方案一:SDK 手动启用缓存

import anthropic client = anthropic.Anthropic(api_key="sk-ant-xxx") SYSTEM_PROMPT = """你是一个代码审查助手。遵循以下规范: 1. 检查安全漏洞 2. 检查性能问题 3. 检查代码风格 ...(长系统提示 2000 tokens) """ def cached_chat(user_message, messages=None): """启用 Prompt Caching 的调用""" # 系统提示加缓存标记 system_block = { "type": "text", "text": SYSTEM_PROMPT, "cache_control": {"type": "ephemeral"} } # 构建消息 if messages is None: messages = [] messages.append({"role": "user", "content": user_message}) # 对历史消息的最后一条加缓存 if len(messages) >= 2: # 对话历史的前缀加缓存 history_text = json.dumps(messages[:-1]) # 实际操作中,cache_control 加在 content block 上 response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=4096, system=[system_block], # 系统提示缓存 messages=messages, tools=get_tools_with_cache() # 工具定义缓存 ) # 报告缓存状态 cache_read = response.usage.cache_read_input_tokens cache_write = response.usage.cache_creation_input_tokens non_cached = response.usage.input_tokens if cache_read > 0: print(f"✓ 缓存命中: {cache_read:,} tokens (0.1×)") if cache_write > 0: print(f"→ 缓存写入: {cache_write:,} tokens (1.25×)") print(f" 非缓存: {non_cached:,} tokens (1.0×)") return response # 第一次调用: 缓存写入 cached_chat("审查 auth.py") # 输出: → 缓存写入: 10000 tokens (1.25×) # 5 分钟内第二次调用: 缓存命中 cached_chat("审查 user.py") # 输出: ✓ 缓存命中: 10000 tokens (0.1×)

4.2 方案二:多断点缓存策略

def multi_breakpoint_cache(system_prompt, claude_md, conversation_history, current_input): """ 多断点缓存策略 在 system、CLAUDE.md、对话历史三个位置设置缓存断点 最大化缓存命中率 """ messages = [] # 将对话历史转为 content blocks if conversation_history: history_content = [] for msg in conversation_history: history_content.append({ "type": "text", "text": f"[{msg['role']}]: {msg['content']}\n" }) # 在历史末尾加缓存断点 history_content[-1]["cache_control"] = {"type": "ephemeral"} messages.append({ "role": "user", "content": history_content }) # 当前输入 messages.append({"role": "user", "content": current_input}) # 系统提示 + CLAUDE.md 两个缓存断点 system_blocks = [ { "type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"} # 断点 1 }, { "type": "text", "text": claude_md, "cache_control": {"type": "ephemeral"} # 断点 2 } ] response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=4096, system=system_blocks, messages=messages ) return response # 使用 system = "你是代码助手..." # 固定 claude_md = open("CLAUDE.md").read() # 固定 history = [ {"role": "user", "content": "分析 auth.py"}, {"role": "assistant", "content": "auth.py 分析完成..."} ] # 第一次:写入缓存 multi_breakpoint_cache(system, claude_md, history, "继续分析 user.py") # 第二次(5分钟内):命中缓存 history.append({"role": "assistant", "content": "user.py 分析完成..."}) multi_breakpoint_cache(system, claude_md, history, "分析 order.py")

4.3 方案三:缓存预热策略

""" 缓存预热:在批量任务开始前先写入缓存 """ def warmup_cache(system_prompt, tools_definition): """预热缓存,确保后续请求命中""" # 发送一个最小请求来写入缓存 response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=10, # 最小输出 system=[{ "type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"} }], tools=tools_definition, messages=[{"role": "user", "content": "ok"}] ) cache_written = response.usage.cache_creation_input_tokens print(f"✓ 缓存预热完成: {cache_written:,} tokens 已缓存") return response def batch_process_with_cache(files, system_prompt): """批量处理 + 缓存复用""" # 1. 预热缓存 warmup_cache(system_prompt, get_tools()) # 2. 批量处理(5 分钟内完成以确保缓存命中) results = [] for filepath in files: content = open(filepath).read() response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=2048, system=[{ "type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"} }], messages=[{ "role": "user", "content": f"审查: {filepath}\n\n{content}" }] ) cache_read = response.usage.cache_read_input_tokens cost = calculate_cost(response) results.append({ "file": filepath, "result": response.content[0].text, "cache_hit": cache_read > 0, "cost": cost }) status = "✓" if cache_read > 0 else "✗" print(f" {status} {filepath}: ${cost:.4f}") # 汇总 total_cost = sum(r["cost"] for r in results) cache_hits = sum(1 for r in results if r["cache_hit"]) print(f"\n总计: {len(results)} 文件, ${total_cost:.4f}, " f"缓存命中 {cache_hits}/{len(results)}") return results # 使用 files = [f"src/{f}" for f in os.listdir("src") if f.endswith(".py")] batch_process_with_cache(files, SYSTEM_PROMPT)

4.4 方案四:缓存保活策略

""" 缓存保活:在长对话中定期发送请求保持缓存不过期 """ import threading import time class CacheKeepAlive: """缓存保活管理器""" def __init__(self, client, system_prompt, interval=240): """ 参数: interval: 保活间隔(秒),默认 240s(4分钟,小于 5分钟 TTL) """ self.client = client self.system_prompt = system_prompt self.interval = interval self._running = False self._thread = None def _keepalive_loop(self): """保活循环""" while self._running: time.sleep(self.interval) if not self._running: break # 发送最小请求保持缓存 try: self.client.messages.create( model="claude-sonnet-4-20250514", max_tokens=1, system=[{ "type": "text", "text": self.system_prompt, "cache_control": {"type": "ephemeral"} }], messages=[{"role": "user", "content": "."}] ) print(f" [keepalive] 缓存已刷新") except Exception as e: print(f" [keepalive] 失败: {e}") def start(self): """启动保活""" self._running = True self._thread = threading.Thread(target=self._keepalive_loop, daemon=True) self._thread.start() print("✓ 缓存保活已启动") def stop(self): """停止保活""" self._running = False if self._thread: self._thread.join(timeout=5) print("✓ 缓存保活已停止") # 使用 keepalive = CacheKeepAlive(client, SYSTEM_PROMPT, interval=240) keepalive.start() # 长对话中,即使用户思考超过 5 分钟,缓存也不会过期 # ... 对话 ... # ... 用户思考 10 分钟 ... # 缓存仍然有效! keepalive.stop()

4.5 方案五:性能基准测试

""" 性能基准测试:对比缓存前后的延迟和成本 """ import time import anthropic client = anthropic.Anthropic(api_key="sk-ant-xxx") SYSTEM_PROMPT = "你是代码助手。" * 500 # ~2500 tokens def benchmark_no_cache(calls=5): """无缓存基准""" total_time = 0 total_cost = 0 for i in range(calls): start = time.time() response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=100, system=SYSTEM_PROMPT, # 字符串形式,无缓存 messages=[{"role": "user", "content": f"test {i}"}] ) elapsed = time.time() - start input_t = response.usage.input_tokens output_t = response.usage.output_tokens cost = (input_t * 3 + output_t * 15) / 1_000_000 total_time += elapsed total_cost += cost print(f" #{i+1}: {elapsed:.2f}s, ${cost:.4f}") print(f"\n无缓存: 平均 {total_time/calls:.2f}s, ${total_cost:.4f}") return total_time / calls, total_cost def benchmark_with_cache(calls=5): """有缓存基准""" total_time = 0 total_cost = 0 for i in range(calls): start = time.time() response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=100, system=[{ "type": "text", "text": SYSTEM_PROMPT, "cache_control": {"type": "ephemeral"} }], messages=[{"role": "user", "content": f"test {i}"}] ) elapsed = time.time() - start input_t = response.usage.input_tokens output_t = response.usage.output_tokens cache_read = response.usage.cache_read_input_tokens cache_write = response.usage.cache_creation_input_tokens # 计算实际成本 cost = ( input_t * 3 + cache_write * 3.75 + cache_read * 0.3 + output_t * 15 ) / 1_000_000 cache_status = "命中" if cache_read > 0 else "写入" if cache_write > 0 else "无" total_time += elapsed total_cost += cost print(f" #{i+1}: {elapsed:.2f}s, ${cost:.4f} (缓存:{cache_status})") print(f"\n有缓存: 平均 {total_time/calls:.2f}s, ${total_cost:.4f}") return total_time / calls, total_cost # 运行基准 print("=== 无缓存 ===") no_cache_time, no_cache_cost = benchmark_no_cache() print("\n=== 有缓存 ===") with_cache_time, with_cache_cost = benchmark_with_cache() print(f"\n=== 对比 ===") print(f"延迟: {no_cache_time:.2f}s → {with_cache_time:.2f}s " f"({(1-with_cache_time/no_cache_time)*100:.0f}% 提升)") print(f"成本: ${no_cache_cost:.4f} → ${with_cache_cost:.4f} " f"({(1-with_cache_cost/no_cache_cost)*100:.0f}% 节省)")

4.6 方案六:网络层优化

""" 网络层优化:连接复用、超时调优、就近接入 """ import anthropic import httpx # 1. 连接复用(SDK 默认已启用) # httpx.Client 自动复用 TCP 连接 # 2. 自定义 HTTP 客户端(调优连接池) custom_http = httpx.Client( timeout=httpx.Timeout( connect=5.0, # 连接超时 read=60.0, # 读取超时 write=10.0, # 写入超时 pool=5.0 # 连接池等待 ), limits=httpx.Limits( max_connections=10, max_keepalive_connections=5, keepalive_expiry=30.0 ), http2=True # 启用 HTTP/2 多路复用 ) client = anthropic.Anthropic( api_key="sk-ant-xxx", http_client=custom_http ) # 3. 流式输出减少首字节延迟 def stream_first_byte(prompt): """使用流式输出,获取首个 token 的延迟""" start = time.time() first_byte_time = None with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=4096, messages=[{"role": "user", "content": prompt}] ) as stream: for event in stream: if event.type == "content_block_delta" and first_byte_time is None: first_byte_time = time.time() - start print(f" 首字节延迟: {first_byte_time:.2f}s") break total_time = time.time() - start print(f" 总耗时: {total_time:.2f}s") # 流式 vs 非流式: # 非流式: 等待完整响应 → 用户等待 total_time # 流式: 首字节后即可开始阅读 → 用户感知延迟 first_byte_time

4.7 方案七:并发优化

""" 并发优化:多请求并行处理 """ import anthropic from concurrent.futures import ThreadPoolExecutor, as_completed import threading # 线程安全的统计 class CostTracker: def __init__(self): self._lock = threading.Lock() self.total_cost = 0 self.total_tokens = 0 def add(self, cost, tokens): with self._lock: self.total_cost += cost self.total_tokens += tokens tracker = CostTracker() def process_file(filepath, client, system_prompt): """处理单个文件(可并发)""" content = open(filepath).read() response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=2048, system=[{ "type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"} }], messages=[{"role": "user", "content": f"审查: {filepath}\n{content}"}] ) cost = (response.usage.input_tokens * 3 + response.usage.output_tokens * 15) / 1_000_000 tracker.add(cost, response.usage.input_tokens + response.usage.output_tokens) return filepath, response.content[0].text def concurrent_batch(files, system_prompt, max_workers=5): """并发批量处理""" client = anthropic.Anthropic(api_key="sk-ant-xxx") start = time.time() results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit(process_file, f, client, system_prompt): f for f in files } for future in as_completed(futures): filepath = futures[future] try: result = future.result() results.append(result) print(f" ✓ {filepath}") except Exception as e: print(f" ✗ {filepath}: {e}") elapsed = time.time() - start print(f"\n并发 {max_workers}: {len(results)} 文件, {elapsed:.1f}s, " f"${tracker.total_cost:.4f}") return results # 对比不同并发度 import os files = [f"src/{f}" for f in os.listdir("src") if f.endswith(".py")][:20] for workers in [1, 3, 5, 10]: tracker = CostTracker() concurrent_batch(files, SYSTEM_PROMPT, max_workers=workers)

5. 验证回归:性能调优验证

5.1 缓存命中率验证

def verify_cache(): """验证缓存是否生效""" system = "你是助手。" * 500 # >1024 tokens # 第一次请求 r1 = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=10, system=[{"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}], messages=[{"role": "user", "content": "hi"}] ) assert r1.usage.cache_creation_input_tokens > 0, "首次应写入缓存" print(f"✓ 首次缓存写入: {r1.usage.cache_creation_input_tokens} tokens") # 第二次请求(应命中) r2 = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=10, system=[{"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}], messages=[{"role": "user", "content": "hello"}] ) assert r2.usage.cache_read_input_tokens > 0, "第二次应命中缓存" print(f"✓ 缓存命中: {r2.usage.cache_read_input_tokens} tokens") # 成本对比 cost1 = r1.usage.cache_creation_input_tokens * 3.75 / 1_000_000 cost2 = r2.usage.cache_read_input_tokens * 0.3 / 1_000_000 print(f" 首次: ${cost1:.4f}, 命中: ${cost2:.4f}") print(f" 节省: {(1-cost2/cost1)*100:.0f}%") verify_cache()

5.2 验证清单

#验证项预期方法
1缓存写入cache_creation > 0首次请求检查
2缓存命中cache_read > 05分钟内二次请求
3缓存过期5分钟后 cache_read=0等待后请求
4成本降低>50%对比有/无缓存
5延迟降低>20%对比响应时间
6并发安全无竞态多线程测试
7保活有效超时后仍命中keepalive 测试
8流式首字节<2sstream 测试

6. 避坑最佳实践

6.1 缓存使用原则

原则 1: 固定前缀 — 系统提示和 CLAUDE.md 保持不变 原则 2: 5分钟窗口 — 批量任务在 5 分钟内完成 原则 3: 最小 1024 — 缓存内容至少 1024 tokens 原则 4: 最多 4 断点 — cache_control 最多 4 个 原则 5: 保活策略 — 长对话用 keepalive 原则 6: 预热缓存 — 批量前先 warmup 原则 7: 监控命中 — 检查 cache_read_input_tokens 原则 8: 并发复用 — 多线程共享缓存

6.2 缓存失效原因

原因检查方法解决
前缀变化diff 前缀保持完全一致
TTL 过期间隔 >5 分钟用 keepalive
内容 <1024 tokens检查长度增加缓存内容
超过 4 断点检查 cache_control 数减少断点
不同组织API Key 不同同一组织

6.3 性能调优检查清单

  • 系统提示使用 cache_control
  • CLAUDE.md 使用 cache_control
  • 对话历史增量缓存
  • 批量任务在 5 分钟内完成
  • 长对话使用 keepalive
  • 流式输出减少感知延迟
  • 并发处理批量任务
  • HTTP/2 连接复用
  • 监控缓存命中率
  • 定期成本对比

7. 附录:缓存参数速查表

7.1 缓存定价

模型正常输入缓存写入 (1.25×)缓存读取 (0.1×)
Opus 4$15/M$18.75/M$1.50/M
Sonnet 4$3/M$3.75/M$0.30/M
Haiku 3.5$0.25/M$0.31/M$0.025/M

7.2 缓存限制

限制
最小缓存长度 (Opus/Sonnet)1,024 tokens
最小缓存长度 (Haiku)2,048 tokens
缓存 TTL5 分钟
最大 cache_control 断点4
缓存隔离按组织

7.3 性能优化手段对比

手段成本影响延迟影响复杂度
Prompt Caching-50~90%-20~40%
流式输出0%-60% 感知
并发处理0%-N倍
keepalive+1%0%
HTTP/20%-10%
模型降级-80%-50%

结语

Prompt Caching 是 Anthropic API 提供的强大成本优化机制。通过固定前缀、多断点缓存、缓存预热、保活策略、并发复用等手段,可以在长对话和批量任务中节省 50-90% 的输入 Token 成本。

核心要点回顾:

  1. 固定前缀:系统提示和 CLAUDE.md 保持不变,确保缓存命中
  2. 多断点缓存:在 system、CLAUDE.md、对话历史设置缓存断点
  3. 缓存预热:批量任务前先 warmup,后续请求全部命中
  4. 保活策略:长对话中定期请求保持缓存不过期
  5. 并发复用:多线程共享同一缓存前缀
  6. 流式输出:减少用户感知延迟
  7. 监控命中:检查cache_read_input_tokens确认缓存生效
  8. 成本对比:定期对比有/无缓存的成本差异