0%

同步方式

Java 并发同步工具:从信号量到交换器的全面解析

在多线程编程中,除了 synchronizedLock 等基础同步机制,JUC(java.util.concurrent)包还提供了多种高级同步工具,用于解决复杂的线程协作问题。本文将详细介绍 信号量(Semaphore)、闭锁(CountDownLatch)、屏障(CyclicBarrier)、移相器(Phaser)和交换器(Exchanger) 的原理、用法及适用场景。

信号量(Semaphore):控制并发访问数量

Semaphore 用于控制同时访问某一资源的线程数量,通过维护一个许可计数器实现。线程需要先获取许可(acquire()),访问完成后释放许可(release())。

核心原理

  • 许可计数器:初始化时指定许可数量(如 new Semaphore(5) 允许 5 个线程同时访问);
  • 获取与释放acquire() 减少许可数(无许可时阻塞),release() 增加许可数;
  • 公平性:支持公平模式(按请求顺序分配许可)和非公平模式(默认,允许插队)。

源码关键方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 获取 1 个许可(可中断)  
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// 释放 1 个许可
public void release() {
sync.releaseShared(1);
}

// 尝试获取许可(非阻塞)
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}

典型应用:限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SemaphoreDemo {  
private static final Semaphore semaphore = new Semaphore(3); // 允许 3 个并发

public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println(Thread.currentThread().getName() + " 开始访问");
Thread.sleep(1000); // 模拟业务操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
System.out.println(Thread.currentThread().getName() + " 结束访问");
}
}, "Thread-" + i).start();
}
}
}

输出:最多 3 个线程同时执行,其余线程等待许可。

闭锁(CountDownLatch):等待一组操作完成

CountDownLatch 允许一个或多个线程等待其他线程完成一组操作后再继续执行。其核心是一个递减计数器,计数器归零时,所有等待线程被唤醒。

核心原理

  • 计数器初始化new CountDownLatch(n) 表示需要等待 n 个操作完成;
  • 计数递减countDown() 方法将计数器减 1;
  • 等待唤醒await() 方法阻塞线程,直到计数器为 0。

源码关键方法

1
2
3
4
5
6
7
8
9
// 等待计数器归 0(可中断)  
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// 计数器减 1
public void countDown() {
sync.releaseShared(1);
}

典型应用:并发测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CountDownLatchDemo {  
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);

long start = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
Thread.sleep(1000); // 模拟任务
System.out.println(Thread.currentThread().getName() + " 完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 任务完成,计数器减 1
}
}).start();
}

latch.await(); // 等待所有任务完成
System.out.println("总耗时:" + (System.currentTimeMillis() - start) + "ms");
}
}

输出:所有线程完成后,打印总耗时(约 1000ms)。

屏障(CyclicBarrier):多线程互相等待

CyclicBarrier 用于让一组线程到达某个屏障点后互相等待,直到所有线程都到达后再共同继续执行。与 CountDownLatch 不同,它可重复使用

核心原理

  • 参与线程数new CyclicBarrier(n) 表示需要 n 个线程到达屏障;
  • 屏障动作:可选的 Runnable 任务,在所有线程到达后执行;
  • 重置性:所有线程通过屏障后,计数器自动重置,可再次使用。

源码关键方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 到达屏障并等待其他线程  
public int await() throws InterruptedException, BrokenBarrierException {
return dowait(false, 0L);
}

// 重置屏障
public void reset() {
lock.lock();
try {
breakBarrier();
nextGeneration();
} finally {
lock.unlock();
}
}

典型应用:阶段任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CyclicBarrierDemo {  
public static void main(String[] args) {
int threadCount = 3;
// 3 个线程到达后,执行屏障动作
CyclicBarrier barrier = new CyclicBarrier(threadCount, () ->
System.out.println("所有线程到达屏障,执行汇总操作")
);

for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 到达屏障");
barrier.await(); // 等待其他线程
System.out.println(Thread.currentThread().getName() + " 继续执行");
} catch (Exception e) {
e.printStackTrace();
}
}, "Thread-" + i).start();
}
}
}

输出:所有线程到达后执行屏障动作,再继续各自任务。

移相器(Phaser):多阶段同步工具

Phaser 是 CountDownLatch 和 CyclicBarrier 的增强版,支持多阶段同步,每个阶段完成后自动进入下一阶段,适用于分阶段执行的任务。

核心特性

  • 动态调整参与者:通过 register() 增加参与者,arriveAndDeregister() 减少;
  • 阶段管理:每个阶段完成后自动递增阶段号,可通过 getPhase() 获取当前阶段;
  • 终止机制:重写 onAdvance() 方法,返回 true 时终止同步。

关键方法

  • arriveAndAwaitAdvance():到达当前阶段并等待其他线程;
  • register():注册新参与者;
  • arriveAndDeregister():当前线程完成并退出。

示例:多阶段任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class PhaserDemo {  
static class Task implements Runnable {
private Phaser phaser;

Task(Phaser phaser) {
this.phaser = phaser;
}

@Override
public void run() {
try {
// 阶段 0:初始化
System.out.println(Thread.currentThread().getName() + " 完成阶段 0");
phaser.arriveAndAwaitAdvance();

// 阶段 1:处理数据
System.out.println(Thread.currentThread().getName() + " 完成阶段 1");
phaser.arriveAndAwaitAdvance();

// 阶段 2:清理资源
System.out.println(Thread.currentThread().getName() + " 完成阶段 2");
phaser.arriveAndDeregister(); // 完成后退出
} catch (Exception e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
Phaser phaser = new Phaser(3); // 3 个参与者
for (int i = 0; i < 3; i++) {
new Thread(new Task(phaser), "Task-" + i).start();
}
}
}

输出:所有线程同步完成每个阶段后,进入下一阶段。

交换器(Exchanger):两线程数据交换

Exchanger 用于两个线程在同步点交换数据,适用于生产者 - 消费者模式中直接传递数据。

核心原理

  • 双向交换:线程 A 调用 exchange(data) 时阻塞,直到线程 B 也调用 exchange(data),此时两者交换数据;
  • 超时机制:支持 exchange(data, timeout, unit) 避免无限等待。

示例:数据交换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ExchangerDemo {  
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();

// 线程 A 发送数据
new Thread(() -> {
try {
String data = "来自线程 A 的数据";
System.out.println("线程 A 发送:" + data);
String received = exchanger.exchange(data); // 等待交换
System.out.println("线程 A 收到:" + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

// 线程 B 发送数据
new Thread(() -> {
try {
String data = "来自线程 B 的数据";
System.out.println("线程 B 发送:" + data);
String received = exchanger.exchange(data); // 等待交换
System.out.println("线程 B 收到:" + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

输出:两线程交换数据并打印结果。

同步工具对比与选择

工具 核心功能 特点 适用场景
Semaphore 控制并发访问数量 基于许可计数器,可动态调整许可数 限流、资源池控制
CountDownLatch 等待一组操作完成 计数器不可逆,一次性使用 并发测试、依赖服务启动
CyclicBarrier 多线程互相等待 可重复使用,支持屏障动作 阶段任务、并行计算
Phaser 多阶段同步 动态调整参与者,支持阶段管理 复杂分阶段任务
Exchanger 两线程数据交换 双向同步,仅支持两个线程 生产者 - 消费者数据传递

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10