Server-Sent Events基础

本文最后更新于:8 天前

引言

在现代 Web 应用中,实时能力已经从「可选」演进为「必需」。浏览器与服务器之间不仅要“快”,还要“流畅”,既能及时把后台事件推送到前端,又能让用户在等待 AI 或大数据任务时看到源源不断的结果。Server-Sent Events(SSE)凭借 单向、轻量、浏览器原生、自动重连 等特性,成为“只需服务器推流、不必客户端回写”的理想方案。本文章系统梳理了 SSE 的协议细节、与长轮询 / WebSocket / gRPC 的横向对比,以及在 Spring MVC、Spring WebFlux 两种编程模型中的落地姿势;同时给出了生产级别的缓冲关闭、心跳、监控、分布式广播以及流式 AI 输出的完整示例,旨在帮助开发者从 “为什么选 SSE”“如何优雅地在生产环境跑起来” 形成一条清晰的技术脉络。

基础概念

协议定义

SSE(Server-Sent Events,服务器发送事件)是一种基于HTTP协议的单向通信技术,允许服务器主动向浏览器推送数据。与传统的轮询不同,SSE 建立的是一个长连接,服务器可以在连接存续期间持续不断地将事件消息发送给客户端。浏览器端通过原生的 EventSource API 即可轻松接入,不需要额外库,使用简单、资源开销低。SSE 适用于如实时消息推送、系统通知、进度更新等场景。它相比 WebSocket 实现更轻量,尤其在不需要客户端反向通信的场景下,是一种稳定、高效的实时通信方案。

协议对比

SSE(Server-Sent Events)

  • 通信方式:单向(服务器 → 客户端)
  • 协议层:基于 HTTP/1.1,MIME 类型为 text/event-stream
  • 使用场景:实时推送数据,如:股票行情、系统通知、直播评论等
  • 优点
    • 使用简单:前端只需 EventSource,后端返回特定格式数据即可
    • 支持自动重连
    • 浏览器原生支持(不需要额外库)
  • 缺点
    • 只能单向通信,客户端不能主动发消息
    • 不适合二进制数据传输

长轮询(Long Polling)

  • 通信方式:模拟实时,通过客户端不断轮询实现近似实时
  • 原理:客户端发出请求,服务器挂起请求直到有新数据或超时返回,然后客户端再重新请求
  • 缺点
    • 资源浪费(频繁建连)
    • 实现复杂(需处理超时、队列、资源回收等)

WebSocket

  • 通信方式:双向(客户端 ↔ 服务器)
  • 协议层:独立协议,基于 HTTP 完成握手后升级为 ws://wss://
  • 使用场景:聊天、协同编辑、游戏、音视频信令等
  • 优点
    • 真正的全双工通信,延迟极低
    • 支持自定义协议(可定义数据结构如 JSON、Protobuf)
  • 缺点
    • 实现复杂度略高(需要手动管理连接和心跳)

gRPC

  • 通信方式
    • 支持一元调用(单次请求响应)
    • 服务器流(类似 SSE)
    • 客户端流、双向流(类似 WebSocket)
  • 协议层:基于 HTTP/2 的远程调用框架,默认使用 Protobuf 编码
  • 使用场景:微服务间通信、高性能数据交换
  • 优点
    • 高性能、强类型、支持多语言
    • 内建双向流和连接复用
  • 缺点
    • 学习成本高
    • 不适合浏览器直接调用(需要 gRPC-Web)

MIME 类型与持久连接

text/event-stream

  • SSE 的 MIME 类型。

  • 表示这个 HTTP 响应是一个流(stream),并不会立即结束。

  • 示例响应头:

    1
    2
    3
    Content-Type: text/event-stream
    Cache-Control: no-cache
    Connection: keep-alive

持久连接(HTTP keep-alive)

  • SSE 利用 HTTP/1.1 的持久连接特性,使得连接不被关闭。
  • 服务器不断地向客户端写入事件消息,不结束响应体。

响应不会关闭

  • 与传统请求不同,SSE 的响应体会持续输出,不会 Content-Length 限定大小。
  • 必须刷新缓冲区 flush() 以确保消息即时传递。

事件格式

SSE 每条事件数据的格式如下,每条事件之间用两个换行分隔:

1
2
3
4
5
event: customEventName
id: 1234
retry: 5000
data: first line of message
data: second line of message

字段说明:

  • event:自定义事件名,对应浏览器中监听的事件eventSource.addEventListener('customEventName', ...)

  • id:消息 ID,浏览器断线重连时会通过 Last-Event-ID 请求头携带该值,服务端可以据此避免重复推送。

  • retry:告诉浏览器在连接断开后等待多少毫秒再重连(非必需)。

  • data:消息体,每个 data: 行会被合并为一个消息文本,支持多行。

  • 示例合并规则:

    1
    2
    data: line1
    data: line2

    最终事件内容为:

    1
    2
    line1
    line2

EventSource API

创建连接

1
const eventSource = new EventSource('http://localhost:8080/sse');

