WebSocket实战

本文最后更新于:1 分钟前

前言

在移动互联网与 IoT 场景下,“毫秒级、双向实时通信” 已成为业务标配:在线聊天、实时协作、行情推送、游戏对战……传统 HTTP 轮询或长轮询不仅延迟高,而且资源浪费严重。

WebSocket 自 2011 年纳入 RFC 6455,提供了浏览器原生支持的 全双工、持久化 通道,是解决上述痛点的行业通用方案。本文先从协议原理出发,梳理握手、帧格式、心跳机制等基础知识;随后给出 Java 8 原生实现,帮助读者快速上手;最后落地到 Spring Boot 2.7.18 + JWT + Redis 的生产级示例,展示如何在分布式环境下实现鉴权、点对点私聊与全局广播。读完本文,你将具备从 0 到 1 搭建企业级 WebSocket 服务的完整思路与代码骨架。

理论

缘起与定位

  • 痛点:HTTP 天生是请求–响应模式,服务端无法主动推送,实时性差。
  • 方案:WebSocket(RFC 6455,2011 发布)在浏览器与服务器之间建立一条全双工持久化的 TCP 连接,允许双方随时发送数据。

握手流程(HTTP Upgrade)

  1. 客户端发起 HTTP/1.1 请求

    1
    2
    3
    4
    5
    6
    7
    8
    GET /chat HTTP/1.1
    Host: example.com
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
    Sec-WebSocket-Version: 13
    Origin: https://example.com # 可选,防跨站
    Sec-WebSocket-Protocol: json, chat # 可选,子协议
  2. 服务器返回 101 Switching Protocols

    1
    2
    3
    4
    5
    HTTP/1.1 101 Switching Protocols
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
    Sec-WebSocket-Protocol: chat # 若协商成功
    • Sec-WebSocket-Accept = BASE64(SHA-1(Sec-WebSocket-Key + GUID))
    • 握手成功后,连接切换到 WebSocket 帧语义,结束所有 HTTP 语义。

数据帧结构

字段 长度 说明
FIN 1 bit 是否为消息最后一帧
RSV1-3 1 bit×3 扩展使用,正常为 0
Opcode 4 bit 0x1 文本,0x2 二进制,0x8 关闭,0x9 Ping,0xA Pong…
MASK 1 bit 客户端→服务器必须为 1;服务器→客户端通常为 0
Payload Len 7/7+16/7+64 bit 0–125,126,127 长度指示
Masking-Key 0/32 bit 仅当 MASK=1 时存在
Payload Data 可变 真正的数据内容
  • 掩码(Masking):浏览器必须对出站数据用随机 32 bit Key 异或;防止代理缓存与协议混淆攻击。
  • 分片:大消息可拆成多帧(FIN=0)。
  • 控制帧最大 125 bytes,必须独立(Ping/Pong/Close)。

连接保活与关闭

场景 机制
保活 任一端可 Ping→Pong 确认活跃;无强制间隔,常配合业务心跳定制
关闭 发送 Close 帧(Code+Reason),收到方应回复 Close 并断 TCP(半握手式关闭)

常见关闭码:1000 正常;1001 终端离开;1006 保留给异常断线(仅本地可见)。

子协议与扩展

  • 子协议 (Sec-WebSocket-Protocol):同一连接内约定更高层语义,如 graphql-wsmqttjsonrpc
  • 扩展 (Sec-WebSocket-Extensions):帧级能力,如 permessage-deflate 压缩、分片重排扩展。需双方协商一致。

安全考量

风险 对策
明文窃听/篡改 使用 **wss://**(TLS)
跨站 WebSocket 劫持 (CSWSH) 校验 Origin / token / Cookie SameSite
反射型 DOS 服务器应限制 Ping / Payload 大小
代理缓存混淆 客户端强制 Mask,服务端验证 Upgrade 头完整性

与 HTTP/HTTP-2/Server-Sent Events 对比

特性 WebSocket SSE HTTP/2+push
双向通信 ×(服务端单向) 理论可推送,但浏览器已弃用
传输层 TCP(或 TLS)
报文开销 极低(帧头 2–14 字节) 较低 需 HTTP 头
代理兼容 需显式支持 Upgrade
典型场景 IM、实时协作、游戏 股票行情、日志流 资源预加载

优缺点与适用场景

