CyclicBarrier:循环栅栏
想象组队打副本的场景:
5人副本,需要所有人到齐才能开打。 有人迟到,其他人等着。 打完一波,下一波继续等所有人到齐。
CyclicBarrier 就是这个「组队机制」——让一组线程相互等待,所有人都到齐后一起继续。
核心概念
CyclicBarrier 核心机制:
- 初始化时设置 parties = N(参与线程数)
- 每个线程调用 await() 表示「我到了」
- 当 N 个线程都调用了 await(),栅栏打开
- 栅栏可重置,继续下一轮(cyclic = 循环)1
2
3
4
5
2
3
4
5
基本用法
场景:多线程分步计算
java
public class 分步计算 {
private final CyclicBarrier barrier;
private final double[] data = new double[1000];
public 分步计算() {
// 4个线程参与
barrier = new CyclicBarrier(4, () -> {
// 栅栏打开时执行(可选)
System.out.println("所有线程完成,准备下一步");
});
}
public void process() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; i++) {
final int threadId = i;
executor.submit(() -> {
try {
// 第一步:加载数据
loadData(threadId);
barrier.await(); // 等待所有人
// 第二步:计算
calculate(threadId);
barrier.await(); // 等待所有人
// 第三步:汇总
summarize(threadId);
barrier.await(); // 等待所有人
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
private void loadData(int id) throws InterruptedException {
System.out.println("线程" + id + " 加载数据");
Thread.sleep(100);
}
private void calculate(int id) throws InterruptedException {
System.out.println("线程" + id + " 计算");
Thread.sleep(100);
}
private void summarize(int id) throws InterruptedException {
System.out.println("线程" + id + " 汇总");
Thread.sleep(100);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
输出示例:
线程0 加载数据
线程1 加载数据
线程2 加载数据
线程3 加载数据
所有线程完成,准备下一步
线程2 计算
线程1 计算
线程3 计算
线程0 计算
所有线程完成,准备下一步
线程0 汇总
...1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
与 CountDownLatch 的区别
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 用途 | 一个线程等N个线程 | N个线程相互等 |
| 计数方向 | 只减 | 可重置(循环) |
| 能否重置 | 不能 | 能 |
| 等待者 | 主线程等待 | 所有线程互相等 |
| 典型场景 | 火箭发射 | 组队打副本 |
java
// CountDownLatch:等火箭准备好
new CountDownLatch(1).await();
// CyclicBarrier:等人到齐
new CyclicBarrier(5).await();1
2
3
4
5
2
3
4
5
实战:多线程排序
java
public class 并行排序 {
private final int[] array;
private final CyclicBarrier barrier;
public 并行排序(int[] array, int threadCount) {
this.array = array;
this.barrier = new CyclicBarrier(threadCount, this::onComplete);
}
public void sort() throws InterruptedException {
int segmentSize = array.length / threadCount;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
final int start = i * segmentSize;
final int end = (i == threadCount - 1) ? array.length : start + segmentSize;
executor.submit(() -> Arrays.sort(array, start, end));
}
barrier.await(); // 等待所有段排序完成
executor.shutdown();
// 合并各段有序数组
mergeSort();
}
private void onComplete() {
System.out.println("所有段排序完成,开始合并");
}
private void mergeSort() {
// 实际合并逻辑
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
原理分析
内部结构
java
// 简化版 CyclicBarrier 实现
public class CyclicBarrier {
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties; // 参与线程数
private int count; // 剩余等待数
private Generation generation = new Generation();
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public int await() throws InterruptedException, BrokenBarrierException {
lock.lock();
try {
int index = --count;
if (index == 0) {
// 所有线程到达,执行 barrierCommand
nextGeneration();
return 0;
}
// 否则等待
for (;;) {
try {
trip.await();
return index;
} catch (InterruptedException e) {
if (generation.broken) throw e;
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
trip.signalAll(); // 唤醒所有等待线程
count = parties; // 重置计数
generation = new Generation();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
BrokenBarrierException
如果某个线程在等待期间被中断,或者栅栏被重置,会抛出 BrokenBarrierException:
java
public class 异常处理 {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3);
Thread t1 = new Thread(() -> {
try {
barrier.await();
} catch (Exception e) {
System.out.println("线程1发现栅栏坏了");
}
});
Thread t2 = new Thread(() -> {
try {
barrier.await();
} catch (Exception e) {
System.out.println("线程2发现栅栏坏了");
}
});
t1.start();
t2.start();
// 模拟超时或重置
try { Thread.sleep(100); } catch (InterruptedException e) {}
barrier.reset(); // 重置会触发 BrokenBarrierException
t1.join();
t2.join();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
reset() 方法
java
CyclicBarrier barrier = new CyclicBarrier(3);
barrier.reset(); // 重置栅栏
// 会导致所有正在 await() 的线程收到 BrokenBarrierException1
2
3
4
2
3
4
什么时候用 reset():
- 定时任务超时
- 某个任务失败,需要重试
- 超时后重新开始下一轮
常见陷阱
陷阱1:线程数不匹配
java
CyclicBarrier barrier = new CyclicBarrier(5);
// 只有4个线程来
// 会一直等待,永远无法通过1
2
3
4
2
3
4
陷阱2:在 barrierAction 中抛异常
java
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
throw new RuntimeException("初始化失败");
});
// barrierAction 失败会破坏栅栏
// 后续 await() 都会失败1
2
3
4
5
6
2
3
4
5
6
陷阱3:忘记异常处理
java
// 错误
barrier.await();
// 正确
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
// 处理
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
适用场景
适合
- 多线程分步计算:每步需要所有人完成
- 组队任务:如游戏副本、批量处理
- 压力测试:模拟同时发起请求
- 定期同步点:如定期数据刷新
不适合
- 只需要等待 N 个任务完成(用 CountDownLatch)
- 需要单个线程控制(用信号量)
- 需要超时退出(用 CompletableFuture)
面试追问方向
CyclicBarrier 为什么叫「循环」? 因为 reset() 后可以重复使用。每次等满 N 个线程后,会自动重置计数。
CyclicBarrier 和 CountDownLatch 的核心区别? CountDownLatch:主线程等子线程,一次性。 CyclicBarrier:子线程互相等,可循环。
BrokenBarrierException 什么时候发生?
- 另一个线程调用 reset()
- 另一个线程等待时被中断
- barrierAction 抛异常
CyclicBarrier 的 barrierAction 有什么用? 所有线程到达后执行,常用于汇总计算结果、记录日志等。
CyclicBarrier 能否只等待部分线程? 不能。必须所有 parties 都到达才会通过。如果需要部分等待,用 Semaphore。