监听消息

1
2
3
4
5
6
7
8
9
// 默认消息(event字段未指定)
eventSource.onmessage = function(event) {
console.log('Message:', event.data);
};

// 监听自定义事件
eventSource.addEventListener('customEvent', function(event) {
console.log('Custom Event:', event.data);
});

错误处理

1
2
3
eventSource.onerror = function(err) {
console.error('EventSource failed:', err);
};

关闭连接

1
eventSource.close();

自动重连特性

  • 默认自动重连机制,若服务端响应中指定了 retry: 行,浏览器会按其值重试连接。
  • 客户端自动附带 Last-Event-ID 请求头。

Spring MVC 中的 SSE 实现

SseEmitter 核心类与生命周期

类概述

  • 包路径org.springframework.web.servlet.mvc.method.annotation.SseEmitter
  • 功能:基于 Servlet 的长连接机制,封装了向客户端推送 Server-Sent Events 的能力。

构造与超时

  • 构造器

    1
    2
    public SseEmitter();               // 默认超时:30 000ms
    public SseEmitter(Long timeout); // 自定义超时,传 0L 表示永不超时
    • 超时处理:若在指定时间内没有任何消息发送,容器会触发超时回调并关闭连接。

主要方法

  • send(Object):向客户端发送一条事件(Content-Type 自动设为 text/event-stream)。
  • send(SseEventBuilder):更灵活地构造事件:可指定 ideventretrydata
  • complete():正常结束,向客户端发送终止符并关闭连接。
  • completeWithError(Throwable):带错误结束,发送错误并关闭连接。
  • onCompletion(Runnable):注册「正常完成时」的回调。
  • onTimeout(Runnable):注册「超时」的回调。
  • onError(Consumer<Throwable>):注册「发送异常」时的回调。

生命周期流程

  1. 实例化
    • Controller 中创建 new SseEmitter(timeout),返回给 DispatcherServlet。
  2. 挂起请求
    • Servlet 容器保留该请求,不立即返回响应结束。
  3. 数据推送
    • 在异步线程中反复调用 emitter.send(...),每次都会 flush 到客户端。
  4. 完成或超时
    • 正常完成:调用 complete()
    • 超时:超过 timeout 未发送数据,容器自动调用 registered onTimeout 回调并关闭;
    • 异常send() 抛出 IOException,可在 onError 回调中处理,并调用 completeWithError
  5. 回调触发
    • 调用完 complete() 或超时后,会触发对应的回调,可以做资源清理、日志记录等。

普通Controller示例

下面示例展示如何在 Spring MVC/Boot 的普通 Controller 中使用 SseEmitter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@RestController
public class SseController {

// 可复用线程池
private final ExecutorService executor = Executors.newSingleThreadExecutor();

@GetMapping("/sse/stream")
public SseEmitter streamEvents() {
// 创建一个永不超时的 emitter
SseEmitter emitter = new SseEmitter(0L);

// 注册超时和完成回调
emitter.onTimeout(() -> {
System.out.println("SSE 连接超时,进行清理");
emitter.complete();
});
emitter.onCompletion(() -> System.out.println("SSE 连接已关闭"));

// 异步推送示例数据
executor.execute(() -> {
try {
for (int i = 1; i <= 5; i++) {
// 构造一个带事件名和 ID 的 SSE 消息
emitter.send(
SseEmitter.event()
.id(String.valueOf(i))
.name("heartbeat")
.data("服务器心跳 " + i)
.reconnectTime(3000)
);
// 模拟业务处理间隔
TimeUnit.SECONDS.sleep(2);
}
// 发送完成,关闭连接
emitter.complete();
} catch (IOException | InterruptedException ex) {
// 出现异常时带错误结束
emitter.completeWithError(ex);
}
});

return emitter;
}
}

关键点说明

  1. 永不超时

    • new SseEmitter(0L) 保证连接常驻,适合频繁推送场景;
    • 若希望自动中断,请设定非 0 的超时时间(单位 ms)。
  2. 异步推送

    • Controller 方法立即返回 SseEmitter,Servlet 容器挂起响应,由另一个线程不断调用 send()
  3. 事件构造

    • 使用 SseEmitter.event() 构造器可链式设置:

      • id(...):消息 ID,用于断线重连时续流;

      • name(...):事件名,前端可通过 addEventListener 精确监听;

      • data(...):实际推送内容;

      • reconnectTime(...):重连间隔,覆盖客户端默认值。

  4. 关闭与异常

    • 正常流程结束后必须调用 complete() 关闭连接;
    • 若发送过程中发生 IO 异常,应调用 completeWithError(),前端会收到错误并关闭;
    • 可通过 onCompletiononTimeoutonError 注册回调做清理或日志。

阻塞和异步

阻塞式处理

默认情况下,Spring MVC 每个请求占用一个 Servlet 线程。如果在同一个线程里循环调用 SseEmitter.send()sleep(),就会一直占用该线程,导致服务器吞吐量急剧下降。

