控制并发流程

什么是控制并发流程

控制并发流程的工具类,作用就是帮助程序员更容易让线程之间进行合作,让线程之间相互配合,来满足业务需求,比如让线程A等待线程B执行完毕后再执行等合作策略

image-20200225092105118

CountDownLatch倒计时门闩

CountDownLatch类的作用

并发流程控制的工具

倒数门闩,例子:购物拼团,大巴(游乐园坐过山车排队),人满发车

流程:倒数结束之前,一直处于等待状态,直到倒计时结束了,此程序才继续工作

image-20200225093115937

类的主要方法介绍

CountDownLatch(int count)

仅有一个构造函数,参数count为需要倒数的数值

await()

调用await()方法的线程会被挂起,他会等待直到count值为0才继续执行

countDown()

将count值减1,直到为0时,等待的线程会被唤醒

图解await和countDown方法

image-20200225094810249

两种典型用法

用法一

一个线程等待多个线程都执行完毕,在继续执行自己的工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 描述: 工厂中,质检,5个工人检查,所有人都认为通过,才通过
*/
public class CountDownLatchDemo1 {

public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {

@Override
public void run() {
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("No." + no + "完成了检查。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
};
service.submit(runnable);
}
System.out.println("等待5个人检查完.....");
latch.await();
System.out.println("所有人都完成了工作,进入下一个环节。");
}
}

输出结果

1
2
3
4
5
6
7
等待5个人检查完.....
No.5完成了检查。
No.3完成了检查。
No.2完成了检查。
No.1完成了检查。
No.4完成了检查。
所有人都完成了工作,进入下一个环节。

用法二

