Java并发编程——管程Monitor

本文最后更新于:4 年前

引言

并发编程的本质是如何安全、高效地管理多个线程对共享资源的访问,而管程切好能够帮助我们在多线程环境中实现对共享资源的互斥访问与条件同步。Java 语言内建了管程机制,通过关键字 synchronized 以及 wait() / notify() 等方法,让开发者能够自然地在代码层面封装共享数据的访问方式、保护临界区,降低并发编程的复杂度。本篇文章将基于管程这一概念,深入探讨其在 JVM 层级的实现细节与常见应用场景,同时结合示例剖析多线程间的经典问题及应对思路。

概述

管程(Monitor)是一种高级并发控制机制,用于管理多个线程对共享资源的访问,保证线程之间的同步,避免数据竞争和状态不一致问题。Java语言通过内置锁和 Object 类的方法(如 wait()notify()notifyAll())实现了管程的机制。

优点

  1. 简化同步问题:管程封装了对共享资源的访问,使得线程之间的协调变得更加简单。使用 synchronized 或其他同步机制时,开发者无需显式地管理锁和线程的状态。
  2. 自动互斥:管程自动处理线程间的互斥,确保同一时刻只有一个线程可以执行管程内的方法,从而避免了竞态条件。
  3. 线程协调:管程通过条件变量(如 wait()notify() 等)提供了线程间的协调机制,生产者和消费者等模式能够更加自然地实现。
  4. 高层次抽象:管程提供了对共享资源和同步机制的高层次抽象,开发者只需关注如何定义和操作共享资源,不需要关注底层的锁管理。

缺点

  1. 性能开销:由于管程的锁和同步机制,每次进入管程时都需要获取锁,操作完成后释放锁。这会带来一定的性能开销,尤其是当管程内的操作较为频繁时,性能可能受到影响。
  2. 缺乏灵活性:管程的实现较为抽象和封闭,可能不适用于所有场景。在一些复杂的并发控制情况下,可能需要更灵活的机制,如显式的锁(ReentrantLock)或其他并发控制策略。
  3. 容易出现假唤醒:在使用条件变量时,如果没有正确地检查条件,可能会遇到假唤醒的问题,导致线程被错误地唤醒并继续执行。这需要额外的逻辑判断来防止此类问题。
  4. 死锁风险:如果管程设计不当,仍然可能导致死锁问题,尤其是当多个管程或多个锁的资源互相依赖时。
  5. 复杂性管理:对于一些复杂的多线程任务,使用管程可能导致程序的可维护性和可理解性下降。尤其是在多个管程相互协作时,代码可能变得难以调试和扩展。

工作流程

  1. 锁的获取:当一个线程需要进入管程执行某些操作时,它首先需要获取该管程的锁。锁的获取是排他性的,这意味着同一时刻只有一个线程能够获得该锁并进入管程。
  2. 条件判断和等待:如果该线程进入管程后发现某个条件不满足(例如,生产者发现缓冲区已满,消费者发现缓冲区为空),它就会调用 wait() 进入等待队列,直到条件满足。
  3. 唤醒其他线程:当其他线程修改了共享资源的状态,可能会唤醒等待队列中的某些线程(调用 notify()notifyAll())。这些线程会重新竞争管程的锁,并继续执行。
  4. 退出和释放锁:当线程完成了它在管程中的工作后,它会释放锁,使得其他线程可以进入管程。释放锁后,线程会从管程退出。

以“生产者-消费者模型”对管程的工作流程展开说明:

sequenceDiagram
    participant P as 生产者
    participant C as 消费者
    participant B as 缓冲区(管程)

    %% 生产者执行过程
    P->>B: 调用 produce(item)
    B->>B: 检查缓冲区是否已满
    alt 缓冲区满
        B->>P: 调用 wait() 等待
    end
    B->>B: 将 item 添加到缓冲区
    B->>C: 调用 notifyAll() 通知消费者
    P->>P: 返回

    %% 消费者执行过程
    C->>B: 调用 consume()
    B->>B: 检查缓冲区是否为空
    alt 缓冲区空
        B->>C: 调用 wait() 等待
    end
    B->>B: 从缓冲区取出 item
    B->>P: 调用 notifyAll() 通知生产者
    C->>C: 返回

基本原理

管程的基本原理可以从以下几个方面进行理解:

组成部分

  • 互斥锁:确保同一时刻只有一个线程可以访问共享资源。
  • 条件变量:用于线程之间的通信和协作,允许线程在特定条件下进入等待状态或被唤醒。
  • 同步队列:线程在等待条件变量时会被加入到一个同步队列中,唤醒时会从队列中取出线程。