1
2
3
4
5
6
7
8
9
10
11
@GetMapping("/sse/blocking")
public SseEmitter blockingStream() throws InterruptedException, IOException {
SseEmitter emitter = new SseEmitter(0L);
// 这是阻塞式:在当前线程里循环发送并 sleep
for (int i = 1; i <= 5; i++) {
emitter.send("message " + i);
Thread.sleep(2000);
}
emitter.complete();
return emitter;
}

问题:每个客户端连接都占用 1 个线程,连接数量多时容易导致线程耗尽。

异步方案一:@Async

  1. 在业务层使用 Spring 的异步方法释放请求线程;
  2. Controller 立即返回 SseEmitter,真正的推送逻辑在异步线程中进行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Service
public class SseService {

@Async
public void pushEvents(SseEmitter emitter) {
try {
for (int i = 1; i <= 5; i++) {
emitter.send(SseEmitter.event()
.id(String.valueOf(i))
.data("Async message " + i));
Thread.sleep(2000);
}
emitter.complete();
} catch (Exception ex) {
emitter.completeWithError(ex);
}
}

}

@RestController
@RequiredArgsConstructor
public class SseController {

private SseService sseService;

@GetMapping("/sse/async")
public SseEmitter asyncStream() {
SseEmitter emitter = new SseEmitter(0L);
sseService.pushEvents(emitter); // 异步执行
return emitter; // 立即释放请求线程
}

}

异步方案二:DeferredResult

  1. 使用 DeferredResult<SseEmitter>,在子线程里构造并返回 SseEmitter
  2. 同样释放 Servlet 请求线程,提高吞吐量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@GetMapping("/sse/deferred")
public DeferredResult<SseEmitter> deferredStream() {
DeferredResult<SseEmitter> result = new DeferredResult<>();
executor.execute(() -> {
SseEmitter emitter = new SseEmitter(0L);
try {
for (int i = 1; i <= 5; i++) {
emitter.send("Deferred message " + i);
Thread.sleep(2000);
}
emitter.complete();
result.setResult(emitter); // 设置返回值
} catch (Exception ex) {
emitter.completeWithError(ex);
result.setErrorResult(ex);
}
});
return result; // 立即释放请求线程
}

简单心跳与重连

服务端定时发送心跳注释

SSE 规范支持“注释行”(以 : 开头),客户端收到注释也会重置重连计时器,但不会触发 message 事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@GetMapping("/sse/heartbeat")
public SseEmitter heartbeatStream() {
SseEmitter emitter = new SseEmitter(0L);
// 定时发送注释做心跳,间隔 15 秒
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
try {
// 发送注释行,客户端不触发 onmessage
emitter.send(SseEmitter.event()
.comment("ping"));
} catch (IOException ex) {
emitter.completeWithError(ex);
}
}, 0, 15, TimeUnit.SECONDS);

// 连接关闭时取消心跳任务
emitter.onCompletion(() -> future.cancel(true));
emitter.onTimeout(() -> {
future.cancel(true);
emitter.complete();
});
return emitter;
}

客户端自动重连与事件监听

浏览器原生 EventSource 会在网络断开后自动重连,默认等待 3 000 ms,可由服务器通过 retry: 指定:

1
2
3
4
5
6
7
8
9
10
11
12
13
const es = new EventSource('/sse/heartbeat');

// 监听连接打开
es.onopen = () => console.log('SSE 已连接');

// 监听默认消息(此例仅有注释心跳,不会触发)
es.onmessage = e => console.log('收到消息:', e.data);

// 监听错误(会在断线后自动重连)
es.onerror = err => console.warn('连接错误,正在重连...', err);

// 如果需要自定义重连时间,可在服务器端发送:
// SseEmitter.event().reconnectTime(5000)

说明

  • 注释行不会进入 onmessage,但可保持连接;
  • 服务器发送 .reconnectTime(ms) 会覆盖客户端重连时长;
  • 一旦连接失败,浏览器自动重连并带上 Last-Event-ID,由服务端决定是否重发漏掉的消息。

常见配置

应用服务器层面

  1. 线程与连接池

    • 长连接数:SSE 连接会长期占用 HTTP 线程或 NIO 事件线程,需要适当增大 server.tomcat.max-threads(或 Jetty/Undertow 的对应参数),以承载并发的持久连接。
    • 连接超时:默认的 server.tomcat.connection-timeout(如 20s)要调大,或者对于 SSE 路径单独设置为 0(永不超时)。
    1
    2
    3
    4
    5
    server:
    port: 8080
    tomcat:
    max-threads: 2000 # 支撑更多并发长连接
    connection-timeout: 600000 # 600秒
  2. 响应缓冲与分块

    • 禁用 HttpServletResponse 层面的缓冲:禁用 sse 路径的 HttpServletResponse 缓冲,同时告知代理层关闭缓冲。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      @Configuration
      public class SseBufferConfig {

      /**
      * 注册一个 Servlet 过滤器,对 /sse/* 路径禁用 HttpServletResponse 层面的缓冲,
      * 并增加 Nginx 友好的 no-buffering 头部。
      */
      @Bean
      public FilterRegistrationBean<Filter> disableBufferingFilter() {
      FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<>();
      bean.setFilter((request, response, chain) -> {
      if (response instanceof HttpServletResponse) {
      HttpServletResponse resp = (HttpServletResponse) response;
      // 禁用 servlet 输出缓冲
      resp.setBufferSize(0);
      // 如果前端有 Nginx/Envoy,可用此头告知关闭代理缓冲
      resp.setHeader("X-Accel-Buffering", "no");
      }
      chain.doFilter(request, response);
      });
      bean.addUrlPatterns("/sse/*");
      bean.setName("disableBufferingFilter");
      bean.setOrder(0);
      return bean;
      }
      }
    • Chunked Transfer:HTTP1.1 下 SSE 默认走分块传输(chunked),确保代理层也支持。

