Java并发编程——其他工具类补充

本文最后更新于:3 年前

引言

在 Java 并发编程中,JDK 除了提供锁、线程池、并发容器等常见核心内容,还包含了一些用于特殊场景的工具类,以满足多线程协同工作的多种需求。LockSupportCyclicBarrierPhaserExchanger 等便是其中典型代表。它们在底层同步机制上各有巧妙设计,在不同应用场景下能发挥巨大作用。本文将带你系统认识这些工具类的工作原理、使用场景与常见示例,为日后编写更高效更易维护的并发程序提供指引。

LockSupport

简介

LockSupport 提供了对线程阻塞和唤醒的底层支持,是构建高级同步工具(如 LockSemaphoreCountDownLatch 等)的基础,类似于操作系统中的线程调度机制。它通过维护每个线程的许可(permit)来控制线程的阻塞和唤醒,而不依赖于传统的监视器锁(如 synchronized 关键字)。

特点

  • 许可机制:每个线程拥有一个许可,初始状态为无许可。unpark 方法会发放一个许可,park 方法会消耗一个许可。
  • 非阻塞的许可获取:即使在调用 park 之前调用了 unpark,也不会阻塞线程,因为许可已经存在。
  • 灵活性高:不像 Object.waitObject.notify 依赖于监视器锁,LockSupport 的阻塞和唤醒操作是独立的,更加灵活。

使用流程

  1. 阻塞线程:调用 LockSupport.park() 或其变种方法,使当前线程进入阻塞状态,等待被唤醒。
  2. 唤醒线程:其他线程调用 LockSupport.unpark(Thread thread),为目标线程发放一个许可,允许其继续执行。
  3. 许可管理:每个线程的许可是独立管理的,多次调用 unpark 只会增加一个许可,而不会累积。

原理

LockSupport 基于操作系统的底层线程调度机制,通过 JVM 提供的本地方法实现线程的阻塞和唤醒。它维护了每个线程的许可状态:

  • park:如果许可存在,消耗许可并返回;否则,将线程阻塞,等待许可的到来。
  • unpark:为指定线程增加一个许可,如果线程已经阻塞,则唤醒它;否则,许可会被保存,后续的 park 调用会立即返回。

这种许可机制避免了传统的基于监视器锁的阻塞方法(如 wait/notify)可能出现的竞态条件和死锁问题。

常用 API

阻塞相关方法

  • **public static void park()**:阻塞当前线程,直到被唤醒或被中断。
  • **public static void park(Object blocker)**:阻塞当前线程,并关联一个阻塞器对象,便于调试和监控。
  • **public static void parkNanos(long nanos)**:阻塞当前线程,最多阻塞指定的纳秒时间。
  • **public static void parkUntil(long deadline)**:阻塞当前线程,直到指定的绝对时间(毫秒时间戳)。

唤醒相关方法

  • **public static void unpark(Thread thread)**:唤醒指定线程,发放一个许可。

许可相关方法

  • **public static boolean isBlocked(Thread thread)**:检查指定线程是否被阻塞(Java 9 引入)。
  • **public static Object getBlocker(Thread thread)**:获取阻塞线程的阻塞器对象(Java 9 引入)。

应用场景

  • 构建锁(Lock):如 ReentrantLock 的内部实现依赖于 LockSupport 来阻塞和唤醒线程。
  • 实现线程池:线程池中的工作线程可能需要被阻塞,等待任务到来,然后被唤醒执行任务。
  • 构建信号量和屏障:如 SemaphoreCyclicBarrier 等同步器的实现。
  • 自定义阻塞队列:在实现“生产者-消费者模型”时,可以使用 LockSupport 控制生产者和消费者的阻塞与唤醒。

代码示例

