0%

BlockingQueue阻塞队列

BlockingQueue 阻塞队列:线程同步与并发协作的核心组件

BlockingQueue(阻塞队列)是 Java 并发包(JUC)中用于线程间安全通信的核心组件,其核心特性是当队列满时阻塞生产者线程,当队列空时阻塞消费者线程,完美适配生产者 - 消费者模型。本文将系统解析 BlockingQueue 接口及其主要实现类,深入探讨其设计原理、核心方法及适用场景。

BlockingQueue 接口核心定义

BlockingQueue 继承自 Queue 接口,新增了阻塞式插入 / 移除方法,确保线程安全的同时简化了并发协作。其核心方法可分为三类:非阻塞方法阻塞方法超时方法

核心方法分类

操作类型 非阻塞方法(满 / 空时抛异常) 非阻塞方法(满 / 空时返回特殊值) 阻塞方法(满 / 空时阻塞) 超时方法(满 / 空时等待超时)
插入 add(E e) offer(E e) put(E e) offer(E e, long timeout, TimeUnit unit)
移除 remove() poll() take() poll(long timeout, TimeUnit unit)
查看 element() peek() - -

关键特性

  • 不允许插入 null 元素(会抛出 NullPointerException);
  • 支持线程中断(阻塞方法声明了 throws InterruptedException);
  • 提供批量操作(如 drainTo(Collection) 一次性取出所有元素)。

主要实现类详解

阻塞队列

BlockingQueue 有多个实现类,适用于不同场景。以下是最常用的 5 种实现:

ArrayBlockingQueue:基于数组的有界阻塞队列

核心特性
  • 底层结构:数组(容量固定,创建时需指定);
  • 排序方式:FIFO(先进先出);
  • 并发控制:单把全局锁(ReentrantLock)+ 两个条件变量(notEmpty/notFull);
  • 公平性:支持公平锁(按线程等待顺序访问)和非公平锁(默认)。
源码解析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
implements BlockingQueue<E>, java.io.Serializable {

// 存储元素的数组(容量固定)
final Object[] items;

// 下一个待取出元素的索引
int takeIndex;

// 下一个待插入元素的索引
int putIndex;

// 元素数量
int count;

// 全局锁(入队和出队共享同一把锁)
final ReentrantLock lock;

// 消费者条件变量(队列空时阻塞)
private final Condition notEmpty;

// 生产者条件变量(队列满时阻塞)
private final Condition notFull;

// 构造函数:指定容量和公平性
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0) throw new IllegalArgumentException();
this.items = new Object[capacity];
this.lock = new ReentrantLock(fair);
this.notEmpty = lock.newCondition();
this.notFull = lock.newCondition();
}
}

put 方法(阻塞插入)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void put(E e) throws InterruptedException {  
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 可中断锁
try {
// 队列满时,阻塞并释放锁
while (count == items.length)
notFull.await();
enqueue(e); // 插入元素
} finally {
lock.unlock();
}
}

private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) putIndex = 0; // 循环数组
count++;
notEmpty.signal(); // 唤醒一个消费者线程
}

take 方法(阻塞移除)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public E take() throws InterruptedException {  
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列空时,阻塞并释放锁
while (count == 0)
notEmpty.await();
return dequeue(); // 移除并返回元素
} finally {
lock.unlock();
}
}

private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null; // 清空位置
if (++takeIndex == items.length) takeIndex = 0; // 循环数组
count--;
notFull.signal(); // 唤醒一个生产者线程
return x;
}

特点

  • 单锁设计导致入队和出队无法并行,高并发下性能可能受限;
  • 适合对容量有严格限制的场景(如日志缓冲、资源池)。

LinkedBlockingQueue:基于链表的有界 / 无界阻塞队列

核心特性
  • 底层结构:单向链表(默认容量为 Integer.MAX_VALUE,可视为无界);
  • 排序方式:FIFO;
  • 并发控制:两把独立锁(putLock 控制入队,takeLock 控制出队),支持入队和出队并行;
  • 适用场景:线程池(如 newFixedThreadPool 默认使用此队列)。
源码解析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class LinkedBlockingQueue<E> extends AbstractQueue<E>  
implements BlockingQueue<E>, java.io.Serializable {

// 节点结构
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}

// 容量(默认 Integer.MAX_VALUE)
private final int capacity;

// 元素数量(原子类保证线程安全)
private final AtomicInteger count = new AtomicInteger();

// 头节点(item 为 null)
transient Node<E> head;

// 尾节点
private transient Node<E> last;

// 入队锁
private final ReentrantLock putLock = new ReentrantLock();

// 生产者条件变量(队列满时阻塞)
private final Condition notFull = putLock.newCondition();

// 出队锁
private final ReentrantLock takeLock = new ReentrantLock();

// 消费者条件变量(队列空时阻塞)
private final Condition notEmpty = takeLock.newCondition();
}

put 方法(阻塞插入)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void put(E e) throws InterruptedException {  
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 队列满时阻塞
while (count.get() == capacity)
notFull.await();
enqueue(node); // 插入队尾
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal(); // 队列仍有空间,唤醒其他生产者
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty(); // 插入前队列为空,唤醒消费者
}

