Java并发编程——无锁

本文最后更新于:4 年前

引言

在高并发应用中,传统的锁(synchronizedReentrantLock 等)容易成为性能瓶颈,不仅会造成线程阻塞,也可能引入死锁、优先级反转等问题。无锁编程正是针对这一痛点而生,通过底层的原子操作(尤其是 CAS)实现线程安全,尽可能减少阻塞和上下文切换,使系统在多核环境下保持较高的可扩展性和吞吐量。

概述

定义

无锁编程是一种在多线程环境中实现线程安全的方法,它通过原子操作来保证数据一致性,而不依赖传统的锁机制(如 ReentrantLocksynchronized)。

特点

  • 高效性:避免了线程因锁竞争而进入阻塞状态,减少上下文切换的开销。
  • 无锁安全性:无需担心死锁、锁饥饿或优先级反转等传统锁机制中的问题。
  • 乐观并发控制:假设大部分操作不会产生冲突,只有在冲突时才通过 CAS 重试解决。
  • 自旋等待:在发生冲突时,通过自旋重试(而不是阻塞线程)完成操作,这可能导致一定的 CPU 开销。

原子操作

定义

原子操作是指不可分割的操作,在执行过程中不会被中断或干扰。即使在多线程环境中,原子操作也能保证操作的完整性和一致性。

特点

  • 不可分割性:整个操作要么完全执行成功,要么完全失败,不会出现中间状态。
  • 线程安全性:多线程同时执行原子操作时,不需要额外的同步措施,操作本身已经是线程安全的。
  • 硬件支持:通常依赖底层硬件(CPU)指令实现,如 cmpxchg(比较并交换)或 lock 指令。

CAS

定义

CAS(Compare And Swap/Set) 是一种原子操作,用于在多线程环境下进行无锁同步。其核心思想是:

  1. 对某个内存地址(变量)读取当前值 V
  2. 与预期值 A 进行比较。
  3. 如果 VA 相等,则将该内存地址的值更新为 B;否则,不进行更新。

简要流程可表示为:

1
2
3
4
5
6
7
boolean compareAndSet(A, B):
if (value == A) {
value = B;
return true;
} else {
return false;
}

在Java中,AtomicIntegerAtomicLong 等原子类就依赖底层的 CAS 指令来实现无锁并发。

问题引入

100 个线程实现对库存的扣减,使库存从 100 减少到 0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class StockTestClient {

private static int stock = 100;

/**
* 减库存
*/
private void decreaseStock() {
stock--;
}

/**
* 获取库存
*/
private int getStock() {
return stock;
}

public static void main(String[] args) {
StockTestClient stock = new StockTestClient();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Thread t = new Thread(stock::decreaseStock);
threads.add(t);
t.start();
}
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
System.out.println("stock = " + stock.getStock());
}

}

这个程序中,由于缺乏同步机制,因此多个线程同时读取和修改 stock 时,无法保证减库存操作的安全,产生了竞态条件,导致更新操作覆盖或丢失,最终导致输出的结果是不确定的。

synchronized 解决

可以用 synchronized 保证同一时刻只有一个线程可以执行减库存操作,如

1
2
3
private synchronized void decreaseStock() {
stock--;
}

CAS解决

根据前面提到的 CAS 概念,修改程序的库存字段和减库存方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static AtomicInteger stock = new AtomicInteger(100);

/**
* 减库存(无锁方式)
*/
private void decreaseStock() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
while (true) {
int current = stock.get();
if (stock.compareAndSet(current, current - 1)) {
break;
}
}
// 等价于 stock.getAndDecrement();
}

/**
* 获取库存
*/
private int getStock() {
return stock.get();
}

代码解析

  1. AtomicInteger 的使用:stock 被声明为 AtomicInteger,它内部维护了一个 volatile 整型变量,确保内存可见性。
  2. 在一个循环体中,获取当前库存,使用 AtomicIntegercompareAndSet 方法保证自减操作的原子性,将库存值减 1,成功则退出循环,否则继续。

ABA 问题

ABA 问题,即在最终比较前变量从 A 变为 B 再变回 A,最后比较的时候结果是一致的,但中间某个时间的比较可能是不一致的,CAS 无法检测到这个过程。

为了更好地解决 CAS 可能带来的 ABA 问题,可以使用 AtomicStampedReferenceAtomicMarkableReference,通过引入版本号或标记来增强 CAS 操作的可靠性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static AtomicStampedReference<Integer> stock = new AtomicStampedReference<>(100, 0);

