CyclicBarrier
可重用的线程屏障,当启动cyclicBarrier时,只有当指定数量的线程都到达了该屏障后,这些线程才能够继续往下执行,适用于大任务拆分成多个子任务的然后需要在子任务执行完成后汇总执行结果的场景
我们从以下几个方面来分析CyclicBarrier
- 属性
- 构造方法
- await方法
属性
首先来看一下CyclicBarrier有哪些属性
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** cyclicBarrier的参与线程数量 */
private final int parties;
/* 创建Barrier时指定的runnable,可选 */
private final Runnable barrierCommand;
/** 当前分代,每次线程冲破Barrier时,该分代就会更换为新的分代 */
private Generation generation = new Generation();
/**
* 还在Barrier前等待的线程数量,每次线程冲破Barrier,会重置count为parties
* Generation和count的重置就是Barrier可以被重复使用的原因
*/
private int count;
构造方法
接着看一下CyclicBarrier的构造方法
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
CyclicBarrier有两个被显式定义的有参构造方法,参数中parties是必须给定的,barrierAction是可选的
await方法
该方法是CyclicBarrier的核心方法,它有两个重载的形式
不带超时时间的await方法
public int await() throws InterruptedException, BrokenBarrierException;
带超时时间的await方法
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException;
有两个异常,需要关注一下,BrokenBarrierException和TimeoutException
BrokenBarrierException
该异常抛出的条件为:
- 当线程在等待的时,其它线程被中断或者超时
- CyclicBarrier的barrierAction执行出错抛出异常时
- CyclicBarrier被broken
- CyclicBarrier被重置
TimeoutException
如果CyclicBarrier超时了,则会抛出该异常,此时CyclicBarrier进入broken状态
两个重载的await方法都会调用CyclicBarrier#dowait方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取到当前generation
final Generation g = generation;
if (g.broken)
// 如果当前CyclicBarrier已经进入broken状态了,则直接抛出异常
throw new BrokenBarrierException();
if (Thread.interrupted()) {
// 如果线程被中断了,抛出InterruptedException
breakBarrier();
throw new InterruptedException();
}
// 通过加锁的方式保证了--count不会出现线程安全问题
int index = --count;
if (index == 0) { // tripped
// 如果index等于0,表示当前所有的parties都到达了CyclicBarrier,此时可以冲破屏障了
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 如果指定了barrierAction,则在这里执行
command.run();
ranAction = true;
// 重置当前generation
nextGeneration();
return 0;
} finally {
if (!ranAction)
//
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 如果--count不等于0,表示还有线程没有到达CyclicBarrier,则执行下面的逻辑
// 主要就是让线程进入等待状态
for (;;) {
try {
if (!timed)
// 如果没有设置超时,则直接等待
trip.await();
else if (nanos > 0L)
// 如果设置了超时,并且超时时间大于0,则执行带超时的await方法
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 异常检测
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
CyclicBarrier#breakBarrier
private void breakBarrier() {
// 设置broken状态为true
generation.broken = true;
// 重置count
count = parties;
// 唤醒正在等待的其他线程
trip.signalAll();
}
CyclicBarrier#nextGeneration
private void nextGeneration() {
// 唤醒所有正在等待的线程
trip.signalAll();
// 重置count
count = parties;
// 更换新的generation
generation = new Generation();
}
总结
CyclicBarrier与CountDownLatch的区别
- CountDownLatch适用于一个线程下开启多个子线程的情况,而CyclicBarrier适用于多个同级别线程(非父子线程)之间的情况
- CountDownLatch的计数不可重置,CyclicBarrier的计数可以重置