0%

ConcurrentLinkedQueue

ConcurrentLinkedQueue源码深度解析:高性能无锁队列的实现原理

ConcurrentLinkedQueue 是 Java 并发包(JUC)中用于高并发场景的无锁队列,基于单向链表结构实现。它通过 CAS(Compare-And-Swap)操作替代传统的锁机制,在保证线程安全的同时显著提升了并发性能。本文将从源码角度深入剖析其设计思想、核心实现及性能优化策略。

核心数据结构与初始化

底层结构:单向链表节点

ConcurrentLinkedQueue 使用内部类 Node 表示链表节点,每个节点包含数据域 item 和指针域 next

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private static class Node<E> {  
volatile E item; // 节点存储的数据(volatile 保证可见性)
volatile Node<E> next; // 指向下一个节点的引用(volatile 保证可见性)

Node(E item) {
UNSAFE.putObject(this, itemOffset, item); // 使用 Unsafe 直接写入内存
}

// CAS 操作:更新 item 域
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

// 延迟设置 next 域(不保证立即对其他线程可见,减少内存屏障开销)
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}

// CAS 操作:更新 next 域
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

// Unsafe 机制初始化(通过反射获取内存偏移量)
private static final sun.misc.Unsafe UNSAFE;
// 偏移量
private static final long itemOffset;
// 下一个元素的偏移量
private static final long nextOffset;

static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

关键点

  • volatile 修饰:确保多线程间的内存可见性,避免指令重排序;
  • CAS 操作:通过 Unsafe 类的原子操作保证线程安全,避免使用锁;
  • lazySetNext:延迟写操作,减少内存屏障,提升性能(适用于非关键路径)。

队列初始化

队列创建时,head 和 tail 指向同一个哨兵节点(item 为 null):

1
2
3
4
5
public ConcurrentLinkedQueue() {  
// 默认头节点、尾节点是Node中为null的哨兵节点
// 初始时,head、tail 都指向同一个 item 为 null 的节点
head = tail = new Node<E>(null);
}

初始状态:

1
head → [null] ← tail  

设计意图

  • 哨兵节点简化边界处理,避免 head/tail 为 null 的复杂判断;
  • 初始状态下队列为空,但 head 和 tail 已初始化,减少空指针检查。

核心方法源码解析

入队操作:offer (E e)

将元素添加到队列尾部,通过 CAS 实现无锁并发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public boolean offer(E e) {  
checkNotNull(e); // 不允许 null 元素
final Node<E> newNode = new Node<E>(e);

// 循环直到 CAS 成功(自旋锁)
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;

// 情况1:p 是尾节点,尝试插入新节点
if (q == null) {
// 使用CAS操作设置p节点的next节点,但是没有更新尾节点
// 如果有多线程操作,会导致第一次CAS操作失败,再次执行for循环
if (p.casNext(null, newNode)) {
// 插入成功后,若 p 不是 tail,则更新 tail(减少 CAS 次数)
if (p != t)
// 设置当前尾节点为新插入的节点
casTail(t, newNode); // 允许失败,不影响正确性
return true;
}
// CAS 失败,说明其他线程已修改 p.next,重新循环
}

// 情况2:遇到自引用(表示 head 已被其他线程移除),重置 p
// 多线程操作时,由于poll操作移除元素后可能会把head变成自引用(环形链表),此时head的next节点也是head,所以需要重新找到新的head
else if (p == q)
p = (t != (t = tail)) ? t : head;

// 情况3:p 不是尾节点,继续向后查找尾节点
else
p = (p != t && t != (t = tail)) ? t : q;
}
}

执行流程

  1. 寻找尾节点:从 tail 开始遍历,找到真正的尾节点(next 为 null);
  2. CAS 插入:使用 CAS 将新节点设置为尾节点的 next;
  3. 更新 tail:仅当 tail 落后时才更新(每间隔一次更新一次 tail),减少 CAS 开销。

出队操作:poll ()

移除并返回队列头部元素,同样使用 CAS 实现无锁并发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public E poll() {  
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
// 保存当前节点
E item = p.item;

// 情况1:当前节点有值,CAS 将其设为 null(标记为已删除)
if (item != null && p.casItem(item, null)) {
// 若 p 不是 head,更新 head(每间隔一次更新一次 head)
// CAS操作成功,则标记当前节点从链表中移除
// 只有多线程操作时,使得第一次p!=h时才会设置头节点为p
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}

// 情况2:队列为空(p.next 为 null)
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}

// 情况3:遇到自引用,重置 p
// 多线程同时操作时才会出现该情况,当前节点自引用,需要重新寻找新的队列头节点
else if (p == q)
continue restartFromHead;