JVM 层级

对象头结构

Java 对象在内存中的布局包括对象头、实例数据和对齐填充。对象头中的 Mark Word 存储了锁的信息,包括:

  • 偏向锁:记录线程 ID、偏向时间等。
  • 轻量级锁:记录锁记录指针(Lock Record)。
  • 重量级锁:记录指向 Monitor 对象的指针。

这也就意味着每个 Java 对象在 JVM 中都具备 Monitor 锁机制。

同步队列

  • Monitor 依赖于底层的操作系统互斥量实现。
  • 每个线程请求锁时会尝试进入 Monitor,如果失败,则会进入等待队列。

对象监视器锁

synchronized 的底层实现依赖于 Monitor,而 Monitor 依托于对象头中的 Mark Word 实现。

  • 每个对象都对应一个监视器锁,当线程试图访问被 synchronized 修饰的代码块时,需要先获得对象的监视器锁。
  • 获得锁的线程可以执行同步代码,而其他线程会进入阻塞或等待队列。
  • 当锁被释放时,其他等待的线程会竞争锁,并根据一定策略(如线程优先级)重新获得执行权。

虚拟机指令

synchronized 关键字在 Java 字节码中会被编译为以下指令:

  1. monitorenter
    • 用于尝试获取对象的监视器锁。
    • 进入同步代码块时执行。
  2. monitorexit
    • 用于释放对象的监视器锁。
    • 退出同步代码块时执行。

字节码示例:

1
2
3
4
5
public void syncMethod() {
synchronized (this) {
System.out.println("Hello synchronized");
}
}

编译后的字节码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
0: aload_0         // 加载当前对象引用
1: dup
2: monitorenter // 尝试获取监视器锁
3: getstatic // 获取 System.out
6: ldc // 加载字符串 "Hello synchronized"
8: invokevirtual // 调用 println 方法
11: aload_0 // 加载当前对象引用
12: monitorexit // 释放监视器锁
13: goto 21 // 跳转到结束
16: astore_1 // 捕获异常
17: aload_0 // 加载当前对象引用
18: monitorexit // 释放监视器锁(异常处理)
19: aload_1 // 抛出异常
20: athrow

锁的状态

synchronized 锁的状态主要有以下几种,随着锁的竞争激烈程度逐步升级:

  1. 无锁(No Lock):
    • 没有竞争时,线程直接执行。
    • 适用于单线程场景。
  2. 偏向锁(Biased Locking):
    • 锁被偏向于第一个获得它的线程。
    • 后续该线程再次获取锁时,无需加锁解锁操作。
    • 偏向锁适用于锁竞争极低的场景,提升性能。
  3. 轻量级锁(Lightweight Locking):
    • 通过 CAS(Compare-And-Swap) 操作尝试加锁。
    • 适用于有少量竞争的场景,减少上下文切换开销。
  4. 重量级锁(Heavyweight Locking):
    • 线程竞争激烈时,升级为重量级锁,线程被阻塞,进入操作系统的调度机制。
    • 适用于高竞争场景,但开销较大。

锁优化机制

为了提升锁的性能,JVM 实现了多种优化机制:

  • 锁消除:在 JIT 编译时,如果发现某段代码中的锁是线程不可能共享的,会直接消除该锁。
  • 锁膨胀:当轻量级锁竞争激烈时,会膨胀为重量级锁。
  • 锁偏向撤销:如果偏向锁的线程发生变化,会撤销偏向锁并升级为轻量级锁。
  • 自适应自旋锁:在锁竞争时,线程会进行一定次数的自旋操作,避免频繁上下文切换。

互斥

管程的核心特点之一是互斥,即每次只有一个线程能够进入管程执行代码。这是通过给管程中的操作加锁来实现的。在 Java 中,synchronized 关键字或者 ReentrantLock 都可以用于实现互斥访问。每个管程都只有一个锁(也可以理解为一个监视器),同一时刻只有一个线程能够持有这个锁,其他线程必须等待。

  • 在 Java 中,类的 synchronized 方法或同步块(synchronized block)都隐式地使用了这个锁。
  • 该锁保护了管程内的共享资源,防止多个线程同时访问这些资源,避免了竞态条件的发生。

条件变量