/**
* 减库存(无锁,解决ABA问题)
*/
private void decreaseStock() {
int[] stampHolder = new int[1];
while (true) {
Integer currentStock = stock.get(stampHolder);
int currentStamp = stampHolder[0];
if (currentStock <= 0) {
System.out.println("库存不足!");
break;
}
boolean updated = stock.compareAndSet(currentStock, currentStock - 1, currentStamp, currentStamp + 1);
if (updated) {
break;
}
}
}

/**
* 获取库存
*/
private int getStock() {
return stock.getReference();
}

代码解析

  1. AtomicStampedReference:除了存储引用,还维护了一个版本号(stamp),在 CAS 操作时,不仅比较当前值,还比较版本号,避免 ABA 问题。
  2. compareAndSet 方法:只有在值和版本号都匹配时,才会进行更新操作。

无锁类

Atomic 类

简介

Java 的 java.util.concurrent.atomic 包提供了一系列的原子类(Atomic Classes),用于执行原子性操作。这些类基于底层的 CAS(Compare-And-Swap) 操作,提供了一种高效的无锁编程方式,避免了显式的同步和锁机制。

常用API

AtomicInteger
  • **int get()**:获取当前值。
  • **void set(int newValue)**:设置新值。
  • **int getAndIncrement()**:获取当前值并自增。
  • **int incrementAndGet()**:自增后获取新值。
  • **boolean compareAndSet(int expect, int update)**:如果当前值等于预期值,则更新为新值。
AtomicLong
  • 类似于 AtomicInteger,用于 long 类型。
AtomicReference<V>
  • **V get()**:获取当前引用。
  • **void set(V newValue)**:设置新引用。
  • **boolean compareAndSet(V expect, V update)**:如果当前引用等于预期值,则更新为新引用。
AtomicStampedReference<V>

可解决 ABA 问题,通过附加的版本号进行比较和更新。

  • **V getReference()**:获取当前引用。
  • **int getStamp()**:获取当前版本号。
  • **boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)**:比较并设置新值及版本号。

实现原理

Atomic 类通过 Unsafe 类提供的底层 CAS 操作实现原子性。每个 Atomic 类内部维护一个 volatile 变量,并利用 CAS 操作确保在多线程环境下对该变量的更新是安全的。例如,AtomicInteger 内部使用 UnsafecompareAndSwapInt 方法来实现 compareAndSet 操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class AtomicInteger extends Number implements Serializable {
private volatile int value;
private static final Unsafe unsafe;
private static final long valueOffset;

static {
try {
unsafe = Unsafe.getUnsafe();
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

// 其他方法省略
}

无锁队列

简介

无锁队列是一种线程安全的队列实现,允许多个线程并发地插入和移除元素,而无需使用锁。Java 标准库中的 ConcurrentLinkedQueue 就是一个典型的无锁队列。

常用API

  • **boolean add(E e)**:将指定元素插入队列的尾部。
  • **boolean remove(Object o)**:删除队列中第一个匹配的元素。
  • **boolean offer(E e)**:将指定元素插入队列的尾部。
  • **E poll()**:移除并返回队列头部的元素,队列为空时返回 null
  • **E peek()**:返回队列头部的元素,但不移除它,队列为空时返回 null
  • **boolean isEmpty()**:判断队列是否为空。
  • **int size()**:返回队列的元素个数(非常量时间操作)。
  • **boolean contains(Object o)**:判断链表中是否存在指定元素。
  • **Iterator<E> iterator()**:返回链表的迭代器。

实现原理

ConcurrentLinkedQueue 基于 Michael & Scott Queue 算法实现,这是一个典型的无锁队列算法。该算法使用单向链表和 CAS 操作来维护队列的头部和尾部指针。

  • 节点结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item) {
    this.item = item;
    this.next = null;
    }
    }
  • 插入(Enqueue)

    1. 创建一个新节点。
    2. 通过 CAS 操作将当前尾节点的 next 指针指向新节点。
    3. 更新尾指针指向新节点。
  • 移除(Dequeue)

    1. 读取当前头节点和其下一个节点。
    2. 通过 CAS 操作更新头指针指向下一个节点。
    3. 返回下一个节点的元素。

这种设计确保了多个线程可以同时进行插入和移除操作,而不需要锁。

无锁双端队列

简介

无锁双端队列是一种线程安全的双端队列实现,允许多个线程并发地插入和移除元素,而无需使用锁,可用于实现队列和栈结构。Java标准库中的 ConcurrentLinkedDeque 就是一个典型的无锁双端队列。