使用 LockSupport 阻塞和唤醒线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Slf4j
public class LockSupportExample {

public static void main(String[] args) {
Thread blockerThread = new Thread(() -> {
log.info("当前线程调用 park()");
LockSupport.park();
if (Thread.interrupted()) {
log.info("线程检测到中断状态");
}
log.info("线程恢复");
});

blockerThread.start();

try {
// 确保 blockerThread 已经调用 park
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

log.info("主线程调用 unpark 唤醒 blockerThread");
LockSupport.unpark(blockerThread);
}
}

运行程序,控制台输出如下

1
2
3
15:01:08.998 [Thread-1] INFO  s.yangtao.other.LockSupportExample - 当前线程调用 park()
15:01:09.996 [main] INFO s.yangtao.other.LockSupportExample - 主线程调用 unpark 唤醒 blockerThread
15:01:09.996 [Thread-1] INFO s.yangtao.other.LockSupportExample - 线程恢复

注意事项

  • 并发编程中的许多阻塞方法如 wait()sleep()join() 在线程被中断时会抛出InterruptedException,但 parkunpark 不会,对已终止或不存在的线程调用 unpark 也不会报错,park 则会响应中断状态并且程序会重新从阻塞的地方开始执行。
  • 使用 LockSupport 需要确保对同一线程调用的 unpark 不会丢失许可,即线程 park 之后一定要有其他线程调用 unpark 给线程分发许可,避免线程永久阻塞。

CyclicBarrier

简介

CyclicBarrier 是Java提供的一个同步工具类,位于 java.util.concurrent 包中。它允许一组线程在某个同步点等待,直到所有线程都到达该同步点后,所有线程才能继续执行。这种机制非常适用于需要多线程协作完成某些阶段性任务的场景。

特点

  • 可重用性CyclicBarrier 可以在所有线程通过屏障后重置,允许它被多次使用。
  • 屏障动作:在所有线程到达屏障点后,可以执行一个预定义的任务(屏障动作),该任务由一个 Runnable 对象定义。
  • 等待方式:线程可以选择阻塞等待所有其他线程到达屏障,或在等待超时时放弃等待。
  • 灵活性:适用于需要分阶段执行的并发任务,如并行计算、分布式任务协调等。

使用流程

  1. 创建 CyclicBarrier 实例:指定参与屏障的线程数量,并可选地提供一个屏障动作。
  2. 线程到达屏障点:每个线程在需要等待的地方调用 await() 方法,等待其他线程到达屏障。
  3. 所有线程到达屏障:当所有参与的线程都调用了 await() 方法后,屏障被触发,所有线程继续执行后续操作,同时执行屏障动作(如果有)。
  4. 重置屏障CyclicBarrier 可以被重用,适用于多个循环或阶段。

原理

CyclicBarrier 的内部机制主要基于一个计数器和一个等待队列。当创建 CyclicBarrier 时,指定了需要等待的线程数量(参与者数量)。每当一个线程调用 await() 方法时,计数器减 1。如果计数器仍然大于 0,线程将被阻塞,直到其他线程也调用了 await()。当计数器达到 0 时,所有被阻塞的线程将被唤醒,同时执行屏障动作(如果定义了)。

关键点

  • 计数器管理:每调用一次 await(),计数器减 1。当计数器达到 0 时,重置计数器,允许CyclicBarrier再次被使用。
  • 屏障动作CyclicBarrier 允许在所有线程到达屏障时执行一个额外的任务(屏障动作),该任务由一个 Runnable 对象定义。
  • 重用性:在所有线程通过屏障后,CyclicBarrier 会重置计数器,允许其再次被用于下一个循环或阶段。
  • 错误处理:如果在等待过程中,有线程中断或其他异常导致屏障破裂(BrokenBarrierException),所有等待的线程将抛出异常,确保不会永久阻塞。

常用API

构造方法

  • **CyclicBarrier(int parties)**:创建一个 CyclicBarrier,指定参与屏障的线程数量(parties)。
  • **CyclicBarrier(int parties, Runnable barrierAction)**:创建一个 CyclicBarrier,指定参与屏障的线程数量,并定义一个屏障动作(barrierAction),当所有线程到达屏障点时执行。

主要方法

