Java并发编程——AQS

本文最后更新于:3 年前

引言

Java 并发包(java.util.concurrent)为开发者提供了种类丰富的同步工具与并发数据结构,极大简化了多线程编程的难度。其中,AQSAbstractQueuedSynchronizer)作为核心框架,为各种锁与同步器提供了统一的基础。理解 AQS 的设计理念与工作机制,有助于深入掌握如 ReentrantLockSemaphoreCountDownLatchBlockingQueue 等重要类的内部实现与使用技巧。
本文将依次介绍 AQS 的原理、常见子类的应用场景和关键 API,并通过示例演示如何借助这些同步工具来构建健壮、高效的并发程序。

AQS

简介

AQSAbstractQueuedSynchronizer),是 java.util.concurrent.locks 包中的一个抽象类,设计用于构建依赖于一个或多个状态的同步器,为实现阻塞锁和其他同步器(如信号量、事件等)提供了基础。它通过一个整型的状态(int state)和一个 FIFO 的等待队列(CLH 队列)来管理线程的获取和释放资源的过程。即:AQS 通过一个共享的整型 stateFIFO 等待队列,为独占或共享场景下的线程获取和释放资源提供了通用框架。

特点

  • 基于状态的同步AQS 使用一个整型的状态变量来表示同步状态。例如,ReentrantLock使用该状态表示锁是否被占用及重入次数。

  • FIFO 等待队列:当线程无法获取到同步状态时,会被封装成一个节点并加入到等待队列中,按照 FIFO 的顺序排队等待。

  • 独占与共享模式AQS 支持两种模式:

  • 独占模式:一次只能有一个线程获取同步状态(如独占锁)。
  • 共享模式:多个线程可以同时获取同步状态(如信号量)。

核心组件

AQS 的核心在于通过 CAS(Compare-And-Swap) 操作来确保对状态变量的原子更新,并通过等待队列来管理线程的等待与唤醒。

获取同步状态

当线程尝试获取同步状态时,会调用 acquire 方法。具体流程如下:

  1. 尝试获取:调用 tryAcquire 方法(需要子类实现),如果成功,线程获得同步状态,方法返回。
  2. 加入队列:如果获取失败,线程会被封装成节点并加入等待队列。
  3. 阻塞等待:线程会被阻塞,直到它被唤醒。

释放同步状态

当线程释放同步状态时,会调用 release 方法。具体流程如下:

  1. 尝试释放:调用 tryRelease 方法(需要子类实现),如果成功,状态被释放。
  2. 唤醒下一个:检查等待队列中是否有等待的线程,如果有,根据独占或共享模式唤醒相应的线程。

独占与共享模式

  • 独占模式:线程在独占模式下获取同步状态,如果失败则进入等待队列。释放时唤醒下一个独占的线程。
  • 共享模式:多个线程可以同时获取同步状态,适用于信号量等场景。释放时可能需要唤醒多个线程。

内部机制

CAS操作

AQS 依赖于 Unsafe 类的 compareAndSwapInt 方法来实现对状态变量的原子操作,确保线程安全。

自旋与阻塞

当线程尝试获取同步状态失败时,会通过自旋或阻塞的方式等待被唤醒。AQS 通过 LockSupport 类提供的 parkunpark 方法来实现线程的阻塞与唤醒。

节点(Node)状态

等待队列中的每个节点都包含了线程信息以及其在队列中的状态(如是否取消、是否在独占或共享模式下等待等)。

唤醒机制

当同步状态被释放时,AQS 会根据队列的顺序唤醒等待中的线程。在独占模式下,通常只唤醒队列中的第一个线程;在共享模式下,可能会唤醒多个线程。

重要方法

以下是 AQS 中一些关键的方法及其作用:

  • **acquire(int arg)**:独占模式下获取同步状态,如果获取失败则将线程加入等待队列并阻塞。
  • **release(int arg)**:独占模式下释放同步状态,并唤醒等待队列中的下一个线程。
  • **acquireShared(int arg)**:共享模式下获取同步状态,如果获取失败则将线程加入等待队列并阻塞。
  • **releaseShared(int arg)**:共享模式下释放同步状态,并唤醒等待队列中的所有适合的线程。
  • **tryAcquire(int arg)**:尝试以独占模式获取同步状态,子类需要实现此方法。
  • **tryRelease(int arg)**:尝试以独占模式释放同步状态,子类需要实现此方法。
  • **tryAcquireShared(int arg)**:尝试以共享模式获取同步状态,子类需要实现此方法。
  • **tryReleaseShared(int arg)**:尝试以共享模式释放同步状态,子类需要实现此方法。

常见子类

Java 标准库中有多个同步器是基于 AQS 实现的,以下是一些常见的例子:

ReentrantLock

ReentrantLock 是基于AQS的一个可重入锁。它通过AQS的独占模式实现锁的获取与释放,并支持公平锁和非公平锁两种模式。

Semaphore

Semaphore(信号量)是基于 AQS 的一个共享模式同步器。它通过 AQS 的共享模式管理许可的获取与释放,允许多个线程同时获取许可。

CountDownLatch