常用API

  • **void addFirst(E e)**:将指定元素插入队列头部。
  • **void addLast(E e)**:将指定元素插入队列尾部。
  • **E removeFirst()**:移除并返回队列头部的元素,队列为空时返回 null。
  • **E removeLast()**:移除并返回队列尾部的元素,队列为空时返回 null。
  • **boolean offerFirst()**:将指定元素插入队列的头部。
  • **boolean offerLast()**:将指定元素插入队列的尾部。
  • **E pollFirst()**:移除队并返回队列头部的元素,队列为空时返回 null。
  • **E pollLast()**:移除队并返回队列尾部的元素,队列为空时返回 null。
  • **E peekFirst()**:返回队列头部的元素,但不移除它,队列为空时返回 null。
  • **E peekLast()**:返回队列尾部的元素,但不移除它,队列为空时返回 null。
  • 无锁队列的所有 API。

实现原理

ConcurrentLinkedDeque 基于双向链表和 CAS 操作实现,允许高效的并发插入和移除操作。其核心思想如下:

  • 节点结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    private static class Node<E> {
    volatile E item;
    volatile Node<E> prev;
    volatile Node<E> next;

    Node(E item) {
    this.item = item;
    }
    }
  • 双端操作

    • 插入(AddFirst/AddLast)
      1. 创建一个新节点。
      2. 通过CAS操作将新节点链接到队列的头部或尾部。
      3. 更新头指针或尾指针指向新节点。
    • 移除(PollFirst/PollLast)
      1. 读取当前头节点或尾节点。
      2. 通过CAS操作更新头指针或尾指针指向下一个或上一个节点。
      3. 返回被移除节点的元素。
  • 无锁设计

    • 通过原子操作确保多个线程可以安全地并发修改队列的两端,而不需要锁。
    • 使用 标记删除 技术处理节点的逻辑删除,避免竞态条件。

无锁哈希表

简介

ConcurrentHashMap 是 Java 标准库中用于高并发场景下的线程安全哈希表实现。虽然 ConcurrentHashMap 采用了“分段锁”(在 JDK 7 之前)或“ CAS + 局部同步”相结合(在 JDK 8 及以后)的方式、并非完全无锁,但它但在大部分读操作时无锁,对写操作采用细粒度或局部锁,并行度依然较高。

常用API

  • **V put(K key, V value)**:将指定键值对插入映射,如果键已存在则更新其值。
  • **V get(Object key)**:获取指定键对应的值。
  • **V remove(Object key)**:移除指定键对应的键值对。
  • **boolean containsKey(Object key)**:判断映射中是否存在指定键。
  • **Set<K> keySet()**:返回映射中的所有键。
  • **V putIfAbsent(K key, V value)**:只有在键不存在时才插入键值对。

实现原理

JDK 8 及以后,ConcurrentHashMap 采用了基于节点的锁分离和 CAS 操作 的设计,具体包括:

  • 节点类型
    • 普通节点(TreeNode):用于处理哈希冲突,基于红黑树实现高效的查找。
    • ForwardingNode:用于扩容过程中指向新表。
  • 插入和更新
    • 使用 synchronizedCAS 操作对桶中的链表或红黑树进行修改,确保线程安全。
  • 扩容
    • 当负载因子达到阈值时,进行表的扩容。使用 synchronized 保证扩容过程中的线程安全。
  • 无锁读操作
    • 大多数读操作(如 get)不需要锁,通过 volatile 关键字保证内存可见性。

无锁集合

简介

ConcurrentSkipListSet 是 Java 标准库中基于跳表(Skip List)实现的线程安全且有序的集合(Set)类。跳表是一种多层链表结构,支持高效的查找、插入和删除操作。ConcurrentSkipListSet 通过无锁算法实现高并发下的操作,并且保证元素的自然顺序或使用指定的比较器顺序。

常用API

  • **boolean add(E e)**:向集合中添加指定元素,如果元素已存在则不添加。
  • **boolean remove(Object o)**:移除集合中指定的元素。
  • **boolean contains(Object o)**:判断集合中是否存在指定元素。
  • **Iterator<E> iterator()**:返回集合的迭代器,支持弱一致性(weakly consistent)。
  • **int size()**:返回集合中元素的数量。
  • **boolean isEmpty()**:判断集合是否为空。
  • **void clear()**:移除集合中的所有元素。
  • **E first()**:返回集合中第一个(最小)元素。
  • **E last()**:返回集合中最后一个(最大)元素。

实现原理