优点

  • 真·全双工&低延迟
  • 单一持久连接降低握手开销
  • 广泛浏览器支持(IE 10+)

局限

  • 需专门服务器或网关(Nginx、Spring WebFlux、Socket.IO、Vert.x…)
  • 中间件(反向代理、负载均衡、防火墙)须显式开启 Upgrade 支持
  • 无自带断线重连与心跳,需要应用层实现

典型应用:在线聊天、实时协同编辑、行情推送、多人游戏、IoT 即时控制、直播弹幕、客服系统等。

实现要点(后端)

  1. 握手与协议栈
    • Java:javax.websocket、Spring Boot WebSocketHandler
    • Node.js:ws、Socket.IO
    • Go:gorilla/websocket
  2. 会话管理:连接分组、广播、点对点、身份校验。
  3. 心跳&重连策略:Ping/Pong + 客户端指数退避重连。
  4. 流量治理:限连接数、限消息频率、防止大包。
  5. 压缩:开启 permessage-deflate 时评估 CPU 开销。
  6. 负载均衡:粘性会话或共享状态(如 Redis 发布订阅、消息队列)。

未来展望

  • WebTransport / WebRTC DataChannel:更现代的多路复用、UDP 支持,可能在部分场景替代 WebSocket。
  • **HTTP/3 (QUIC)**:在 TLS + UDP 上的多路复用,将与 WebSocket (over HTTP/3) 协同进化。

Java 原生实现

在追求极简的场景(学习、PoC、微服务里自带 WebSocket 服务)下,我们可以抛开 Spring、Netty 等重量级框架,直接使用 Java EE 7 引入的 javax.websocket API。Java 8 自带该包(从 JDK 1.8u20 开始),配合轻量容器或 Tyrus Standalone,即可快速起 WebSocket 服务。

  1. 环境准备

    工具 版本 说明
    JDK 1.8 (UTF-8 编码) 原生 API
    Maven 3.6+ 打包 demo
    运行方式 A Tomcat 8+ / Jetty 9+ 已内置 WebSocket 容器
    运行方式 B Tyrus Standalone 纯 Java SE 启动,最轻量

    下文同时给出容器部署与嵌入式启动两种方式。

  2. Maven 依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>space.yangtao</groupId>
    <artifactId>java-ws</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- 仅在“嵌入式 Tyrus”时需要 -->
    <dependencies>
    <!-- https://mvnrepository.com/artifact/org.glassfish.tyrus/tyrus-server -->
    <dependency>
    <groupId>org.glassfish.tyrus</groupId>
    <artifactId>tyrus-server</artifactId>
    <version>1.20</version>
    </dependency>
    </dependencies>

    <build>
    <plugins>
    <!-- 打可执行 JAR(嵌入式模式) -->
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>3.4.1</version>
    <executions>
    <execution>
    <phase>package</phase>
    <goals>
    <goal>shade</goal>
    </goals>
    <configuration>
    <transformers>
    <transformer
    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    <mainClass>space.yangtao.Bootstrap</mainClass>
    </transformer>
    </transformers>
    </configuration>
    </execution>
    </executions>
    </plugin>
    </plugins>
    </build>

    </project>

    若选择 Tomcat 部署,可省略依赖,直接打 WAR。

  3. 服务端实现

    EchoEndpoint(UTF-8)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    @ServerEndpoint("/echo")   // ws://host:port/echo
    public class EchoEndpoint {

    private static final AtomicInteger ONLINE = new AtomicInteger(0);

    @OnOpen
    public void onOpen(Session session) {
    int cnt = ONLINE.incrementAndGet();
    System.out.println("WebSocket opened, sid=" + session.getId() + ", online=" + cnt);
    }

    @OnMessage
    public String onMessage(String msg, Session session) {
    String reply = "【" + LocalDateTime.now() + "】echo: " + msg;
    System.out.println("sid=" + session.getId() + " -> " + msg);
    return reply; // 直接返回 => 自动包装文本帧
    }

    @OnClose
    public void onClose(Session session, CloseReason reason) {
    int cnt = ONLINE.decrementAndGet();
    System.out.println("WebSocket closed, sid=" + session.getId() +
    ", reason=" + reason.getReasonPhrase() + ", online=" + cnt);
    }

    @OnError
    public void onError(Session session, Throwable thr) {
    System.err.println("sid=" + session.getId() + " error: " + thr.getMessage());
    }
    }

    Bootstrap(嵌入式 Tyrus 可选)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class Bootstrap {

    public static void main(String[] args) throws Exception {
    Server server = new Server("0.0.0.0", 8080, "/", null, EchoEndpoint.class);
    try {
    server.start();
    System.out.println("WebSocket server started at ws://localhost:8080/echo");
    // 阻塞主线程
    Thread.currentThread().join();
    } finally {
    server.stop();
    }
    }
    }

    若用 Tomcat/Jetty,只需把 EchoEndpoint 打包成 WAR 部署,容器会自动扫描 @ServerEndpoint

  4. 运行与测试

    模式 步骤
    容器 mvn clean package -DskipTests → 将 WAR 部署到 Tomcat webapps/ → 浏览器连接 ws://localhost:8080/<context>/echo
    嵌入式 mvn clean packagejava -jar target/websocket-echo-1.0-SNAPSHOT.jar → 终端或浏览器连接 ws://localhost:8080/echo
  5. 常见问题 & 小结

    症状 排查要点
    404 / 500 容器未识别 @ServerEndpoint:确认 javax.websocket-api 冲突、WEB-INF/web.xml 无手动配置冲突
    客户端握手失败 浏览器控制台看 Request/Response,是否返回 101;若是 WSS 需证书
    多节点广播 WebSocket 连接有粘性,需借助 MQ / Redis 发布订阅做分发
    心跳 建议业务层定时 ping/自定义消息,客户端断线重连

    至此,已经完成了一个纯原生 Java 8 WebSocket 的完整闭环:服务端、客户端、部署与测试。后续可基于此扩展身份校验、群聊、压缩、限流等生产级特性。

