在Java多线程编程中,经常会需要我们控制并发流程,等其他线程执行完毕,或者分阶段执行。Java在1.5的juc中引入了CountDownLatch
和CyclicBarrier
,1.7中又引入了Phaser
。
CountDownLatch
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
一个或多个线程等待其他线程完成一系列操作后才执行。
流程图:
基本使用:
使用两个 countdown latches的示例。
第一个是开始信号,在发出执行命令前,阻止线程开始执行。
第二个是完成信号,直到所有线程执行完毕,主线程再开始执行。
class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); doSomethingElse(); // don't let run yet startSignal.countDown(); // let all threads proceed doSomethingElse(); doneSignal.await(); // wait for all to finish }}class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... }}}
CyclicBarrier
A synchronizati on aid that allows a set of threads to all wait for each other to reach a common barrier point.
多个线程互相等待,直到到达同一个同步点,再继续一起执行。CyclicBarrier适用于多个线程有固定的多步需要执行,线程间互相等待,当都执行完了,在一起执行下一步。
流程图:
基本使用:
public class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(2, new A()); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { } System.out.println(2); } static class A implements Runnable { @Override public void run() { System.out.println(3); } }}
以上的例子利用了CyclicBarrier提供的一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
Phaser
A reusable synchronization barrier, similar in functionality to* {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and* {@link java.util.concurrent.CountDownLatch CountDownLatch}* but supporting more flexible usage.
Phaser
拥有与CyclicBarrier
和CountDownLatch
类似的功劳.但是这个类提供了更加灵活的应用。它支持任务在多个点都进行同步,支持动态调整注册任务的数量。当然你也可以使用CountDownLatch,但你必须创建多个CountDownLatch对象,每一阶段都需要一个CountDownLatch对象。
流程图:
基本使用:
public class Match { // 模拟了100米赛跑,10名选手,只等裁判一声令下。当所有人都到达终点时,比赛结束。 public static void main(String[] args) throws InterruptedException { final Phaser phaser=new Phaser(1) ; // 十名选手 for (int index = 0; index < 10; index++) { phaser.register(); new Thread(new player(phaser),"player"+index).start(); } System.out.println("Game Start"); //注销当前线程,比赛开始 phaser.arriveAndDeregister(); //是否非终止态一直等待 while(!phaser.isTerminated()){ } System.out.println("Game Over"); }}class player implements Runnable{ private final Phaser phaser ; player(Phaser phaser){ this.phaser=phaser; } @Override public void run() { try { // 第一阶段——等待创建好所有线程再开始 phaser.arriveAndAwaitAdvance(); // 第二阶段——等待所有选手准备好再开始 Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " ready"); phaser.arriveAndAwaitAdvance(); // 第三阶段——等待所有选手准备好到达,到达后,该线程从phaser中注销,不在进行下面的阶段。 Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " arrived"); phaser.arriveAndDeregister(); } catch (InterruptedException e) { e.printStackTrace(); } }}