ConcurrentSkipListSet 基于 ConcurrentSkipListMap 实现,其底层结构为跳表。跳表通过多层链表的结构实现高效的有序存取,具体原理如下:

  • 节点结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private static class Node<E> {
    final E key;
    final int topLevel;
    final AtomicReferenceArray<Node<E>> next;

    Node(E key, int height) {
    this.key = key;
    this.topLevel = height;
    this.next = new AtomicReferenceArray<>(height + 1);
    }
    }
  • 跳表结构

    • 多层链表:每个节点在不同的层上有不同数量的指针,允许快速跳跃式搜索。
    • 随机层级:新节点的层级通过随机算法决定,保证跳表的平均性能。
  • 并发操作

    • 插入(Add)
      1. 在每一层找到插入位置。
      2. 使用CAS操作插入新节点的指针,确保线程安全。
    • 移除(Remove)
      1. 在每一层找到要移除的节点。
      2. 使用CAS操作更新前驱节点的指针,逻辑删除节点。
    • 查找(Contains)
      1. 从最高层开始,逐层向下搜索。
      2. 使用无锁的方式遍历链表,确保高效的并发访问。

应用场景

无锁编程适用于多种高并发和性能敏感的场景,尤其在以下领域表现突出:

高并发计数器与统计

应用示例

  • 网站访问计数器:在高流量的网站中,记录页面访问次数需要高效的原子操作。
  • 实时统计系统:如在线游戏中的实时玩家统计、金融系统中的交易计数等。

无锁优势

  • 高效的原子增减操作,如使用 AtomicIntegerLongAdder,避免了锁的竞争,提升吞吐量。

生产者-消费者模型

应用示例

  • 消息队列:如 KafkaRabbitMQ 中的内部消息存储。
  • 任务调度系统:线程池中的任务队列。

无锁优势

  • 使用无锁队列(如 ConcurrentLinkedQueueConcurrentLinkedDeque),支持高效的并发插入和移除操作,减少锁带来的阻塞和上下文切换开销。

实时系统与低延迟应用

应用示例

  • 高频交易系统:金融交易系统对延迟极其敏感,需要快速响应。
  • 实时数据处理:如流处理框架中的数据管道。

无锁优势

  • 无锁数据结构和算法能够提供更低的延迟和更高的吞吐量,满足实时性要求。

高性能缓存系统

应用示例

  • 分布式缓存:如 RedisMemcached 内部的数据存储。
  • 本地缓存:如 Guava CacheCaffeine 中的高速缓存实现。

无锁优势

  • 无锁哈希表(如 ConcurrentHashMap)支持高效的并发访问和更新,减少缓存命中和写入的延迟。

多核并行计算

应用示例

  • 并行算法:如并行排序、并行图算法。
  • 大规模数据处理:如 MapReduceSpark 中的数据并行处理。

无锁优势

  • 通过无锁数据结构和原子操作,充分利用多核处理器的并行能力,提升计算性能。

分布式系统中的共享数据结构

应用示例

  • 分布式锁服务:如 ZookeeperEtcd 中的分布式锁实现。
  • 共享状态管理:如分布式缓存、一致性哈希环等。

无锁优势

  • 无锁算法在分布式环境中可以减少锁的争用和网络延迟,提高系统的可扩展性和可靠性。

性能分析

无锁编程通过避免传统锁机制,能够在特定场景下显著提升性能。然而,其实际效果受多种因素影响,以下是详细的性能分析:

优势

1. 避免锁的阻塞和上下文切换

  • 传统锁:在高并发场景下,多个线程竞争同一个锁,导致部分线程被阻塞,频繁的上下文切换增加了系统开销。
  • 无锁机制:通过原子操作(如 CAS)实现同步,避免了线程阻塞和上下文切换,提升并发性能。

2. 提高吞吐量

  • 无锁数据结构:如无锁队列、无锁栈,允许多个线程同时进行插入和移除操作,显著提升吞吐量。

3. 减少死锁和优先级反转的风险

  • 传统锁:可能导致死锁、锁饥饿和优先级反转等问题,影响系统稳定性。
  • 无锁机制:避免了显式锁的使用,自然规避了这些问题,提升系统可靠性。

劣势

1. CAS操作的重试开销

  • 高冲突场景:在竞争激烈的情况下,CAS 操作可能频繁失败并重试,导致大量自旋,消耗 CPU 资源,影响性能。
  • 自旋消耗:无锁操作依赖于自旋等待,长时间的自旋会浪费大量 CPU 时间。

2. ABA问题带来的复杂性

  • ABA问题解决方案复杂CAS 带来的 ABA 问题需要使用带版本号的引用(如 AtomicStampedReference),增加了实现的复杂性。