除了互斥,管程还使用条件变量来控制线程的执行顺序。当某个线程需要等待某些条件成立时,它可以将自己挂起,并在条件成立时被唤醒。条件变量通常包括 wait()notify()notifyAll() 方法(在 Java 中是 Object 类的方法),用于实现线程之间的协作和通信。

  • wait():使当前线程进入等待队列,直到被唤醒。
  • notify():唤醒一个等待队列中的线程。
  • notifyAll():唤醒所有等待队列中的线程。

通过条件变量,线程可以在满足某些条件时再继续执行,从而避免了不必要的资源竞争。

入口和出口

管程的访问通常是通过特定的方法(称为“入口”)来进行的。每次线程访问管程时,都需要通过这些方法来进入,线程在进入管程之前必须获得锁。在完成操作后,线程可以通过“出口”释放锁,使得其他线程可以进入管程。

每个管程可以包含多个“入口”方法(例如,增加、减少、检查状态等),但同一时刻只有一个线程能执行这些方法。

管程的封装

管程通过封装来隐藏共享资源的细节和同步机制,确保外部线程无法直接访问共享数据。所有对共享资源的操作都必须通过管程提供的方法来进行,从而保证了资源的一致性和线程安全。

在 Java 中,这通常通过将共享资源和对其操作的实现封装在类内部,避免外部代码直接修改。

应用场景

互斥访问

管程提供了一种机制,确保同一时刻只有一个线程可以访问共享资源,从而实现线程间的互斥。

  • 数据库连接池管理:管理数据库连接,确保连接创建、使用和回收的互斥。
  • 文件系统操作:在操作系统中,文件的读写需要互斥控制,防止数据损坏。
  • 硬件设备管理:控制对打印机、扫描仪等共享硬件的访问,确保操作不会相互干扰。

条件同步

管程可以基于条件变量控制线程的执行顺序,允许线程在特定条件下等待或被唤醒。

  • 生产者-消费者问题:同步生产者和消费者线程,确保消费者在有产品可消费时才运行。
  • 任务调度系统:在任务依赖特定条件(如时间、资源可用性)时启动。
  • 事件驱动系统:如 GUI 应用,等待用户操作事件。

资源共享

管程使多个线程能够安全地共享资源,如数据结构、内存等。

  • 内存管理系统:操作系统中的内存分配与回收。
  • 有界队列(Bounded Queue):多线程环境中用于任务或消息传递的队列。
  • 缓冲区管理:如多媒体应用中的音视频数据缓冲。

线程协作

管程支持线程间的协作,使得多个线程可以按照某种逻辑顺序执行,相互之间通过管程进行通信。

  • 读写锁实现:允许多个读线程同时访问资源,写线程独占访问。
  • 信号量管理:使用管程来实现信号量的功能,控制对某些资源的并发访问数量。
  • 多线程游戏:比如在线游戏中,同步多个玩家的动作。

应用示例

生产者-消费者模型

描述

“生产者-消费者模型”是多线程同步中的经典应用,涉及两个类型的线程:

  • 生产者(Producer):生成数据并将其放入共享缓冲区。
  • 消费者(Consumer):从共享缓冲区中取出数据并进行处理。

目标是确保生产者和消费者在访问共享缓冲区时不会发生数据竞争,避免缓冲区溢出或空缺。

思路

  • 互斥访问:通过 synchronized 关键字确保同一时刻只有一个线程访问缓冲区。
  • 条件同步:使用 wait()notifyAll() 方法协调生产者和消费者的等待与唤醒。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class ProducerConsumerExample {
private final Queue<Integer> buffer = new LinkedList<>();
private final int capacity = 5;

// 生产者方法
public void produce(int value) throws InterruptedException {
synchronized (buffer) {
// 当缓冲区满时,生产者等待
while (buffer.size() == capacity) {
buffer.wait();
}
buffer.add(value);
System.out.println("Produced: " + value);
// 唤醒所有等待的线程(消费者)
buffer.notifyAll();
}
}

// 消费者方法
public int consume() throws InterruptedException {
synchronized (buffer) {
// 当缓冲区为空时,消费者等待
while (buffer.isEmpty()) {
buffer.wait();
}
int value = buffer.poll();
System.out.println("Consumed: " + value);
// 唤醒所有等待的线程(生产者)
buffer.notifyAll();
return value;
}
}

public static void main(String[] args) throws InterruptedException {
ProducerConsumerExample pc = new ProducerConsumerExample();

// 生产者线程
Thread producer = new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
pc.produce(i);
// 模拟生产时间
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

// 消费者线程
Thread consumer = new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
pc.consume();
// 模拟消费时间
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

producer.start();
consumer.start();

producer.join();
consumer.join();

System.out.println("Final buffer size: " + pc.buffer.size());
}
}