CountDownLatch 是基于 AQS 的一个同步工具,它允许一个或多个线程等待直到一组操作完成。它使用 AQS 的共享模式来管理等待线程的唤醒。

ReentrantReadWriteLock

ReentrantReadWriteLock 是基于 AQS 实现的一个读写锁。它使用 AQS 的共享模式来管理多个读锁和独占的写锁。

ReentrantLock

简介

ReentrantLock(可重入锁)是Java提供的一种可重入的互斥锁,允许同一个线程多次获取同一把锁而不会发生死锁。它提供了与 synchronized 关键字类似的基本行为,但具有更高的灵活性和更丰富的功能,如可中断锁获取、公平锁、定时锁获取等。

特点

  • 可重入性:同一线程可以多次获取同一把锁,锁的持有计数会自动递增。
  • 公平性:支持公平锁和非公平锁两种模式。
  • 可中断性:可以响应中断,允许线程在等待锁时被中断。
  • 锁获取的尝试:支持尝试获取锁的机制,如 tryLock
  • Condition 支持:提供 Condition 接口,支持更灵活的线程协调。

使用流程

  1. 创建 ReentrantLock 实例:可以选择创建公平锁或非公平锁。
  2. 获取锁:在需要保护的代码块之前调用 lock() 方法获取锁。
  3. 执行同步代码:在获取锁后执行需要同步的代码。
  4. 释放锁:在同步代码执行完毕后,通过 unlock() 方法释放锁。通常在 finally 块中释放锁,以确保锁的释放。

原理

ReentrantLock 是基于 AbstractQueuedSynchronizer 实现的。AQS 提供了一个基于 FIFO 队列的同步器框架,ReentrantLock 通过 AQS 的独占模式实现锁的获取与释放。以下是其内部工作原理的关键点:

可重入性

ReentrantLock 允许同一个线程多次获取锁,每次获取锁时,内部的 holdCount(持有计数)会递增。当线程释放锁时,持有计数会递减,直到为零时,锁才真正被释放,其他等待的线程才能获取锁。

公平与非公平锁

  • 非公平锁:默认情况下,ReentrantLock 是非公平的。线程在获取锁时,不考虑等待队列中的线程,可能会导致“插队”现象,提高吞吐量但可能导致某些线程长时间等待。
  • 公平锁:通过构造方法可以创建公平锁。公平锁按照线程请求锁的顺序来分配锁,避免了“插队”,但可能会略微降低吞吐量。

锁的获取与释放

  • 获取锁:线程调用 lock() 方法时,尝试通过 AQStryAcquire 方法获取锁。如果锁可用(state 为 0),则成功获取并将 state 设置为 1。如果锁不可用,则线程被加入到等待队列中,等待锁的释放。
  • 释放锁:线程调用 unlock() 方法时,通过AQStryRelease 方法释放锁。如果当前线程是锁的持有者,持有计数减一。当持有计数为 0 时,锁被释放,AQS 唤醒等待队列中的下一个线程。

常用API

锁的获取与释放

  • **lock()**:获取锁,如果锁不可用,则等待。
  • **lockInterruptibly()**:获取锁,允许线程在等待锁时被中断。
  • **tryLock()**:尝试获取锁,立即返回,成功则返回 true,否则返回 false
  • **tryLock(long timeout, TimeUnit unit)**:尝试获取锁,在指定的时间内获取,成功则返回 true,超时则返回 false
  • **unlock()**:释放锁。

锁的公平性

  • **new ReentrantLock(boolean fair)**:构造方法,fairtrue 创建公平锁,false 创建非公平锁。

Condition支持

  • **newCondition()**:返回一个与锁绑定的 Condition 实例,用于实现线程间的协调等待与通知。

查询方法

  • **isLocked()**:查询锁是否被任何线程持有。
  • **isHeldByCurrentThread()**:查询当前线程是否持有锁。
  • **getHoldCount()**:获取当前线程持有锁的次数。

应用场景

  • 需要可中断锁获取:使用 lockInterruptibly() 方法允许线程在等待锁时响应中断,这是 synchronized 关键字无法实现的。
  • 需要公平锁:在需要严格按照线程请求锁的顺序进行锁分配的场景中,可以使用公平锁,避免线程饥饿。
  • 需要尝试锁获取:通过 tryLock() 方法,可以尝试获取锁而不会无限期等待,适用于需要在一定条件下放弃操作的场景。
  • 需要多个 ConditionReentrantLock 支持创建多个 Condition 对象,用于实现更复杂的线程协调,比 Objectwait/notify 更灵活。
  • 高度竞争的场景:在高度竞争的多线程环境中,ReentrantLock 可以提供更好的性能和更细粒度的锁控制。

代码示例

