Java 并发同步工具:从信号量到交换器的全面解析
在多线程编程中,除了 synchronized
和 Lock
等基础同步机制,JUC(java.util.concurrent)包还提供了多种高级同步工具,用于解决复杂的线程协作问题。本文将详细介绍 信号量(Semaphore)、闭锁(CountDownLatch)、屏障(CyclicBarrier)、移相器(Phaser)和交换器(Exchanger) 的原理、用法及适用场景。
信号量(Semaphore):控制并发访问数量
Semaphore 用于控制同时访问某一资源的线程数量,通过维护一个许可计数器实现。线程需要先获取许可(acquire()
),访问完成后释放许可(release()
)。
核心原理
- 许可计数器:初始化时指定许可数量(如
new Semaphore(5)
允许 5 个线程同时访问);
- 获取与释放:
acquire()
减少许可数(无许可时阻塞),release()
增加许可数;
- 公平性:支持公平模式(按请求顺序分配许可)和非公平模式(默认,允许插队)。
源码关键方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public void release() { sync.releaseShared(1); }
public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; }
|
典型应用:限流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class SemaphoreDemo { private static final Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " 开始访问"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); System.out.println(Thread.currentThread().getName() + " 结束访问"); } }, "Thread-" + i).start(); } } }
|
输出:最多 3 个线程同时执行,其余线程等待许可。
闭锁(CountDownLatch):等待一组操作完成
CountDownLatch 允许一个或多个线程等待其他线程完成一组操作后再继续执行。其核心是一个递减计数器,计数器归零时,所有等待线程被唤醒。
核心原理
- 计数器初始化:
new CountDownLatch(n)
表示需要等待 n 个操作完成;
- 计数递减:
countDown()
方法将计数器减 1;
- 等待唤醒:
await()
方法阻塞线程,直到计数器为 0。
源码关键方法
1 2 3 4 5 6 7 8 9
| public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public void countDown() { sync.releaseShared(1); }
|
典型应用:并发测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { int threadCount = 5; CountDownLatch latch = new CountDownLatch(threadCount);
long start = System.currentTimeMillis(); for (int i = 0; i < threadCount; i++) { new Thread(() -> { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " 完成"); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } }).start(); }
latch.await(); System.out.println("总耗时:" + (System.currentTimeMillis() - start) + "ms"); } }
|
输出:所有线程完成后,打印总耗时(约 1000ms)。
屏障(CyclicBarrier):多线程互相等待
CyclicBarrier 用于让一组线程到达某个屏障点后互相等待,直到所有线程都到达后再共同继续执行。与 CountDownLatch 不同,它可重复使用。
核心原理
- 参与线程数:
new CyclicBarrier(n)
表示需要 n 个线程到达屏障;
- 屏障动作:可选的
Runnable
任务,在所有线程到达后执行;
- 重置性:所有线程通过屏障后,计数器自动重置,可再次使用。
源码关键方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public int await() throws InterruptedException, BrokenBarrierException { return dowait(false, 0L); }
public void reset() { lock.lock(); try { breakBarrier(); nextGeneration(); } finally { lock.unlock(); } }
|
典型应用:阶段任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class CyclicBarrierDemo { public static void main(String[] args) { int threadCount = 3; CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> System.out.println("所有线程到达屏障,执行汇总操作") );
for (int i = 0; i < threadCount; i++) { new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 到达屏障"); barrier.await(); System.out.println(Thread.currentThread().getName() + " 继续执行"); } catch (Exception e) { e.printStackTrace(); } }, "Thread-" + i).start(); } } }
|
输出:所有线程到达后执行屏障动作,再继续各自任务。
移相器(Phaser):多阶段同步工具
Phaser 是 CountDownLatch 和 CyclicBarrier 的增强版,支持多阶段同步,每个阶段完成后自动进入下一阶段,适用于分阶段执行的任务。
核心特性
- 动态调整参与者:通过
register()
增加参与者,arriveAndDeregister()
减少;
- 阶段管理:每个阶段完成后自动递增阶段号,可通过
getPhase()
获取当前阶段;
- 终止机制:重写
onAdvance()
方法,返回 true
时终止同步。
关键方法
arriveAndAwaitAdvance()
:到达当前阶段并等待其他线程;
register()
:注册新参与者;
arriveAndDeregister()
:当前线程完成并退出。
示例:多阶段任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class PhaserDemo { static class Task implements Runnable { private Phaser phaser;
Task(Phaser phaser) { this.phaser = phaser; }
@Override public void run() { try { System.out.println(Thread.currentThread().getName() + " 完成阶段 0"); phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 完成阶段 1"); phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 完成阶段 2"); phaser.arriveAndDeregister(); } catch (Exception e) { e.printStackTrace(); } } }
public static void main(String[] args) { Phaser phaser = new Phaser(3); for (int i = 0; i < 3; i++) { new Thread(new Task(phaser), "Task-" + i).start(); } } }
|
输出:所有线程同步完成每个阶段后,进入下一阶段。
交换器(Exchanger):两线程数据交换
Exchanger 用于两个线程在同步点交换数据,适用于生产者 - 消费者模式中直接传递数据。
核心原理
- 双向交换:线程 A 调用
exchange(data)
时阻塞,直到线程 B 也调用 exchange(data)
,此时两者交换数据;
- 超时机制:支持
exchange(data, timeout, unit)
避免无限等待。
示例:数据交换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class ExchangerDemo { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> { try { String data = "来自线程 A 的数据"; System.out.println("线程 A 发送:" + data); String received = exchanger.exchange(data); System.out.println("线程 A 收到:" + received); } catch (InterruptedException e) { e.printStackTrace(); } }).start();
new Thread(() -> { try { String data = "来自线程 B 的数据"; System.out.println("线程 B 发送:" + data); String received = exchanger.exchange(data); System.out.println("线程 B 收到:" + received); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
|
输出:两线程交换数据并打印结果。
同步工具对比与选择
工具 |
核心功能 |
特点 |
适用场景 |
Semaphore |
控制并发访问数量 |
基于许可计数器,可动态调整许可数 |
限流、资源池控制 |
CountDownLatch |
等待一组操作完成 |
计数器不可逆,一次性使用 |
并发测试、依赖服务启动 |
CyclicBarrier |
多线程互相等待 |
可重复使用,支持屏障动作 |
阶段任务、并行计算 |
Phaser |
多阶段同步 |
动态调整参与者,支持阶段管理 |
复杂分阶段任务 |
Exchanger |
两线程数据交换 |
双向同步,仅支持两个线程 |
生产者 - 消费者数据传递 |
v1.3.10