代码解析

  1. 共享缓冲区:使用 Queue<Integer> 作为生产者和消费者共享的缓冲区,容量为5。
  2. **生产者方法 (produce)**:
    • 使用 synchronized 块锁定缓冲区对象,确保互斥访问。
    • 当缓冲区满时,调用 buffer.wait() 使生产者线程进入等待状态。
    • 生产一个值后,将其添加到缓冲区,并调用 buffer.notifyAll() 唤醒所有等待的线程(主要是消费者)。
  3. **消费者方法 (consume)**:
    • 同样使用 synchronized 块锁定缓冲区对象。
    • 当缓冲区为空时,调用 buffer.wait() 使消费者线程进入等待状态。
    • 消费一个值后,从缓冲区中取出,并调用 buffer.notifyAll() 唤醒所有等待的线程(主要是生产者)。
  4. 主方法
    • 创建并启动生产者和消费者线程。
    • 使用 join() 确保主线程等待子线程完成。
    • 最终打印缓冲区的大小,理想情况下应为0。

注意事项

  • **使用 while 而非 if**:在等待条件时,应该使用 while 循环检查条件,以防止虚假唤醒(Spurious Wakeup)。
  • 同步块的锁对象:在此示例中,锁对象为缓冲区 buffer。选择合适的锁对象对于同步至关重要。
  • 避免过度唤醒:尽量使用 notify() 而非 notifyAll(),除非确实需要唤醒所有等待线程,以提高性能。

线程池

描述

线程池(Thread Pool)是一种管理和复用多个线程的机制,旨在减少创建和销毁线程的开销,提高系统性能和资源利用率。Java通过 java.util.concurrent 包提供了强大的线程池实现,但在自定义线程池时,也可以使用管程机制来管理线程和任务队列。

思路

  • 任务队列的管理:生产者(任务提交者)向任务队列添加任务,消费者(线程池中的工作线程)从任务队列中获取并执行任务。
  • 线程的同步与协调:确保任务队列的线程安全,控制任务的添加和获取,管理线程的等待与唤醒。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public class SimpleThreadPool {

private final Queue<Runnable> taskQueue = new LinkedList<>();
private final WorkerThread[] workers;

/**
* 构造方法
*/
public SimpleThreadPool(int poolSize) {
workers = new WorkerThread[poolSize];
// 初始化并启动工作线程
for (int i = 0; i < poolSize; i++) {
workers[i] = new WorkerThread("Worker-" + i);
workers[i].start();
}
}

/**
* 提交任务到线程池
* @param task 任务
*/
public void submit(Runnable task) {
synchronized (taskQueue) {
taskQueue.add(task);
// 唤醒等待的工作线程
taskQueue.notify();
}
}

/**
* 关闭线程池
*/
public void shutdown() {
for (WorkerThread worker : workers) {
worker.interrupt();
}
}

/**
* 工作线程内部类
*/
private class WorkerThread extends Thread {
public WorkerThread(String name) {
super(name);
}

public void run() {
Runnable task;

while (!isInterrupted()) {
synchronized (taskQueue) {
while (taskQueue.isEmpty()) {
try {
// 等待任务
taskQueue.wait();
} catch (InterruptedException e) {
// 接收到中断信号,退出线程
Thread.currentThread().interrupt();
break;
}
}

// 获取任务
task = taskQueue.poll();
}

// 执行任务
if (task != null) {
try {
System.out.println(Thread.currentThread().getName() + " executing task.");
task.run();
} catch (RuntimeException e) {
// 捕获并处理任务执行中的异常
System.err.println("Task execution failed: " + e.getMessage());
}
}
}

System.out.println(Thread.currentThread().getName() + " shutting down.");
}
}

}