多个线程等待某一个线程的信号,同时开始执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 描述: 模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。
*/
public class CountDownLatchDemo2 {

public static void main(String[] args) throws InterruptedException {
CountDownLatch begin = new CountDownLatch(1);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("No." + no + "准备完毕,等待发令枪");
try {
begin.await();
System.out.println("No." + no + "开始跑步了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
service.submit(runnable);
}
//裁判员检查发令枪...
Thread.sleep(5000);
System.out.println("发令枪响,比赛开始!");
begin.countDown();
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
No.1准备完毕,等待发令枪
No.2准备完毕,等待发令枪
No.3准备完毕,等待发令枪
No.4准备完毕,等待发令枪
No.5准备完毕,等待发令枪
发令枪响,比赛开始!
No.1开始跑步了
No.5开始跑步了
No.4开始跑步了
No.3开始跑步了
No.2开始跑步了

一等多与多等一结合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 描述: 模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。当所有人都到终点后,比赛结束。
*/
public class CountDownLatchDemo1And2 {

public static void main(String[] args) throws InterruptedException {
CountDownLatch begin = new CountDownLatch(1);

CountDownLatch end = new CountDownLatch(5);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("No." + no + "准备完毕,等待发令枪");
try {
begin.await();
System.out.println("No." + no + "开始跑步了");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("No." + no + "跑到终点了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
end.countDown();
}
}
};
service.submit(runnable);
}
//裁判员检查发令枪...
Thread.sleep(5000);
System.out.println("发令枪响,比赛开始!");
begin.countDown();

end.await();
System.out.println("所有人到达终点,比赛结束");
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
No.1准备完毕,等待发令枪
No.3准备完毕,等待发令枪
No.2准备完毕,等待发令枪
No.4准备完毕,等待发令枪
No.5准备完毕,等待发令枪
发令枪响,比赛开始!
No.1开始跑步了
No.2开始跑步了
No.3开始跑步了
No.5开始跑步了
No.4开始跑步了
No.3跑到终点了
No.4跑到终点了
No.1跑到终点了
No.2跑到终点了
No.s
所有人到达终点,比赛结束

注意点

扩展用法:多个线程等多个线程完成执行后,再同时执行

CountDownLatch是不能够重用的,如果需要重新计算,可以考虑使用CyclicBarrier或者创建新的CountDownLatch实例

总结

两个典型用法:一等多和多等一

CountDownLatch类在创建实例的时候,需要传递倒数次数,倒数到0的时候,之前等待的线程会继续执行

CountDownLatch不能回滚重置

Semaphore信号量

Semaphore可以用来限制或管理数量有限的资源的使用情况

例子:工厂污水排放,污染不能太多,污染许可证只能发3张

信号量的作用是维护一个“许可证”的计数,线程可以获取”许可证“,那信号量剩余的许可证就减一,线程也可以释放一个”许可证“,那信号量剩余的许可证就加一,当信号量所拥有的许可证数量为0,那么下一个还想要获取许可证的线程,就需要等待,直到有另外的线程释放了许可证

image-20200225103525920

Semaphore应用实例

image-20200225103946103

没有使用信号量

image-20200225104104867

使用信号量

正常情况下获取许可证

线程1

image-20200225104243810 image-20200225104317178

线程2

image-20200225104357416

线程3

image-20200225104708377

线程4

线程4就被挡住了

image-20200225104846249

直到线程1归还许可证

image-20200225105016379

更多的线程进来

image-20200225105211821

总结

image-20200225105250007

信号量使用流程

  1. 初始化Semaphore,并指定许可证的数量
  2. 在需要被执行的代码前加acquire()或者acquireUninterruptibly()方法,以acquire()为主
  3. 在任务执行结束后,调用release()来释放许可证

信号量主要方法介绍

new Semaphore(int permits,boolean fair)

这里可以设置是否要使用公平策略,如果传入true,那么Semaphore会把之前等待的线程放到FIFO的队列里,以便于当有了新的许可证时,可以分发给之前等了最长时间的线程

accquire()

获取许可

accquireUninterruptibly()

tryAcquire()

看现在有没有空闲的许可证,如果有的话就去获取,如果没有也不必进入阻塞,可以去做别的事,过一段时间再来查看许可证的空闲情况,返回的boolean类型

tryAcquire(timeout)

和tryAcquire()一样,但是多了一个超时时间,如在3秒内获取不到许可证,就去做别的事

release()

归还许可证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 描述: 演示Semaphore用法
*/
public class SemaphoreDemo {

static Semaphore semaphore = new Semaphore(5, true);

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(50);
for (int i = 0; i < 100; i++) {
service.submit(new Task());
}
service.shutdown();
}

static class Task implements Runnable {

@Override
public void run() {
try {
semaphore.acquire(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "拿到了许可证");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "释放了许可证");
semaphore.release(3);
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
pool-1-thread-1拿到了许可证
pool-1-thread-1释放了许可证
pool-1-thread-2拿到了许可证
pool-1-thread-2释放了许可证
pool-1-thread-3拿到了许可证
pool-1-thread-3释放了许可证
pool-1-thread-4拿到了许可证
pool-1-thread-4释放了许可证
pool-1-thread-5拿到了许可证
pool-1-thread-5释放了许可证
pool-1-thread-6拿到了许可证
pool-1-thread-6释放了许可证
pool-1-thread-7拿到了许可证
pool-1-thread-7释放了许可证
pool-1-thread-8拿到了许可证
pool-1-thread-8释放了许可证
pool-1-thread-9拿到了许可证
pool-1-thread-9释放了许可证
pool-1-thread-10拿到了许可证
pool-1-thread-10释放了许可证
pool-1-thread-11拿到了许可证
...

信号量的特殊用法

一次性获取或释放多个许可证

比如TaskA会调用很消耗资源的method1(),而TaskB()调用的是不太消耗资源的method2(),假设一共有5个许可证,那么就要求TaskA获取5个许可证才能执行,而TaskB只需要获取到一个许可证就能执行,这样就避免了A和B同时运行的情况,用户可以根据自己的需求合理分配资源

获取和释放的许可证数量必须一致,如果每次都获取2个但是只释放一个甚至不释放,随着时间的推移,到最后许可证的数量不够用,就会使程序卡死

虽然信号量类并不对是否和获取数量做规范,但是这是程序员的编程规范,否则容易报错

注意点

  1. 在初始化Semaphore的时候设置公平性,一般设置为true会更合理

  2. 并不是必须由获取许可证的线程释放那个许可证,事实上,获取和释放许可证对新安村并无要求,也许是A获取了,然后由B释放,只要逻辑合理即可

  3. 信号量的作用,除了处理临界区最多同时有N个线程访问外,另一个作用是可以实现“条件等待”,例如线程1需要在线程2完成准备工作后才开始工作,那么就线程1acquire(),而线程2完成任务后release(),这样的话,相当于轻量级的CountDownLatch

Condition接口

又称条件对象,是绑定在锁上的

作用

当线程1需要等待某个条件的时候,就去执行condition.await()方法,一旦执行了await()方法,线程1就会进入阻塞状态

这时通常会有另一个线程,假设是线程2,线程2去执行对应的条件,直到这个条件达成的时候,线程2就会去执行condition.signal()方法,这时JVM就会从被阻塞的线程里寻找等待该condition的线程,当线程1收到可执行的信号的时候,线程1的线程状态就会变成Runnable状态

image-20200225151524469

signalAll()和signal()区别

signalAll()会唤醒所有的正在等待的线程

signal()是公平的,只会唤醒等待时间最长的线程

代码演示

普通用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 描述: 演示Condition的基本用法
*/
public class ConditionDemo1 {
private ReentrantLock lock = new ReentrantLock();
//利用锁来新建Condition
private Condition condition = lock.newCondition();

void method1() throws InterruptedException {
lock.lock();
try{
System.out.println("条件不满足,开始await");
condition.await();
System.out.println("条件满足了,开始执行后续的任务");
}finally {
lock.unlock();
}
}

void method2() {
lock.lock();
try{
System.out.println("准备工作完成,唤醒其他的线程");
condition.signal();
}finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
ConditionDemo1 conditionDemo1 = new ConditionDemo1();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
conditionDemo1.method2();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
conditionDemo1.method1();
}
}

输出结果

1
2
3
条件不满足,开始await
准备工作完成,唤醒其他的线程
条件满足了,开始执行后续的任务

用Condition实现生产者消费者模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* 描述: 演示用Condition实现生产者消费者模式
*/
public class ConditionDemo2 {

private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();

public static void main(String[] args) {
ConditionDemo2 conditionDemo2 = new ConditionDemo2();
Producer producer = conditionDemo2.new Producer();
Consumer consumer = conditionDemo2.new Consumer();
producer.start();
consumer.start();
}

class Consumer extends Thread {

@Override
public void run() {
consume();
}

private void consume() {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("队列空,等待数据");
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
notFull.signalAll();
System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
} finally {
lock.unlock();
}
}
}
}

class Producer extends Thread {

@Override
public void run() {
produce();
}

private void produce() {
while (true) {
lock.lock();
try {
while (queue.size() == queueSize) {
System.out.println("队列满,等待有空余");
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1);
notEmpty.signalAll();
System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
} finally {
lock.unlock();
}
}
}
}

}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
向队列插入了一个元素,队列剩余空间9
向队列插入了一个元素,队列剩余空间8
向队列插入了一个元素,队列剩余空间7
向队列插入了一个元素,队列剩余空间6
向队列插入了一个元素,队列剩余空间5
向队列插入了一个元素,队列剩余空间4
向队列插入了一个元素,队列剩余空间3
向队列插入了一个元素,队列剩余空间2
向队列插入了一个元素,队列剩余空间1
向队列插入了一个元素,队列剩余空间0
队列满,等待有空余
从队列里取走了一个数据,队列剩余9个元素
从队列里取走了一个数据,队列剩余8个元素
从队列里取走了一个数据,队列剩余7个元素
从队列里取走了一个数据,队列剩余6个元素
从队列里取走了一个数据,队列剩余5个元素
从队列里取走了一个数据,队列剩余4个元素
从队列里取走了一个数据,队列剩余3个元素
从队列里取走了一个数据,队列剩余2个元素
从队列里取走了一个数据,队列剩余1个元素
从队列里取走了一个数据,队列剩余0个元素
队列空,等待数据
向队列插入了一个元素,队列剩余空间9
向队列插入了一个元素,队列剩余空间8
向队列插入了一个元素,队列剩余空间7
向队列插入了一个元素,队列剩余空间6
向队列插入了一个元素,队列剩余空间5
向队列插入了一个元素,队列剩余空间4
向队列插入了一个元素,队列剩余空间3
向队列插入了一个元素,队列剩余空间2
向队列插入了一个元素,队列剩余空间1
向队列插入了一个元素,队列剩余空间0
队列满,等待有空余
从队列里取走了一个数据,队列剩余9个元素
...

注意点

实际上,如果说Lock用来代替synchronized,那么Condition就是用来代替相应的Object.wait/notify的,所以在用法和性质上,几乎都是一样的

await()方法会自动释放持有的Lock锁,和Object.wait()一样,不需要自己动手先释放锁

调用await()的时候,必须持有锁,否则会抛出异常,和Object.wait()一样

CyclicBarrier循环栅栏

CyclicBarrier循环栅栏和CountDownLatch很类似,都能阻塞一组线程

当有大量线程相互配合的时候,分别计算不同任务,并且需要最后统一汇总的时候,可以使用CycliBarrier

CycliBarrier可以构造一个集结点,当某一个线程执行完毕,它就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就被撤销,所有线程再统一出发,继续执行剩下的任务

代码演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 描述: 演示CyclicBarrier
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("所有人都到场了, 大家统一出发!");
}
});
for (int i = 0; i < 10; i++) {
new Thread(new Task(i, cyclicBarrier)).start();
}
}