实现“生产者-消费者模型”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class ProducerConsumer {
private final Queue<Integer> queue = new LinkedList<>();
private final int MAX_CAPACITY = 5;
// 锁对象
private final ReentrantLock lock = new ReentrantLock();
// 生产者条件变量
private final Condition notFull = lock.newCondition();
// 消费者条件变量
private final Condition notEmpty = lock.newCondition();

public void produce(int value) throws InterruptedException {
lock.lockInterruptibly(); // 支持可中断
try {
while (queue.size() == MAX_CAPACITY) {
notFull.await(); // 等待队列有空位
}
queue.offer(value);
System.out.println("Produced: " + value);
notEmpty.signal(); // 通知消费者
} finally {
lock.unlock();
}
}

public int consume() throws InterruptedException {
lock.lockInterruptibly(); // 支持可中断
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待队列有数据
}
int value = queue.poll();
System.out.println("Consumed: " + value);
notFull.signal(); // 通知生产者
return value;
} finally {
lock.unlock();
}
}

}

与 Synchronized 比较

虽然 ReentrantLocksynchronized 都用于实现互斥锁,但它们在功能和使用上有一些显著的区别:

特性 ReentrantLock synchronized
可重入性 支持,默认可重入 支持,默认可重入
公平性 支持公平锁和非公平锁 不支持公平性
可中断性 支持 lockInterruptibly() 不支持,可被中断
尝试锁获取 支持 tryLock() 和带超时的 tryLock 不支持
多个 Condition 支持 支持多个 Condition 实例 每个对象只有一个隐式条件(wait/notify
性能 在低竞争下可能略慢于 synchronized 在低竞争下性能较好
代码灵活性 需要显式获取和释放锁,代码更灵活 隐式获取和释放锁,代码较简洁

选择使用

  • 如果需要更灵活的锁机制,如公平性、可中断锁获取、多个条件变量等,建议使用 ReentrantLock
  • 如果只是需要简单的互斥锁,并且不需要额外的功能,synchronized 更加简洁且易于使用。

ReentrantReadWriteLock

ReentrantReadWriteLockReentrantLock 的基础上增加了读锁、写锁的区分:

  • 读锁(Read Lock):多个线程可以同时持有读锁,只要没有线程持有写锁。适用于不修改共享资源的操作,可通过 ReentrantReadWriteLock.ReadLock readLock() 获取。
  • 写锁(Write Lock):一次只能有一个线程持有写锁,并且在持有写锁时,所有的读锁和写锁都被阻塞。适用于修改共享资源的操作,可通过 ReentrantReadWriteLock.WriteLock writeLock() 获取。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ReadWriteLockExample {
// 读写锁对象
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
// 读锁
private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
// 写锁
private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();
private int sharedData = 0;

public void readData() {
readLock.lock(); // 获取读锁
try {
Thread.sleep(1000);
System.out.println("Reading data: " + sharedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
readLock.unlock(); // 释放读锁
}
}

public void writeData(int value) {
writeLock.lock(); // 获取写锁
try {
Thread.sleep(2000);
sharedData = value;
System.out.println("Writing data: " + sharedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
writeLock.unlock(); // 释放写锁
}
}
}

运行程序,会发现所有的读操作一起运行。

ReentrantReadWriteLock 是 Java 并发编程中一个强大且灵活的锁机制,特别适用于读操作远多于写操作的场景。

StampedLock

简介

StampedLock 是 Java 8 引入的一种新的锁机制,位于 java.util.concurrent.locks 包中。与 ReentrantReadWriteLock 相比,StampedLock 提供了一种基于能力的锁管理方式,其中锁的获取和释放通过使用票据(stamps)来控制,此票据可用于释放锁或检查锁是否有效。

模式

StampedLock 支持三种模式的锁:

  • 写锁:独占锁,阻止其他读锁和写锁。
  • 读锁:共享锁,允许多个线程同时获取读锁,但不允许写锁。
  • 乐观读:一种尝试不阻塞写锁获取的读锁,需要通过检查印记后确认数据有效性。

使用流程

  1. 获取锁:根据需要的锁类型(读锁、写锁或乐观读),使用相应的方法获取锁,并返回一个印记 stamp
  2. 操作共享数据:在持有锁的保护下操作数据。
  3. 释放锁:使用印记来释放锁。

原理

StampedLock 的设计主要是为了优化读多写少的场景,通过提供一种不总是需要阻塞读操作的机制(乐观读)。其原理包括:

  • 印记管理:每次锁获取或释放都伴随一个唯一的印记,这有助于管理锁的状态和验证操作的有效性。
  • 锁模式:支持三种锁模式,允许在不同情况下选择最适合的锁。
  • 乐观读锁:允许线程假定没有写操作,进行读取,后续需要通过印记来验证读取的数据是否在读取过程中被修改。

常用API

写锁

  • writeLock():获取写锁。
  • unlockWrite(long stamp):释放写锁。
  • tryConvertToWriteLock(long stamp):尝试升级为写锁。

读锁

  • readLock():获取读锁。
  • unlockRead(long stamp):释放读锁。
  • tryConvertToReadLock(long stamp):尝试降级为读锁,失败返回 0

乐观读锁

  • tryOptimisticRead():尝试获取乐观读锁。
  • validate(long stamp):检查在乐观读期间是否有写锁被获取。

应用场景

  • 大量读操作,少量写操作的数据结构:如缓存、配置信息的读取。
  • 需要降低锁竞争的场景:提高系统的并发能力。
  • 替代 ReadWriteLock 的场景:当需要非阻塞的读取操作时,StampedLock 提供了更好的性能。

锁降级/升级

锁降级(Write → Read)

适用场景

  • 需要短暂写入,但后续是长期读取,避免写锁阻塞其他读操作。
  • 数据修改后不希望立即释放锁,确保后续的读操作读取的是一致的数据。
  • 减少写锁持有时间,提高并发能力。

执行流程

  1. 线程先获取写锁,修改数据。
  2. 尝试降级为读锁(避免长期持有写锁)。
  3. 如果降级失败,释放写锁后重新获取读锁。
  4. 继续执行读取操作,不会阻塞其他读线程。

代码示例:配置缓存更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class LockDowngradeExample {
private final StampedLock lock = new StampedLock();
private int sharedData = 0;

public void updateAndRead() {
long stamp = lock.writeLock(); // 1. 获取写锁
try {
sharedData++; // 2. 修改数据
System.out.println("Updated sharedData to: " + sharedData);

// 3. 尝试降级为读锁,减少对其他线程的影响
stamp = lock.tryConvertToReadLock(stamp);
if (stamp == 0) { // 降级失败,重新获取读锁
lock.unlockWrite(stamp);
stamp = lock.readLock();
}

System.out.println("Reading after update: " + sharedData);
} finally {
lock.unlock(stamp);
}
}

public static void main(String[] args) {
LockDowngradeExample example = new LockDowngradeExample();
example.updateAndRead();
}
}

锁升级(Read → Write)

适用场景

  • 需要在读取数据后进行修改(如乐观读发现数据变更)。
  • 不能直接持有写锁,否则影响并发性能。
  • 需要确保数据一致性,防止并发更新问题。

执行流程

  1. 先获取乐观读锁(tryOptimisticRead()),非阻塞提高性能。
  2. 读取数据后,验证是否被其他线程修改。
  3. 如果数据未变更,直接返回。
  4. 如果数据已变更:
    • 先退化为悲观读锁,确保数据正确性。
    • 尝试升级到写锁,如果成功,则直接修改数据。
    • 如果升级失败,释放读锁并重新获取写锁(防止死锁)。

代码示例:基于 StampedLock 的乐观读升级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class LockUpgradeExample {
private final StampedLock lock = new StampedLock();
private int sharedData = 0;

public void readThenMaybeWrite() {
long stamp = lock.tryOptimisticRead(); // 1. 获取乐观读锁
int data = sharedData;

// 2. 验证读锁是否仍然有效
if (!lock.validate(stamp)) {
// 3. 退化为悲观读锁,确保数据一致性
stamp = lock.readLock();
try {
data = sharedData;
// 4. 可能需要升级为写锁
long writeStamp = lock.tryConvertToWriteLock(stamp);
if (writeStamp != 0L) { // 5. 成功升级
stamp = writeStamp;
sharedData = data + 1;
System.out.println("Upgraded to write lock. Updated sharedData to: " + sharedData);
} else { // 6. 失败,释放读锁后重新获取写锁
lock.unlockRead(stamp);
stamp = lock.writeLock();
sharedData = data + 1;
System.out.println("Acquired write lock separately. Updated sharedData to: " + sharedData);
}
} finally {
lock.unlock(stamp);
}
}
System.out.println("Final sharedData: " + sharedData);
}

public static void main(String[] args) {
LockUpgradeExample example = new LockUpgradeExample();
example.readThenMaybeWrite();
}
}

注意事项

  • StampedLock 不可重入,一个线程不能重复获取同一类型的锁,否则会死锁。
  • 乐观读锁可能在执行过程中失效,必须通过 validate() 检测,并可能需要回退到悲观读锁或升级为写锁。
  • 锁降级是允许的(write → read),但锁升级不被支持(read → write 必须释放后重新获取)。
  • 在复杂的并发环境下,可以结合降级(减少锁竞争)和升级(确保数据一致性)优化性能。

Semaphore

概念

Semaphore(信号量)是一种用于控制多个线程对共享资源访问的计数器。信号量维护了一个内部计数器,表示可用的许可(permits)数量,确保系统资源的有效利用和防止资源过载。线程在访问资源前需要获取一个许可,访问完成后释放许可。信号量可以用来实现互斥锁(当许可数量为 1 时)或限制资源的并发访问数量(许可数量大于 1 时)。

特点

  • 许可机制:信号量通过许可数量来控制并发访问的线程数量。
  • 阻塞与非阻塞:当许可不足时,线程可以选择阻塞等待或立即返回。
  • 公平性:信号量可以设置为公平(FIFO)或非公平,决定线程获取许可的顺序。
  • 计数器:内部维护一个计数器,表示当前可用的许可数量。

使用流程

  1. 创建 Semaphore 实例:指定初始许可数量和可选的公平性参数。
  2. 获取许可:在访问受限资源前,调用 acquire() 方法获取一个许可。如果许可不足,线程将被阻塞,直到有许可可用。
  3. 执行同步操作:在获取许可后,执行需要受限访问的代码。
  4. 释放许可:操作完成后,调用 release() 方法释放许可,允许其他等待的线程获取许可。

原理

Semaphore 的核心原理基于许可的获取与释放机制。它通过一个内部计数器来跟踪可用的许可数量:

  • 获取许可(acquire):当线程调用 acquire() 方法时,信号量的计数器减 1。如果计数器为负,表示当前没有足够的许可,线程将被阻塞,直到有许可可用。
  • 释放许可(release):当线程调用 release() 方法时,信号量的计数器加 1。如果有等待的线程被阻塞,信号量会唤醒其中一个线程,使其可以获取许可继续执行。

Semaphore 可以设置为公平或非公平:

  • 公平信号量:按照线程请求许可的顺序(FIFO)来分配许可,避免线程饥饿。
  • 非公平信号量:线程获取许可时不考虑顺序,可能会导致某些线程长期等待,但吞吐量较高。

常用API

构造方法

  • **Semaphore(int permits)**:创建一个具有指定许可数量的非公平信号量。
  • **Semaphore(int permits, boolean fair)**:创建一个具有指定许可数量和公平性策略的信号量。

获取与释放许可

  • **void acquire()**:获取一个许可,如果许可不可用,则线程被阻塞,直到有许可可用。
  • **void acquire(int permits)**:获取指定数量的许可。
  • **boolean tryAcquire()**:尝试获取一个许可,立即返回,成功则返回 true,否则返回 false
  • **boolean tryAcquire(long timeout, TimeUnit unit)**:在指定的等待时间内尝试获取一个许可,成功则返回 true,否则返回 false
  • **void release()**:释放一个许可。
  • **void release(int permits)**:释放指定数量的许可。

其他方法

  • **int availablePermits()**:返回当前可用的许可数量。
  • **int getQueueLength()**:返回等待获取许可的线程数量。
  • **boolean hasQueuedThreads()**:判断是否有线程在等待许可。
  • **boolean isFair()**:判断信号量是否为公平信号量。

应用场景

  • 限制并发访问数量:例如,限制同时访问数据库连接池的线程数量,防止过多线程导致资源耗尽。
  • 实现互斥锁:当Semaphore的许可数量设置为1时,可以用来实现互斥锁,确保同一时间只有一个线程访问资源。
  • 控制流量:在网络编程中,使用信号量来控制并发请求的数量,防止服务器过载。

代码示例

限制并发访问数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class SemaphoreExample {
private final Semaphore semaphore = new Semaphore(2); // 2个许可

public void accessResource() {
try {
if (semaphore.tryAcquire(500, TimeUnit.MILLISECONDS)) { // 尝试在500ms内获取许可
try {
System.out.println(Thread.currentThread().getName() + " acquired a permit.");
Thread.sleep(1000); // 模拟资源访问
} finally {
semaphore.release(); // 释放许可
System.out.println(Thread.currentThread().getName() + " released a permit.");
}
} else {
System.out.println(Thread.currentThread().getName() + " could not acquire a permit.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

public static void main(String[] args) {
SemaphoreExample example = new SemaphoreExample();

// 创建3个线程尝试获取许可
for (int i = 0; i < 3; i++) {
new Thread(example::accessResource, "Thread-" + i).start();
}
}
}

在这个示例中,第三个线程尝试在 500 毫秒内获取许可,但由于前两个线程持有许可,第三个线程在超时后未能获取许可并返回。

CountDownLatch

简介

CountDownLatch 是一种同步工具,用于协调多个线程之间的执行顺序。它基于一个计数器,初始化时设置为某个数值,每当一个线程完成其任务后,就将计数器减一。当计数器的值达到零时,所有等待的线程被唤醒,继续执行后续操作。

特点

  • 单向同步CountDownLatch 是单向的,计数器一旦达到零,不能重置或增加。
  • 不可重用:一旦计数器达到零,CountDownLatch 不能再次使用。如果需要可重用的同步工具,可以考虑 CyclicBarrier
  • 等待与通知机制:一个或多个线程等待,直到其他线程完成各自的操作。
  • 线程安全CountDownLatch 是线程安全的,适用于多线程环境。

使用流程

  1. 创建CountDownLatch实例:在初始化时设置计数器的值(即需要等待的事件数量)。
  2. **等待线程调用 await()**:一个或多个线程调用 await() 方法,进入等待状态,直到计数器达到 0。
  3. **计数线程调用 countDown()**:其他线程在完成各自的任务后,调用 countDown() 方法将计数器减 1。
  4. 释放等待线程:当计数器的值达到零时,所有等待的线程被唤醒,继续执行后续操作。

原理

CountDownLatch 的核心原理基于一个内部的计数器,该计数器初始化时设定为需要等待的事件数量。主要工作流程包括:

  • 计数器管理:计数器的值表示还需要多少个事件完成。当某个事件完成时,调用 countDown() 方法将计数器减一。
  • 等待机制:调用 await() 方法的线程会被阻塞,直到计数器的值达到零。
  • 释放机制:当计数器达到零时,所有等待的线程被唤醒,继续执行后续代码。

关键点

  • 线程协调CountDownLatch 主要用于协调线程之间的执行顺序,确保某些操作在其他操作完成后才开始。
  • 不可重用:一旦计数器达到 0,CountDownLatch 不能再次使用。如果需要多次使用同步工具,可以考虑 CyclicBarrier
  • 无依赖性:等待线程和计数线程可以在不同的时间启动和结束,CountDownLatch 不强制它们的执行顺序。

常用API

构造方法

  • **CountDownLatch(int count)**:创建一个 CountDownLatch,计数器初始化为指定的 count 值。count 必须大于零。

主要方法

  • **void await()**:使当前线程等待,直到计数器的值达到 0。
  • **void await(long timeout, TimeUnit unit)**:使当前线程等待,直到计数器的值达到 0,或等待时间超过指定的 timeout
  • **boolean await(long timeout, TimeUnit unit)**:类似于 await(long, TimeUnit),但在等待超时后返回一个布尔值,表示是否成功等待到计数器为 0。
  • **void countDown()**:将计数器的值减 1。如果计数器的值达到 0,则释放所有等待的线程。
  • **long getCount()**:返回当前计数器的值。

应用场景

  • 等待多个线程完成:在主线程需要等待多个工作线程完成任务后再继续执行的场景。例如,启动多个服务后,等待所有服务启动完成再进行后续操作。
  • 事件触发机制:在一个事件触发前,其他线程需要等待某些条件的满足。例如,加载配置文件完成后,其他线程才能开始处理请求。
  • 测试并发性能:在测试环境中,使用 CountDownLatch 来启动多个线程同时执行某个操作,以测试系统的并发性能。
  • 线程依赖关系:在复杂的多线程任务中,某些线程的执行依赖于其他线程的完成情况,使用 CountDownLatch 来管理这些依赖关系。

代码示例

主线程启动多个工作线程,并等待所有工作线程完成后继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CountDownLatchExample {
private static final int NUM_WORKERS = 3;
private final CountDownLatch latch = new CountDownLatch(NUM_WORKERS);

public void workerTask(int workerId) {
System.out.println("Worker " + workerId + " started.");
try {
// 模拟工作任务
Thread.sleep((long) (Math.random() * 2000));
System.out.println("Worker " + workerId + " finished.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 完成任务后,计数器减一
}
}

public void startTasks() throws InterruptedException {
for (int i = 1; i <= NUM_WORKERS; i++) {
final int workerId = i;
new Thread(() -> workerTask(workerId)).start();
}

System.out.println("Main thread waiting for workers to finish.");
latch.await(); // 等待计数器达到零
System.out.println("All workers finished. Main thread proceeding.");
}

public static void main(String[] args) throws InterruptedException {
CountDownLatchExample example = new CountDownLatchExample();
example.startTasks();
}
}

注意事项

  • 一次性使用CountDownLatch 是一次性的,计数器一旦达到 0,无法重置或重新使用。如果需要可重用的同步工具,可以考虑 CyclicBarrier
  • 确保 countDown 调用:在使用 CountDownLatch 时,必须确保所有需要调用 countDown() 的线程都能够正常执行,否则可能导致等待线程永久阻塞。
  • 避免死锁:如果 await() 方法被调用后,相关线程无法调用 countDown(),将导致死锁。因此,在设计使用 CountDownLatch 的同步逻辑时,需要确保计数器能够正确地递减到 0。

BlockingQueue

简介

BlockingQueue 是一个支持两个附加操作的 Queue 接口:

  1. 在队列为空时等待获取元素:当线程尝试从一个空的队列中获取元素时,该线程会被阻塞,直到队列中有元素可供获取。
  2. 在队列满时等待插入元素:当线程尝试向一个有容量限制且已满的队列中插入元素时,该线程会被阻塞,直到队列中有空间可用。

BlockingQueue 的这些特性使其在多线程环境下能够有效地协调生产者和消费者之间的工作,避免资源竞争和数据不一致的问题。

特点

  • 线程安全:所有的 BlockingQueue 实现都是线程安全的,内部通过锁机制或其他同步技术保证并发操作的正确性。
  • 阻塞操作:提供了在无法立即完成操作时阻塞线程的方法,简化了线程间的协调。
  • 容量限制:可以选择有界队列(有固定容量)或无界队列(容量无限制)。
  • 多种实现:提供了多种不同特性的实现类,以适应不同的应用场景。

使用流程

  1. 选择合适的 BlockingQueue 实现类:根据具体需求选择适当的实现类,如 ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueue 等。
  2. 创建 BlockingQueue 实例:根据需要设置队列的容量和其他参数。
  3. 生产者线程插入元素:生产者线程向队列中插入元素,可能会因为队列满而被阻塞。
  4. 消费者线程获取元素:消费者线程从队列中获取元素,可能会因为队列为空而被阻塞。
  5. 处理元素:消费者线程处理获取到的元素。
  6. 重复上述过程:生产者和消费者线程持续执行插入和获取操作,直到完成任务。

原理

内部同步机制

  • 锁机制:大多数 BlockingQueue 实现类使用内部锁(如 ReentrantLock)来保护队列的状态,确保在插入和获取元素时的线程安全。
  • 条件变量:通过条件变量(Condition)实现线程的等待和通知机制。当队列满时,生产者线程等待;当队列为空时,消费者线程等待。

阻塞策略

  • 生产者插入元素时的阻塞:当队列已满,生产者线程调用 put() 方法插入元素时会被阻塞,直到有空间可用。
  • 消费者获取元素时的阻塞:当队列为空,消费者线程调用 take() 方法获取元素时会被阻塞,直到有元素可用。

公平性

一些 BlockingQueue 实现类支持公平性选项,决定线程获取锁的顺序。公平性有助于避免线程饥饿,但可能会稍微降低吞吐量。

常用API

BlockingQueue 接口继承自 Queue,并增加了一些用于阻塞操作的方法。以下是一些常用的方法:

插入元素

  • **boolean add(E e)**:向队列中添加元素,如果队列已满则抛出 IllegalStateException
  • **boolean offer(E e)**:尝试向队列中添加元素,如果队列已满则返回 false
  • **void put(E e)**:向队列中添加元素,如果队列已满则阻塞,直到有空间可用。
  • **boolean offer(E e, long timeout, TimeUnit unit)**:尝试在指定时间内向队列中添加元素,如果队列已满则在超时后返回 false

获取元素

  • **E remove()**:从队列中移除并返回头部元素,如果队列为空则抛出 NoSuchElementException
  • **E poll()**:从队列中移除并返回头部元素,如果队列为空则返回 null
  • **E take()**:从队列中移除并返回头部元素,如果队列为空则阻塞,直到有元素可用。
  • **E poll(long timeout, TimeUnit unit)**:尝试在指定时间内从队列中移除并返回头部元素,如果队列为空则在超时后返回 null

检查队列状态

  • **int size()**:返回队列中元素的数量。
  • **boolean isEmpty()**:检查队列是否为空。
  • **boolean contains(Object o)**:检查队列是否包含指定的元素。
  • **int remainingCapacity()**:返回队列剩余的容量。

其他方法

  • **Iterator<E> iterator()**:返回队列的迭代器。
  • **void clear()**:清空队列中的所有元素。

应用场景

  • 生产者-消费者模型BlockingQueue 是实现“生产者-消费者模型”的理想选择。生产者线程将数据放入队列,消费者线程从队列中获取数据进行处理。
  • 任务调度:在任务调度系统中,可以使用 BlockingQueue 来存储待执行的任务,多个工作线程从队列中获取任务并执行。
  • 数据流处理:在数据流处理系统中,BlockingQueue 可用于在不同处理阶段之间传递数据,确保数据的有序和线程安全的传递。
  • 线程池中的任务队列:Java 的线程池实现(如 ThreadPoolExecutor)内部使用 BlockingQueue 来存储待执行的任务,线程池中的工作线程从队列中获取任务并执行。

流量控制

在高并发系统中,BlockingQueue 可以用于控制并发流量,限制同时处理的请求数量,防止系统过载。

代码示例

生产者-消费者模型

使用 ArrayBlockingQueueLinkedBlockingQueue 实现“生产者-消费者模型”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class ProducerConsumerExample {
// 有界队列(ArrayBlockingQueue实现),容量为5
// private final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

// 无界队列(LinkedBlockingQueue实现,也可以限制容量,推荐限制容量)
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

/**
* 生产者方法
*/
public void producer(int i) {
try {
queue.put(i); // 插入元素,如果队列满则阻塞
System.out.println("Produced: " + i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

/**
* 消费者方法
*/
public void consumer() {
while (true) {
try {
int value = queue.take(); // 获取元素,如果队列为空则阻塞
System.out.println("Consumed: " + value);
Thread.sleep(500); // 模拟处理时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

public static void main(String[] args) {
ProducerConsumerExample example = new ProducerConsumerExample();

// 创建生产者线程
List<Thread> producerThreads = new ArrayList<>();
for (int i = 0; i < 50; i++) {
final int value = i;
Thread producerThread = new Thread(() -> example.producer(value), "Producer-" + i);
producerThreads.add(producerThread);
}
// 创建消费者线程
Thread consumerThread = new Thread(example::consumer, "Consumer");

// 启动线程
producerThreads.forEach(Thread::start);
consumerThread.start();
}
}

优先级任务

使用 PriorityBlockingQueue 实现优先级任务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class PriorityBlockingQueueExample {
// 定义任务类,实现Comparable接口以支持优先级排序
static class Task implements Comparable<Task> {
private final String name;
private final int priority;

public Task(String name, int priority) {
this.name = name;
this.priority = priority;
}

@Override
public int compareTo(Task other) {
return Integer.compare(other.priority, this.priority); // 高优先级先处理
}

@Override
public String toString() {
return "Task{name='" + name + "', priority=" + priority + '}';
}
}

// 任务队列
private final BlockingQueue<Task> queue = new PriorityBlockingQueue<>();

// 生产者方法
public void producer() {
Task[] tasks = {
new Task("Task1", 3),
new Task("Task2", 1),
new Task("Task3", 2),
new Task("Task4", 5),
new Task("Task5", 4)
};
try {
for (Task task : tasks) {
queue.put(task); // 插入任务
System.out.println("Produced: " + task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

// 消费者方法
public void consumer() {
try {
while (true) {
Task task = queue.take(); // 获取任务
System.out.println("Consumed: " + task);
Thread.sleep(400); // 模拟消费时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

public static void main(String[] args) {
PriorityBlockingQueueExample example = new PriorityBlockingQueueExample();

// 创建生产者线程
Thread producerThread = new Thread(example::producer, "Producer");
// 创建消费者线程
Thread consumerThread = new Thread(example::consumer, "Consumer");

producerThread.start();
consumerThread.start();
}
}

延时任务

使用 DelayQueue 实现延时任务(只有在延迟时间到期时才能获取元素)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class DelayQueueExample {

private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

// 定义延迟任务类,实现Delayed接口
static class DelayedTask implements Delayed {
private final String name;
private final long startTime;

public DelayedTask(String name, long delay, TimeUnit unit) {
this.name = name;
this.startTime = System.currentTimeMillis() + unit.toMillis(delay);
}

@Override
public long getDelay(TimeUnit unit) {
long remaining = startTime - System.currentTimeMillis();
return unit.convert(remaining, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed other) {
return Long.compare(this.startTime, ((DelayedTask) other).startTime);
}

@Override
public String toString() {
return "DelayedTask{name='" + name + "', startTime=" + startTime + '}';
}
}

public static void main(String[] args) {
BlockingQueue<DelayedTask> delayQueue = new DelayQueue<>();

// 生产者线程,插入延迟任务
Thread producer = new Thread(() -> {
try {
delayQueue.put(new DelayedTask("Task1", 3, TimeUnit.SECONDS));
System.out.println("Produced Task1 with 3 seconds delay.");
delayQueue.put(new DelayedTask("Task2", 1, TimeUnit.SECONDS));
System.out.println("Produced Task2 with 1 second delay.");
delayQueue.put(new DelayedTask("Task3", 5, TimeUnit.SECONDS));
System.out.println("Produced Task3 with 5 seconds delay.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer");

// 消费者线程,获取延迟任务
Thread consumer = new Thread(() -> {
try {
while (true) {
DelayedTask task = delayQueue.take(); // 阻塞直到任务到期
System.out.println("Consumed: " + task.name + " at " + DATE_TIME_FORMATTER.format(LocalDateTime.now()));
if ("Task3".equals(task.name)) {
break; // 结束条件
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer");

producer.start();
consumer.start();
}
}

注意事项

  • 选择合适的实现类:根据具体需求选择合适的 BlockingQueue 实现类。例如,ArrayBlockingQueue 适用于固定容量的场景,LinkedBlockingQueue 适用于需要动态扩展的场景,PriorityBlockingQueue 适用于需要按优先级处理元素的场景。
  • 避免死锁:确保生产者和消费者能够正常运行,避免由于队列满或空导致的永久阻塞。合理设置队列容量和处理速度。
  • 处理异常:在使用阻塞操作时,注意处理 InterruptedException,以确保线程能够响应中断信号。
  • 资源管理:对于有界队列,合理设置队列容量,避免内存溢出或资源浪费。
  • 线程协调:确保生产者和消费者的数量与队列容量相匹配,以实现最佳的并发性能。

队列选择

BlockingQueueQueue 接口的扩展,提供了在队列为空或满时的阻塞操作。以下是 BlockingQueue 与其他常用队列的比较:

特性 BlockingQueue LinkedList (Queue) PriorityQueue ArrayDeque (Queue)
阻塞操作 支持(通过 puttake 方法) 不支持 不支持 不支持
线程安全 大多数实现是线程安全的 非线程安全 非线程安全 非线程安全
有界/无界 支持有界和无界实现 通常无界 通常无界 通常无界
元素顺序 FIFO FIFO 按优先级排序 FIFO
常用实现类 ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueueDelayQueue LinkedList PriorityQueue ArrayDeque

选择使用

  • BlockingQueue:在需要线程安全的阻塞操作时使用,如“生产者-消费者模型”、任务调度等。
  • LinkedList:在单线程环境下或通过外部同步机制保证线程安全时使用,适合需要频繁插入和删除的场景。
  • PriorityQueue:在需要按照优先级顺序处理元素时使用,但需注意其非线程安全性。
  • ArrayDeque:在需要高效的双端队列操作时使用,适合单线程环境或通过外部同步机制保证线程安全的场景。

总结

本篇围绕 AQS 及其衍生的多种并发工具(锁、信号量、倒计时门闩、读写锁、阻塞队列等)进行了系统性阐述:

  • AQS 通过一个共享的整型 stateFIFO 等待队列,为独占或共享场景下的线程获取和释放资源提供了通用框架。
  • 在此基础上,Java 并发库构建了多种实用的同步类,如 ReentrantLock(可重入互斥锁)、Semaphore(信号量)、CountDownLatch(一次性门闩)、ReentrantReadWriteLock(读写锁)等,用来应对不同的并发访问需求。
  • 同时,BlockingQueueStampedLock 等更高级数据结构和锁机制能满足更细粒度的需求,如延迟队列、优先级调度、乐观读等。

实际应用中,应根据业务特性(线程竞争程度、读写比、是否需要可中断或公平锁等)选用合适的并发组件,并充分理解它们所带来的性能和可维护性影响,从而在安全性与效率之间取得最佳平衡。