Spring WebSocket

链路

使用 Spring-WebSocket 处理浏览器连接,同步集成 JWT 鉴权Redis Pub/Sub 实现分布式广播。完整链路分三段:

阶段 关键动作
握手 浏览器 ws://…/ws?token=xxxJwtHandshakeInterceptor 校验 → 建立会话
本机路由 WsHandler 收发消息;WsSessionManager 维护 uid ↔ Session 映射
跨实例广播 RedisPublisherWsMessage 发布到 Redis ChannelRedisSubscriber 订阅并分派给本机在线用户

核心类及职责

作用 关键点
JwtUtil 解析 / 校验 JWT parseToken() → 返回 userId
JwtHandshakeInterceptor 握手前拦截 从 URL 查询参数提取 token → 鉴权成功把 uid 放入 attributes
WebSocketConfig 注册端点 registry.addHandler(wsHandler, "/ws") 并挂上拦截器
WsHandler 核心业务 Handler afterConnectionEstablished() 注册会话;handleTextMessage() 反序列化 WsMessage 并调用 RedisPublisher
WsMessage 统一消息模型 uidcontentbroadcast 三字段
WsSessionManager 本机 Session 池 register / unregister / sendTo / broadcast
RedisPublisher 发布消息到 Redis convertAndSend("ws-msg", json)
RedisSubscriber 订阅 ws-msg Channel 解析 WsMessage → 定向 / 广播 给本机 Session
RedisListenerConfig 官方写法订阅 RedisMessageListenerContainer 注册 RedisSubscriber (可选)

生产环境还需 TLS、限流、心跳等配套,但与上表类职责正交。

时序说明

sequenceDiagram
    participant B as Browser
    participant G as Nginx / LB
    participant A as Pod-A
    participant R as Redis
    participant B2 as Pod-B

%% 握手阶段
    B->>G: ws://example.com/ws?token=JWT
    G->>A: ws (粘性连接)
    A->>A: JwtHandshakeInterceptor\n解析 token → uid
    A-->>B: 101 Switching Protocols

%% 私聊消息
    B->>A: {"uid":"u2","content":"hi","broadcast":false}
    A->>A: WsHandler 生成 WsMessage
    A->>R: Redis PUBLISH ws-msg
    R-->>A: SUBSCRIBE\n(本机收到)
    R-->>B2: SUBSCRIBE\n(其他实例收到)
    A->>A: WsSessionManager.sendTo(u2)
    B2->>B2: WsSessionManager.sendTo(u2)

