Wiki-Framework 1.2.0 新能力:wiki-sse 服务端推送
为什么需要这个模块
做后台系统的时候,通知、审批进度、在线状态这类需求很常见。轮询能用,但费连接、也费服务端;WebSocket 能力强,接入成本却不低。SSE(Server-Sent Events)夹在中间——基于普通 HTTP,浏览器原生支持EventSource,服务端单向推数据正好够用。
1.2.0 之前,Wiki-Framework 在 Socket、WebSocket 上已经有封装,但缺一块「轻量推送」的能力。wiki-sse就是补这个空位的:基于 Spring MVC 的SseEmitter,把连接管理、心跳保活、单播/广播/组播、生命周期回调这些脏活累活收进一个 Service,业务侧只管「连上来」和「发消息」。
模块里有什么
整个模块目前三个核心类,职责很清晰:
类 | 干什么 |
|---|---|
| 连接池、心跳、消息推送,日常开发主要跟它打交道 |
| 连接建立、断开、超时、出错、心跳等事件的回调接口 |
| 约定好的事件名和标识,比如 |
依赖很轻:只拉了wiki-util、wiki-entity和 Spring WebMVC,不绑死整套wiki-all,按需引入就行。
<dependency> <groupId>com.framewiki</groupId> <artifactId>wiki-sse</artifactId> <version>1.2.0</version> </dependency>核心设计:连接怎么管
SseEmitterConnectionService内部用两个ConcurrentHashMap分别存连接和心跳任务:
sseEmitterMap:sessionId → SseEmitterheartbeatTasks:sessionId → ScheduledFuture
同一个sessionId重复连接时,会先关掉旧连接再建新连接,避免一个用户挂着两条长连接占资源。这点在实际项目里挺重要——用户刷新页面、网络闪断重连,都容易触发重复 connect。
连接建立后,服务会自动做三件事:
立刻推一条欢迎消息,
event.id为success,前端可以用来确认链路通了;启动心跳,默认每 25 秒发一次
heartbeat事件,数据是ping;挂上完成 / 超时 / 错误回调,统一走
SseEmitterCallback,该清理的清理干净。
心跳间隔特意设在 25 秒,注释里写得很直白:要比网关、负载均衡的空闲超时短一截,不然连接会被中间层悄悄掐掉,你还以为是客户端的问题。
// 连接建立后的核心流程(节选) public SseEmitter connect(String sessionId) { long timeout = 60 * 15 * 1000; // 15 分钟 SseEmitter sseEmitter = new SseEmitter(timeout); String cacheKey = KEY_PREFIX + sessionId; // 同 sessionId 重连:关掉旧的 SseEmitter oldEmitter = sseEmitterMap.put(cacheKey, sseEmitter); if (oldEmitter != null) { oldEmitter.complete(); // ...取消旧心跳 } startHeartbeat(sessionId, sseEmitter); sseEmitter.onCompletion(() -> sseEmitterCallback.onCompletion(sessionId)); sseEmitter.onTimeout(() -> { sseEmitterCallback.onTimeout(sessionId); disconnect(sessionId); }); sseEmitter.onError(e -> { sseEmitterCallback.onError(sessionId, e); disconnect(sessionId); }); sseEmitter.send(SseEmitter.event() .data("SSE连接建立成功", MediaType.TEXT_PLAIN) .id(EmitterConstant.SUCCESS)); sseEmitterCallback.onConnect(sessionId); return sseEmitter; }服务关闭时还有@PreDestroy,会把调度器、心跳任务、所有连接一并收掉,不会留僵尸线程。
消息怎么发:三种模式
推送 API 分三档,按场景选就行:
方法 | 场景 |
|---|---|
| 给某个会话单播 |
| 全体广播 |
| 按组播( |
单播失败(比如客户端已经断了)会自动disconnect,不会留着无效连接占 map。每条业务消息会带一个短 UUID 作为event.id,方便前端做去重或排查。
// 审批通过后,只推给当前用户 sseEmitterConnectionService.sendMessage(userId, "您的请假单已通过"); // 系统维护通知,推给所有在线连接 sseEmitterConnectionService.sendMessageAll("系统将于 22:00 维护,请提前保存"); // 某个项目组的消息,sessionId 约定成 groupId_userId 的形式 sseEmitterConnectionService.groupSendMessage("project-42", "需求 #128 状态已更新");组播这块有个约定要心里有数:groupSendMessage是靠 key 前缀匹配的,sessionId设计时最好带上组信息,比如project-42_user_10086,不然组播筛不出来。
回调接口:把业务钩子留出来
SseEmitterCallback定义了六个钩子,覆盖连接的完整生命周期:
public interface SseEmitterCallback { void onConnect(String sessionId); // 连接成功 void onDisconnect(String sessionId); // 主动断开 void onError(String sessionId, Throwable throwable); void onTimeout(String sessionId); void onCompletion(String sessionId); // 连接正常结束 void onHeartbeat(String sessionId); // 心跳任务启动时 }这个接口需要你自己实现并注册成 Spring Bean,框架不会替你写默认实现——毕竟有人要在onConnect里记在线状态,有人要在onDisconnect里清缓存,业务差异太大。
一个比较典型的写法:
@Component public class MySseCallback implements SseEmitterCallback { @Override public void onConnect(String sessionId) { // 记入在线表、打日志、更新 Redis 都行 } @Override public void onDisconnect(String sessionId) { // 清理在线状态 } @Override public void onError(String sessionId, Throwable throwable) { // 告警或记错误日志 } @Override public void onTimeout(String sessionId) { // 超时也当断开处理 } @Override public void onCompletion(String sessionId) { // 客户端正常关闭 } @Override public void onHeartbeat(String sessionId) { // 一般不用干啥,需要的话可以用来续期 } }业务接入:从 Controller 到前端
服务端入口就是一个返回SseEmitter的接口,把sessionId传进去即可。sessionId通常用当前登录用户 ID,或者用户 ID + 业务场景拼出来。
@RestController @RequestMapping("/api/sse") @RequiredArgsConstructor public class NotifyController { private final SseEmitterConnectionService sseService; @GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter connect(@RequestParam String sessionId) { return sseService.connect(sessionId); } @PostMapping("/push") public void push(@RequestParam String sessionId, @RequestParam String message) { sseService.sendMessage(sessionId, message); } }前端用原生EventSource就能接,不用额外引库:
const sessionId = 'user_10086'; const source = new EventSource(`/api/sse/connect?sessionId=${sessionId}`); // 连接成功时服务端会推 id=success 的事件 source.addEventListener('message', (e) => { if (e.lastEventId === 'success') { console.log('SSE 已连通:', e.data); return; } console.log('收到消息:', e.data); }); // 心跳事件,保持连接用,一般忽略即可 source.addEventListener('heartbeat', () => {}); source.onerror = () => { console.warn('连接异常,浏览器会自动重连'); };如果走 Nginx 反向代理,记得关掉缓冲,不然 SSE 会被攒着不发:
location /api/sse/ { proxy_pass http://backend; proxy_buffering off; proxy_cache off; proxy_read_timeout 3600s; }几个实用 API 顺带提一下
除了连和发,服务还暴露了这几个辅助方法,排查问题时挺好用:
// 这个用户还在线吗? boolean online = sseEmitterConnectionService.isConnected(sessionId); // 当前一共多少条连接? int count = sseEmitterConnectionService.getConnectionCount(); // 踢掉某个会话 sseEmitterConnectionService.disconnect(sessionId); // 全部关掉(比如发版前清场) sseEmitterConnectionService.disconnectAll();和 WebSocket 怎么选
简单说:
只要服务端往客户端推,客户端偶尔发请求走普通 HTTP —— 用 SSE,省心;
要双向实时通信(聊天、协同编辑、游戏)—— 还是 WebSocket;
要兼容老浏览器或极简场景—— SSE 基于 HTTP,穿透代理通常更顺。
Wiki-Framework 里wiki-sse和wiki-web-socket是并列模块,不冲突,按场景选一个或混用都行。
小结
wiki-sse在 1.2.0 里是全新模块,不是什么花哨封装,就是把 SSE 长连接最容易踩坑的几件事处理好了:
连接复用与替换
25 秒心跳保活
单播 / 广播 / 组播
生命周期回调
服务关停时的资源回收
业务侧接入路径也很直接:引依赖 → 实现SseEmitterCallback→ Controller 返回connect(sessionId)→ 需要推送时调sendMessage。如果你已经在用 Wiki-Framework 1.2.0,又刚好有「服务端主动通知前端」的需求,这个模块值得试一下。
关于维基框架
维基框架(Wiki Framework)是一套面向复杂业务场景的轻量级开发框架,支持多语言、多协议、多部署形态。适用于企业级应用开发、微服务架构、云原生部署等场景。
官网:framewiki.com
Gitee:gitee.com/wiki-framework
GitHub:github.com/wiki-framework
示例项目:gitee.com/cdkjframework/framewiki-example 📄 许可证:MulanPSL-2.0(木兰宽松许可证,第2版)