CountDownLatch CountDownLatch底层是使用AQS实现的。
使用AQS的状态变量来存放计数器的值。首先在初始化CountDownLatch时设置状态值(计数器值),当多个线程调用countdown方法时实际是原子性递减AQS的状态值。
当线程调用await方法后当前线程会被放入AQS的阻塞队列等待计数器为0再返回。
其他线程调用countdown方法让计数器值递减1,当计数器值变为0时,当前线程还要调用AQS的doReleaseShared方法来激活由于调用await()方法而被阻塞的线程。
CyclicBarrier CyclicBarrier,其通过独占锁ReentrantLock实现计数器原子性更新,并使用条件变量队列来实现线程同步。
JDK开发组提供了CyclicBarrier类,从字面意思理解,CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier的状态后它可以被重用 。之所以叫作屏障是因为线程调用await方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。
创建CyclicBarrier的时候,可以传入barrierAction,当所有线程到达屏障以后,最后一个线程会执行barrierAction中的操作,执行完毕后会唤醒其他线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public CyclicBarrier (int parties, Runnable barrierAction) { if (parties <= 0 ) throw new IllegalArgumentException (); this .parties = parties; this .count = parties; this .barrierCommand = barrierAction; } private int dowait (boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this .lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException (); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException (); } int index = --count; if (index == 0 ) { boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L ) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException (); if (g != generation) return index; if (timed && nanos <= 0L ) { breakBarrier(); throw new TimeoutException (); } } } finally { lock.unlock(); } }
Semaphore Semaphore信号量也是Java中的一个同步器,它内部的计数器是递增的,并且在初始化Semaphore时可以指定一个初始值,但是并不需要知道需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class SemaphoreTest { private static Semaphore semaphore = new Semaphore (0 ); public static void main (String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2 ); executorService.submit(() -> { System.out.println(Thread.currentThread() + "over" ); semaphore.release(); }); executorService.submit(() -> { System.out.println(Thread.currentThread() + "over" ); semaphore.release(); }); semaphore.acquire(2 ); System.out.println("all child thread over" ); executorService.shutdown(); } } ======= Thread[pool-1 -thread-1 ,5 ,main]over Thread[pool-1 -thread-2 ,5 ,main]over all child thread over
Semaphore也是使用AQS实现的,并且获取信号量时有公平策略和非公平策略之分。
Exchanger Exchanger用于线程之间交换数据,其使用代码很简单,是一个exchange(..)函数,
Phaser 从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大。
1.用Phaser替代CountDownLatch
1个主线程要等10个worker线程完成之后,才能做接下来的事情,也可以用Phaser来实现此功能。在CountDownLatch中,主要是2个函数:await()和countDown(),在Phaser中,与之相对应的函数是awaitAdance(int n)和arrive()。
用Phaser替代CyclicBarrier