Java并发编程——其他工具类补充
本文最后更新于:3 年前
引言
在 Java 并发编程中,JDK 除了提供锁、线程池、并发容器等常见核心内容,还包含了一些用于特殊场景的工具类,以满足多线程协同工作的多种需求。LockSupport
、CyclicBarrier
、Phaser
与 Exchanger
等便是其中典型代表。它们在底层同步机制上各有巧妙设计,在不同应用场景下能发挥巨大作用。本文将带你系统认识这些工具类的工作原理、使用场景与常见示例,为日后编写更高效更易维护的并发程序提供指引。
LockSupport
简介
LockSupport
提供了对线程阻塞和唤醒的底层支持,是构建高级同步工具(如 Lock
、Semaphore
、CountDownLatch
等)的基础,类似于操作系统中的线程调度机制。它通过维护每个线程的许可(permit
)来控制线程的阻塞和唤醒,而不依赖于传统的监视器锁(如 synchronized
关键字)。
特点
- 许可机制:每个线程拥有一个许可,初始状态为无许可。
unpark
方法会发放一个许可,park
方法会消耗一个许可。 - 非阻塞的许可获取:即使在调用
park
之前调用了unpark
,也不会阻塞线程,因为许可已经存在。 - 灵活性高:不像
Object.wait
和Object.notify
依赖于监视器锁,LockSupport
的阻塞和唤醒操作是独立的,更加灵活。
使用流程
- 阻塞线程:调用
LockSupport.park()
或其变种方法,使当前线程进入阻塞状态,等待被唤醒。 - 唤醒线程:其他线程调用
LockSupport.unpark(Thread thread)
,为目标线程发放一个许可,允许其继续执行。 - 许可管理:每个线程的许可是独立管理的,多次调用
unpark
只会增加一个许可,而不会累积。
原理
LockSupport
基于操作系统的底层线程调度机制,通过 JVM 提供的本地方法实现线程的阻塞和唤醒。它维护了每个线程的许可状态:
- park:如果许可存在,消耗许可并返回;否则,将线程阻塞,等待许可的到来。
- unpark:为指定线程增加一个许可,如果线程已经阻塞,则唤醒它;否则,许可会被保存,后续的
park
调用会立即返回。
这种许可机制避免了传统的基于监视器锁的阻塞方法(如 wait/notify
)可能出现的竞态条件和死锁问题。
常用 API
阻塞相关方法
- **public static void park()**:阻塞当前线程,直到被唤醒或被中断。
- **public static void park(Object blocker)**:阻塞当前线程,并关联一个阻塞器对象,便于调试和监控。
- **public static void parkNanos(long nanos)**:阻塞当前线程,最多阻塞指定的纳秒时间。
- **public static void parkUntil(long deadline)**:阻塞当前线程,直到指定的绝对时间(毫秒时间戳)。
唤醒相关方法
- **public static void unpark(Thread thread)**:唤醒指定线程,发放一个许可。
许可相关方法
- **public static boolean isBlocked(Thread thread)**:检查指定线程是否被阻塞(Java 9 引入)。
- **public static Object getBlocker(Thread thread)**:获取阻塞线程的阻塞器对象(Java 9 引入)。
应用场景
- 构建锁(Lock):如
ReentrantLock
的内部实现依赖于LockSupport
来阻塞和唤醒线程。 - 实现线程池:线程池中的工作线程可能需要被阻塞,等待任务到来,然后被唤醒执行任务。
- 构建信号量和屏障:如
Semaphore
和CyclicBarrier
等同步器的实现。 - 自定义阻塞队列:在实现“生产者-消费者模型”时,可以使用
LockSupport
控制生产者和消费者的阻塞与唤醒。
代码示例
使用 LockSupport
阻塞和唤醒线程。
1 |
|
运行程序,控制台输出如下
1 |
|
注意事项
- 并发编程中的许多阻塞方法如
wait()
、sleep()
、join()
在线程被中断时会抛出InterruptedException
,但park
和unpark
不会,对已终止或不存在的线程调用unpark
也不会报错,park
则会响应中断状态并且程序会重新从阻塞的地方开始执行。 - 使用
LockSupport
需要确保对同一线程调用的unpark
不会丢失许可,即线程park
之后一定要有其他线程调用unpark
给线程分发许可,避免线程永久阻塞。
CyclicBarrier
简介
CyclicBarrier
是Java提供的一个同步工具类,位于 java.util.concurrent
包中。它允许一组线程在某个同步点等待,直到所有线程都到达该同步点后,所有线程才能继续执行。这种机制非常适用于需要多线程协作完成某些阶段性任务的场景。
特点
- 可重用性:
CyclicBarrier
可以在所有线程通过屏障后重置,允许它被多次使用。 - 屏障动作:在所有线程到达屏障点后,可以执行一个预定义的任务(屏障动作),该任务由一个
Runnable
对象定义。 - 等待方式:线程可以选择阻塞等待所有其他线程到达屏障,或在等待超时时放弃等待。
- 灵活性:适用于需要分阶段执行的并发任务,如并行计算、分布式任务协调等。
使用流程
- 创建 CyclicBarrier 实例:指定参与屏障的线程数量,并可选地提供一个屏障动作。
- 线程到达屏障点:每个线程在需要等待的地方调用
await()
方法,等待其他线程到达屏障。 - 所有线程到达屏障:当所有参与的线程都调用了
await()
方法后,屏障被触发,所有线程继续执行后续操作,同时执行屏障动作(如果有)。 - 重置屏障:
CyclicBarrier
可以被重用,适用于多个循环或阶段。
原理
CyclicBarrier
的内部机制主要基于一个计数器和一个等待队列。当创建 CyclicBarrier
时,指定了需要等待的线程数量(参与者数量)。每当一个线程调用 await()
方法时,计数器减 1。如果计数器仍然大于 0,线程将被阻塞,直到其他线程也调用了 await()
。当计数器达到 0 时,所有被阻塞的线程将被唤醒,同时执行屏障动作(如果定义了)。
关键点:
- 计数器管理:每调用一次
await()
,计数器减 1。当计数器达到 0 时,重置计数器,允许CyclicBarrier
再次被使用。 - 屏障动作:
CyclicBarrier
允许在所有线程到达屏障时执行一个额外的任务(屏障动作),该任务由一个Runnable
对象定义。 - 重用性:在所有线程通过屏障后,
CyclicBarrier
会重置计数器,允许其再次被用于下一个循环或阶段。 - 错误处理:如果在等待过程中,有线程中断或其他异常导致屏障破裂(
BrokenBarrierException
),所有等待的线程将抛出异常,确保不会永久阻塞。
常用API
构造方法
- **CyclicBarrier(int parties)**:创建一个
CyclicBarrier
,指定参与屏障的线程数量(parties
)。 - **CyclicBarrier(int parties, Runnable barrierAction)**:创建一个
CyclicBarrier
,指定参与屏障的线程数量,并定义一个屏障动作(barrierAction
),当所有线程到达屏障点时执行。
主要方法
- **int await()**:使当前线程等待,直到所有参与线程都调用了
await()
方法。成功通过屏障时,返回当前线程的屏障索引。 - **int await(long timeout, TimeUnit unit)**:使当前线程等待,直到所有参与线程都调用了
await()
方法,或等待超时。如果超时,抛出TimeoutException
。 - **boolean isBroken()**:查询屏障是否已经破裂。
- **int getNumberWaiting()**:返回当前等待屏障的线程数量。
- **int getParties()**:返回屏障的参与线程数量。
其他方法
- **void reset()**:重置屏障,丢弃所有等待线程。所有等待的线程将抛出
BrokenBarrierException
。
应用场景
CyclicBarrier
适用于需要多个线程协同完成某些阶段性任务的场景,以下是一些典型的应用场景:
- 分阶段计算:在并行计算中,数据被分成多个部分,由不同的线程处理。每个线程完成一部分计算后,需要等待其他线程完成,然后进行下一阶段的计算。
- 模拟并发测试:在测试环境中,使用
CyclicBarrier
可以让多个线程同时开始执行某个任务,以模拟高并发场景,评估系统性能。 - 多线程游戏开发:在游戏开发中,可能需要多个线程协同完成某些任务,如渲染、物理计算、AI决策等。
CyclicBarrier
可以确保各个线程在每一帧同步更新。 - 多线程数据处理:在数据处理系统中,可能需要多个线程并行处理数据的不同部分,完成后再汇总结果。
CyclicBarrier
可以协调这些线程在每个处理阶段同步。
代码示例
模拟多线程分阶段计算,四个工作线程需要完成三个阶段的计算。每完成一个阶段,所有线程都必须到达屏障点,屏障动作被执行,然后进入下一阶段。
1 |
|
运行程序,控制台输出如下
1 |
|
注意事项
- 线程中断:如果在等待过程中,某个线程被中断,
CyclicBarrier
会破裂(isBroken()
返回true
),其他等待的线程将抛出BrokenBarrierException
。需要处理好异常,避免程序崩溃。 - 确保所有线程都到达屏障:如果某个线程永远不会调用
await()
,其他线程将永久等待,导致死锁。需要确保所有参与线程都能正常到达屏障点。 - 屏障动作的执行:屏障动作由一个独立的线程执行,通常是最后一个到达屏障的线程。如果屏障动作抛出异常,会导致
CyclicBarrier
破裂,影响其他等待线程。 - 重用性:
CyclicBarrier
是可重用的,可以在多次循环中使用,但需要确保每次使用时所有参与线程都能正确调用await()
。
Phaser
简介
Phaser
是一种可重用的同步器,支持多阶段(phase
)的任务执行,并允许动态地注册和注销参与的线程。它主要用于协调一组线程在多个阶段上的同步点,确保所有参与线程在每个阶段完成后再进入下一个阶段。
特点
- 多阶段同步:支持多个同步点(阶段),每个阶段结束后进入下一个阶段。
- 动态参与:可以在运行时动态地增加或减少参与的线程数。
- 可重复使用:类似于
CyclicBarrier
,Phaser
可以在多个阶段重复使用。 - 灵活性高:提供了更多的控制方法,如提前终止、获取当前阶段等。
使用流程
- 创建 Phaser 实例:指定初始参与者数量,或者使用默认构造方法后动态注册。
- 注册参与者:通过构造时指定或在运行时调用
register()
方法增加参与者。 - 执行阶段任务:每个参与者在完成当前阶段的任务后,调用
arriveAndAwaitAdvance()
等方法,等待其他参与者到达同步点。 - 进入下一阶段:所有参与者到达同步点后,
Phaser
进入下一阶段,参与者继续执行下一阶段的任务。 - 注销参与者:任务完成后,参与者可以调用
arriveAndDeregister()
方法注销,减少参与者数量。 - 终止 Phaser:当所有阶段任务完成或需要提前终止时,可以调用
forceTermination()
或其他终止方法。
原理
Phaser
的内部实现基于可变的参与者数量和阶段计数。它维护一个阶段计数器(phase
),以及当前阶段需要到达的参与者数量。当参与者调用同步方法(如 arriveAndAwaitAdvance()
)时,Phaser
会记录到达的参与者数量,并在所有参与者到达后,递增阶段计数器,释放所有等待的线程,进入下一阶段。
主要机制:
- 阶段(Phase):表示同步的当前阶段,每完成一个阶段后递增。
- 参与者(Parties):需要同步的线程数量,可以动态增加或减少。
- 到达(Arrive):参与者完成当前阶段的任务,通知
Phaser
。 - 等待(Await):参与者等待其他参与者到达同步点。
- 注销(Deregister):参与者完成所有任务后,从
Phaser
中注销,减少参与者数量。
常用 API
构造方法
- **Phaser()**:创建一个未注册任何参与者的
Phaser
。 - **Phaser(int parties)**:创建一个并注册指定数量参与者的
Phaser
。
注册和注销
- **int register()**:动态注册一个参与者,返回当前阶段。
- **int bulkRegister(int parties)**:动态注册多个参与者。
- **int arriveAndDeregister()**:到达同步点并注销当前参与者。
- **int deregister()**:仅注销当前参与者,不到达同步点。
阶段到达与等待
- **int arrive()**:到达同步点,不等待其他参与者。
- **int arriveAndAwaitAdvance()**:到达同步点,并等待其他参与者到达后进入下一阶段。
- **int arriveAndDeregister()**:到达同步点,并注销当前参与者。
- **onAdvance(int phase, int registeredParties)**:在每次到达同步点时自定义额外的行为或者决定是否终止 phaser。
查询状态
- **int getPhase()**:获取当前阶段数。
- **int getRegisteredParties()**:获取当前注册的参与者数量。
- **int getArrivedParties()**:获取当前阶段已到达的参与者数量。
- **boolean isTerminated()**:检查
Phaser
是否已终止。
终止控制
- **boolean forceTermination()**:强制终止
Phaser
,所有等待的线程会被释放。
应用场景
Phaser
适用于以下场景:
- 多阶段任务协调:如分阶段的计算任务,每个阶段需要所有参与者完成后再进入下一阶段。
- 动态参与者管理:参与者数量在运行时可能会变化,例如任务池中动态增加或减少工作线程。
- 复杂的同步需求:需要多个同步点或条件,
Phaser
提供了更高的灵活性。 - 分布式系统中的同步:在分布式计算中,不同节点可能需要在不同阶段进行同步。
代码示例
动态的注册参与者。
1 |
|
运行程序,控制台输出如下
1 |
|
注意事项
- 避免死锁:确保所有注册的参与者都能到达同步点,否则会导致永久等待。
- 动态调整参与者:在多线程环境下动态注册和注销参与者时,需要小心管理,避免竞态条件。
- 中断处理:
Phaser
的同步方法不会抛出InterruptedException
,需要手动检查中断状态。 - 资源管理:使用完
Phaser
后,确保所有参与者已注销,避免资源泄漏。
Exchanger
简介
Exchanger<V>
是一个用于两个线程之间交换数据的同步点。它允许两个线程在某个同步点相遇,并交换彼此持有的数据。每个线程在调用 exchange
方法时,会阻塞直到另一个线程也到达该同步点,随后两者交换数据并继续执行。
特点
- 双线程交换:
Exchanger
设计用于两个线程之间的数据交换,不适用于多于两个线程的场景。 - 同步点:两个线程必须在同一个同步点到达,才能进行数据交换。
- 阻塞与超时:线程在等待交换时会阻塞,可以选择设置超时时间以避免无限等待。
- 可重复使用:
Exchanger
可以在多次交换中重复使用,不需要为每次交换创建新的实例。
使用流程
- 创建 Exchanger 实例:指定交换的数据类型。
- 准备数据:每个线程准备好要交换的数据。
- 调用 exchange 方法:线程调用
exchange
方法,传入要交换的数据,并接收对方线程传回的数据。 - 处理交换结果:线程获取到对方线程传回的数据后,继续执行后续操作。
原理
Exchanger
内部维护了一个同步点,当两个线程调用 exchange
方法时:
- 第一个到达的线程会在同步点等待,直到第二个线程也到达。
- 当第二个线程到达时,
Exchanger
会将两个线程传入的数据进行交换。 - 两个线程各自接收对方传入的数据,然后继续执行。
Exchanger
的实现基于 AbstractQueuedSynchronizer
(AQS),利用其状态管理和队列机制来控制线程的阻塞和唤醒。
常用 API
构造方法
- **Exchanger()**:创建一个新的
Exchanger
实例,用于交换指定类型的数据。
交换方法
- V exchange(V x) throws InterruptedException:在同步点等待另一个线程到达,并交换数据。该方法会阻塞直到另一个线程也调用
exchange
。 - V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException:在指定的等待时间内等待另一个线程到达并交换数据。如果超时未交换成功,则抛出
TimeoutException
。
其他方法
- **int getParties()**:返回参与交换的线程数,通常为2。
- **int getMaximumWaitTime()**:返回最大等待时间。
应用场景
- 生产者-消费者模型:两个线程在生产和消费数据时,可以使用
Exchanger
实现数据的高效交换。 - 双缓冲技术:在需要两个缓冲区交替使用的场景,如图像处理、音视频流处理等,可以使用
Exchanger
实现缓冲区的交换。 - 数据处理流水线:在多阶段的数据处理流水线中,两个相邻阶段的线程可以通过
Exchanger
进行数据交换。 - 协同计算:两个线程需要在某些计算步骤上进行协同工作,并交换中间结果。
代码示例
基本的双线程数据交换。
1 |
|
运行程序,控制台输出如下
1 |
|
注意事项
- 仅适用于两个线程之间的数据交换:
Exchanger
设计用于两个线程之间的数据交换,多于两个线程使用时会引发不可预测的行为。 - exchange 方法死锁风险:
exchange()
方法会让当前线程阻塞,直到另一个线程到达并交换数据。如果配对线程未到达,则线程会一直阻塞,可能导致死锁。 - exchange 方法会被中断:如果线程在
exchange()
方法中被中断,会抛出InterruptedException
,导致交换失败。 - 仅交换对象引用:
Exchanger
仅交换对象的引用,而不会复制对象,确保交换的数据在多线程环境下的可变性可控,尤其是List
、Map
等可变对象,应特别注意并发修改问题。
总结
在多线程协作的复杂场景中,使用合理的并发工具能大幅简化代码、减少错误风险:
LockSupport
以灵活的许可方式实现阻塞/唤醒,突破了传统wait/notify
的局限,是众多并发组件的底层基石;CyclicBarrier
注重“多线程并发到达同一个关口”,适合分阶段执行的同步;Phaser
则是对Barrier
的进一步扩展,支持可变线程数与更多交互方法,适合多阶段且参与者动态变化的场景;Exchanger
专注于“双线程交换数据”这一需求,让两条线程能够在同一同步点互相交换对象。
通过充分了解它们各自的特性与适用情境,开发者能更从容地编写高质量多线程应用,既保证正确性,又在可维护性与性能之间取得最佳平衡。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!