static class Task implements Runnable{
private int id;
private CyclicBarrier cyclicBarrier;

public Task(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println("线程" + id + "现在前往集合地点");
try {
Thread.sleep((long) (Math.random()*10000));
System.out.println("线程"+id+"到了集合地点,开始等待其他人到达");
cyclicBarrier.await();
System.out.println("线程"+id+"出发了");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
线程1现在前往集合地点
线程0现在前往集合地点
线程2现在前往集合地点
线程3现在前往集合地点
线程4现在前往集合地点
线程6现在前往集合地点
线程5现在前往集合地点
线程7现在前往集合地点
线程8现在前往集合地点
线程9现在前往集合地点
线程5到了集合地点,开始等待其他人到达
线程0到了集合地点,开始等待其他人到达
线程9到了集合地点,开始等待其他人到达
线程1到了集合地点,开始等待其他人到达
线程7到了集合地点,开始等待其他人到达
所有人都到场了, 大家统一出发!
线程7出发了
线程5出发了
线程9出发了
线程0出发了
线程1出发了
线程4到了集合地点,开始等待其他人到达
线程2到了集合地点,开始等待其他人到达
线程6到了集合地点,开始等待其他人到达
线程3到了集合地点,开始等待其他人到达
线程8到了集合地点,开始等待其他人到达
所有人都到场了, 大家统一出发!
线程8出发了
线程3出发了
线程6出发了
线程2出发了
线程4出发了

CyclicBarrier和CountDownLatch的区别

作用不同

CyclicBarrier要等固定数量的线程都达到类栅栏的位置才能继续执行,额CountDownLatch只需要等待数字为0,也就是说,CountDownLatch用于事件,但是CyclicBarrier用于线程

可重用性不同

CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用了,除非新建新的实例,而CyclicBarrier可以重复使用

文章目录
  1. 1. 控制并发流程
    1. 1.1. 什么是控制并发流程
    2. 1.2. CountDownLatch倒计时门闩
      1. 1.2.1. CountDownLatch类的作用
      2. 1.2.2. 类的主要方法介绍
      3. 1.2.3. 图解await和countDown方法
      4. 1.2.4. 两种典型用法
        1. 1.2.4.1. 用法一
        2. 1.2.4.2. 用法二
      5. 1.2.5. 总结
    3. 1.3. Semaphore信号量
      1. 1.3.1. Semaphore应用实例
      2. 1.3.2. 信号量使用流程
      3. 1.3.3. 信号量主要方法介绍
      4. 1.3.4. 信号量的特殊用法
      5. 1.3.5. 注意点
    4. 1.4. Condition接口
      1. 1.4.1. 作用
      2. 1.4.2. signalAll()和signal()区别
      3. 1.4.3. 代码演示
      4. 1.4.4. 注意点
    5. 1.5. CyclicBarrier循环栅栏
      1. 1.5.1. 代码演示
      2. 1.5.2. CyclicBarrier和CountDownLatch的区别
        1. 1.5.2.1. 作用不同
        2. 1.5.2.2. 可重用性不同
|