反向代理 / 负载均衡

大多数生产环境会在前面放 Nginx、HAProxy、Envoy 等,需要针对 SSE 调优:

1
2
3
4
5
6
7
8
location /sse/ {
proxy_pass http://app_servers;
proxy_http_version 1.1;
proxy_set_header Connection ""; # 保持底层长连接
proxy_buffering off; # 关掉响应缓冲
proxy_read_timeout 600s; # 读超时要比 SSE 最大空闲心跳间隔高
proxy_send_timeout 600s;
}
  • proxy_buffering off:保证服务器写入的每次 data: 都能实时下发给客户端。
  • proxy_read_timeout/send_timeout:调大(根据业务响应时长确定),防止代理主动断开空闲连接。

心跳与重连策略

  • 后台心跳:定时发送注释行或空事件(例如每 15–30s 一次),用于穿透各种中间网络设备的空闲超时。
  • 客户端重连:在服务端通过 SseEmitter.event().reconnectTime(…) 设定合理重连间隔,例如 5000ms

安全与监控

  1. CORS 与认证
    • SSE 的跨域和普通 REST 一样,可在 WebMvcConfigurer@CrossOrigin 上单独为 /sse/** 路径放行。
    • Token 鉴权时,要考虑长连接的认证续期和失效通知。
  2. 连接监控
    • 统计当前打开的 SSE 连接数,以防资源泄漏。可以在 onCompletiononErroronTimeout 回调里维护并发计数。
    • 在 Prometheus 或其他 APM 中暴露指标:sse_open_connectionssse_messages_sent_total 等。

对比 REST 接口

配置项/特性 普通 REST 接口 SSE 接口
连接模型 短连接,完成即关闭 长连接,响应持续不断地输出
响应超时 20–60s 常见 通常设置为 0(永不超时)或数分钟以上
线程/资源占用 请求处理完即释放 连接存续期间持续占用线程/NIO 事件槽
代理/缓存 可开启缓冲、缓存 必须关闭缓冲(proxy_buffering off),实时下发
心跳与断线重连 需显式心跳注释行;客户端自动重连与 Last-Event-ID 续流

场景

分布式消息推送

链路

使用 SseEmitter 建立浏览器长连接,结合 JWT 鉴权 + Redis Pub/Sub 完成跨实例广播。整体链路同样可拆为三段:

阶段 关键动作
握手 浏览器 GET /sse/connect?token=xxx
JwtAuthInterceptor 校验
→ Controller 返回 SseEmitter
本机路由 SseSessionManager 维护 uid ↔ SseEmitter 映射;
NotificationService 负责发送
跨实例广播 业务代码/任务调度
RedisPublisher 发布 SseEvent
RedisSubscriber 订阅并分派给本机用户

❗ 与 WebSocket 不同:浏览器 只收不发,业务侧想推消息时自行调用 NotificationService(或 REST API、MQ 监听任务)。

核心类及职责

作用 关键点
JwtUtil 解析 / 校验 JWT parseToken() → 返回 userId
JwtAuthInterceptor HandlerInterceptor token 取 uid,放入 request.setAttribute("uid", …)
SseController 建立 SSE 连接 connect() 创建 SseEmitter 并注册到 SseSessionManager
SseEvent 统一消息模型 uideventdatabroadcast 四字段
SseSessionManager 本机 Emitter 池 register / remove / sendTo / broadcast
NotificationService 业务入口 调用 RedisPublisher 或直接 sessionManager.xxx()
RedisPublisher 发布事件到 Redis Channel convertAndSend("sse-event", json)
RedisSubscriber 订阅 sse-event Channel 解析 SseEvent → 定向 / 广播 给本机 SseEmitter
Sseutil 定向推送或广播事件 通过 SsePublisher 向 Redis 频道发布事件,多实例订阅 Redis 频道,判断是否需要向本机管理的 SseEmitter 发送事件,实现跨实例

时序说明

sequenceDiagram
    participant B as Browser
    participant G as Nginx / LB
    participant A as Pod-A
    participant R as Redis
    participant B2 as Pod-B
    participant S as 业务Service

%% 握手阶段
    B->>G: GET /sse/connect?token=JWT
    G->>A: HTTP (粘性或随机)
    A->>A: JwtAuthInterceptor:解析 token → uid
    A-->>B: 200 + text/event-stream(SseEmitter 建立)

%% 服务端推送
    S->>A: NotificationService.push(uid,msg)
    A->>R: Redis PUBLISH sse-event
    R-->>A: SUBSCRIBE
    R-->>B2: SUBSCRIBE
    A->>A: SseSessionManager.sendTo(uid)
    B2->>B2: SseSessionManager.sendTo(uid)

%% 广播
    Note over S: broadcast=true\nSubscriber 调用\nbroadcast()
  1. 握手:拦截器校验 JWT,将 uid 写入属性;Controller 生成 SseEmitter 并登记。
  2. 本机会话注册SseSessionManager.register(uid, emitter);在 onCompletion/onTimeout/onError 中注销。
  3. 消息入口:业务代码或定时任务调用 NotificationService;所有事件先写 Redis,保证分布式一致性。
  4. 跨实例分发:各实例的 RedisSubscriber 订阅 sse-event;按 broadcast 标记选择 sendTobroadcast
  5. 客户端渲染:前端用 EventSource 监听不同 eventName 做弹窗 / 列表追加等 UI 处理。

代码示例

  1. JWT 工具类,用于解析 JWT 令牌

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Component
    public class JwtUtil {

    @Value("${jwt.secret}")
    private String secret;

    public String parseToken(String token) {
    return Jwts.parser()
    .setSigningKey(secret.getBytes(StandardCharsets.UTF_8))
    .parseClaimsJws(token.replace("Bearer ", ""))
    .getBody()
    .getSubject();
    }
    }

    JWT 认证拦截器,用于验证请求中的 JWT 令牌

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Component
    @RequiredArgsConstructor
    public class JwtAuthInterceptor implements HandlerInterceptor {

    private final JwtUtil jwtUtil;

    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
    String token = request.getParameter("token");
    try {
    req.setAttribute("uid", jwt.parseToken(token));
    return true;
    } catch (JwtException e) {
    response.setStatus(401);
    return false;
    }
    }
    }

    Web 配置,注册拦截器以及设置跨域

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @Configuration
    @RequiredArgsConstructor
    public class WebConfig implements WebMvcConfigurer {

    private final JwtAuthInterceptor jwtAuthInterceptor;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
    registry.addInterceptor(jwtAuthInterceptor)
    .addPathPatterns("/sse/**");
    }

    @Override
    public void addCorsMappings(CorsRegistry registry) {
    registry.addMapping("/sse/**")
    .allowedOrigins("*")
    .allowedMethods("GET");
    }

    }
  2. SSE 事件数据传输对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Accessors(chain = true)
    public class SseEvent {

    /**
    * 目标用户,可空
    */
    private String uid;

    /**
    * 事件名,如 HEARTBEAT、NOTICE
    */
    private String event;

    /**
    * 数据:任意 JSON
    */
    private Object data;

    /**
    * 是否广播,默认为 false
    */
    private boolean broadcast;

    }

    会话管理器,用于管理和发送 SSE 事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    @Slf4j
    @Component
    public class SseSessionManager {

    private final static Map<String, Set<SseEmitter>> ALL_EMITTERS = new ConcurrentHashMap<>();

    /**
    * 注册一个新的 SSE emitter
    * @param uid 用户唯一标识符,用于区分不同用户的连接
    * @return 返回一个新的 SseEmitter 实例
    */
    public SseEmitter register(String uid) {
    SseEmitter emitter = new SseEmitter(0L); // 永不超时
    ALL_EMITTERS.computeIfAbsent(uid, k -> ConcurrentHashMap.newKeySet()).add(emitter);
    log.info("SSE emitter registered for UID: {}", uid);
    Runnable clear = () -> ALL_EMITTERS.computeIfPresent(uid, (k, emitters) -> {
    emitters.remove(emitter);
    log.info("SSE emitter removed for UID: {}", uid);
    return emitters.isEmpty() ? null : emitters;
    });
    emitter.onCompletion(clear);
    emitter.onTimeout(clear);
    emitter.onError(t -> clear.run());
    // 关键:立刻发送首帧,强制 flush header
    try {
    emitter.send(SseEmitter.event()
    .name("INIT")
    .data("CONNECTED")
    .reconnectTime(3000)); // 可选:告诉客户端重连间隔
    } catch (IOException e) {
    emitter.completeWithError(e);
    }
    return emitter;
    }

    /**
    * 定时任务,每30秒发送心跳事件到所有连接
    * 根据发送结果,自动清理失效的 emitter
    */
    @Scheduled(fixedRate = 30000)
    public void heartbeat() {
    // 定时向所有的连接发送心跳事件
    ALL_EMITTERS.forEach((uid, emitters) -> {
    SseEvent heartbeatEvent = new SseEvent(uid, "HEARTBEAT", "PING", false);
    emitters.forEach(e -> {
    try {
    e.send(SseEmitter.event().name(heartbeatEvent.getEvent()).data(heartbeatEvent.getData()));
    } catch (Exception ex) {
    log.error("Error sending heartbeat to UID: {}", uid, ex);
    e.completeWithError(ex);
    emitters.remove(e);
    }
    });
    });
    }

    /**
    * 发送指定事件到特定用户的所有连接
    * @param uid 用户唯一标识符
    * @param sseEvent 要发送的 SSE 事件
    */
    public void sendTo(String uid, SseEvent sseEvent) {
    Set<SseEmitter> sendEmitters = ALL_EMITTERS.get(uid);
    if (sendEmitters != null) {
    sendEmitters.forEach(e -> send(sseEvent, e));
    }
    }

    /**
    * 广播事件到所有连接
    * @param sseEvent 要广播的 SSE 事件
    */
    public void broadcast(SseEvent sseEvent) {
    ALL_EMITTERS.values().forEach(emitters -> {
    if (emitters != null) {
    emitters.forEach(e -> send(sseEvent, e));
    }
    });
    }

    /**
    * 发送事件到指定的 SSE emitter
    * @param sseEvent 要发送的 SSE 事件
    */
    private void send(SseEvent sseEvent, SseEmitter sseEmitter) {
    if (sseEmitter == null) {
    return;
    }
    try {
    sseEmitter.send(SseEmitter.event().name(sseEvent.getEvent()).data(sseEvent.getData()));
    } catch (Exception ex) {
    sseEmitter.completeWithError(ex);
    // 如果发送失败,移除该 emitter
    Set<SseEmitter> removeEmitters = ALL_EMITTERS.get(sseEvent.getUid());
    if (removeEmitters != null) {
    removeEmitters.remove(sseEmitter);
    if (removeEmitters.isEmpty()) {
    ALL_EMITTERS.remove(sseEvent.getUid());
    }
    }
    }
    }
    }
  3. Redis 发布者,用于发布事件到 SSE 频道

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Component
    @RequiredArgsConstructor
    public class SsePublisher {

    public static final String CHANNEL = "sse-event";

    private final StringRedisTemplate stringRedisTemplate;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public void publish(SseEvent sseEvent) {
    try {
    stringRedisTemplate.convertAndSend(CHANNEL, objectMapper.writeValueAsString(sseEvent));
    } catch (JsonProcessingException ignored) {
    }
    }
    }

    Redis 订阅者,用于从 SSE 频道接收事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    @Component
    @RequiredArgsConstructor
    public class SseSubscriber implements MessageListener {

    private final SseSessionManager sseSessionManager;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final StringRedisTemplate stringRedisTemplate;

    @PostConstruct
    public void init() {
    Objects.requireNonNull(stringRedisTemplate.getConnectionFactory()).getConnection()
    .subscribe(this, SsePublisher.CHANNEL.getBytes());
    }

    public void onMessage(Message rawMessage, byte[] pattern) {
    try {
    SseEvent sseEvent = objectMapper.readValue(rawMessage.getBody(), SseEvent.class);
    if (sseEvent.isBroadcast()) {
    sseSessionManager.broadcast(sseEvent);
    } else {
    sseSessionManager.sendTo(sseEvent.getUid(), sseEvent);
    }
    } catch (IOException ignored) {
    }
    }
    }
  4. SSE 连接接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @RestController
    @RequestMapping("/sse")
    @RequiredArgsConstructor
    public class SseController {

    private final SseSessionManager sessionManager;

    @GetMapping(path = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter connect(HttpServletRequest req) {
    String uid = (String) req.getAttribute("uid");
    return sessionManager.register(uid);
    }
    }
  5. SSE 工具类,用于推送事件到特定用户或全站广播(通过 Redis 实现跨实例的关键)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    @Component
    @RequiredArgsConstructor
    public class SseUtil {

    private final SsePublisher pub;

    /**
    * 定向推送
    */
    public void pushToUser(String uid, String msg, String event) {
    pub.publish(new SseEvent(uid, event, msg, false));
    }

    /**
    * 全站广播
    */
    public void broadcast(String msg, String event) {
    pub.publish(new SseEvent(null, event, msg, true));
    }

    }
  6. 测试接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Resource
    private SseUtil sseUtil;

    @GetMapping("/send/{uid}/{message}")
    public void sendMessage(@PathVariable("uid") String uid,
    @PathVariable("message") String message) {
    sseUtil.pushToUser(uid, message, "NOTICE");
    }

    @GetMapping("/broadcast/{message}")
    public void broadcastMessage(@PathVariable("message") String message) {
    sseUtil.broadcast(message, "NOTICE");
    }

    测试 HTML 文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    <!DOCTYPE html>
    <html lang="zh-CN">
    <head>
    <meta charset="UTF-8">
    <title>SSE 测试页面</title>
    <style>
    body { font-family: sans-serif; padding: 20px; }
    #log { margin-top: 20px; white-space: pre-wrap; border: 1px solid #ccc; padding: 10px; height: 200px; overflow-y: auto; }
    input, button { font-size: 1rem; padding: 5px; margin-right: 5px; }
    </style>
    </head>
    <body>
    <h1>SSE 连接测试</h1>
    <label>
    Token:
    <input type="text" id="tokenInput" placeholder="输入 token">
    </label>
    <button id="connectBtn">连接 SSE</button>
    <button id="closeBtn" disabled>断开连接</button>
    <div id="log">[日志输出区]</div>

    <script>
    const btnConnect = document.getElementById('connectBtn');
    const btnClose = document.getElementById('closeBtn');
    const tokenInput = document.getElementById('tokenInput');
    const logEl = document.getElementById('log');
    let es;

    function log(msg) {
    const time = new Date().toLocaleTimeString();
    logEl.textContent += `\n[${time}] ${msg}`;
    logEl.scrollTop = logEl.scrollHeight;
    }

    btnConnect.addEventListener('click', () => {
    const token = tokenInput.value.trim();
    if (!token) {
    alert('请先输入 token');
    return;
    }
    // 如果已有连接,先关闭旧连接
    if (es) {
    es.close();
    log('已关闭旧连接');
    }
    const url = `http://localhost:8000/sse/connect?token=${encodeURIComponent(token)}`;
    log(`尝试连接:${url}`);
    es = new EventSource(url);

    es.onopen = () => {
    log('SSE 通道已打开,等待服务器推送...');
    btnClose.disabled = false;
    };
    es.onerror = (err) => {
    log('连接出错或已关闭');
    console.error(err);
    btnClose.disabled = true;
    };
    es.onmessage = (evt) => {
    log(`收到消息:${evt.data}`);
    };
    es.addEventListener('NOTICE', (evt) => {
    log(`自定义事件 NOTICE:${evt.data}`);
    });
    });

    btnClose.addEventListener('click', () => {
    if (es) {
    es.close();
    log('已主动关闭 SSE 连接');
    btnClose.disabled = true;
    }
    });
    </script>
    </body>
    </html>

流式响应

作用

  • AI 问答/文案生成:用户提交一个问题或提示(prompt),后端调用大模型逐 token 生成答案,浏览器边渲染边展示。
  • Markdown 实时渲染:用户在编辑器输入内容,服务端将解析后的 HTML 分段推送到客户端,实时预览效果。
  • 代码编译/日志输出:提交编译任务后,后端实时推送编译日志或进度,让前端滚动展示。
  • 数据处理进度:异步跑批任务时,通过 SSE 分段推送处理进度或结果,实时反馈给用户。

这些场景都需要在同一次 HTTP 请求中,边处理边推送,无需等待整个结果完成后才返回。

代码示例

  1. 后端接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    @RestController
    @RequestMapping("/sse/ai")
    public class AiStreamController {

    // 可复用线程池
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    @GetMapping("/stream")
    public SseEmitter streamAiResponse(@RequestParam(required = false) String prompt) {
    // 永不超时,直到 complete()/completeWithError()
    SseEmitter emitter = new SseEmitter(0L);

    executor.execute(() -> {
    try {
    // 模拟 AI 按 token 生成输出
    String[] tokens = {
    "这", "是", "一", "段", "模", "拟", "流", "式", "响", "应", "。"
    };
    for (String token : tokens) {
    // 每个 token 单独发一次
    emitter.send(SseEmitter.event()
    .name("TOKEN")
    .data(token)
    );
    Thread.sleep(150); // 模拟生成延迟
    }
    // 通知前端结束
    emitter.send(SseEmitter.event()
    .name("END")
    .data("[DONE]")
    );
    emitter.complete();
    } catch (Exception ex) {
    emitter.completeWithError(ex);
    }
    });

    return emitter;
    }

    }
  2. 测试 HTML 文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    <!DOCTYPE html>
    <html lang="zh-CN">
    <head>
    <meta charset="UTF-8">
    <title>AI 流式响应示例</title>
    <style>
    body { font-family: sans-serif; padding: 20px; }
    #output { white-space: pre-wrap; border: 1px solid #ccc; padding: 10px; height: 200px; overflow-y: auto; }
    #startBtn { padding: 10px 20px; font-size: 16px; }
    </style>
    </head>
    <body>

    <button id="startBtn">开始 AI 流式响应</button>
    <div id="output"></div>

    <script>
    document.getElementById('startBtn').addEventListener('click', () => {
    // 清空上次输出
    const out = document.getElementById('output');
    out.textContent = '';

    // 建立 SSE 连接
    const es = new EventSource('http://localhost:8000/sse/ai/stream');

    // 接收每个 TOKEN 事件
    es.addEventListener('TOKEN', e => {
    out.textContent += e.data;
    out.scrollTop = out.scrollHeight;
    });

    // 接收结束事件
    es.addEventListener('END', () => {
    out.textContent += '\n[流式响应结束]';
    es.close();
    });

    // 错误处理
    es.onerror = err => {
    out.textContent += '\n[连接出错或已断开]';
    es.close();
    };
    });
    </script>

    </body>
    </html>

WebFlux 中的 SSE

基本原理

  • 非阻塞响应式

    Spring WebFlux 底层基于 Reactor 和 Netty(或 Undertow),采用事件循环(Event Loop)模型,能够用极少的线程处理大量并发连接。SSE 在 WebFlux 中就是把 Flux<T> 当作一个“无限流”,按需把每个元素序列化成 text/event-stream 的 chunk 发送给客户端。

  • Media Type

    使用 MediaType.TEXT_EVENT_STREAM_VALUE(即 text/event-stream)告诉框架和客户端这是一个 Server-Sent Events 流。HTTP1.1 下会自动启用 chunked transfer,不需要手动 flush;流结束时 Flux 完成,HTTP 响应关闭。

  • Backpressure 与取消

    客户端断开(如调用 EventSource.close() 或网络断开)时,Flux 会收到取消信号(cancel),可在 .doOnCancel() 中做资源清理。Reactor 自动处理下游背压,保证上游不会无限制生产。

使用 Flux 创建 SSE

  • 注解方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @RestController
    public class ReactiveSseController {

    @GetMapping(value = "/sse/reactive", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamPlainText() {
    // 每秒一个字符串
    return Flux.interval(Duration.ofSeconds(1))
    .map(seq -> "tick: " + seq);
    }

    }

    以上代码会把每个 String 自动包成:

    1
    2
    3
    data: tick: 0
    data: tick: 1

  • ServerSentEvent 构造:若要自定义事件名、ID 或重连时间,可返回 Flux<ServerSentEvent<T>>

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @GetMapping(value = "/sse/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> streamEvents() {
    return Flux.interval(Duration.ofSeconds(1))
    .map(seq -> ServerSentEvent.<String>builder()
    .id(String.valueOf(seq))
    .event("heartbeat")
    .data("心跳 " + seq)
    .retry(Duration.ofSeconds(5))
    .build()
    );
    }

扩展示例:集成业务流

假设有一个业务服务不断推送消息给特定用户,使用 Reactive Kafka/Redis 也很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Service
public class NotificationService {
// 假设这是一个不断从消息队列来的 Flux<SseEvent>
public Flux<SseEvent> notificationFlux(String userId) {
return kafkaReceiver.receive()
.filter(msg -> msg.getUserId().equals(userId))
.map(msg -> new SseEvent(msg.getUserId(), "notify", msg.getContent()));
}
}

@RestController
@RequiredArgsConstructor
@RequestMapping("/sse")
public class ReactiveSseController {
private final NotificationService notificationService;

@GetMapping(value = "/notify/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> notifyUser(@PathVariable String userId) {
return notificationService.notificationFlux(userId)
.map(ev -> ServerSentEvent.builder(ev.getData())
.id(ev.getUserId() + "-" + UUID.randomUUID())
.event(ev.getEvent())
.build()
)
.doOnCancel(() -> System.out.println("用户 " + userId + " 取消订阅"));
}
}
  • notificationFlux 可以是从 Kafka、Redis Pub/Sub、数据库 Tailable Cursor 等得到的 Flux<T>
  • .doOnCancel() 用于客户端断开后的清理逻辑。

前端接入示例

1
2
3
4
5
6
7
8
9
10
11
12
13
<script>
const userId = "u123";
const es = new EventSource(`/sse/notify/${userId}`);

es.addEventListener("notify", e => {
console.log("新通知:", e.data);
});

es.onerror = () => {
console.warn("连接异常或已关闭");
es.close();
};
</script>

小结

  • WebFlux SSE:把响应式流 Flux<T> 直接映射为 SSE,不再依赖 SseEmitter
  • 高效非阻塞:少量线程即可支撑海量并发长连接;
  • 灵活扩展:任意来源的 Flux(消息队列、定时器)都可无缝推送;
  • 事件定制:借助 ServerSentEvent 构建 ID、event、retry 等字段。

总结

  • 协议层面:SSE 基于 HTTP/1.1 的 text/event-stream 长连接,天然支持浏览器自动重连与 Last-Event-ID 断点续传。
  • 实现层面
    • Spring MVC 中使用 SseEmitter;合理设置超时、异步线程和输出缓冲即可支撑高并发。
    • Spring WebFlux 中直接返回 FluxServerSentEvent,利用 Reactor 的非阻塞背压模型稳定承载海量连接。
  • 生产运维:必须关闭应用服务器与 Nginx 的响应缓冲、配置心跳穿透代理超时、监控连接数与推送指标,并结合 JWT + Redis Pub/Sub 完成多实例广播。
  • 应用场景:从系统通知、进度条、日志流,到 AI Token-by-Token 输出,凡是“服务端单向快速推流”皆可用 SSE 简洁高效地实现。

掌握本文提供的最佳实践和代码骨架后,开发者可在单机或分布式环境中快速落地稳定、可扩展的 SSE 实时推送能力,让前端页面“活”起来,同时保持架构的简洁与可维护性。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!