ConcurrentLinkedQueue源码深度解析:高性能无锁队列的实现原理
ConcurrentLinkedQueue 是 Java 并发包(JUC)中用于高并发场景的无锁队列,基于单向链表结构实现。它通过 CAS(Compare-And-Swap)操作替代传统的锁机制,在保证线程安全的同时显著提升了并发性能。本文将从源码角度深入剖析其设计思想、核心实现及性能优化策略。
核心数据结构与初始化
底层结构:单向链表节点
ConcurrentLinkedQueue 使用内部类 Node 表示链表节点,每个节点包含数据域 item 和指针域 next:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| private static class Node<E> { volatile E item; volatile Node<E> next;
Node(E item) { UNSAFE.putObject(this, itemOffset, item); }
boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); }
void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); }
private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset;
static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
|
关键点:
- volatile 修饰:确保多线程间的内存可见性,避免指令重排序;
- CAS 操作:通过 Unsafe 类的原子操作保证线程安全,避免使用锁;
- lazySetNext:延迟写操作,减少内存屏障,提升性能(适用于非关键路径)。
队列初始化
队列创建时,head 和 tail 指向同一个哨兵节点(item 为 null):
1 2 3 4 5
| public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
|
初始状态:
设计意图:
- 哨兵节点简化边界处理,避免 head/tail 为 null 的复杂判断;
- 初始状态下队列为空,但 head 和 tail 已初始化,减少空指针检查。
核心方法源码解析
入队操作:offer (E e)
将元素添加到队列尾部,通过 CAS 实现无锁并发:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) { Node<E> q = p.next;
if (q == null) { if (p.casNext(null, newNode)) { if (p != t) casTail(t, newNode); return true; } }
else if (p == q) p = (t != (t = tail)) ? t : head;
else p = (p != t && t != (t = tail)) ? t : q; } }
|
执行流程:
- 寻找尾节点:从 tail 开始遍历,找到真正的尾节点(next 为 null);
- CAS 插入:使用 CAS 将新节点设置为尾节点的 next;
- 更新 tail:仅当 tail 落后时才更新(每间隔一次更新一次 tail),减少 CAS 开销。
出队操作:poll ()
移除并返回队列头部元素,同样使用 CAS 实现无锁并发:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item;
if (item != null && p.casItem(item, null)) { if (p != h) updateHead(h, ((q = p.next) != null) ? q : p); return item; }
else if ((q = p.next) == null) { updateHead(h, p); return null; }
else if (p == q) continue restartFromHead;
else p = q; } } }
final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
|
执行流程:
- 从头节点开始遍历:找到第一个有值的节点(item 不为 null);
- CAS 删除:使用 CAS 将该节点的 item 设为 null(标记为已删除);
- 更新 head:仅当 head 落后时才更新(每间隔一次更新一次 head),减少 CAS 开销;
- 自引用处理:将原 head 节点的 next 指向自身,帮助 GC 回收不可达节点。
查看队首元素:peek ()
与 poll () 类似,但不移除元素:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public E peek() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } else if (p == q) continue restartFromHead; else p = q; } } }
|
删除指定元素:remove (Object o)
遍历队列,使用 CAS 删除第一个匹配的元素:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public boolean remove(Object o) { if (o != null) { Node<E> pred = null; for (Node<E> p = first(); p != null; pred = p, p = succ(p)) { E item = p.item; if (item != null && o.equals(item) && p.casItem(item, null)) { unlink(p, pred); return true; } } } return false; }
Node<E> first() { for (;;) { Node<E> h = head; Node<E> p = h; Node<E> q; while ((q = p.next) != null && p.item == null) p = q; E item = p.item; if (item != null) return p; else if (p == q) return null; } }
|
head 和 tail 的延迟更新策略
ConcurrentLinkedQueue 的性能优化核心在于 head 和 tail 的延迟更新机制:
更新时机
- tail 更新:每插入两个节点才更新一次 tail(即插入第 1、3、5… 个节点时更新);
- head 更新:每删除两个节点才更新一次 head(即删除第 1、3、5… 个节点时更新)。
设计目的
减少 CAS 操作次数,降低内存屏障开销。每次 CAS 操作都需要通过内存屏障保证可见性,频繁操作会影响性能。
图示说明
初始状态:
插入 3 个元素后:
1
| head → [null] → [A] → [B] → [C] ← tail
|
此时 tail 指向 C,但实际尾节点是 C。当插入第 4 个元素时,才会更新 tail 指向新节点。
线程安全与性能分析
线程安全保障
- CAS 操作:所有关键操作(如入队、出队)均使用 CAS 原子操作,确保原子性;
- volatile 修饰:item 和 next 字段使用 volatile 修饰,保证内存可见性;
- 延迟更新:head 和 tail 的延迟更新不影响队列正确性,仅优化性能。
性能优势
- 无锁设计:相比 BlockingQueue(如 LinkedBlockingQueue),避免了锁的竞争和线程上下文切换;
- 减少 CAS 次数:通过延迟更新 head/tail,降低了 CAS 操作频率;
- 读写分离:入队和出队操作可并发执行,提高吞吐量。
适用场景
- 高并发读 / 写:适用于生产者 - 消费者模型,尤其是读多写少的场景;
- 非阻塞操作:不支持等待(如 BlockingQueue 的 take ()),适合不需要阻塞的场景;
- 无界队列:理论上无容量限制,但需注意内存使用。
总结与最佳实践
核心特性
- 无锁实现:基于 CAS 操作,避免锁的开销;
- 单向链表:使用哨兵节点简化边界处理;
- 延迟更新:减少 CAS 操作,提升性能;
- 不允许 null:插入 null 会抛出 NullPointerException。
使用建议
- 优先使用 offer/poll:相比 add/remove,更明确地处理队列满 / 空的情况;
- 避免频繁调用 size ():size () 方法需要遍历整个队列,时间复杂度为 O (n);
- 适用于非阻塞场景:若需要阻塞等待,建议使用 LinkedBlockingQueue。
源码启示
ConcurrentLinkedQueue 的设计展示了高性能并发数据结构的关键技术:
- 通过 CAS 替代锁实现线程安全;
- 使用延迟更新减少原子操作次数;
- 利用 volatile 和内存屏障保证可见性;
- 通过自引用帮助 GC 回收无效节点。