Spring Boot 4.1:用 MongoDB JobRepository 把 Agent 长任务做成可恢复批处理
同步 Agent 适合“回答问题”,Batch Agent 适合“完成一批可恢复任务”。当 Agent 开始处理成百上千个 item,Spring Batch 的 JobRepository、Step、Retry、Skip、Restart 比手写循环更接近生产系统需要的运行时。
分享日期:2026-06-23
主题:Java / Spring Boot 4.1.0 / Spring Batch 6.0.4 / MongoDB JobRepository / Spring AI 2.0.0 / Agent Job / Spring Cloud 2025.1.2
版本背景:截至 2026-06-23,Spring 官方项目页显示 Spring Boot4.1.0、Spring AI2.0.0、Spring Cloud2025.1.2为当前稳定入口;Spring Boot 4.1 文档已经把 Spring Batch 的 JobRepository 存储列为 In-memory、JDBC、MongoDB 三类。
1. 为什么今天值得关注
前几天我们连续看了 Spring AI 2.0 的 MCP、Agent 治理、可观测性、评估、递归 Advisors、Spring Modulith 事件一致性和 JSpecify 空安全。今天适合补一块更贴近生产运行的问题:Agent 的长耗时任务,应该怎么可靠地跑完、重试、恢复和审计。
很多 Agent Demo 都是一次 HTTP 调用:
String answer = chatClient.prompt() .user(userInput) .tools(agentTools) .call() .content();这适合问答、摘要、代码建议、轻量工具调用。一旦进入企业场景,任务经常变成长流程:
- 扫描一批需求文档,逐个生成测试用例。
- 给几万条工单做分类、补全、去重和风险标注。
- 对知识库做增量切片、Embedding、索引和质量评估。
- 让 Agent 调用多个内部系统,生成迁移计划或巡检报告。
- 对失败样本进行二次评估、人工复核和补偿执行。
这类任务有几个共同特征:耗时长、分片多、失败不可避免、需要断点续跑、必须能追溯。它们不应该只靠一个同步 Controller、一个@Async方法或一段手写 while 循环撑住。
Spring Boot 4.1 对 Spring Batch + MongoDB 的自动配置支持,给 Java 团队提供了一个新选择:如果团队本来已经把 Agent 运行态数据、文档片段、工具调用记录、向量元数据或审计事件放在 MongoDB 里,现在可以把 Batch 元数据也放进同一类基础设施,而不是为了 JobRepository 额外引入一套关系型数据库。
一句话:Agent 的“思考”和“工具调用”可以由 Spring AI 管,Agent 的“长任务生命周期”应该交给 Spring Batch 这类可恢复运行时来管。Boot 4.1 的 MongoDB JobRepository 让这件事更容易落到现有 NoSQL 技术栈里。
2. 版本坐标与事实边界
今天这篇分享基于下面几个官方事实:
- Spring Boot 4.1.0:当前 Spring Boot 项目页展示的稳定版本。Boot 4.1 Release Notes 明确新增 Spring Batch 使用 MongoDB 的自动配置,并提供新的 Batch MongoDB starter。
- Spring Batch 6.0.4:当前 Spring Batch 参考文档展示的稳定文档版本。Batch 文档说明
JobRepository用来持久化JobExecution、StepExecution等批处理元数据,并提供 JDBC 与 MongoDB 两类数据库实现。 - Spring AI 2.0.0:当前 Spring AI 项目页展示的稳定版本。Spring AI 工具调用文档说明,模型只会请求工具调用,真正执行工具的是客户端应用;
ChatClient中的工具调用生命周期由ToolCallingAdvisor和ToolCallingManager管理。 - Spring Cloud 2025.1.2:当前 Spring Cloud 项目页展示的稳定版本,对应
2025.1.xOakwood release train。Spring Cloud 项目页也提醒,Cloud 版本要按 Boot 版本兼容矩阵选择。
这里要分清两件事:
第一,MongoDB JobRepository 解决的是 Batch 元数据持久化,不是替代业务库、向量库或 Agent Memory。它记录的是作业实例、作业执行、步骤执行、执行上下文、状态、失败和重启信息。
第二,Spring Batch 不是让 Agent 变“智能”的框架。它解决的是长任务的工程问题:切分、事务、重试、跳过、恢复、并行、监控和审计。模型推理、工具定义、结构化输出、MCP、RAG 仍然属于 Spring AI 和业务层的职责。
3. MongoDB JobRepository 到底改变了什么
过去很多 Spring Batch 项目默认使用 JDBC JobRepository。这个选择成熟、稳定、易审计,也适合多数交易型系统。但 Agent 应用的运行态数据经常天然偏文档模型:
- 文档切片、解析结果、Embedding 元数据。
- 工具调用输入输出摘要。
- 模型响应、评估分数、引用来源。
- Agent 计划、步骤状态、人工复核备注。
- 多租户、多模型、多版本 Prompt 的执行上下文。
这些对象不一定适合拆成很多关系表。团队如果已经用 MongoDB 保存 Agent 运行态数据,再为了 Batch 元数据单独维护一套关系数据库,架构会更重。
Boot 4.1 的意义是把这个选择变成一等自动配置入口:当应用使用 Spring Batch 并引入对应 MongoDB 支持后,Boot 可以帮你自动配置 MongoDB-backed JobRepository,并提供 schema 初始化配置。
概念上可以这样理解:
flowchart LR Api[Agent API] --> Launcher[JobLauncher] Launcher --> Job[Spring Batch Job] Job --> Step1[Read Work Items] Step1 --> Step2[Call Spring AI Agent] Step2 --> Step3[Persist Result] Job --> Repo[(MongoDB JobRepository)] Step2 --> Audit[(Agent Audit Store)] Step3 --> Domain[(Business Data / Vector Store)]JobRepository不保存所有业务结果,它保存“任务跑到哪里了”。业务结果、审计摘要、向量数据、工具调用详情可以放在独立集合或独立系统里。这样才能避免 Batch 元数据集合被业务大对象撑爆。
4. 适合用 Batch 承载的 Agent 场景
不是所有 Agent 请求都要变成 Batch Job。下面这些场景更适合:
| 场景 | 为什么适合 Batch | Agent 负责什么 |
|---|---|---|
| 知识库增量入库 | 文件多、步骤固定、失败可重试 | 摘要、标签、切片质量判断 |
| 工单批量分类 | 记录多、可分片、需要跳过坏数据 | 意图识别、优先级判断、原因解释 |
| 代码仓库巡检 | 耗时长、需要报告和断点续跑 | 规则解释、修复建议、风险分级 |
| 数据迁移评估 | 多源读取、结果可汇总 | 字段映射建议、异常解释 |
| Agent 评估回归 | 样本集固定、指标可聚合 | 生成回答、LLM-as-a-judge 评分 |
不太适合的场景:
- 用户等在页面上的低延迟聊天。
- 一次性的小工具调用。
- 状态必须由外部分布式工作流系统统一编排的跨团队流程。
- 强事务、强一致、必须同步返回的业务动作。
一个实用判断标准:如果你已经在讨论“失败后从第几条继续”“跑一半怎么停”“重试几次算失败”“怎么查看每个分片的状态”,那就不要只靠普通异步任务了,应该考虑 Batch、Workflow 或队列式任务运行时。
5. 最小配置思路
在 Boot 4.1 中,Spring Batch 文档展示的 JobRepository 存储类型包括 In-memory、JDBC 和 MongoDB。MongoDB schema 初始化可以通过配置打开:
spring: batch: data: mongodb: schema: initialize: true生产环境建议谨慎使用自动初始化:
- 本地开发、测试环境可以打开,减少环境准备成本。
- 预发和生产更建议由变更脚本或平台流程创建集合和索引。
- 如果需要完全控制 Batch 配置,可以使用
@EnableBatchProcessing或继承DefaultBatchConfiguration,让 Boot 自动配置退让,再按 Spring Batch API 显式配置。
一个 Agent 批处理应用的依赖边界通常是:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-batch-data-mongodb</artifactId> </dependency> <dependency> <groupId>org.springframework.ai</groupId> <artifactId>spring-ai-starter-model-openai</artifactId> </dependency> </dependencies>具体 artifact 名称以当前 Boot 4.1 依赖清单和 Initializr 生成结果为准。Release Notes 中提到的是新的 Batch MongoDB starter;项目落地时不要手写猜版本,优先让 Spring Boot BOM 管理依赖。
6. 把 Agent 调用放进 Step,而不是放进 Controller
常见错误是 Controller 里直接循环调用模型:
@PostMapping("/agent/import") String importDocuments(@RequestBody ImportRequest request) { for (String documentId : request.documentIds()) { agentService.analyze(documentId); } return "ok"; }这段代码的问题不是不能跑,而是运行态不可治理:HTTP 超时、进程重启、部分失败、重复提交、审计补偿都会变复杂。更合理的形态是:Controller 只负责提交 Job,Batch Step 负责逐条处理。
@RestController class AgentJobController { private final JobLauncher jobLauncher; private final Job documentAnalysisJob; AgentJobController(JobLauncher jobLauncher, Job documentAnalysisJob) { this.jobLauncher = jobLauncher; this.documentAnalysisJob = documentAnalysisJob; } @PostMapping("/agent/jobs/document-analysis") JobExecutionResponse launch(@RequestBody DocumentAnalysisRequest request) throws Exception { JobParameters parameters = new JobParametersBuilder() .addString("tenantId", request.tenantId()) .addString("batchId", request.batchId()) .addLong("submittedAt", System.currentTimeMillis()) .toJobParameters(); JobExecution execution = jobLauncher.run(documentAnalysisJob, parameters); return new JobExecutionResponse(execution.getJobId(), execution.getStatus().name()); } }Batch Job 则负责读取任务项、调用 Agent、写入结果:
@Bean Job documentAnalysisJob(JobRepository jobRepository, Step analyzeDocumentStep) { return new JobBuilder("documentAnalysisJob", jobRepository) .start(analyzeDocumentStep) .build(); } @Bean Step analyzeDocumentStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, ItemReader<DocumentTask> reader, ItemProcessor<DocumentTask, DocumentAnalysis> processor, ItemWriter<DocumentAnalysis> writer) { return new StepBuilder("analyzeDocumentStep", jobRepository) .<DocumentTask, DocumentAnalysis>chunk(20, transactionManager) .reader(reader) .processor(processor) .writer(writer) .faultTolerant() .retry(TransientAiException.class) .retryLimit(3) .skip(BadDocumentException.class) .skipLimit(100) .build(); }这里的关键点是:Agent 调用不再是一个不可恢复的循环,而是 Batch 的ItemProcessor。每条数据的成功、失败、重试和跳过都能进入 Batch 生命周期。
7. Agent Processor 的边界设计
ItemProcessor里可以调用 Spring AI,但不要把所有逻辑都塞进 Prompt。建议把边界拆清楚:
class DocumentAgentProcessor implements ItemProcessor<DocumentTask, DocumentAnalysis> { private final ChatClient chatClient; private final DocumentRepository documentRepository; private final AgentAuditService auditService; DocumentAgentProcessor(ChatClient chatClient, DocumentRepository documentRepository, AgentAuditService auditService) { this.chatClient = chatClient; this.documentRepository = documentRepository; this.auditService = auditService; } @Override public DocumentAnalysis process(DocumentTask task) { DocumentContent content = documentRepository.load(task.documentId()) .orElseThrow(() -> new BadDocumentException(task.documentId())); AgentPromptInput input = AgentPromptInput.from(task, content); auditService.beforeCall(task.batchId(), task.documentId(), input.summary()); DocumentAnalysisResult result = chatClient.prompt() .system(""" You are a document analysis agent. Return risk level, topic tags, summary, and citations. """) .user(input.promptText()) .call() .entity(DocumentAnalysisResult.class); auditService.afterCall(task.batchId(), task.documentId(), result.auditSummary()); return DocumentAnalysis.from(task, result); } }这个 Processor 应该遵守几条规则:
- 读业务数据要显式失败,不要让模型猜缺失字段。
- Prompt 输入要可摘要审计,但不要默认记录全文敏感内容。
- 模型输出要结构化,不要靠自然语言解析状态。
- 可重试异常和不可重试异常要分开。
- 写入业务结果要幂等,避免 Job 重启后重复产生副作用。
8. Tool Calling 与 Batch 重试不要互相打架
Spring AI 文档强调,模型只能请求工具调用,客户端应用负责执行工具并返回结果。ChatClient使用ToolCallingAdvisor管理工具调用生命周期。这对 Batch 场景很重要,因为会出现两层重试:
- Agent 内部重试:模型输出格式不合法、工具调用失败、Advisor 触发重试。
- Batch 外部重试:整个 item 处理失败,Step 对该 item 重新执行。
如果不设计边界,就可能重复调用高风险工具。例如发送通知、创建工单、扣减额度、提交审批。
建议把工具分三类:
| 工具类型 | 是否允许 Batch 重试 | 处理建议 |
|---|---|---|
| 只读查询 | 可以 | 超时、限流、临时网络错误可重试 |
| 可幂等写入 | 可以但要有幂等键 | 使用batchId + itemId + actionType作为幂等键 |
| 非幂等高风险动作 | 不建议自动重试 | 先生成计划,交给人工或独立审批流程执行 |
示例:让 Agent 只生成“建议动作”,不要直接执行动作:
public record RemediationPlan( String itemId, String riskLevel, List<String> proposedActions, boolean requiresHumanApproval) { }对于批量 Agent 作业,默认策略应该是“模型产出结构化计划,系统按策略执行”,而不是“模型想到什么工具就直接执行什么工具”。
9. JobParameters 要可追溯、可重放、可去重
Batch 的JobParameters决定一个 JobInstance 的身份。Agent 长任务里不要只塞时间戳,否则每次提交都会变成全新作业,难以判断重复提交。
建议包含:
tenantId:租户或业务域。batchId:业务批次 ID,来自上游提交。agentProfile:Agent 配置版本,例如document-risk-v3。promptVersion:Prompt 模板版本。modelProfile:模型配置逻辑名,不一定直接暴露真实供应商。inputSnapshot:输入数据快照 ID 或查询条件 hash。submittedBy:提交人或系统来源。
示例:
JobParameters parameters = new JobParametersBuilder() .addString("tenantId", request.tenantId(), true) .addString("batchId", request.batchId(), true) .addString("agentProfile", "document-risk-v3", true) .addString("promptVersion", "2026-06-23", true) .addString("inputSnapshot", request.inputSnapshot(), true) .addLong("submittedAt", System.currentTimeMillis(), false) .toJobParameters();这里最后一个布尔值表示参数是否用于识别 JobInstance。像submittedAt这类纯审计字段通常不应该参与身份识别,否则会破坏重复提交检测。
10. MongoDB 存储下的几个生产注意点
Spring Batch 文档提醒,MongoDB JobRepository 需要集合来保存批处理元数据,这些集合定义在 Spring Batch core jar 的schema-mongodb.jsonl中。文档还提醒 MongoDB 不推荐字段名里使用.,因此需要通过MongoTemplate的MappingMongoConverter做 map key 替换。
示例:
@Bean MongoTemplate mongoTemplate(MongoDatabaseFactory mongoDatabaseFactory) { MongoTemplate template = new MongoTemplate(mongoDatabaseFactory); MappingMongoConverter converter = (MappingMongoConverter) template.getConverter(); converter.setMapKeyDotReplacement("_"); return template; }生产落地时再补几条:
- Batch 元数据集合和业务结果集合分开命名、分开权限。
- 给高频查询字段建立索引,例如 job name、status、create time、tenantId。
- 限制 ExecutionContext 大小,不要把完整 Prompt、完整文档或大模型响应塞进去。
- 对失败摘要、工具参数、模型输出做脱敏和保留期控制。
- 对大批量并发任务压测 MongoDB 写入延迟,避免元数据写入成为瓶颈。
- 明确备份恢复策略,因为 JobRepository 丢失会影响任务恢复语义。
MongoDB JobRepository 不是“随便存 JSON”的许可。Batch 元数据是运行时控制面,应该比普通业务日志更谨慎。
11. 可观测性:Batch 指标和 Agent 指标要能关联
Agent 长任务排障时,光看最终失败原因不够。建议建立三层观测:
第一层是 Batch 维度:
jobNamejobExecutionIdstepExecutionIdreadCountwriteCountskipCountretryCountstatusdurationMs
第二层是 Agent 维度:
conversationIdagentProfilepromptVersionmodelProfiletoolCallCountinputTokensoutputTokensestimatedCoststructuredOutputValid
第三层是业务维度:
tenantIdbatchIditemIdriskLevelreviewRequiredresultVersion
关键是把jobExecutionId、stepExecutionId、batchId、itemId贯穿起来。否则 Batch 只能告诉你“第 327 条失败”,Agent 日志只能告诉你“某次模型调用失败”,两边对不上。
一个简单的审计对象可以这样设计:
public record AgentBatchAuditEvent( String tenantId, String batchId, Long jobExecutionId, Long stepExecutionId, String itemId, String agentProfile, String promptVersion, String modelProfile, int toolCallCount, boolean structuredOutputValid, String status, String errorCode, long durationMs) { }注意这里仍然没有记录完整 Prompt 和完整回答。生产默认记录摘要、状态、引用 ID 和错误码,需要全文审计时再走受控采样或加密存储。
12. 和 Spring Cloud 的关系
Spring Cloud 不直接替你跑 Batch Job,但它对分布式 Agent 任务仍然有价值:
- Spring Cloud Config 管理 Agent 配置、模型 profile、Prompt 版本开关。
- Spring Cloud Gateway 提供统一入口、鉴权、限流和任务提交路由。
- Spring Cloud Circuit Breaker 包住外部模型、MCP Server、业务工具服务。
- Spring Cloud Stream 用于 Job 完成事件、人工复核事件、补偿事件的异步流转。
- Spring Cloud Kubernetes 帮助在 Kubernetes 中治理配置、发现和滚动部署。
这里有一个版本提醒:Spring Cloud 项目页当前把2025.1.xOakwood 标为 Boot4.0.x代际。真实项目升级时不要只看“最新版本”,要以 Spring Cloud BOM、release notes、start.spring.io 生成结果和兼容矩阵为准。Boot4.1.0是否能直接搭配某个 Cloud train,需要以官方兼容说明为准。
13. 推荐落地架构
一个相对稳妥的 Agent Batch 架构可以这样分层:
flowchart TB UI[Admin UI / API Client] --> Gateway[Spring Cloud Gateway] Gateway --> JobApi[Spring Boot Agent Job API] JobApi --> Batch[Spring Batch JobLauncher] Batch --> Repo[(MongoDB JobRepository)] Batch --> Worker[Batch Step Worker] Worker --> AI[Spring AI ChatClient] AI --> Tools[Tool Calling / MCP Tools] Worker --> Result[(Business Result Store)] Worker --> Audit[(Audit Events)] Audit --> Observe[Metrics / Traces / Logs] Batch --> Event[Job Completion Event]职责划分:
- Gateway 负责入口治理。
- Job API 负责提交、查询、停止、重启。
- Spring Batch 负责任务生命周期。
- Spring AI 负责模型调用、工具调用、结构化输出。
- MongoDB JobRepository 负责批处理元数据。
- 业务库负责结果数据。
- 审计和可观测系统负责排障、成本和合规。
14. 迁移建议:从@Async到 Batch
如果你现在已经有一批@AsyncAgent 任务,不建议一次性重写全部。可以按风险迁移:
第一步:把任务入口改成提交batchId,不要同步等待结果。
第二步:把原来的循环拆成ItemReader、ItemProcessor、ItemWriter。
第三步:把失败分类成可重试、可跳过、必须终止三类。
第四步:把模型输出改成结构化 DTO,并为 DTO 做校验。
第五步:把工具调用副作用加幂等键。
第六步:打通 Batch 执行 ID 和 Agent 审计 ID。
第七步:再考虑并行 step、partition、远程 worker 或事件驱动补偿。
一开始不要急着做复杂分布式。先让单 JVM、单 JobRepository、可恢复、可查询、可审计跑稳。Agent 长任务最先需要的是确定性和可恢复性,不是更复杂的调度拓扑。
15. 今日结论
Spring Boot 4.1 的 MongoDB Spring Batch 支持,不是一个孤立的存储选项,而是对 Agent 工程很实用的一块拼图。
它适合解决的问题是:
- Agent 长任务如何持久化执行状态。
- 失败后如何重试、跳过、恢复。
- 大批量样本如何分片处理。
- 批量模型调用如何审计和计量。
- NoSQL 技术栈下如何减少额外关系库依赖。
但它不应该被误用为:
- Prompt 全文仓库。
- 工具调用明细仓库。
- 向量数据库。
- 高风险业务动作的自动执行许可。
- 替代工作流平台的万能调度器。
参考资料
- Spring Boot 项目页:当前版本 4.1.0
- Spring Boot 4.1 Release Notes:MongoDB Support for Spring Batch
- Spring Boot Reference:Spring Batch
- Spring Batch Reference:Configuring a JobRepository
- Spring AI 项目页:当前版本 2.0.0
- Spring AI Reference:Tool Calling
- Spring Cloud 项目页:当前版本 2025.1.2 与兼容矩阵