// 情况4:继续向后查找有效节点
//多线程操作时,会导致第一次判断时item为null,且此时已经有了新插入的节点了,需要重新指定头节点
else
p = q;
}
}
}

// 更新 head 节点(将 h 设为 p,并将 h.next 指向自身形成自引用)
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h); // 原 head 自引用,帮助 GC
}

执行流程

  1. 从头节点开始遍历:找到第一个有值的节点(item 不为 null);
  2. CAS 删除:使用 CAS 将该节点的 item 设为 null(标记为已删除);
  3. 更新 head:仅当 head 落后时才更新(每间隔一次更新一次 head),减少 CAS 开销;
  4. 自引用处理:将原 head 节点的 next 指向自身,帮助 GC 回收不可达节点。

查看队首元素:peek ()

与 poll () 类似,但不移除元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 与poll方法类似,只是少了cas操作来清空头节点的值
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p); // 确保 head 指向第一个有值的节点或尾节点
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

删除指定元素:remove (Object o)

遍历队列,使用 CAS 删除第一个匹配的元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean remove(Object o) {  
if (o != null) {
Node<E> pred = null; // 前驱节点
for (Node<E> p = first(); p != null; pred = p, p = succ(p)) {
E item = p.item;
if (item != null && o.equals(item) && p.casItem(item, null)) {
// 移除节点(将前驱节点的 next 指向当前节点的 next)
unlink(p, pred);
return true;
}
}
}
return false;
}

// 获取第一个有效节点(item 不为 null)
Node<E> first() {
for (;;) {
Node<E> h = head;
Node<E> p = h;
Node<E> q;
// 跳过已删除节点(item 为 null)
while ((q = p.next) != null && p.item == null)
p = q;
E item = p.item;
if (item != null)
return p;
else if (p == q) // 遇到自引用,重置
return null;
}
}

head 和 tail 的延迟更新策略

ConcurrentLinkedQueue 的性能优化核心在于 head 和 tail 的延迟更新机制

更新时机

  • tail 更新:每插入两个节点才更新一次 tail(即插入第 1、3、5… 个节点时更新);
  • head 更新:每删除两个节点才更新一次 head(即删除第 1、3、5… 个节点时更新)。

设计目的

减少 CAS 操作次数,降低内存屏障开销。每次 CAS 操作都需要通过内存屏障保证可见性,频繁操作会影响性能。

图示说明

初始状态

1
head → [null] ← tail  

插入 3 个元素后

1
head → [null] → [A] → [B] → [C] ← tail  

此时 tail 指向 C,但实际尾节点是 C。当插入第 4 个元素时,才会更新 tail 指向新节点。

线程安全与性能分析

线程安全保障

  • CAS 操作:所有关键操作(如入队、出队)均使用 CAS 原子操作,确保原子性;
  • volatile 修饰:item 和 next 字段使用 volatile 修饰,保证内存可见性;
  • 延迟更新:head 和 tail 的延迟更新不影响队列正确性,仅优化性能。

性能优势

  • 无锁设计:相比 BlockingQueue(如 LinkedBlockingQueue),避免了锁的竞争和线程上下文切换;
  • 减少 CAS 次数:通过延迟更新 head/tail,降低了 CAS 操作频率;
  • 读写分离:入队和出队操作可并发执行,提高吞吐量。

适用场景

  • 高并发读 / 写:适用于生产者 - 消费者模型,尤其是读多写少的场景;
  • 非阻塞操作:不支持等待(如 BlockingQueue 的 take ()),适合不需要阻塞的场景;
  • 无界队列:理论上无容量限制,但需注意内存使用。

总结与最佳实践

核心特性

  • 无锁实现:基于 CAS 操作,避免锁的开销;
  • 单向链表:使用哨兵节点简化边界处理;
  • 延迟更新:减少 CAS 操作,提升性能;
  • 不允许 null:插入 null 会抛出 NullPointerException。

使用建议

  • 优先使用 offer/poll:相比 add/remove,更明确地处理队列满 / 空的情况;
  • 避免频繁调用 size ():size () 方法需要遍历整个队列,时间复杂度为 O (n);
  • 适用于非阻塞场景:若需要阻塞等待,建议使用 LinkedBlockingQueue。

源码启示

ConcurrentLinkedQueue 的设计展示了高性能并发数据结构的关键技术:

  • 通过 CAS 替代锁实现线程安全;
  • 使用延迟更新减少原子操作次数;
  • 利用 volatile 和内存屏障保证可见性;
  • 通过自引用帮助 GC 回收无效节点。

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10