  • **int await()**:使当前线程等待,直到所有参与线程都调用了 await() 方法。成功通过屏障时,返回当前线程的屏障索引。
  • **int await(long timeout, TimeUnit unit)**:使当前线程等待,直到所有参与线程都调用了 await() 方法,或等待超时。如果超时,抛出 TimeoutException
  • **boolean isBroken()**:查询屏障是否已经破裂。
  • **int getNumberWaiting()**:返回当前等待屏障的线程数量。
  • **int getParties()**:返回屏障的参与线程数量。

其他方法

  • **void reset()**:重置屏障,丢弃所有等待线程。所有等待的线程将抛出 BrokenBarrierException

应用场景

CyclicBarrier 适用于需要多个线程协同完成某些阶段性任务的场景,以下是一些典型的应用场景:

  • 分阶段计算:在并行计算中,数据被分成多个部分,由不同的线程处理。每个线程完成一部分计算后,需要等待其他线程完成,然后进行下一阶段的计算。
  • 模拟并发测试:在测试环境中,使用 CyclicBarrier 可以让多个线程同时开始执行某个任务,以模拟高并发场景,评估系统性能。
  • 多线程游戏开发:在游戏开发中,可能需要多个线程协同完成某些任务,如渲染、物理计算、AI决策等。CyclicBarrier 可以确保各个线程在每一帧同步更新。
  • 多线程数据处理:在数据处理系统中,可能需要多个线程并行处理数据的不同部分,完成后再汇总结果。CyclicBarrier 可以协调这些线程在每个处理阶段同步。

代码示例

模拟多线程分阶段计算,四个工作线程需要完成三个阶段的计算。每完成一个阶段,所有线程都必须到达屏障点,屏障动作被执行,然后进入下一阶段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Slf4j
public class CyclicBarrierExample {
private static final int NUM_THREADS = 4;
private final CyclicBarrier barrier;

public CyclicBarrierExample() {
// 创建CyclicBarrier实例,指定4个参与线程,并定义屏障动作
barrier = new CyclicBarrier(NUM_THREADS, () -> {
log.info("所有线程已到达屏障,开始执行屏障动作。");
});
}

public void workerTask(int workerId) {
try {
for (int phase = 1; phase <= 3; phase++) {
log.info("Worker " + workerId + " 正在执行阶段 " + phase + " 的任务...");
Thread.sleep((long) (Math.random() * 1000)); // 模拟工作时间
log.info("Worker " + workerId + " 已完成阶段 " + phase + " 的任务,等待其他线程...");
barrier.await(); // 等待所有线程完成当前阶段
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}

public void startWorkers() {
for (int i = 1; i <= NUM_THREADS; i++) {
final int workerId = i;
new Thread(() -> workerTask(workerId), "Worker-" + workerId).start();
}
}

public static void main(String[] args) {
CyclicBarrierExample example = new CyclicBarrierExample();
example.startWorkers();
}
}

运行程序,控制台输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
15:29:00.328 [Worker-3] INFO  s.yangtao.aqs.CyclicBarrierExample - Worker 3 正在执行阶段 1 的任务...
15:29:00.328 [Worker-2] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 2 正在执行阶段 1 的任务...
15:29:00.328 [Worker-1] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 1 正在执行阶段 1 的任务...
15:29:00.328 [Worker-4] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 4 正在执行阶段 1 的任务...
15:29:00.402 [Worker-4] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 4 已完成阶段 1 的任务,等待其他线程...
15:29:00.498 [Worker-3] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 3 已完成阶段 1 的任务,等待其他线程...
15:29:00.722 [Worker-1] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 1 已完成阶段 1 的任务,等待其他线程...
15:29:00.863 [Worker-2] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 2 已完成阶段 1 的任务,等待其他线程...
15:29:00.863 [Worker-2] INFO s.yangtao.aqs.CyclicBarrierExample - 所有线程已到达屏障,开始执行屏障动作。
15:29:00.863 [Worker-2] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 2 正在执行阶段 2 的任务...
15:29:00.863 [Worker-4] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 4 正在执行阶段 2 的任务...
15:29:00.863 [Worker-3] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 3 正在执行阶段 2 的任务...
15:29:00.863 [Worker-1] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 1 正在执行阶段 2 的任务...
15:29:00.998 [Worker-4] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 4 已完成阶段 2 的任务,等待其他线程...
15:29:01.492 [Worker-1] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 1 已完成阶段 2 的任务,等待其他线程...
15:29:01.768 [Worker-3] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 3 已完成阶段 2 的任务,等待其他线程...
15:29:01.807 [Worker-2] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 2 已完成阶段 2 的任务,等待其他线程...
15:29:01.807 [Worker-2] INFO s.yangtao.aqs.CyclicBarrierExample - 所有线程已到达屏障,开始执行屏障动作。
15:29:01.807 [Worker-2] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 2 正在执行阶段 3 的任务...
15:29:01.807 [Worker-4] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 4 正在执行阶段 3 的任务...
15:29:01.807 [Worker-1] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 1 正在执行阶段 3 的任务...
15:29:01.807 [Worker-3] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 3 正在执行阶段 3 的任务...
15:29:02.376 [Worker-4] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 4 已完成阶段 3 的任务,等待其他线程...
15:29:02.431 [Worker-2] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 2 已完成阶段 3 的任务,等待其他线程...
15:29:02.496 [Worker-1] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 1 已完成阶段 3 的任务,等待其他线程...
15:29:02.700 [Worker-3] INFO s.yangtao.aqs.CyclicBarrierExample - Worker 3 已完成阶段 3 的任务,等待其他线程...
15:29:02.700 [Worker-3] INFO s.yangtao.aqs.CyclicBarrierExample - 所有线程已到达屏障,开始执行屏障动作。

注意事项

  • 线程中断:如果在等待过程中,某个线程被中断,CyclicBarrier 会破裂(isBroken() 返回 true),其他等待的线程将抛出 BrokenBarrierException。需要处理好异常,避免程序崩溃。
  • 确保所有线程都到达屏障:如果某个线程永远不会调用 await(),其他线程将永久等待,导致死锁。需要确保所有参与线程都能正常到达屏障点。
  • 屏障动作的执行:屏障动作由一个独立的线程执行,通常是最后一个到达屏障的线程。如果屏障动作抛出异常,会导致 CyclicBarrier 破裂,影响其他等待线程。
  • 重用性CyclicBarrier 是可重用的,可以在多次循环中使用,但需要确保每次使用时所有参与线程都能正确调用await()

Phaser

简介

Phaser 是一种可重用的同步器,支持多阶段(phase)的任务执行,并允许动态地注册和注销参与的线程。它主要用于协调一组线程在多个阶段上的同步点,确保所有参与线程在每个阶段完成后再进入下一个阶段。

特点

  • 多阶段同步:支持多个同步点(阶段),每个阶段结束后进入下一个阶段。
  • 动态参与:可以在运行时动态地增加或减少参与的线程数。
  • 可重复使用:类似于 CyclicBarrierPhaser 可以在多个阶段重复使用。
  • 灵活性高:提供了更多的控制方法,如提前终止、获取当前阶段等。

使用流程

  1. 创建 Phaser 实例:指定初始参与者数量,或者使用默认构造方法后动态注册。
  2. 注册参与者:通过构造时指定或在运行时调用 register() 方法增加参与者。
  3. 执行阶段任务:每个参与者在完成当前阶段的任务后,调用 arriveAndAwaitAdvance() 等方法,等待其他参与者到达同步点。
  4. 进入下一阶段:所有参与者到达同步点后,Phaser 进入下一阶段,参与者继续执行下一阶段的任务。
  5. 注销参与者:任务完成后,参与者可以调用 arriveAndDeregister() 方法注销,减少参与者数量。
  6. 终止 Phaser:当所有阶段任务完成或需要提前终止时,可以调用 forceTermination() 或其他终止方法。

原理

Phaser 的内部实现基于可变的参与者数量和阶段计数。它维护一个阶段计数器(phase),以及当前阶段需要到达的参与者数量。当参与者调用同步方法(如 arriveAndAwaitAdvance())时,Phaser 会记录到达的参与者数量,并在所有参与者到达后,递增阶段计数器,释放所有等待的线程,进入下一阶段。

主要机制

  • 阶段(Phase):表示同步的当前阶段,每完成一个阶段后递增。
  • 参与者(Parties):需要同步的线程数量,可以动态增加或减少。
  • 到达(Arrive):参与者完成当前阶段的任务,通知 Phaser
  • 等待(Await):参与者等待其他参与者到达同步点。
  • 注销(Deregister):参与者完成所有任务后,从 Phaser 中注销,减少参与者数量。

常用 API

构造方法

  • **Phaser()**:创建一个未注册任何参与者的 Phaser
  • **Phaser(int parties)**:创建一个并注册指定数量参与者的 Phaser

注册和注销

  • **int register()**:动态注册一个参与者,返回当前阶段。
  • **int bulkRegister(int parties)**:动态注册多个参与者。
  • **int arriveAndDeregister()**:到达同步点并注销当前参与者。
  • **int deregister()**:仅注销当前参与者,不到达同步点。

阶段到达与等待

  • **int arrive()**:到达同步点,不等待其他参与者。
  • **int arriveAndAwaitAdvance()**:到达同步点,并等待其他参与者到达后进入下一阶段。
  • **int arriveAndDeregister()**:到达同步点,并注销当前参与者。
  • **onAdvance(int phase, int registeredParties)**:在每次到达同步点时自定义额外的行为或者决定是否终止 phaser。

查询状态

  • **int getPhase()**:获取当前阶段数。
  • **int getRegisteredParties()**:获取当前注册的参与者数量。
  • **int getArrivedParties()**:获取当前阶段已到达的参与者数量。
  • **boolean isTerminated()**:检查 Phaser 是否已终止。

终止控制

  • **boolean forceTermination()**:强制终止 Phaser,所有等待的线程会被释放。

应用场景

Phaser 适用于以下场景:

  • 多阶段任务协调:如分阶段的计算任务,每个阶段需要所有参与者完成后再进入下一阶段。
  • 动态参与者管理:参与者数量在运行时可能会变化,例如任务池中动态增加或减少工作线程。
  • 复杂的同步需求:需要多个同步点或条件,Phaser 提供了更高的灵活性。
  • 分布式系统中的同步:在分布式计算中,不同节点可能需要在不同阶段进行同步。

代码示例

动态的注册参与者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class PhaserExample {
public static void main(String[] args) {
Phaser phaser = new Phaser(1); // 主线程注册

// 主线程创建并启动2个子线程
for (int i = 1; i <= 2; i++) {
final int threadId = i;
phaser.register(); // 动态注册参与者
new Thread(() -> {
System.out.println("Thread " + threadId + " - 阶段1开始");
// 模拟阶段1的工作
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread " + threadId + " - 阶段1完成,等待其他线程");
phaser.arriveAndAwaitAdvance();

System.out.println("Thread " + threadId + " - 阶段2开始");
// 模拟阶段2的工作
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread " + threadId + " - 阶段2完成,等待其他线程");
phaser.arriveAndAwaitAdvance();

phaser.arriveAndDeregister(); // 到达并注销
}).start();
}

// 主线程也参与同步
System.out.println("Main thread - 阶段1等待");
phaser.arriveAndAwaitAdvance();

System.out.println("Main thread - 阶段2等待");
phaser.arriveAndAwaitAdvance();

phaser.arriveAndDeregister(); // 主线程注销

System.out.println("所有阶段完成,Phaser 终止");
}
}

运行程序,控制台输出如下

1
2
3
4
5
6
7
8
9
10
11
15:53:30.540 [Thread-2] INFO  space.yangtao.other.PhaserExample - Thread 2 - 阶段1开始
15:53:30.540 [Thread-1] INFO space.yangtao.other.PhaserExample - Thread 1 - 阶段1开始
15:53:30.540 [main] INFO space.yangtao.other.PhaserExample - Main thread - 阶段1等待
15:53:31.044 [Thread-1] INFO space.yangtao.other.PhaserExample - Thread 1 - 阶段1完成,等待其他线程
15:53:31.044 [Thread-2] INFO space.yangtao.other.PhaserExample - Thread 2 - 阶段1完成,等待其他线程
15:53:31.044 [Thread-2] INFO space.yangtao.other.PhaserExample - Thread 2 - 阶段2开始
15:53:31.044 [Thread-1] INFO space.yangtao.other.PhaserExample - Thread 1 - 阶段2开始
15:53:31.044 [main] INFO space.yangtao.other.PhaserExample - Main thread - 阶段2等待
15:53:31.557 [Thread-1] INFO space.yangtao.other.PhaserExample - Thread 1 - 阶段2完成,等待其他线程
15:53:31.557 [Thread-2] INFO space.yangtao.other.PhaserExample - Thread 2 - 阶段2完成,等待其他线程
15:53:31.557 [main] INFO space.yangtao.other.PhaserExample - 所有阶段完成,Phaser 终止

注意事项

  • 避免死锁:确保所有注册的参与者都能到达同步点,否则会导致永久等待。
  • 动态调整参与者:在多线程环境下动态注册和注销参与者时,需要小心管理,避免竞态条件。
  • 中断处理Phaser 的同步方法不会抛出 InterruptedException,需要手动检查中断状态。
  • 资源管理:使用完 Phaser 后,确保所有参与者已注销,避免资源泄漏。

Exchanger

简介

Exchanger<V> 是一个用于两个线程之间交换数据的同步点。它允许两个线程在某个同步点相遇,并交换彼此持有的数据。每个线程在调用 exchange 方法时,会阻塞直到另一个线程也到达该同步点,随后两者交换数据并继续执行。

特点

  • 双线程交换Exchanger 设计用于两个线程之间的数据交换,不适用于多于两个线程的场景。
  • 同步点:两个线程必须在同一个同步点到达,才能进行数据交换。
  • 阻塞与超时:线程在等待交换时会阻塞,可以选择设置超时时间以避免无限等待。
  • 可重复使用Exchanger 可以在多次交换中重复使用,不需要为每次交换创建新的实例。

使用流程

  1. 创建 Exchanger 实例:指定交换的数据类型。
  2. 准备数据:每个线程准备好要交换的数据。
  3. 调用 exchange 方法:线程调用 exchange 方法,传入要交换的数据,并接收对方线程传回的数据。
  4. 处理交换结果:线程获取到对方线程传回的数据后,继续执行后续操作。

原理

Exchanger 内部维护了一个同步点,当两个线程调用 exchange 方法时:

  1. 第一个到达的线程会在同步点等待,直到第二个线程也到达。
  2. 当第二个线程到达时,Exchanger 会将两个线程传入的数据进行交换。
  3. 两个线程各自接收对方传入的数据,然后继续执行。

Exchanger 的实现基于 AbstractQueuedSynchronizer(AQS),利用其状态管理和队列机制来控制线程的阻塞和唤醒。

常用 API

构造方法

  • **Exchanger()**:创建一个新的 Exchanger 实例,用于交换指定类型的数据。

交换方法

  • V exchange(V x) throws InterruptedException:在同步点等待另一个线程到达,并交换数据。该方法会阻塞直到另一个线程也调用 exchange
  • V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException:在指定的等待时间内等待另一个线程到达并交换数据。如果超时未交换成功,则抛出 TimeoutException

其他方法

  • **int getParties()**:返回参与交换的线程数,通常为2。
  • **int getMaximumWaitTime()**:返回最大等待时间。

应用场景

  • 生产者-消费者模型:两个线程在生产和消费数据时,可以使用 Exchanger 实现数据的高效交换。
  • 双缓冲技术:在需要两个缓冲区交替使用的场景,如图像处理、音视频流处理等,可以使用 Exchanger 实现缓冲区的交换。
  • 数据处理流水线:在多阶段的数据处理流水线中,两个相邻阶段的线程可以通过 Exchanger 进行数据交换。
  • 协同计算:两个线程需要在某些计算步骤上进行协同工作,并交换中间结果。

代码示例

基本的双线程数据交换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
public class ExchangerExample {
public static void main(String[] args) {
// 创建一个 Exchanger,用于交换 String 类型的数据
Exchanger<String> exchanger = new Exchanger<>();

// 线程A
Thread threadA = new Thread(() -> {
String data = "数据A";
try {
log.info("线程A准备交换的数据: " + data);
String receivedData = exchanger.exchange(data);
log.info("线程A接收到的数据: " + receivedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("线程A被中断");
}
}, "线程A");

// 线程B
Thread threadB = new Thread(() -> {
String data = "数据B";
try {
log.info("线程B准备交换的数据: " + data);
String receivedData = exchanger.exchange(data);
log.info("线程B接收到的数据: " + receivedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("线程B被中断");
}
}, "线程B");

threadA.start();
threadB.start();
}
}

运行程序,控制台输出如下

1
2
3
4
16:01:23.173 [线程A] INFO  space.yangtao.other.ExchangerExample - 线程A准备交换的数据: 数据A
16:01:23.173 [线程B] INFO space.yangtao.other.ExchangerExample - 线程B准备交换的数据: 数据B
16:01:23.175 [线程A] INFO space.yangtao.other.ExchangerExample - 线程A接收到的数据: 数据B
16:01:23.175 [线程B] INFO space.yangtao.other.ExchangerExample - 线程B接收到的数据: 数据A

注意事项

  • 仅适用于两个线程之间的数据交换Exchanger 设计用于两个线程之间的数据交换,多于两个线程使用时会引发不可预测的行为。
  • exchange 方法死锁风险exchange() 方法会让当前线程阻塞,直到另一个线程到达并交换数据。如果配对线程未到达,则线程会一直阻塞,可能导致死锁。
  • exchange 方法会被中断:如果线程在 exchange() 方法中被中断,会抛出 InterruptedException,导致交换失败。
  • 仅交换对象引用Exchanger 仅交换对象的引用,而不会复制对象,确保交换的数据在多线程环境下的可变性可控,尤其是 ListMap 等可变对象,应特别注意并发修改问题。

总结

在多线程协作的复杂场景中,使用合理的并发工具能大幅简化代码、减少错误风险:

  • LockSupport 以灵活的许可方式实现阻塞/唤醒,突破了传统 wait/notify 的局限,是众多并发组件的底层基石;
  • CyclicBarrier 注重“多线程并发到达同一个关口”,适合分阶段执行的同步;
  • Phaser 则是对 Barrier 的进一步扩展,支持可变线程数与更多交互方法,适合多阶段且参与者动态变化的场景;
  • Exchanger 专注于“双线程交换数据”这一需求,让两条线程能够在同一同步点互相交换对象。

通过充分了解它们各自的特性与适用情境,开发者能更从容地编写高质量多线程应用,既保证正确性,又在可维护性与性能之间取得最佳平衡。