take 方法(阻塞移除)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public E take() throws InterruptedException {  
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列空时阻塞
while (count.get() == 0)
notEmpty.await();
x = dequeue(); // 移除队头
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal(); // 队列仍有元素,唤醒其他消费者
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull(); // 移除前队列满,唤醒生产者
return x;
}

特点

  • 双锁设计支持入队和出队并行,高并发性能优于 ArrayBlockingQueue
  • 无界模式下需注意内存溢出风险(如无限制生产元素)。

PriorityBlockingQueue:支持优先级的无界阻塞队列

核心特性
  • 底层结构:平衡二叉堆(数组实现);
  • 排序方式:自然排序或自定义 Comparator(元素需实现 Comparable 或传入比较器);
  • 并发控制:单锁(ReentrantLock)+ 条件变量 notEmpty
  • 无界性:自动扩容(最大容量 Integer.MAX_VALUE - 8)。
适用场景
  • 任务调度(按优先级执行任务);
  • 延迟处理(结合时间优先级)。
核心方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 插入元素(无界,不会阻塞)  
public void put(E e) {
offer(e); // 直接调用 offer,因无界不会阻塞
}

// 取出优先级最高的元素(队列为空时阻塞)
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E result;
while ((result = dequeue()) == null)
notEmpty.await();
return result;
} finally {
lock.unlock();
}
}

DelayQueue:延迟阻塞队列

核心特性
  • 元素要求:必须实现 Delayed 接口(重写 getDelay(TimeUnit) 方法);
  • 触发条件:仅当元素的延迟时间 <= 0 时,才能被取出;
  • 底层依赖PriorityQueue 维护元素优先级,ReentrantLock 保证线程安全;
  • 适用场景:缓存过期清理、定时任务调度(如 ScheduledThreadPoolExecutor)。
源码解析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>  
implements BlockingQueue<E> {

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>(); // 优先级队列
private Thread leader = null; // 用于减少无效等待的线程
private final Condition available = lock.newCondition();

// 取出延迟到期的元素(阻塞)
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await(); // 队列为空,阻塞
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll(); // 延迟到期,取出元素
first = null; // 释放引用,避免内存泄漏
if (leader != null)
available.await(); // 已有线程等待,当前线程阻塞
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); // 等待剩余延迟时间
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal(); // 唤醒其他线程
lock.unlock();
}
}
}

SynchronousQueue:无缓冲的同步队列

核心特性
  • 无存储能力:元素直接从生产者传递给消费者,不存储任何元素;
  • 阻塞机制:生产者插入元素后会阻塞,直到消费者取出;消费者取出元素前会阻塞,直到生产者插入;
  • 适用场景:线程间直接通信(如 newCachedThreadPool 用此队列实现任务提交与执行的同步)。
示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args) throws InterruptedException {  
BlockingQueue<String> queue = new SynchronousQueue<>();

// 生产者线程
new Thread(() -> {
try {
System.out.println("生产者发送:A");
queue.put("A"); // 阻塞,直到消费者取出
System.out.println("生产者发送:B");
queue.put("B");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

// 消费者线程
new Thread(() -> {
try {
Thread.sleep(1000); // 延迟消费
System.out.println("消费者接收:" + queue.take());
Thread.sleep(1000);
System.out.println("消费者接收:" + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

输出

1
2
3
4
5
6
生产者发送:A  
(1秒后)
消费者接收:A
生产者发送:B
(1秒后)
消费者接收:B

阻塞队列的典型应用场景

生产者 - 消费者模型

通过阻塞队列实现生产者和消费者的解耦与并发协作:

  • 生产者线程通过 put 插入数据;
  • 消费者线程通过 take 取出数据;
  • 队列自动平衡生产和消费速度(满时阻塞生产者,空时阻塞消费者)。

线程池任务队列

线程池(如 ThreadPoolExecutor)使用阻塞队列存储待执行任务:

  • newFixedThreadPool 使用 LinkedBlockingQueue(无界,任务可无限堆积);
  • newCachedThreadPool 使用 SynchronousQueue(任务直接提交给线程,无缓冲);
  • newScheduledThreadPool 使用延迟队列实现定时任务。

异步日志框架

如 Logback 的异步日志器,使用 ArrayBlockingQueue 缓存日志事件:

  • 应用线程快速写入队列(非阻塞);
  • 后台线程从队列取出并输出日志(阻塞等待新事件)。

各实现类对比与选择建议

实现类 底层结构 容量特性 并发性能 适用场景
ArrayBlockingQueue 数组 有界 固定容量场景(如资源池)
LinkedBlockingQueue 链表 可配置 线程池、高并发通信
PriorityBlockingQueue 二叉堆 无界 优先级任务调度
DelayQueue 优先级队列 无界 延迟任务(缓存清理、定时任务)
SynchronousQueue 无存储 无界(逻辑) 极高 线程间直接通信

欢迎关注我的其它发布渠道