class SimpleThreadPoolTestClient {
public static void main(String[] args) throws InterruptedException {
SimpleThreadPool threadPool = new SimpleThreadPool(3);

// 提交10个任务
for (int i = 1; i <= 10; i++) {
final int taskId = i;
threadPool.submit(() -> {
try {
Thread.sleep(500); // 模拟任务执行时间
System.out.println("Task " + taskId + " completed.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}

// 等待所有任务完成
Thread.sleep(6000);
threadPool.shutdown();
}
}

代码解析

  1. 任务队列:使用 Queue<Runnable> 作为任务队列,存储待执行的任务。
  2. 工作线程WorkerThread 类继承自 Thread,负责从任务队列中获取任务并执行。
  3. **提交任务 (submit)**:
    • 使用 synchronized 块锁定任务队列,确保线程安全地添加任务。
    • 调用 taskQueue.notify() 唤醒等待的工作线程。
  4. **工作线程运行 (run)**:
    • 使用 synchronized 块锁定任务队列。
    • 当任务队列为空时,调用 taskQueue.wait() 使线程进入等待状态。
    • 当有任务时,从队列中取出任务并执行。
    • 捕获并处理任务执行中的异常,避免工作线程因异常终止。
  5. **关闭线程池 (shutdown)**:
    • 调用工作线程的 interrupt() 方法,发送中断信号,促使工作线程退出循环并终止。
  6. 主方法
    • 创建一个拥有3个工作线程的线程池。
    • 提交10个模拟任务,每个任务睡眠500毫秒后打印完成信息。
    • 主线程等待6秒以确保所有任务完成,然后关闭线程池。

注意事项

  • 任务队列的同步:所有对任务队列的访问必须在 synchronized 块内进行,确保线程安全。
  • 工作线程的中断处理:在工作线程中捕获 InterruptedException,并正确响应中断信号以优雅地终止线程。
  • 避免死锁:确保锁的获取顺序一致,避免多个锁的嵌套使用导致死锁。
  • 任务异常处理:在工作线程中捕获任务执行中的异常,防止异常导致线程终止。

读写锁

描述

读写锁(Read-Write Lock)是一种同步机制,允许多个线程同时读取共享资源,但在写入时必须排他。即:

  • 读锁(Read Lock):多个线程可以同时持有,只要没有线程持有写锁。
  • 写锁(Write Lock):独占锁,只有一个线程可以持有,且期间不允许其他线程持有读锁或写锁。

这种机制适用于读多写少的场景,能够提高并发性能。

思路

  • 维护读者和写者的计数:跟踪当前持有读锁和写锁的线程数。
  • 协调读写访问:确保写线程在没有读线程时才能写入,读线程在没有写线程时才能读取。
  • 避免饥饿:合理调度读者和写者,防止某一方长时间等待。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
public class ReadWriteLock {

private int readers = 0;
private int writers = 0;
private int writeRequests = 0;

/**
* 获取读锁
*/
public synchronized void lockRead() throws InterruptedException {
while (writers > 0 || writeRequests > 0) {
wait();
}
readers++;
}

/**
* 释放读锁
*/
public synchronized void unlockRead() {
readers--;
notifyAll();
}

/**
* 获取写锁
*/
public synchronized void lockWrite() throws InterruptedException {
writeRequests++;
while (readers > 0 || writers > 0) {
wait();
}
writeRequests--;
writers++;
}

/**
* 释放写锁
*/
public synchronized void unlockWrite() {
writers--;
notifyAll();
}

}

class ReadWriteLockExample {
private final ReadWriteLock rwLock = new ReadWriteLock();
private int sharedData = 0;

// 读操作
public int readData() throws InterruptedException {
rwLock.lockRead();
try {
// 模拟读操作时间
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + " read data: " + sharedData);
return sharedData;
} finally {
rwLock.unlockRead();
}
}

// 写操作
public void writeData(int value) throws InterruptedException {
rwLock.lockWrite();
try {
// 模拟写操作时间
Thread.sleep(150);
sharedData = value;
System.out.println(Thread.currentThread().getName() + " wrote data: " + sharedData);
} finally {
rwLock.unlockWrite();
}
}

public static void main(String[] args) {
ReadWriteLockExample example = new ReadWriteLockExample();

// 读者线程
Runnable reader = () -> {
try {
for (int i = 0; i < 5; i++) {
example.readData();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};

// 写者线程
Runnable writer = () -> {
try {
for (int i = 1; i <= 5; i++) {
example.writeData(i * 10);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};

// 启动多个读者和写者线程
Thread writerThread = new Thread(writer, "Writer-1");
Thread readerThread1 = new Thread(reader, "Reader-1");
Thread readerThread2 = new Thread(reader, "Reader-2");

writerThread.start();
readerThread1.start();
readerThread2.start();

try {
writerThread.join();
readerThread1.join();
readerThread2.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

System.out.println("Final shared data: " + example.sharedData);
}
}

代码解析

  1. **读写锁 (ReadWriteLock 类)**:
    • 变量
      • readers:当前持有读锁的线程数。
      • writers:当前持有写锁的线程数(0或1)。
      • writeRequests:等待获取写锁的线程数。
    • 方法
      • **lockRead()**:获取读锁。若有写线程持有锁或有写线程等待,则读线程进入等待。
      • **unlockRead()**:释放读锁,并通知所有等待线程。
      • **lockWrite()**:获取写锁。增加写请求计数,若有读锁或写锁被持有,则写线程进入等待。
      • **unlockWrite()**:释放写锁,并通知所有等待线程。
  2. **共享资源 (sharedData)**:一个整型变量,由多个读者线程读取和写者线程修改。
  3. **读者方法 (readData)**:
    • 调用 rwLock.lockRead() 获取读锁。
    • 读取 sharedData 并打印。
    • 调用 rwLock.unlockRead() 释放读锁。
  4. **写者方法 (writeData)**:
    • 调用 rwLock.lockWrite() 获取写锁。
    • 修改 sharedData 并打印。
    • 调用 rwLock.unlockWrite() 释放写锁。
  5. 主方法
    • 创建并启动一个写者线程和两个读者线程。
    • 使用 join() 确保主线程等待子线程完成。
    • 最终打印共享数据的值。

注意事项

  • 公平性:上述实现未考虑公平性,即无法保证写线程不会长期被读线程饥饿。可通过引入队列或其他机制实现公平性。
  • 性能优化:使用 java.util.concurrent.locks.ReentrantReadWriteLock 提供了更高效和功能丰富的读写锁实现,建议在实际项目中优先使用标准库提供的锁。
  • 避免死锁:确保锁的获取和释放顺序一致,避免多锁嵌套引发死锁。
  • 锁的粒度:合理设计锁的范围,避免过度锁定导致性能瓶颈。

常见问题

死锁(Deadlock)

概念

死锁是指两个或多个线程由于互相等待对方持有的资源而陷入无限等待的状态,导致这些线程无法继续执行。死锁通常发生在以下四个必要条件同时满足时:

  1. 互斥条件(Mutual Exclusion):至少有一个资源是非共享的,即同一时间只能被一个线程占用。
  2. 持有并等待(Hold and Wait):一个线程持有至少一个资源,并且正在等待获取其他被其他线程持有的资源。
  3. 非抢占条件(No Preemption):已经分配给线程的资源,不能被强行夺取,必须由线程自行释放。
  4. 循环等待(Circular Wait):存在一种线程等待资源的环形链,每个线程都在等待下一个线程持有的资源。

预防

  1. 破坏互斥条件:尽量将资源设计为可共享的,允许多个线程同时访问。

  2. 破坏持有并等待条件:要求线程在请求资源时,先释放它已持有的所有资源,或者一次性请求所有需要的资源。

    1
    2
    3
    4
    5
    6
    // 示例:一次性请求所有资源
    synchronized (resource1) {
    synchronized (resource2) {
    // 操作
    }
    }
  3. 破坏非抢占条件:如果线程已经持有某些资源且请求新的资源被拒绝,则强制释放当前持有的资源。虽然 Java 中无法直接强制抢占锁,但可以设计可抢占的资源管理策略。

  4. 破坏循环等待条件:为所有资源按一定顺序编号,并要求线程按编号顺序请求资源,避免形成循环等待。

    1
    2
    3
    4
    5
    6
    // 资源按顺序编号
    synchronized (resource1) {
    synchronized (resource2) {
    // 操作
    }
    }

检测

在 Java 中,死锁的检测可以借助 ThreadMXBean 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class DeadlockDetector {
public static void main(String[] args) {
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
long[] threadIds = bean.findDeadlockedThreads();

if (threadIds != null) {
ThreadInfo[] infos = bean.getThreadInfo(threadIds);
System.out.println("发现死锁线程:");
for (ThreadInfo info : infos) {
System.out.println(info.getThreadName());
}
} else {
System.out.println("未发现死锁。");
}
}
}

哲学家就餐问题

五位哲学家围坐在一张圆形餐桌旁,做以下两件事情之一:吃饭、思考。他们在吃东西的时候停止思考,思考的时候也停止吃东西。餐桌上有五碗意大利面,每位哲学家之间各有一只餐叉,这里哲学家必须用两只餐叉吃东西,且只能使用自己左右手边的那两只餐叉。

这个问题不考虑意大利面有多少,也不考虑哲学家的胃有多大。假设两者都是无限大。

问题在于如何设计一套规则,使得在哲学家们在完全不交谈,也就是无法知道其他人可能在什么时候要吃饭或者思考的情况下,可以在这两种状态下永远交替下去。

可制定的规则如下:

  1. 资源分配的顺序策略:让编号为奇数的哲学家先拿起左边的叉子,再拿起右边的叉子。通过规定哲学家获取叉子的顺序,可以打破循环等待条件,从而避免死锁。
  2. 限制同时就餐的哲学家数量:最多 N-1 个哲学家同时就餐(N 是哲学家的总数),确保至少有一只叉子是空闲的,避免所有哲学家同时拿起一只叉子而导致死锁。
  3. 未能拿到两只叉子时释放已拿到的叉子:如果无法同时拿到两只叉子,就释放已拿到的叉子,即通过避免持有单只叉子而等待另一只叉子,打破了死锁的循环等待条件。
  4. 使用仲裁者(Arbitrator):引入一个仲裁者(如一个专门的管理线程)来管理叉子的分配,哲学家在需要叉子时,向仲裁者请求,当仲裁者认为可以安全分配时,才允许哲学家拿起叉子。

活锁(Livelock)

概念

活锁是指两个或多个线程不断改变自身状态以响应彼此的动作,但实际上没有任何线程能够继续执行有用的工作。与死锁不同,活锁中的线程不会被永久阻塞,它们仍然处于活动状态,但无法完成任务。

预防

  1. 限制重试次数:为线程的重试操作设置最大次数,超过后放弃或采取其他措施。

    1
    2
    3
    4
    5
    6
    7
    int retries = 0;
    while (retries < MAX_RETRIES) {
    if (tryAcquire()) {
    break;
    }
    retries++;
    }
  2. 优先级调整:调整线程优先级,确保某些线程具有获取资源的优势。

  3. 设计更好的协议:确保线程在资源获取时遵循一致的协议,避免无限循环的资源释放与重试。

检测

活锁的检测较为困难,因为线程仍然在运行,不像死锁那样完全停止。通常通过监控线程的状态和行为来识别活锁:

  1. 监控线程活动:监控线程的执行频率和状态,识别是否存在频繁的资源获取和释放。
  2. 日志分析:通过日志记录线程的操作步骤,分析是否存在循环的资源操作。

饥饿(Starvation)

概念

饥饿是指某些线程长期无法获取所需资源,导致它们无法执行。这通常是由于其他高优先级线程持续占用资源,或由于资源分配策略不公平,某些线程得不到足够的执行机会。

预防

  1. 使用公平锁:采用公平锁(如 ReentrantLock 的公平性选项),确保锁的获取按照线程请求的顺序进行。

    1
    ReentrantLock lock = new ReentrantLock(true); // 公平锁
  2. 合理设置线程优先级:避免过高或过低的线程优先级设置,确保所有线程都有合理的执行机会。

  3. 避免长时间持有锁:尽量减少锁的持有时间,避免高优先级线程长时间占用资源。

  4. 任务分配均衡:通过合理的任务分配策略,确保所有线程都能获得执行机会。

检测

饥饿的检测通常依赖于监控系统的行为和性能指标:

  1. 线程执行时间监控:监控线程的执行时间,识别某些线程是否长时间处于等待状态。
  2. 性能分析工具:使用性能分析工具(如 Java VisualVMJProfiler)检测线程的状态和资源获取情况。
  3. 日志和指标:通过日志记录和指标监控,识别线程的等待和执行模式,发现饥饿现象。

虚假唤醒(Spurious Wakeup)

概念

虚假唤醒指的是线程在没有收到明确的通知(notify()notifyAll())情况下,被唤醒的现象。这可能导致线程在错误的时间继续执行,导致程序逻辑错误。

预防

使用循环检查条件:在调用 wait() 后,使用 while 循环检查条件,而不是 if 语句,确保线程在条件真正满足时才继续执行。

1
2
3
4
5
6
synchronized (lock) {
while (!condition) {
lock.wait();
}
// 执行操作
}

线上死锁处理

定位

代码检测

代码中添加检测死锁程序,检测到以后输出死锁位置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class DeadLockTestClient {
public static void main(String[] args) {
// 检测死锁的线程
Thread detector = new Thread(() -> {
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
while (true) {
long[] deadlockedThreadIds = bean.findDeadlockedThreads();
if (deadlockedThreadIds != null) {
ThreadInfo[] infos = bean.getThreadInfo(deadlockedThreadIds);
System.out.println("检测到死锁线程:");
for (ThreadInfo info : infos) {
System.out.println(info.getThreadId() + " " + info.getThreadName() + " " + info.getLockName() + " " + info.getBlockedTime());
}
}
try {
Thread.sleep(5000); // 每5秒检测一次
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
// 设置为守护线程
detector.setDaemon(true);
detector.start();

// 死锁代码
Object lock1 = new Object();
Object lock2 = new Object();
Thread t1 = new Thread(() -> {
synchronized (lock1) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
synchronized (lock2) {
System.out.println("t1持有lock1,尝试获取lock2");
}
}
});
Thread t2 = new Thread(() -> {
synchronized (lock2) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
synchronized (lock1) {
System.out.println("t2持有lock2,尝试获取lock1");
}
}
});

t1.start();
t2.start();
}
}

以上代码运行后,会打印出死锁信息,如

1
2
3
检测到死锁线程:
22 Thread-2 java.lang.Object@6c1553f -1
21 Thread-1 java.lang.Object@806ab97 -1

软件检测

使用 Java VisualVMJConsole 连接 JVM 实例检测死锁。

JConsole 连接到 JVM 实例后,在线程页检测死锁:

JConsole-线程-死锁

分析堆栈跟踪

先通过 jps 获取 Java 进程的 PID

1
2
3
4
5
C:\Users\yangtao>jps -l
25136 space.yangtao.monitor.deadlock.DeadLockTestClient
16164 org.jetbrains.jps.cmdline.Launcher
11352 sun.tools.jps.Jps
12728 com.intellij.idea.Main

使用 jstack 生成线程堆栈跟踪并输出到指定的文件中

1
C:\Users\yangtao>jstack 25136 > t25136.txt

在生成的文件中如果有如下死锁检测报告,则说明进程中含有死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Found one Java-level deadlock:
=============================
"Thread-1":
waiting to lock monitor 0x00000237f914ec08 (object 0x000000066c17ca60, a java.lang.Object),
which is held by "Thread-0"
"Thread-0":
waiting to lock monitor 0x00000237fcd6d108 (object 0x000000066c17ca70, a java.lang.Object),
which is held by "Thread-1"

Java stack information for the threads listed above:
===================================================
"Thread-1":
at space.yangtao.monitor.deadlock.DeadLockTestClient.lambda$main$1(DeadLockTestClient.java:64)
- waiting to lock <0x000000066c17ca60> (a java.lang.Object)
- locked <0x000000066c17ca70> (a java.lang.Object)
at space.yangtao.monitor.deadlock.DeadLockTestClient$$Lambda$2/1480010240.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
"Thread-0":
at space.yangtao.monitor.deadlock.DeadLockTestClient.lambda$main$0(DeadLockTestClient.java:51)
- waiting to lock <0x000000066c17ca70> (a java.lang.Object)
- locked <0x000000066c17ca60> (a java.lang.Object)
at space.yangtao.monitor.deadlock.DeadLockTestClient$$Lambda$1/999966131.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock.

应对

在 Java 中,没有提供一个安全且直接的方式来“杀死”一个正在运行的线程,因为这样做通常会引起一系列严重的问题,如状态不一致、资源泄露等。Java 的设计哲学强调了在多线程环境下保持程序的健壮性和安全性,因此提倡使用合作机制来管理线程的生命周期,而不是强制终止线程。

如果问题严重到需要强制结束导致死锁的 Java 进程,可以通过杀死整个 Java 进程来间接“杀死”内部的所有线程,包括那些死锁的线程。这通常是在其他所有解决方案失败后的最后手段,因为这样做可能会导致数据丢失或其他副作用。

Windows

  • 通过任务管理器结束进程

  • 使用 taskkill 命令

    1
    taskkill /F /PID <pid_number>

    1
    2
    C:\Users\yangtao>taskkill /F /PID 25136
    成功: 已终止 PID 为 25136 的进程。

Linux / macOS

  • 使用 kill 命令

    1
    kill -9 <pid_number>

    1
    kill -9 25136

总结

管程在 Java 并发编程中扮演了“内置锁与条件变量统一体”的角色,为开发者提供了高层次的锁管理与线程协调机制。通过 synchronizedwait()notify()notifyAll() 等基础手段,Java 屏蔽了底层操作系统的锁细节,使得多线程对共享资源的访问更具可读性与安全性。在实际开发中,我们常以管程思想为基础来解决各种并发场景:从生产者-消费者队列管理到线程池线程的调度,再到读写锁的并发读写优化。此外,管程也并非万能,死锁、活锁、饥饿、假唤醒等问题仍需根据具体场景在设计层面或实现层面予以规避和处理。要想写出高质量、高并发又稳定的 Java 程序,需要在理解管程原理的基础上,结合更高级的并发工具(ReentrantLockConditionReentrantReadWriteLock 等)和良好的设计策略,从而最大化地利用多线程优势并保证程序正确性。