3. 只能针对单一变量的原子操作

  • 多变量更新:无锁编程通常只能保证单一变量的原子性,对于需要同时更新多个变量的操作,需要更复杂的设计,如使用锁或其他同步机制。

3. 可维护性和可读性较差

  • 代码复杂度:无锁算法逻辑相对复杂,代码难以理解和维护,增加了开发和调试的难度。

性能对比

以下通过简单的实验对比无锁和传统锁在高并发场景下的性能表现。

实验场景

  • 100 个线程同时对同一个计数器进行自增操作。

传统锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class LockCounter {
private int count = 0;

public synchronized void increment() {
// 模拟耗时操作
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
count++;
}

public synchronized int getCount() {
return count;
}

public static void main(String[] args) throws InterruptedException {
LockCounter counter = new LockCounter();
List<Thread> threads = new ArrayList<>();
long start = System.nanoTime();
for (int i = 0; i < 1000; i++) {
Thread t = new Thread(() -> counter.increment());
threads.add(t);
t.start();
}
for (Thread t : threads) {
t.join();
}
long end = System.nanoTime();
System.out.println("Final count (Lock): " + counter.getCount());
System.out.println("Time taken: " + (end - start) / 1_000_000 + " ms");
}
}

无锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class AtomicCounter {
private AtomicInteger count = new AtomicInteger(0);

public void increment() {
// 模拟耗时操作
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
count.incrementAndGet();
}

public int getCount() {
return count.get();
}

public static void main(String[] args) throws InterruptedException {
AtomicCounter counter = new AtomicCounter();
List<Thread> threads = new ArrayList<>();
long start = System.nanoTime();
for (int i = 0; i < 1000; i++) {
Thread t = new Thread(() -> counter.increment());
threads.add(t);
t.start();
}
for (Thread t : threads) {
t.join();
}
long end = System.nanoTime();
System.out.println("Final count (Atomic): " + counter.getCount());
System.out.println("Time taken: " + (end - start) / 1_000_000 + " ms");
}
}

实验结果

1
2
3
4
5
Final count (Lock): 1000
Time taken: 1444 ms

Final count (Atomic): 1000
Time taken: 91 ms
  • 传统锁:较高的时间消耗,随着线程数量的增加,时间显著上升。
  • 无锁(Atomic):较低的时间消耗,在高并发下表现更优。

注意:实际性能还受具体硬件、JVM 实现和其他环境因素影响,但在大多数情况下,无锁编程在高并发场景下具有显著的性能优势。

与传统锁比较

无锁编程和传统锁(如 synchronizedReentrantLock)在并发控制上各有优缺点。以下从多个维度进行对比:

无锁编程(Lock-Free) 传统锁(Locking)
并发模型 乐观并发,假设大多数情况下不会发生冲突 悲观并发,假设资源竞争频繁
阻塞与非阻塞 非阻塞,线程不会因等待锁而被阻塞 阻塞,线程可能因等待锁而进入阻塞状态
死锁风险 无死锁风险 存在死锁、锁饥饿、优先级反转等问题
性能表现 高并发下性能优异,避免上下文切换和锁竞争带来的开销 在低并发下性能较好,但高并发下性能可能下降
实现复杂度 高,实现和调试复杂,需要处理 ABA 问题,多变量的更新实现更为复杂 低,实现简单,易于理解和维护
适用场景 高并发、低延迟、无需要跨多个变量的原子操作 需要强一致性、多步骤操作或对复杂数据结构的同步
可扩展性 良好,适用于多核处理器和大规模并发环境 限制较多,锁的粒度和数量可能影响可扩展性
公平性 不保证线程获取资源的顺序 可以通过锁的策略(如公平锁)保证一定的公平性
内存消耗 通常较低,只需维护少量原子变量和版本号 可能较高,尤其是复杂锁机制或大量锁对象的情况下

总结

无锁编程与传统锁机制相比,具有非阻塞、高并发、无死锁等重要优势。在 Java 中,这种思路体现在 Atomic 原子类(AtomicIntegerAtomicReference 等)、无锁队列(ConcurrentLinkedQueueConcurrentLinkedDeque)、部分无锁哈希结构(ConcurrentHashMap)以及基于跳表的有序集合(ConcurrentSkipListSet)等多种实用组件中。

当然,无锁并不适合所有场景:对多变量的原子更新操作或复杂的同步逻辑,往往仍需使用更重量级的锁或其他同步工具。总体而言,是否引入无锁编程需要结合业务逻辑、并发冲突频率及对系统延迟的要求来综合考量,才能在性能与实现难度之间取得平衡。