0%

java并发06_并发工具

CountDownLatch

CountDownLatch底层是使用AQS实现的。

使用AQS的状态变量来存放计数器的值。首先在初始化CountDownLatch时设置状态值(计数器值),当多个线程调用countdown方法时实际是原子性递减AQS的状态值。

当线程调用await方法后当前线程会被放入AQS的阻塞队列等待计数器为0再返回。

其他线程调用countdown方法让计数器值递减1,当计数器值变为0时,当前线程还要调用AQS的doReleaseShared方法来激活由于调用await()方法而被阻塞的线程。

CyclicBarrier

CyclicBarrier,其通过独占锁ReentrantLock实现计数器原子性更新,并使用条件变量队列来实现线程同步。

JDK开发组提供了CyclicBarrier类,从字面意思理解,CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。之所以叫作屏障是因为线程调用await方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。

创建CyclicBarrier的时候,可以传入barrierAction,当所有线程到达屏障以后,最后一个线程会执行barrierAction中的操作,执行完毕后会唤醒其他线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
//获取到锁的线程去执行barrierCommand
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

Semaphore

Semaphore信号量也是Java中的一个同步器,它内部的计数器是递增的,并且在初始化Semaphore时可以指定一个初始值,但是并不需要知道需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SemaphoreTest {
private static Semaphore semaphore = new Semaphore(0);

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(() -> {
System.out.println(Thread.currentThread() + "over");
semaphore.release();
});
executorService.submit(() -> {
System.out.println(Thread.currentThread() + "over");
semaphore.release();
});
semaphore.acquire(2);
System.out.println("all child thread over");
executorService.shutdown();
}
}

=======
Thread[pool-1-thread-1,5,main]over
Thread[pool-1-thread-2,5,main]over
all child thread over

Semaphore也是使用AQS实现的,并且获取信号量时有公平策略和非公平策略之分。

Exchanger

Exchanger用于线程之间交换数据,其使用代码很简单,是一个exchange(..)函数,

Phaser

从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大。

1.用Phaser替代CountDownLatch

1个主线程要等10个worker线程完成之后,才能做接下来的事情,也可以用Phaser来实现此功能。在CountDownLatch中,主要是2个函数:await()和countDown(),在Phaser中,与之相对应的函数是awaitAdance(int n)和arrive()。

  1. 用Phaser替代CyclicBarrier