%% 广播消息
    Note over B: 若 broadcast=true\nRedisSubscriber 调用\nsessionManager.broadcast()
  1. 握手阶段
    • 请求透过负载均衡到某节点;JwtHandshakeInterceptor 校验成功 → uid 存进 WebSocketSession.attributes
  2. 会话注册
    • WsHandler.afterConnectionEstablished(uid, session) 注册到 **WsSessionManager**(可一人多端登录)。
  3. 消息入站
    • 浏览器发送任意符合 WsMessage 结构的 JSON。
    • WsHandler.handleTextMessage 反序列化后统一调用 RedisPublisher.publish(msg) ——所有消息先入 Redis,保证分布式一致。
  4. 跨实例分发
    • 每个实例的 RedisSubscriber 订阅 ws-msg
      • broadcast=truesessionManager.broadcast(json)
      • 否则 sessionManager.sendTo(msg.getUser(), json)
  5. 本机推送
    • WsSessionManager 遍历目标 Session;sendMessage() 仅在 isOpen() 时发送,避免脏连接。

代码示例

  1. 新建 Spring Boot 项目,引入Maven依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>
    <!-- Web & WebSocket -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <!-- JWT -->
    <dependency>
    <groupId>io.jsonwebtoken</groupId>
    <artifactId>jjwt-api</artifactId>
    <version>0.11.5</version>
    </dependency>
    <dependency>
    <groupId>io.jsonwebtoken</groupId>
    <artifactId>jjwt-impl</artifactId>
    <version>0.11.5</version>
    <scope>runtime</scope>
    </dependency>
    <dependency>
    <groupId>io.jsonwebtoken</groupId>
    <artifactId>jjwt-jackson</artifactId>
    <version>0.11.5</version>
    <scope>runtime</scope>
    </dependency>
    <!-- Redis (Lettuce 默认) -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    </dependencies>
  2. 添加 JWT 工具类(简化版)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Component
    public class JwtUtil {

    @Value("${jwt.secret}")
    private String secret;

    // 解析并校验,成功返回 userId,失败抛异常
    public String parseToken(String token) {
    return Jwts.parser()
    .setSigningKey(secret.getBytes(StandardCharsets.UTF_8))
    .parseClaimsJws(token.replace("Bearer ", ""))
    .getBody()
    .getSubject(); // userId
    }
    }
  3. 定义握手拦截器,用于处理鉴权以及绑定会话

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    @Component
    public class JwtHandshakeInterceptor implements HandshakeInterceptor {

    private final JwtUtil jwtUtil;

    public JwtHandshakeInterceptor(JwtUtil jwtUtil) {
    this.jwtUtil = jwtUtil;
    }

    @Override
    public boolean beforeHandshake(ServerHttpRequest req,
    ServerHttpResponse resp,
    WebSocketHandler handler,
    Map<String, Object> attrs) {
    URI uri = req.getURI();
    String query = uri.getQuery(); // e.g. token=xxx.yyy.zzz
    if (query != null && query.contains("token=")) {
    String token = Arrays.stream(query.split("&"))
    .filter(p -> p.startsWith("token="))
    .map(p -> p.substring("token=".length()))
    .findFirst().orElse(null);
    try {
    // 解析并校验,成功返回 userId,失败抛异常
    String userId = jwtUtil.parseToken(token);
    attrs.put("uid", userId);
    return true;
    } catch (JwtException e) {
    resp.setStatusCode(HttpStatus.UNAUTHORIZED);
    }
    }
    return false;
    }

    @Override public void afterHandshake(ServerHttpRequest r, ServerHttpResponse s,
    WebSocketHandler h, Exception e) { }
    }
  4. 定义 WebSocket消息体

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class WsMessage {
    /**
    * 目标用户
    */
    private String uid;

    /**
    * 文本内容
    * */
    private String content;

    /**
    * 是否广播标记;true 时忽略 user
    */
    private boolean broadcast;
    }
  5. WebSocket 会话管理器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    @Component
    public class WsSessionManager {

    // uid → 会话集合
    private final ConcurrentHashMap<String, Set<WebSocketSession>> pool = new ConcurrentHashMap<>();

    public void register(String uid, WebSocketSession s) {
    pool.computeIfAbsent(uid, k -> ConcurrentHashMap.newKeySet()).add(s);
    }

    public void unregister(WebSocketSession s) {
    pool.entrySet().removeIf(entry -> entry.getValue().remove(s));
    }

    /**
    * 精准投递
    */
    public void sendTo(String uid, String json) {
    pool.getOrDefault(uid, new HashSet<>()).forEach(s -> safeSend(s, json));
    }
    /**
    * 全局广播
    */
    public void broadcast(String json) {
    pool.values().stream().flatMap(Set::stream).forEach(s -> safeSend(s, json));
    }
    private void safeSend(WebSocketSession s, String text){
    if (s.isOpen()) try { s.sendMessage(new TextMessage(text)); } catch (IOException ignored) {}
    }
    }
  6. Redis发布/订阅组件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Component
    @RequiredArgsConstructor
    public class RedisPublisher {
    private final StringRedisTemplate template;
    private final ObjectMapper mapper;

    public void publish(WsMessage msg){
    try {
    template.convertAndSend("ws-msg", mapper.writeValueAsString(msg));
    } catch (JsonProcessingException ignored) {}
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    @Component
    @RequiredArgsConstructor
    public class RedisSubscriber implements MessageListener {

    private final StringRedisTemplate stringRedisTemplate;
    private final WsSessionManager sessionManager;
    private final ObjectMapper mapper = new ObjectMapper();

    @PostConstruct
    public void init() {
    Objects.requireNonNull(stringRedisTemplate.getConnectionFactory()).getConnection()
    .subscribe(this, "ws-msg".getBytes());
    }

    @Override
    public void onMessage(Message raw, byte[] pattern) {
    try {
    WsMessage msg = mapper.readValue(raw.getBody(), WsMessage.class);
    if (msg.isBroadcast()) {
    sessionManager.broadcast(msg.getContent());
    } else {
    sessionManager.sendTo(msg.getUid(), msg.getContent());
    }
    } catch (IOException ignored) {
    }
    }
    }
  7. WebSocket处理器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    @Component
    @RequiredArgsConstructor
    public class WsHandler extends TextWebSocketHandler {

    private final WsSessionManager sessionManager;
    private final RedisPublisher publisher;
    private final ObjectMapper mapper = new ObjectMapper();

    @Override
    public void afterConnectionEstablished(WebSocketSession s) {
    String uid = (String) s.getAttributes().get("uid");
    sessionManager.register(uid, s);
    }

    @Override
    protected void handleTextMessage(WebSocketSession s, TextMessage tm) {
    try {
    if (tm.getPayload().equalsIgnoreCase("PING")) {
    s.sendMessage(new TextMessage("PONG"));
    return;
    }
    // 客户端直接发送符合 WsMessage 结构的 JSON
    WsMessage msg = mapper.readValue(tm.getPayload(), WsMessage.class);

    // 服务端可补充 from/时间戳等信息
    // msg.setFrom(uid)...

    publisher.publish(msg); // 一律走 Redis,实现跨实例广播 / 定向
    } catch (IOException ignored) {
    }
    }

    @Override
    public void afterConnectionClosed(WebSocketSession s, CloseStatus cs) {
    sessionManager.unregister(s);
    }
    }
  8. 配置文件

    1
    2
    3
    4
    5
    6
    spring.application.name=spring-ws

    spring.redis.host=127.0.0.1
    spring.redis.port=6379

    jwt.secret=abc123

总结

  1. 协议层:一次 HTTP Upgrade 握手即可切换到轻量帧语义,帧头最小仅 2 Bytes,依靠 Ping/Pong 保活。
  2. 开发层:Java 提供 javax.websocket(Tyrus)满足纯 JDK 场景;Spring-WebSocket 则无缝融入 MVC 生态,可配合拦截器完成 JWT 鉴权。
  3. 分布式:使用 Redis Pub/Sub 解耦实例,实现 “写入一次,集群即达”;统一的 WsMessage 数据结构简化了本机与跨节点路由。
  4. 生产层:心跳、限流、TLS、Origin 校验、日志监控缺一不可;推荐用 RedisMessageListenerContainer、Prometheus 指标与灰度发布完善运维链路。
  5. 演进方向:随着 HTTP/3 与 WebTransport 普及,未来可在 QUIC/UDP 上获得更低延迟与多路复用能力,但短期内 WebSocket 仍是浏览器实时通信的主力军。

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!