WebSocket实战
本文最后更新于:1 分钟前
前言
在移动互联网与 IoT 场景下,“毫秒级、双向实时通信” 已成为业务标配:在线聊天、实时协作、行情推送、游戏对战……传统 HTTP 轮询或长轮询不仅延迟高,而且资源浪费严重。
WebSocket 自 2011 年纳入 RFC 6455,提供了浏览器原生支持的 全双工、持久化 通道,是解决上述痛点的行业通用方案。本文先从协议原理出发,梳理握手、帧格式、心跳机制等基础知识;随后给出 Java 8 原生实现,帮助读者快速上手;最后落地到 Spring Boot 2.7.18 + JWT + Redis 的生产级示例,展示如何在分布式环境下实现鉴权、点对点私聊与全局广播。读完本文,你将具备从 0 到 1 搭建企业级 WebSocket 服务的完整思路与代码骨架。
理论
缘起与定位
- 痛点:HTTP 天生是请求–响应模式,服务端无法主动推送,实时性差。
- 方案:WebSocket(RFC 6455,2011 发布)在浏览器与服务器之间建立一条全双工、持久化的 TCP 连接,允许双方随时发送数据。
握手流程(HTTP Upgrade)
客户端发起 HTTP/1.1 请求
1
2
3
4
5
6
7
8GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: https://example.com # 可选,防跨站
Sec-WebSocket-Protocol: json, chat # 可选,子协议服务器返回 101 Switching Protocols
1
2
3
4
5HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat # 若协商成功Sec-WebSocket-Accept = BASE64(SHA-1(Sec-WebSocket-Key + GUID))
- 握手成功后,连接切换到 WebSocket 帧语义,结束所有 HTTP 语义。
数据帧结构
字段 | 长度 | 说明 |
---|---|---|
FIN | 1 bit | 是否为消息最后一帧 |
RSV1-3 | 1 bit×3 | 扩展使用,正常为 0 |
Opcode | 4 bit | 0x1 文本,0x2 二进制,0x8 关闭,0x9 Ping,0xA Pong… |
MASK | 1 bit | 客户端→服务器必须为 1;服务器→客户端通常为 0 |
Payload Len | 7/7+16/7+64 bit | 0–125,126,127 长度指示 |
Masking-Key | 0/32 bit | 仅当 MASK=1 时存在 |
Payload Data | 可变 | 真正的数据内容 |
- 掩码(Masking):浏览器必须对出站数据用随机 32 bit Key 异或;防止代理缓存与协议混淆攻击。
- 分片:大消息可拆成多帧(FIN=0)。
- 控制帧最大 125 bytes,必须独立(Ping/Pong/Close)。
连接保活与关闭
场景 | 机制 |
---|---|
保活 | 任一端可 Ping→Pong 确认活跃;无强制间隔,常配合业务心跳定制 |
关闭 | 发送 Close 帧(Code+Reason),收到方应回复 Close 并断 TCP(半握手式关闭) |
常见关闭码:1000 正常;1001 终端离开;1006 保留给异常断线(仅本地可见)。
子协议与扩展
- 子协议 (
Sec-WebSocket-Protocol
):同一连接内约定更高层语义,如graphql-ws
、mqtt
、jsonrpc
。 - 扩展 (
Sec-WebSocket-Extensions
):帧级能力,如permessage-deflate
压缩、分片重排扩展。需双方协商一致。
安全考量
风险 | 对策 |
---|---|
明文窃听/篡改 | 使用 **wss://**(TLS) |
跨站 WebSocket 劫持 (CSWSH) | 校验 Origin / token / Cookie SameSite |
反射型 DOS | 服务器应限制 Ping / Payload 大小 |
代理缓存混淆 | 客户端强制 Mask,服务端验证 Upgrade 头完整性 |
与 HTTP/HTTP-2/Server-Sent Events 对比
特性 | WebSocket | SSE | HTTP/2+push |
---|---|---|---|
双向通信 | √ | ×(服务端单向) | 理论可推送,但浏览器已弃用 |
传输层 | TCP(或 TLS) | 同 | 同 |
报文开销 | 极低(帧头 2–14 字节) | 较低 | 需 HTTP 头 |
代理兼容 | 需显式支持 Upgrade | 高 | 高 |
典型场景 | IM、实时协作、游戏 | 股票行情、日志流 | 资源预加载 |
优缺点与适用场景
优点
- 真·全双工&低延迟
- 单一持久连接降低握手开销
- 广泛浏览器支持(IE 10+)
局限
- 需专门服务器或网关(Nginx、Spring WebFlux、Socket.IO、Vert.x…)
- 中间件(反向代理、负载均衡、防火墙)须显式开启 Upgrade 支持
- 无自带断线重连与心跳,需要应用层实现
典型应用:在线聊天、实时协同编辑、行情推送、多人游戏、IoT 即时控制、直播弹幕、客服系统等。
实现要点(后端)
- 握手与协议栈
- Java:
javax.websocket
、Spring BootWebSocketHandler
- Node.js:
ws
、Socket.IO - Go:
gorilla/websocket
- Java:
- 会话管理:连接分组、广播、点对点、身份校验。
- 心跳&重连策略:Ping/Pong + 客户端指数退避重连。
- 流量治理:限连接数、限消息频率、防止大包。
- 压缩:开启
permessage-deflate
时评估 CPU 开销。 - 负载均衡:粘性会话或共享状态(如 Redis 发布订阅、消息队列)。
未来展望
- WebTransport / WebRTC DataChannel:更现代的多路复用、UDP 支持,可能在部分场景替代 WebSocket。
- **HTTP/3 (QUIC)**:在 TLS + UDP 上的多路复用,将与 WebSocket (over HTTP/3) 协同进化。
Java 原生实现
在追求极简的场景(学习、PoC、微服务里自带 WebSocket 服务)下,我们可以抛开 Spring、Netty 等重量级框架,直接使用 Java EE 7 引入的 javax.websocket
API。Java 8 自带该包(从 JDK 1.8u20 开始),配合轻量容器或 Tyrus Standalone,即可快速起 WebSocket 服务。
环境准备
工具 版本 说明 JDK 1.8 (UTF-8 编码) 原生 API Maven 3.6+ 打包 demo 运行方式 A Tomcat 8+ / Jetty 9+ 已内置 WebSocket 容器 运行方式 B Tyrus Standalone 纯 Java SE 启动,最轻量 下文同时给出容器部署与嵌入式启动两种方式。
Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>space.yangtao</groupId>
<artifactId>java-ws</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 仅在“嵌入式 Tyrus”时需要 -->
<dependencies>
<!-- https://mvnrepository.com/artifact/org.glassfish.tyrus/tyrus-server -->
<dependency>
<groupId>org.glassfish.tyrus</groupId>
<artifactId>tyrus-server</artifactId>
<version>1.20</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打可执行 JAR(嵌入式模式) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>space.yangtao.Bootstrap</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>若选择 Tomcat 部署,可省略依赖,直接打 WAR。
服务端实现
EchoEndpoint(UTF-8)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30@ServerEndpoint("/echo") // ws://host:port/echo
public class EchoEndpoint {
private static final AtomicInteger ONLINE = new AtomicInteger(0);
@OnOpen
public void onOpen(Session session) {
int cnt = ONLINE.incrementAndGet();
System.out.println("WebSocket opened, sid=" + session.getId() + ", online=" + cnt);
}
@OnMessage
public String onMessage(String msg, Session session) {
String reply = "【" + LocalDateTime.now() + "】echo: " + msg;
System.out.println("sid=" + session.getId() + " -> " + msg);
return reply; // 直接返回 => 自动包装文本帧
}
@OnClose
public void onClose(Session session, CloseReason reason) {
int cnt = ONLINE.decrementAndGet();
System.out.println("WebSocket closed, sid=" + session.getId() +
", reason=" + reason.getReasonPhrase() + ", online=" + cnt);
}
@OnError
public void onError(Session session, Throwable thr) {
System.err.println("sid=" + session.getId() + " error: " + thr.getMessage());
}
}Bootstrap(嵌入式 Tyrus 可选)
1
2
3
4
5
6
7
8
9
10
11
12
13
14public class Bootstrap {
public static void main(String[] args) throws Exception {
Server server = new Server("0.0.0.0", 8080, "/", null, EchoEndpoint.class);
try {
server.start();
System.out.println("WebSocket server started at ws://localhost:8080/echo");
// 阻塞主线程
Thread.currentThread().join();
} finally {
server.stop();
}
}
}若用 Tomcat/Jetty,只需把 EchoEndpoint 打包成 WAR 部署,容器会自动扫描
@ServerEndpoint
。运行与测试
模式 步骤 容器 mvn clean package -DskipTests
→ 将 WAR 部署到 Tomcatwebapps/
→ 浏览器连接ws://localhost:8080/<context>/echo
嵌入式 mvn clean package
→java -jar target/websocket-echo-1.0-SNAPSHOT.jar
→ 终端或浏览器连接ws://localhost:8080/echo
常见问题 & 小结
症状 排查要点 404 / 500 容器未识别 @ServerEndpoint
:确认javax.websocket-api
冲突、WEB-INF/web.xml
无手动配置冲突客户端握手失败 浏览器控制台看 Request/Response
,是否返回 101;若是 WSS 需证书多节点广播 WebSocket 连接有粘性,需借助 MQ / Redis 发布订阅做分发 心跳 建议业务层定时 ping
/自定义消息,客户端断线重连至此,已经完成了一个纯原生 Java 8 WebSocket 的完整闭环:服务端、客户端、部署与测试。后续可基于此扩展身份校验、群聊、压缩、限流等生产级特性。
Spring WebSocket
链路
使用 Spring-WebSocket 处理浏览器连接,同步集成 JWT 鉴权 与 Redis Pub/Sub 实现分布式广播。完整链路分三段:
阶段 | 关键动作 |
---|---|
握手 | 浏览器 ws://…/ws?token=xxx → JwtHandshakeInterceptor 校验 → 建立会话 |
本机路由 | WsHandler 收发消息;WsSessionManager 维护 uid ↔ Session 映射 |
跨实例广播 | RedisPublisher 将 WsMessage 发布到 Redis Channel;RedisSubscriber 订阅并分派给本机在线用户 |
核心类及职责
类 | 作用 | 关键点 |
---|---|---|
JwtUtil |
解析 / 校验 JWT | parseToken() → 返回 userId |
JwtHandshakeInterceptor |
握手前拦截 | 从 URL 查询参数提取 token → 鉴权成功把 uid 放入 attributes |
WebSocketConfig |
注册端点 | registry.addHandler(wsHandler, "/ws") 并挂上拦截器 |
WsHandler |
核心业务 Handler | afterConnectionEstablished() 注册会话;handleTextMessage() 反序列化 WsMessage 并调用 RedisPublisher |
WsMessage |
统一消息模型 | uid 、content 、broadcast 三字段 |
WsSessionManager |
本机 Session 池 | register / unregister / sendTo / broadcast |
RedisPublisher |
发布消息到 Redis | convertAndSend("ws-msg", json) |
RedisSubscriber |
订阅 ws-msg Channel |
解析 WsMessage → 定向 / 广播 给本机 Session |
RedisListenerConfig |
官方写法订阅 | 用 RedisMessageListenerContainer 注册 RedisSubscriber (可选) |
生产环境还需 TLS、限流、心跳等配套,但与上表类职责正交。
时序说明
sequenceDiagram
participant B as Browser
participant G as Nginx / LB
participant A as Pod-A
participant R as Redis
participant B2 as Pod-B
%% 握手阶段
B->>G: ws://example.com/ws?token=JWT
G->>A: ws (粘性连接)
A->>A: JwtHandshakeInterceptor\n解析 token → uid
A-->>B: 101 Switching Protocols
%% 私聊消息
B->>A: {"uid":"u2","content":"hi","broadcast":false}
A->>A: WsHandler 生成 WsMessage
A->>R: Redis PUBLISH ws-msg
R-->>A: SUBSCRIBE\n(本机收到)
R-->>B2: SUBSCRIBE\n(其他实例收到)
A->>A: WsSessionManager.sendTo(u2)
B2->>B2: WsSessionManager.sendTo(u2)
%% 广播消息
Note over B: 若 broadcast=true\nRedisSubscriber 调用\nsessionManager.broadcast()
- 握手阶段
- 请求透过负载均衡到某节点;
JwtHandshakeInterceptor
校验成功 →uid
存进WebSocketSession.attributes
。
- 请求透过负载均衡到某节点;
- 会话注册
WsHandler.afterConnectionEstablished
把(uid, session)
注册到 **WsSessionManager
**(可一人多端登录)。
- 消息入站
- 浏览器发送任意符合
WsMessage
结构的 JSON。 WsHandler.handleTextMessage
反序列化后统一调用RedisPublisher.publish(msg)
——所有消息先入 Redis,保证分布式一致。
- 浏览器发送任意符合
- 跨实例分发
- 每个实例的
RedisSubscriber
订阅ws-msg
:broadcast=true
⇒sessionManager.broadcast(json)
。- 否则
sessionManager.sendTo(msg.getUser(), json)
。
- 每个实例的
- 本机推送
WsSessionManager
遍历目标 Session;sendMessage()
仅在isOpen()
时发送,避免脏连接。
代码示例
新建 Spring Boot 项目,引入Maven依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Web & WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- JWT -->
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-api</artifactId>
<version>0.11.5</version>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-impl</artifactId>
<version>0.11.5</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-jackson</artifactId>
<version>0.11.5</version>
<scope>runtime</scope>
</dependency>
<!-- Redis (Lettuce 默认) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>添加 JWT 工具类(简化版)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15@Component
public class JwtUtil {
@Value("${jwt.secret}")
private String secret;
// 解析并校验,成功返回 userId,失败抛异常
public String parseToken(String token) {
return Jwts.parser()
.setSigningKey(secret.getBytes(StandardCharsets.UTF_8))
.parseClaimsJws(token.replace("Bearer ", ""))
.getBody()
.getSubject(); // userId
}
}定义握手拦截器,用于处理鉴权以及绑定会话
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36@Component
public class JwtHandshakeInterceptor implements HandshakeInterceptor {
private final JwtUtil jwtUtil;
public JwtHandshakeInterceptor(JwtUtil jwtUtil) {
this.jwtUtil = jwtUtil;
}
@Override
public boolean beforeHandshake(ServerHttpRequest req,
ServerHttpResponse resp,
WebSocketHandler handler,
Map<String, Object> attrs) {
URI uri = req.getURI();
String query = uri.getQuery(); // e.g. token=xxx.yyy.zzz
if (query != null && query.contains("token=")) {
String token = Arrays.stream(query.split("&"))
.filter(p -> p.startsWith("token="))
.map(p -> p.substring("token=".length()))
.findFirst().orElse(null);
try {
// 解析并校验,成功返回 userId,失败抛异常
String userId = jwtUtil.parseToken(token);
attrs.put("uid", userId);
return true;
} catch (JwtException e) {
resp.setStatusCode(HttpStatus.UNAUTHORIZED);
}
}
return false;
}
@Override public void afterHandshake(ServerHttpRequest r, ServerHttpResponse s,
WebSocketHandler h, Exception e) { }
}定义 WebSocket消息体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19@Data
@NoArgsConstructor
@AllArgsConstructor
public class WsMessage {
/**
* 目标用户
*/
private String uid;
/**
* 文本内容
* */
private String content;
/**
* 是否广播标记;true 时忽略 user
*/
private boolean broadcast;
}WebSocket 会话管理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30@Component
public class WsSessionManager {
// uid → 会话集合
private final ConcurrentHashMap<String, Set<WebSocketSession>> pool = new ConcurrentHashMap<>();
public void register(String uid, WebSocketSession s) {
pool.computeIfAbsent(uid, k -> ConcurrentHashMap.newKeySet()).add(s);
}
public void unregister(WebSocketSession s) {
pool.entrySet().removeIf(entry -> entry.getValue().remove(s));
}
/**
* 精准投递
*/
public void sendTo(String uid, String json) {
pool.getOrDefault(uid, new HashSet<>()).forEach(s -> safeSend(s, json));
}
/**
* 全局广播
*/
public void broadcast(String json) {
pool.values().stream().flatMap(Set::stream).forEach(s -> safeSend(s, json));
}
private void safeSend(WebSocketSession s, String text){
if (s.isOpen()) try { s.sendMessage(new TextMessage(text)); } catch (IOException ignored) {}
}
}Redis发布/订阅组件
1
2
3
4
5
6
7
8
9
10
11
12@Component
@RequiredArgsConstructor
public class RedisPublisher {
private final StringRedisTemplate template;
private final ObjectMapper mapper;
public void publish(WsMessage msg){
try {
template.convertAndSend("ws-msg", mapper.writeValueAsString(msg));
} catch (JsonProcessingException ignored) {}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27@Component
@RequiredArgsConstructor
public class RedisSubscriber implements MessageListener {
private final StringRedisTemplate stringRedisTemplate;
private final WsSessionManager sessionManager;
private final ObjectMapper mapper = new ObjectMapper();
@PostConstruct
public void init() {
Objects.requireNonNull(stringRedisTemplate.getConnectionFactory()).getConnection()
.subscribe(this, "ws-msg".getBytes());
}
@Override
public void onMessage(Message raw, byte[] pattern) {
try {
WsMessage msg = mapper.readValue(raw.getBody(), WsMessage.class);
if (msg.isBroadcast()) {
sessionManager.broadcast(msg.getContent());
} else {
sessionManager.sendTo(msg.getUid(), msg.getContent());
}
} catch (IOException ignored) {
}
}
}WebSocket处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37@Component
@RequiredArgsConstructor
public class WsHandler extends TextWebSocketHandler {
private final WsSessionManager sessionManager;
private final RedisPublisher publisher;
private final ObjectMapper mapper = new ObjectMapper();
@Override
public void afterConnectionEstablished(WebSocketSession s) {
String uid = (String) s.getAttributes().get("uid");
sessionManager.register(uid, s);
}
@Override
protected void handleTextMessage(WebSocketSession s, TextMessage tm) {
try {
if (tm.getPayload().equalsIgnoreCase("PING")) {
s.sendMessage(new TextMessage("PONG"));
return;
}
// 客户端直接发送符合 WsMessage 结构的 JSON
WsMessage msg = mapper.readValue(tm.getPayload(), WsMessage.class);
// 服务端可补充 from/时间戳等信息
// msg.setFrom(uid)...
publisher.publish(msg); // 一律走 Redis,实现跨实例广播 / 定向
} catch (IOException ignored) {
}
}
@Override
public void afterConnectionClosed(WebSocketSession s, CloseStatus cs) {
sessionManager.unregister(s);
}
}配置文件
1
2
3
4
5
6spring.application.name=spring-ws
spring.redis.host=127.0.0.1
spring.redis.port=6379
jwt.secret=abc123
总结
- 协议层:一次 HTTP Upgrade 握手即可切换到轻量帧语义,帧头最小仅 2 Bytes,依靠 Ping/Pong 保活。
- 开发层:Java 提供
javax.websocket
(Tyrus)满足纯 JDK 场景;Spring-WebSocket 则无缝融入 MVC 生态,可配合拦截器完成 JWT 鉴权。 - 分布式:使用 Redis Pub/Sub 解耦实例,实现 “写入一次,集群即达”;统一的
WsMessage
数据结构简化了本机与跨节点路由。 - 生产层:心跳、限流、TLS、Origin 校验、日志监控缺一不可;推荐用
RedisMessageListenerContainer
、Prometheus 指标与灰度发布完善运维链路。 - 演进方向:随着 HTTP/3 与 WebTransport 普及,未来可在 QUIC/UDP 上获得更低延迟与多路复用能力,但短期内 WebSocket 仍是浏览器实时通信的主力军。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!