BlockingQueue 阻塞队列:线程同步与并发协作的核心组件
BlockingQueue(阻塞队列)是 Java 并发包(JUC)中用于线程间安全通信的核心组件,其核心特性是当队列满时阻塞生产者线程,当队列空时阻塞消费者线程,完美适配生产者 - 消费者模型。本文将系统解析 BlockingQueue 接口及其主要实现类,深入探讨其设计原理、核心方法及适用场景。
BlockingQueue 接口核心定义
BlockingQueue 继承自 Queue 接口,新增了阻塞式插入 / 移除方法,确保线程安全的同时简化了并发协作。其核心方法可分为三类:非阻塞方法、阻塞方法和超时方法。
核心方法分类
| 操作类型 |
非阻塞方法(满 / 空时抛异常) |
非阻塞方法(满 / 空时返回特殊值) |
阻塞方法(满 / 空时阻塞) |
超时方法(满 / 空时等待超时) |
| 插入 |
add(E e) |
offer(E e) |
put(E e) |
offer(E e, long timeout, TimeUnit unit) |
| 移除 |
remove() |
poll() |
take() |
poll(long timeout, TimeUnit unit) |
| 查看 |
element() |
peek() |
- |
- |
关键特性:
- 不允许插入
null 元素(会抛出 NullPointerException);
- 支持线程中断(阻塞方法声明了
throws InterruptedException);
- 提供批量操作(如
drainTo(Collection) 一次性取出所有元素)。
主要实现类详解
![阻塞队列]()
BlockingQueue 有多个实现类,适用于不同场景。以下是最常用的 5 种实现:
ArrayBlockingQueue:基于数组的有界阻塞队列
核心特性
- 底层结构:数组(容量固定,创建时需指定);
- 排序方式:FIFO(先进先出);
- 并发控制:单把全局锁(
ReentrantLock)+ 两个条件变量(notEmpty/notFull);
- 公平性:支持公平锁(按线程等待顺序访问)和非公平锁(默认)。
源码解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; this.lock = new ReentrantLock(fair); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); } }
|
put 方法(阻塞插入):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
|
take 方法(阻塞移除):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; notFull.signal(); return x; }
|
特点:
- 单锁设计导致入队和出队无法并行,高并发下性能可能受限;
- 适合对容量有严格限制的场景(如日志缓冲、资源池)。
LinkedBlockingQueue:基于链表的有界 / 无界阻塞队列
核心特性
- 底层结构:单向链表(默认容量为
Integer.MAX_VALUE,可视为无界);
- 排序方式:FIFO;
- 并发控制:两把独立锁(
putLock 控制入队,takeLock 控制出队),支持入队和出队并行;
- 适用场景:线程池(如
newFixedThreadPool 默认使用此队列)。
源码解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition(); }
|
put 方法(阻塞插入):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) notFull.await(); enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
|
take 方法(阻塞移除):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) notEmpty.await(); x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
|
特点:
- 双锁设计支持入队和出队并行,高并发性能优于
ArrayBlockingQueue;
- 无界模式下需注意内存溢出风险(如无限制生产元素)。
PriorityBlockingQueue:支持优先级的无界阻塞队列
核心特性
- 底层结构:平衡二叉堆(数组实现);
- 排序方式:自然排序或自定义
Comparator(元素需实现 Comparable 或传入比较器);
- 并发控制:单锁(
ReentrantLock)+ 条件变量 notEmpty;
- 无界性:自动扩容(最大容量
Integer.MAX_VALUE - 8)。
适用场景
- 任务调度(按优先级执行任务);
- 延迟处理(结合时间优先级)。
核心方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public void put(E e) { offer(e); }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { E result; while ((result = dequeue()) == null) notEmpty.await(); return result; } finally { lock.unlock(); } }
|
DelayQueue:延迟阻塞队列
核心特性
- 元素要求:必须实现
Delayed 接口(重写 getDelay(TimeUnit) 方法);
- 触发条件:仅当元素的延迟时间 <= 0 时,才能被取出;
- 底层依赖:
PriorityQueue 维护元素优先级,ReentrantLock 保证线程安全;
- 适用场景:缓存过期清理、定时任务调度(如
ScheduledThreadPoolExecutor)。
源码解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); private Thread leader = null; private final Condition available = lock.newCondition();
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } }
|
SynchronousQueue:无缓冲的同步队列
核心特性
- 无存储能力:元素直接从生产者传递给消费者,不存储任何元素;
- 阻塞机制:生产者插入元素后会阻塞,直到消费者取出;消费者取出元素前会阻塞,直到生产者插入;
- 适用场景:线程间直接通信(如
newCachedThreadPool 用此队列实现任务提交与执行的同步)。
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public static void main(String[] args) throws InterruptedException { BlockingQueue<String> queue = new SynchronousQueue<>();
new Thread(() -> { try { System.out.println("生产者发送:A"); queue.put("A"); System.out.println("生产者发送:B"); queue.put("B"); } catch (InterruptedException e) { e.printStackTrace(); } }).start();
new Thread(() -> { try { Thread.sleep(1000); System.out.println("消费者接收:" + queue.take()); Thread.sleep(1000); System.out.println("消费者接收:" + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
|
输出:
1 2 3 4 5 6
| 生产者发送:A (1秒后) 消费者接收:A 生产者发送:B (1秒后) 消费者接收:B
|
阻塞队列的典型应用场景
生产者 - 消费者模型
通过阻塞队列实现生产者和消费者的解耦与并发协作:
- 生产者线程通过
put 插入数据;
- 消费者线程通过
take 取出数据;
- 队列自动平衡生产和消费速度(满时阻塞生产者,空时阻塞消费者)。
线程池任务队列
线程池(如 ThreadPoolExecutor)使用阻塞队列存储待执行任务:
newFixedThreadPool 使用 LinkedBlockingQueue(无界,任务可无限堆积);
newCachedThreadPool 使用 SynchronousQueue(任务直接提交给线程,无缓冲);
newScheduledThreadPool 使用延迟队列实现定时任务。
异步日志框架
如 Logback 的异步日志器,使用 ArrayBlockingQueue 缓存日志事件:
- 应用线程快速写入队列(非阻塞);
- 后台线程从队列取出并输出日志(阻塞等待新事件)。
各实现类对比与选择建议
| 实现类 |
底层结构 |
容量特性 |
并发性能 |
适用场景 |
| ArrayBlockingQueue |
数组 |
有界 |
中 |
固定容量场景(如资源池) |
| LinkedBlockingQueue |
链表 |
可配置 |
高 |
线程池、高并发通信 |
| PriorityBlockingQueue |
二叉堆 |
无界 |
中 |
优先级任务调度 |
| DelayQueue |
优先级队列 |
无界 |
中 |
延迟任务(缓存清理、定时任务) |
| SynchronousQueue |
无存储 |
无界(逻辑) |
极高 |
线程间直接通信 |