Server-Sent Events基础
本文最后更新于:8 天前
引言
在现代 Web 应用中,实时能力已经从「可选」演进为「必需」。浏览器与服务器之间不仅要“快”,还要“流畅”,既能及时把后台事件推送到前端,又能让用户在等待 AI 或大数据任务时看到源源不断的结果。Server-Sent Events(SSE)凭借 单向、轻量、浏览器原生、自动重连 等特性,成为“只需服务器推流、不必客户端回写”的理想方案。本文章系统梳理了 SSE 的协议细节、与长轮询 / WebSocket / gRPC 的横向对比,以及在 Spring MVC、Spring WebFlux 两种编程模型中的落地姿势;同时给出了生产级别的缓冲关闭、心跳、监控、分布式广播以及流式 AI 输出的完整示例,旨在帮助开发者从 “为什么选 SSE” 到 “如何优雅地在生产环境跑起来” 形成一条清晰的技术脉络。
基础概念
协议定义
SSE(Server-Sent Events,服务器发送事件)是一种基于HTTP协议的单向通信技术,允许服务器主动向浏览器推送数据。与传统的轮询不同,SSE 建立的是一个长连接,服务器可以在连接存续期间持续不断地将事件消息发送给客户端。浏览器端通过原生的 EventSource
API 即可轻松接入,不需要额外库,使用简单、资源开销低。SSE 适用于如实时消息推送、系统通知、进度更新等场景。它相比 WebSocket 实现更轻量,尤其在不需要客户端反向通信的场景下,是一种稳定、高效的实时通信方案。
协议对比
SSE(Server-Sent Events)
- 通信方式:单向(服务器 → 客户端)
- 协议层:基于 HTTP/1.1,MIME 类型为
text/event-stream
- 使用场景:实时推送数据,如:股票行情、系统通知、直播评论等
- 优点:
- 使用简单:前端只需
EventSource
,后端返回特定格式数据即可 - 支持自动重连
- 浏览器原生支持(不需要额外库)
- 使用简单:前端只需
- 缺点:
- 只能单向通信,客户端不能主动发消息
- 不适合二进制数据传输
长轮询(Long Polling)
- 通信方式:模拟实时,通过客户端不断轮询实现近似实时
- 原理:客户端发出请求,服务器挂起请求直到有新数据或超时返回,然后客户端再重新请求
- 缺点:
- 资源浪费(频繁建连)
- 实现复杂(需处理超时、队列、资源回收等)
WebSocket
- 通信方式:双向(客户端 ↔ 服务器)
- 协议层:独立协议,基于 HTTP 完成握手后升级为
ws://
或wss://
- 使用场景:聊天、协同编辑、游戏、音视频信令等
- 优点:
- 真正的全双工通信,延迟极低
- 支持自定义协议(可定义数据结构如 JSON、Protobuf)
- 缺点:
- 实现复杂度略高(需要手动管理连接和心跳)
gRPC
- 通信方式:
- 支持一元调用(单次请求响应)
- 服务器流(类似 SSE)
- 客户端流、双向流(类似 WebSocket)
- 协议层:基于 HTTP/2 的远程调用框架,默认使用 Protobuf 编码
- 使用场景:微服务间通信、高性能数据交换
- 优点:
- 高性能、强类型、支持多语言
- 内建双向流和连接复用
- 缺点:
- 学习成本高
- 不适合浏览器直接调用(需要 gRPC-Web)
MIME 类型与持久连接
text/event-stream
SSE 的 MIME 类型。
表示这个 HTTP 响应是一个流(stream),并不会立即结束。
示例响应头:
1
2
3Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
持久连接(HTTP keep-alive)
- SSE 利用 HTTP/1.1 的持久连接特性,使得连接不被关闭。
- 服务器不断地向客户端写入事件消息,不结束响应体。
响应不会关闭
- 与传统请求不同,SSE 的响应体会持续输出,不会
Content-Length
限定大小。 - 必须刷新缓冲区
flush()
以确保消息即时传递。
事件格式
SSE 每条事件数据的格式如下,每条事件之间用两个换行分隔:
1 |
|
字段说明:
event
:自定义事件名,对应浏览器中监听的事件eventSource.addEventListener('customEventName', ...)
。id
:消息 ID,浏览器断线重连时会通过Last-Event-ID
请求头携带该值,服务端可以据此避免重复推送。retry
:告诉浏览器在连接断开后等待多少毫秒再重连(非必需)。data
:消息体,每个data:
行会被合并为一个消息文本,支持多行。示例合并规则:
1
2data: line1
data: line2最终事件内容为:
1
2line1
line2
EventSource API
创建连接
1 |
|
监听消息
1 |
|
错误处理
1 |
|
关闭连接
1 |
|
自动重连特性
- 默认自动重连机制,若服务端响应中指定了
retry:
行,浏览器会按其值重试连接。 - 客户端自动附带
Last-Event-ID
请求头。
Spring MVC 中的 SSE 实现
SseEmitter 核心类与生命周期
类概述
- 包路径:
org.springframework.web.servlet.mvc.method.annotation.SseEmitter
- 功能:基于 Servlet 的长连接机制,封装了向客户端推送 Server-Sent Events 的能力。
构造与超时
构造器:
1
2public SseEmitter(); // 默认超时:30 000ms
public SseEmitter(Long timeout); // 自定义超时,传 0L 表示永不超时- 超时处理:若在指定时间内没有任何消息发送,容器会触发超时回调并关闭连接。
主要方法
send(Object)
:向客户端发送一条事件(Content-Type 自动设为 text/event-stream)。send(SseEventBuilder)
:更灵活地构造事件:可指定id
、event
、retry
、data
。complete()
:正常结束,向客户端发送终止符并关闭连接。completeWithError(Throwable)
:带错误结束,发送错误并关闭连接。onCompletion(Runnable)
:注册「正常完成时」的回调。onTimeout(Runnable)
:注册「超时」的回调。onError(Consumer<Throwable>)
:注册「发送异常」时的回调。
生命周期流程
- 实例化
- Controller 中创建
new SseEmitter(timeout)
,返回给 DispatcherServlet。
- Controller 中创建
- 挂起请求
- Servlet 容器保留该请求,不立即返回响应结束。
- 数据推送
- 在异步线程中反复调用
emitter.send(...)
,每次都会 flush 到客户端。
- 在异步线程中反复调用
- 完成或超时
- 正常完成:调用
complete()
; - 超时:超过
timeout
未发送数据,容器自动调用 registeredonTimeout
回调并关闭; - 异常:
send()
抛出IOException
,可在onError
回调中处理,并调用completeWithError
。
- 正常完成:调用
- 回调触发
- 调用完
complete()
或超时后,会触发对应的回调,可以做资源清理、日志记录等。
- 调用完
普通Controller示例
下面示例展示如何在 Spring MVC/Boot 的普通 Controller 中使用 SseEmitter
。
1 |
|
关键点说明:
永不超时
- 用
new SseEmitter(0L)
保证连接常驻,适合频繁推送场景; - 若希望自动中断,请设定非 0 的超时时间(单位 ms)。
- 用
异步推送
- Controller 方法立即返回
SseEmitter
,Servlet 容器挂起响应,由另一个线程不断调用send()
。
- Controller 方法立即返回
事件构造
使用
SseEmitter.event()
构造器可链式设置:id(...)
:消息 ID,用于断线重连时续流;name(...)
:事件名,前端可通过addEventListener
精确监听;data(...)
:实际推送内容;reconnectTime(...)
:重连间隔,覆盖客户端默认值。
关闭与异常
- 正常流程结束后必须调用
complete()
关闭连接; - 若发送过程中发生 IO 异常,应调用
completeWithError()
,前端会收到错误并关闭; - 可通过
onCompletion
、onTimeout
、onError
注册回调做清理或日志。
- 正常流程结束后必须调用
阻塞和异步
阻塞式处理
默认情况下,Spring MVC 每个请求占用一个 Servlet 线程。如果在同一个线程里循环调用 SseEmitter.send()
并 sleep()
,就会一直占用该线程,导致服务器吞吐量急剧下降。
1 |
|
问题:每个客户端连接都占用 1 个线程,连接数量多时容易导致线程耗尽。
异步方案一:@Async
- 在业务层使用 Spring 的异步方法释放请求线程;
- Controller 立即返回
SseEmitter
,真正的推送逻辑在异步线程中进行。
1 |
|
异步方案二:DeferredResult
- 使用
DeferredResult<SseEmitter>
,在子线程里构造并返回SseEmitter
; - 同样释放 Servlet 请求线程,提高吞吐量。
1 |
|
简单心跳与重连
服务端定时发送心跳注释
SSE 规范支持“注释行”(以 :
开头),客户端收到注释也会重置重连计时器,但不会触发 message
事件。
1 |
|
客户端自动重连与事件监听
浏览器原生 EventSource
会在网络断开后自动重连,默认等待 3 000 ms,可由服务器通过 retry:
指定:
1 |
|
说明
- 注释行不会进入
onmessage
,但可保持连接;- 服务器发送
.reconnectTime(ms)
会覆盖客户端重连时长;- 一旦连接失败,浏览器自动重连并带上
Last-Event-ID
,由服务端决定是否重发漏掉的消息。
常见配置
应用服务器层面
线程与连接池
- 长连接数:SSE 连接会长期占用 HTTP 线程或 NIO 事件线程,需要适当增大
server.tomcat.max-threads
(或 Jetty/Undertow 的对应参数),以承载并发的持久连接。 - 连接超时:默认的
server.tomcat.connection-timeout
(如 20s)要调大,或者对于 SSE 路径单独设置为 0(永不超时)。
1
2
3
4
5server:
port: 8080
tomcat:
max-threads: 2000 # 支撑更多并发长连接
connection-timeout: 600000 # 600秒- 长连接数:SSE 连接会长期占用 HTTP 线程或 NIO 事件线程,需要适当增大
响应缓冲与分块
禁用 HttpServletResponse 层面的缓冲:禁用 sse 路径的 HttpServletResponse 缓冲,同时告知代理层关闭缓冲。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26@Configuration
public class SseBufferConfig {
/**
* 注册一个 Servlet 过滤器,对 /sse/* 路径禁用 HttpServletResponse 层面的缓冲,
* 并增加 Nginx 友好的 no-buffering 头部。
*/
@Bean
public FilterRegistrationBean<Filter> disableBufferingFilter() {
FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<>();
bean.setFilter((request, response, chain) -> {
if (response instanceof HttpServletResponse) {
HttpServletResponse resp = (HttpServletResponse) response;
// 禁用 servlet 输出缓冲
resp.setBufferSize(0);
// 如果前端有 Nginx/Envoy,可用此头告知关闭代理缓冲
resp.setHeader("X-Accel-Buffering", "no");
}
chain.doFilter(request, response);
});
bean.addUrlPatterns("/sse/*");
bean.setName("disableBufferingFilter");
bean.setOrder(0);
return bean;
}
}Chunked Transfer:HTTP1.1 下 SSE 默认走分块传输(chunked),确保代理层也支持。
反向代理 / 负载均衡
大多数生产环境会在前面放 Nginx、HAProxy、Envoy 等,需要针对 SSE 调优:
1 |
|
proxy_buffering off
:保证服务器写入的每次data:
都能实时下发给客户端。proxy_read_timeout
/send_timeout
:调大(根据业务响应时长确定),防止代理主动断开空闲连接。
心跳与重连策略
- 后台心跳:定时发送注释行或空事件(例如每 15–30s 一次),用于穿透各种中间网络设备的空闲超时。
- 客户端重连:在服务端通过
SseEmitter.event().reconnectTime(…)
设定合理重连间隔,例如5000ms
。
安全与监控
- CORS 与认证
- SSE 的跨域和普通 REST 一样,可在
WebMvcConfigurer
或@CrossOrigin
上单独为/sse/**
路径放行。 - Token 鉴权时,要考虑长连接的认证续期和失效通知。
- SSE 的跨域和普通 REST 一样,可在
- 连接监控
- 统计当前打开的 SSE 连接数,以防资源泄漏。可以在
onCompletion
、onError
、onTimeout
回调里维护并发计数。 - 在 Prometheus 或其他 APM 中暴露指标:
sse_open_connections
、sse_messages_sent_total
等。
- 统计当前打开的 SSE 连接数,以防资源泄漏。可以在
对比 REST 接口
配置项/特性 | 普通 REST 接口 | SSE 接口 |
---|---|---|
连接模型 | 短连接,完成即关闭 | 长连接,响应持续不断地输出 |
响应超时 | 20–60s 常见 | 通常设置为 0(永不超时)或数分钟以上 |
线程/资源占用 | 请求处理完即释放 | 连接存续期间持续占用线程/NIO 事件槽 |
代理/缓存 | 可开启缓冲、缓存 | 必须关闭缓冲(proxy_buffering off ),实时下发 |
心跳与断线重连 | — | 需显式心跳注释行;客户端自动重连与 Last-Event-ID 续流 |
场景
分布式消息推送
链路
使用 SseEmitter 建立浏览器长连接,结合 JWT 鉴权 + Redis Pub/Sub 完成跨实例广播。整体链路同样可拆为三段:
阶段 | 关键动作 |
---|---|
握手 | 浏览器 GET /sse/connect?token=xxx → JwtAuthInterceptor 校验 → Controller 返回 SseEmitter |
本机路由 | SseSessionManager 维护 uid ↔ SseEmitter 映射;NotificationService 负责发送 |
跨实例广播 | 业务代码/任务调度 → RedisPublisher 发布 SseEvent → RedisSubscriber 订阅并分派给本机用户 |
❗ 与 WebSocket 不同:浏览器 只收不发,业务侧想推消息时自行调用
NotificationService
(或 REST API、MQ 监听任务)。
核心类及职责
类 | 作用 | 关键点 |
---|---|---|
JwtUtil |
解析 / 校验 JWT | parseToken() → 返回 userId |
JwtAuthInterceptor |
HandlerInterceptor |
从 token 取 uid,放入 request.setAttribute("uid", …) |
SseController |
建立 SSE 连接 | connect() 创建 SseEmitter 并注册到 SseSessionManager |
SseEvent |
统一消息模型 | uid 、event 、data 、broadcast 四字段 |
SseSessionManager |
本机 Emitter 池 | register / remove / sendTo / broadcast |
NotificationService |
业务入口 | 调用 RedisPublisher 或直接 sessionManager.xxx() |
RedisPublisher |
发布事件到 Redis Channel | convertAndSend("sse-event", json) |
RedisSubscriber |
订阅 sse-event Channel |
解析 SseEvent → 定向 / 广播 给本机 SseEmitter |
Sseutil |
定向推送或广播事件 | 通过 SsePublisher 向 Redis 频道发布事件,多实例订阅 Redis 频道,判断是否需要向本机管理的 SseEmitter 发送事件,实现跨实例 |
时序说明
sequenceDiagram
participant B as Browser
participant G as Nginx / LB
participant A as Pod-A
participant R as Redis
participant B2 as Pod-B
participant S as 业务Service
%% 握手阶段
B->>G: GET /sse/connect?token=JWT
G->>A: HTTP (粘性或随机)
A->>A: JwtAuthInterceptor:解析 token → uid
A-->>B: 200 + text/event-stream(SseEmitter 建立)
%% 服务端推送
S->>A: NotificationService.push(uid,msg)
A->>R: Redis PUBLISH sse-event
R-->>A: SUBSCRIBE
R-->>B2: SUBSCRIBE
A->>A: SseSessionManager.sendTo(uid)
B2->>B2: SseSessionManager.sendTo(uid)
%% 广播
Note over S: broadcast=true\nSubscriber 调用\nbroadcast()
- 握手:拦截器校验 JWT,将
uid
写入属性;Controller 生成SseEmitter
并登记。 - 本机会话注册:
SseSessionManager.register(uid, emitter)
;在onCompletion/onTimeout/onError
中注销。 - 消息入口:业务代码或定时任务调用
NotificationService
;所有事件先写 Redis,保证分布式一致性。 - 跨实例分发:各实例的
RedisSubscriber
订阅sse-event
;按broadcast
标记选择sendTo
或broadcast
。 - 客户端渲染:前端用
EventSource
监听不同eventName
做弹窗 / 列表追加等 UI 处理。
代码示例
JWT 工具类,用于解析 JWT 令牌
1
2
3
4
5
6
7
8
9
10
11
12
13
14@Component
public class JwtUtil {
@Value("${jwt.secret}")
private String secret;
public String parseToken(String token) {
return Jwts.parser()
.setSigningKey(secret.getBytes(StandardCharsets.UTF_8))
.parseClaimsJws(token.replace("Bearer ", ""))
.getBody()
.getSubject();
}
}JWT 认证拦截器,用于验证请求中的 JWT 令牌
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17@Component
@RequiredArgsConstructor
public class JwtAuthInterceptor implements HandlerInterceptor {
private final JwtUtil jwtUtil;
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String token = request.getParameter("token");
try {
req.setAttribute("uid", jwt.parseToken(token));
return true;
} catch (JwtException e) {
response.setStatus(401);
return false;
}
}
}Web 配置,注册拦截器以及设置跨域
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20@Configuration
@RequiredArgsConstructor
public class WebConfig implements WebMvcConfigurer {
private final JwtAuthInterceptor jwtAuthInterceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(jwtAuthInterceptor)
.addPathPatterns("/sse/**");
}
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/sse/**")
.allowedOrigins("*")
.allowedMethods("GET");
}
}SSE 事件数据传输对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
public class SseEvent {
/**
* 目标用户,可空
*/
private String uid;
/**
* 事件名,如 HEARTBEAT、NOTICE
*/
private String event;
/**
* 数据:任意 JSON
*/
private Object data;
/**
* 是否广播,默认为 false
*/
private boolean broadcast;
}会话管理器,用于管理和发送 SSE 事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103@Slf4j
@Component
public class SseSessionManager {
private final static Map<String, Set<SseEmitter>> ALL_EMITTERS = new ConcurrentHashMap<>();
/**
* 注册一个新的 SSE emitter
* @param uid 用户唯一标识符,用于区分不同用户的连接
* @return 返回一个新的 SseEmitter 实例
*/
public SseEmitter register(String uid) {
SseEmitter emitter = new SseEmitter(0L); // 永不超时
ALL_EMITTERS.computeIfAbsent(uid, k -> ConcurrentHashMap.newKeySet()).add(emitter);
log.info("SSE emitter registered for UID: {}", uid);
Runnable clear = () -> ALL_EMITTERS.computeIfPresent(uid, (k, emitters) -> {
emitters.remove(emitter);
log.info("SSE emitter removed for UID: {}", uid);
return emitters.isEmpty() ? null : emitters;
});
emitter.onCompletion(clear);
emitter.onTimeout(clear);
emitter.onError(t -> clear.run());
// 关键:立刻发送首帧,强制 flush header
try {
emitter.send(SseEmitter.event()
.name("INIT")
.data("CONNECTED")
.reconnectTime(3000)); // 可选:告诉客户端重连间隔
} catch (IOException e) {
emitter.completeWithError(e);
}
return emitter;
}
/**
* 定时任务,每30秒发送心跳事件到所有连接
* 根据发送结果,自动清理失效的 emitter
*/
@Scheduled(fixedRate = 30000)
public void heartbeat() {
// 定时向所有的连接发送心跳事件
ALL_EMITTERS.forEach((uid, emitters) -> {
SseEvent heartbeatEvent = new SseEvent(uid, "HEARTBEAT", "PING", false);
emitters.forEach(e -> {
try {
e.send(SseEmitter.event().name(heartbeatEvent.getEvent()).data(heartbeatEvent.getData()));
} catch (Exception ex) {
log.error("Error sending heartbeat to UID: {}", uid, ex);
e.completeWithError(ex);
emitters.remove(e);
}
});
});
}
/**
* 发送指定事件到特定用户的所有连接
* @param uid 用户唯一标识符
* @param sseEvent 要发送的 SSE 事件
*/
public void sendTo(String uid, SseEvent sseEvent) {
Set<SseEmitter> sendEmitters = ALL_EMITTERS.get(uid);
if (sendEmitters != null) {
sendEmitters.forEach(e -> send(sseEvent, e));
}
}
/**
* 广播事件到所有连接
* @param sseEvent 要广播的 SSE 事件
*/
public void broadcast(SseEvent sseEvent) {
ALL_EMITTERS.values().forEach(emitters -> {
if (emitters != null) {
emitters.forEach(e -> send(sseEvent, e));
}
});
}
/**
* 发送事件到指定的 SSE emitter
* @param sseEvent 要发送的 SSE 事件
*/
private void send(SseEvent sseEvent, SseEmitter sseEmitter) {
if (sseEmitter == null) {
return;
}
try {
sseEmitter.send(SseEmitter.event().name(sseEvent.getEvent()).data(sseEvent.getData()));
} catch (Exception ex) {
sseEmitter.completeWithError(ex);
// 如果发送失败,移除该 emitter
Set<SseEmitter> removeEmitters = ALL_EMITTERS.get(sseEvent.getUid());
if (removeEmitters != null) {
removeEmitters.remove(sseEmitter);
if (removeEmitters.isEmpty()) {
ALL_EMITTERS.remove(sseEvent.getUid());
}
}
}
}
}Redis 发布者,用于发布事件到 SSE 频道
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16@Component
@RequiredArgsConstructor
public class SsePublisher {
public static final String CHANNEL = "sse-event";
private final StringRedisTemplate stringRedisTemplate;
private final ObjectMapper objectMapper = new ObjectMapper();
public void publish(SseEvent sseEvent) {
try {
stringRedisTemplate.convertAndSend(CHANNEL, objectMapper.writeValueAsString(sseEvent));
} catch (JsonProcessingException ignored) {
}
}
}Redis 订阅者,用于从 SSE 频道接收事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26@Component
@RequiredArgsConstructor
public class SseSubscriber implements MessageListener {
private final SseSessionManager sseSessionManager;
private final ObjectMapper objectMapper = new ObjectMapper();
private final StringRedisTemplate stringRedisTemplate;
@PostConstruct
public void init() {
Objects.requireNonNull(stringRedisTemplate.getConnectionFactory()).getConnection()
.subscribe(this, SsePublisher.CHANNEL.getBytes());
}
public void onMessage(Message rawMessage, byte[] pattern) {
try {
SseEvent sseEvent = objectMapper.readValue(rawMessage.getBody(), SseEvent.class);
if (sseEvent.isBroadcast()) {
sseSessionManager.broadcast(sseEvent);
} else {
sseSessionManager.sendTo(sseEvent.getUid(), sseEvent);
}
} catch (IOException ignored) {
}
}
}SSE 连接接口
1
2
3
4
5
6
7
8
9
10
11
12
13@RestController
@RequestMapping("/sse")
@RequiredArgsConstructor
public class SseController {
private final SseSessionManager sessionManager;
@GetMapping(path = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect(HttpServletRequest req) {
String uid = (String) req.getAttribute("uid");
return sessionManager.register(uid);
}
}SSE 工具类,用于推送事件到特定用户或全站广播(通过 Redis 实现跨实例的关键)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21@Component
@RequiredArgsConstructor
public class SseUtil {
private final SsePublisher pub;
/**
* 定向推送
*/
public void pushToUser(String uid, String msg, String event) {
pub.publish(new SseEvent(uid, event, msg, false));
}
/**
* 全站广播
*/
public void broadcast(String msg, String event) {
pub.publish(new SseEvent(null, event, msg, true));
}
}测试接口
1
2
3
4
5
6
7
8
9
10
11
12
13@Resource
private SseUtil sseUtil;
@GetMapping("/send/{uid}/{message}")
public void sendMessage(@PathVariable("uid") String uid,
@PathVariable("message") String message) {
sseUtil.pushToUser(uid, message, "NOTICE");
}
@GetMapping("/broadcast/{message}")
public void broadcastMessage(@PathVariable("message") String message) {
sseUtil.broadcast(message, "NOTICE");
}测试 HTML 文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>SSE 测试页面</title>
<style>
body { font-family: sans-serif; padding: 20px; }
#log { margin-top: 20px; white-space: pre-wrap; border: 1px solid #ccc; padding: 10px; height: 200px; overflow-y: auto; }
input, button { font-size: 1rem; padding: 5px; margin-right: 5px; }
</style>
</head>
<body>
<h1>SSE 连接测试</h1>
<label>
Token:
<input type="text" id="tokenInput" placeholder="输入 token">
</label>
<button id="connectBtn">连接 SSE</button>
<button id="closeBtn" disabled>断开连接</button>
<div id="log">[日志输出区]</div>
<script>
const btnConnect = document.getElementById('connectBtn');
const btnClose = document.getElementById('closeBtn');
const tokenInput = document.getElementById('tokenInput');
const logEl = document.getElementById('log');
let es;
function log(msg) {
const time = new Date().toLocaleTimeString();
logEl.textContent += `\n[${time}] ${msg}`;
logEl.scrollTop = logEl.scrollHeight;
}
btnConnect.addEventListener('click', () => {
const token = tokenInput.value.trim();
if (!token) {
alert('请先输入 token');
return;
}
// 如果已有连接,先关闭旧连接
if (es) {
es.close();
log('已关闭旧连接');
}
const url = `http://localhost:8000/sse/connect?token=${encodeURIComponent(token)}`;
log(`尝试连接:${url}`);
es = new EventSource(url);
es.onopen = () => {
log('SSE 通道已打开,等待服务器推送...');
btnClose.disabled = false;
};
es.onerror = (err) => {
log('连接出错或已关闭');
console.error(err);
btnClose.disabled = true;
};
es.onmessage = (evt) => {
log(`收到消息:${evt.data}`);
};
es.addEventListener('NOTICE', (evt) => {
log(`自定义事件 NOTICE:${evt.data}`);
});
});
btnClose.addEventListener('click', () => {
if (es) {
es.close();
log('已主动关闭 SSE 连接');
btnClose.disabled = true;
}
});
</script>
</body>
</html>
流式响应
作用
- AI 问答/文案生成:用户提交一个问题或提示(prompt),后端调用大模型逐 token 生成答案,浏览器边渲染边展示。
- Markdown 实时渲染:用户在编辑器输入内容,服务端将解析后的 HTML 分段推送到客户端,实时预览效果。
- 代码编译/日志输出:提交编译任务后,后端实时推送编译日志或进度,让前端滚动展示。
- 数据处理进度:异步跑批任务时,通过 SSE 分段推送处理进度或结果,实时反馈给用户。
这些场景都需要在同一次 HTTP 请求中,边处理边推送,无需等待整个结果完成后才返回。
代码示例
后端接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41@RestController
@RequestMapping("/sse/ai")
public class AiStreamController {
// 可复用线程池
private final ExecutorService executor = Executors.newSingleThreadExecutor();
@GetMapping("/stream")
public SseEmitter streamAiResponse(@RequestParam(required = false) String prompt) {
// 永不超时,直到 complete()/completeWithError()
SseEmitter emitter = new SseEmitter(0L);
executor.execute(() -> {
try {
// 模拟 AI 按 token 生成输出
String[] tokens = {
"这", "是", "一", "段", "模", "拟", "流", "式", "响", "应", "。"
};
for (String token : tokens) {
// 每个 token 单独发一次
emitter.send(SseEmitter.event()
.name("TOKEN")
.data(token)
);
Thread.sleep(150); // 模拟生成延迟
}
// 通知前端结束
emitter.send(SseEmitter.event()
.name("END")
.data("[DONE]")
);
emitter.complete();
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
}测试 HTML 文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>AI 流式响应示例</title>
<style>
body { font-family: sans-serif; padding: 20px; }
#output { white-space: pre-wrap; border: 1px solid #ccc; padding: 10px; height: 200px; overflow-y: auto; }
#startBtn { padding: 10px 20px; font-size: 16px; }
</style>
</head>
<body>
<button id="startBtn">开始 AI 流式响应</button>
<div id="output"></div>
<script>
document.getElementById('startBtn').addEventListener('click', () => {
// 清空上次输出
const out = document.getElementById('output');
out.textContent = '';
// 建立 SSE 连接
const es = new EventSource('http://localhost:8000/sse/ai/stream');
// 接收每个 TOKEN 事件
es.addEventListener('TOKEN', e => {
out.textContent += e.data;
out.scrollTop = out.scrollHeight;
});
// 接收结束事件
es.addEventListener('END', () => {
out.textContent += '\n[流式响应结束]';
es.close();
});
// 错误处理
es.onerror = err => {
out.textContent += '\n[连接出错或已断开]';
es.close();
};
});
</script>
</body>
</html>
WebFlux 中的 SSE
基本原理
非阻塞响应式
Spring WebFlux 底层基于 Reactor 和 Netty(或 Undertow),采用事件循环(Event Loop)模型,能够用极少的线程处理大量并发连接。SSE 在 WebFlux 中就是把 Flux<T> 当作一个“无限流”,按需把每个元素序列化成
text/event-stream
的 chunk 发送给客户端。Media Type
使用
MediaType.TEXT_EVENT_STREAM_VALUE
(即text/event-stream
)告诉框架和客户端这是一个 Server-Sent Events 流。HTTP1.1 下会自动启用 chunked transfer,不需要手动 flush;流结束时 Flux 完成,HTTP 响应关闭。Backpressure 与取消
客户端断开(如调用
EventSource.close()
或网络断开)时,Flux 会收到取消信号(cancel),可在.doOnCancel()
中做资源清理。Reactor 自动处理下游背压,保证上游不会无限制生产。
使用 Flux 创建 SSE
注解方式
1
2
3
4
5
6
7
8
9
10
11@RestController
public class ReactiveSseController {
@GetMapping(value = "/sse/reactive", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamPlainText() {
// 每秒一个字符串
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> "tick: " + seq);
}
}以上代码会把每个
String
自动包成:1
2
3data: tick: 0
data: tick: 1
…ServerSentEvent 构造:若要自定义事件名、ID 或重连时间,可返回
Flux<ServerSentEvent<T>>
:1
2
3
4
5
6
7
8
9
10
11@GetMapping(value = "/sse/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> ServerSentEvent.<String>builder()
.id(String.valueOf(seq))
.event("heartbeat")
.data("心跳 " + seq)
.retry(Duration.ofSeconds(5))
.build()
);
}
扩展示例:集成业务流
假设有一个业务服务不断推送消息给特定用户,使用 Reactive Kafka/Redis 也很简单:
1 |
|
notificationFlux
可以是从 Kafka、Redis Pub/Sub、数据库 Tailable Cursor 等得到的Flux<T>
。.doOnCancel()
用于客户端断开后的清理逻辑。
前端接入示例
1 |
|
小结
- WebFlux SSE:把响应式流
Flux<T>
直接映射为 SSE,不再依赖SseEmitter
; - 高效非阻塞:少量线程即可支撑海量并发长连接;
- 灵活扩展:任意来源的
Flux
(消息队列、定时器)都可无缝推送; - 事件定制:借助
ServerSentEvent
构建 ID、event、retry 等字段。
总结
- 协议层面:SSE 基于 HTTP/1.1 的
text/event-stream
长连接,天然支持浏览器自动重连与Last-Event-ID
断点续传。 - 实现层面:
- 在 Spring MVC 中使用
SseEmitter
;合理设置超时、异步线程和输出缓冲即可支撑高并发。 - 在 Spring WebFlux 中直接返回
Flux
或ServerSentEvent
,利用 Reactor 的非阻塞背压模型稳定承载海量连接。
- 在 Spring MVC 中使用
- 生产运维:必须关闭应用服务器与 Nginx 的响应缓冲、配置心跳穿透代理超时、监控连接数与推送指标,并结合 JWT + Redis Pub/Sub 完成多实例广播。
- 应用场景:从系统通知、进度条、日志流,到 AI Token-by-Token 输出,凡是“服务端单向快速推流”皆可用 SSE 简洁高效地实现。
掌握本文提供的最佳实践和代码骨架后,开发者可在单机或分布式环境中快速落地稳定、可扩展的 SSE 实时推送能力,让前端页面“活”起来,同时保持架构的简洁与可维护性。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!