多线程三之线程8大核心基础

线程8大核心基础

实现多线程的方法到底有1种还是2种还是4种

网上说法

1种的观点,2种的观点,4种的观点,其他观点

正确说法

Oracle官网的文档是如何写的?

正确答案是2种

1
There are two ways to create a new thread of execution. One is to declare a class to be a subclass of Thread. This subclass should override the run method of class Thread. An instance of the subclass can then be allocated and started.
1
The other way to create a thread is to declare a class that implements the Runnable interface. That class then implements the run method. An instance of the class can then be allocated, passed as an argument when creating Thread, and started.

治理线程的第二法宝

Future和Callable

Runnable的缺陷

不能返回一个返回值

不能抛出checked Exception异常

Callable接口

类似于Runnable,被其他线程执行的任务

实现call方法

有返回值

看源码

Future类

作用

Callable和Future的关系

可以用Future.get()来获取Callable接口返回的执行结果,可以通过Future.isDone()来判断任务是否已经执行完,以及取消这个任务,限时获取任务的结果等

在call()未执行完毕之前,调用get()的线程会阻塞,直到call()方法返回了结果后,此时Future.get()才会得到该结果,然后主线程才会切换到Runnable状态

Future是一个存储器,它存储了call()这个任务的结果,而这个任务的执行时间是无法提前确定的,因为这完全取决于call()方法执行的情况

Future的主要方法

一共5个

get()

获取结果,get()方法的行为取决于Callable任务的执行状态,只有以下这5种状况

  1. 任务正常完成:get()方法会立即返回结果
  2. 任务尚未完成(任务还没开始或进行中):get()将阻塞并直接完成任务
  3. 任务执行过程中抛出Exception:get()方法会抛出ExecutionException,这里的抛出异常,是call()执行时产生的那个异常,这个异常类型是java.util.concurrent.Exception,不论call()执行时抛出的异常类型是什么,最后get方法抛出的异常类型都是ExectionException
  4. 任务被取消:get()方法会抛出CancellationException
  5. 任务超时:get()方法有一个重载方法,是传入一个延时时间。如果时间到了还没有获得结果,get方法就会抛出TimeoutException

get(long timeout,TimeUnit unit)

有超时时间的获取

超时的需求很常见

get(long timeout,TimeUnit unit)方法时,如果call()在规定时间内完成了任务,那么就会正常获取到返回值,如果在指定时间内没有计算出结果,就会抛出TimeoutException

超时不获取则任务需要取消

cancel()

取消任务的执行

isDone()

判断线程是否执行完毕

isCancelled()

判断是否被取消

get()基本用法

线程池的submit方法返回Future对象

首先要给线程池提交任务,提交过后线程池会立即返回一个空的Future容器,当线程的任务执行完毕,也就是可以获取结果的时候,线程池便会把该结果填入之前的Future中,而不是创建一个新的Future,此时便可以从该Future中获得任务的执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 描述: 演示一个Future的使用方法
*/
public class OneFuture {

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
Future<Integer> future = service.submit(new CallableTask());
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
service.shutdown();
}

static class CallableTask implements Callable<Integer> {

@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return new Random().nextInt();
}
}

多个任务用Future数组来获取结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 描述: 演示批量提交任务时,用List来批量接收结果
*/
public class MultiFutures {

public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(20);
ArrayList<Future> futures = new ArrayList<>();
for (int i = 0; i < 20; i++) {
Future<Integer> future = service.submit(new CallableTask());
futures.add(future);
}
Thread.sleep(5000);
for (int i = 0; i < 20; i++) {
Future<Integer> future = futures.get(i);
try {
Integer integer = future.get();
System.out.println(integer);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

static class CallableTask implements Callable<Integer> {

@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return new Random().nextInt();
}
}
}

任务执行过程中抛出异常Exception和isDone

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 描述: 演示get方法过程中抛出异常,for循环为了演示抛出Exception的时机:并不是说一产生异常就抛出,直到我们get执行时,才会抛出。
*/
public class GetException {

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(20);
Future<Integer> future = service.submit(new CallableTask());


try {
for (int i = 0; i < 5; i++) {
System.out.println(i);
Thread.sleep(500);
}
System.out.println(future.isDone());
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("InterruptedException异常");
} catch (ExecutionException e) {
e.printStackTrace();
System.out.println("ExecutionException异常");
}
}


static class CallableTask implements Callable<Integer> {

@Override
public Integer call() throws Exception {
throw new IllegalArgumentException("Callable抛出异常");
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
0
1
2
3
4
true
java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Callable抛出异常
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at future.GetException.main(GetException.java:26)
Caused by: java.lang.IllegalArgumentException: Callable抛出异常
ExecutionException异常

获取任务超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* 描述: 演示get的超时方法,需要注意超时后处理,调用future.cancel()。
* 演示cancel传入true和false的区别,future.cancel()代表是否中断正在执行的任务。
*/
public class Timeout {

private static final Ad DEFAULT_AD = new Ad("无网络时候的默认广告");
private static final ExecutorService exec = Executors.newFixedThreadPool(10);

static class Ad {

String name;

public Ad(String name) {
this.name = name;
}

@Override
public String toString() {
return "Ad{" +
"name='" + name + '\'' +
'}';
}
}


static class FetchAdTask implements Callable<Ad> {

@Override
public Ad call() throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("sleep期间被中断了");
return new Ad("被中断时候的默认广告");
}
return new Ad("旅游订票哪家强?找某程");
}
}


public void printAd() {
Future<Ad> f = exec.submit(new FetchAdTask());
Ad ad;
try {
ad = f.get(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
ad = new Ad("被中断时候的默认广告");
} catch (ExecutionException e) {
ad = new Ad("异常时候的默认广告");
} catch (TimeoutException e) {
ad = new Ad("超时时候的默认广告");
System.out.println("超时,未获取到广告");
boolean cancel = f.cancel(true);
System.out.println("cancel的结果:" + cancel);
}
exec.shutdown();
System.out.println(ad);
}

public static void main(String[] args) {
Timeout timeout = new Timeout();
timeout.printAd();
}
}

cancel()方法

取消任务的执行

  1. 如果这个任务还没有开始执行,任务会被正常取消,以后也不会被执行,方法返回true
  2. 如果任务已完成或者已取消,那么cancel()方法会执行失败,方法返回false
  3. 如果这个任务已经开始执行了,那么这个取消方法将不会直接取消该任务,而是会根据填的参数mayInterruptIfRunning做判断

Future.cancel(true)适用于

  1. 任务能够处理interrupt

Future.cancel(false)仅用于避免启动尚未启动的任务,适用于

  1. 未能处理interrupt的任务
  2. 不清楚任务是否支持取消
  3. 需要等待已经开始的任务执行完成

用FutureTasK来创建Future

用FutureTask来获取Future和任务的结果

FutureTask是一种包装器,可以把Callable转化成Future和Runnable,它同时实现二者的接口

image-20200227085312718

所以它即可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值

把Callable实例当做参数,生成FutureTask的对象,然后把这个对象当做一个Runnable对象,用线程池或另起一个线程去执行这个Runnable对象,最后通过FutureTask获取刚才执行的结果

注意点

当for循环批量获Future的结果时,容易发生一部分线程很慢的情况,get()方法调用时应该使用timeout限制

image-20200227090342711

Future的声明周期不能后退,生命周期只能前进,不能后退

控制并发流程

什么是控制并发流程

控制并发流程的工具类,作用就是帮助程序员更容易让线程之间进行合作,让线程之间相互配合,来满足业务需求,比如让线程A等待线程B执行完毕后再执行等合作策略

image-20200225092105118

CountDownLatch倒计时门闩

CountDownLatch类的作用

并发流程控制的工具

倒数门闩,例子:购物拼团,大巴(游乐园坐过山车排队),人满发车

流程:倒数结束之前,一直处于等待状态,直到倒计时结束了,此程序才继续工作

image-20200225093115937

类的主要方法介绍

CountDownLatch(int count)

仅有一个构造函数,参数count为需要倒数的数值

await()

调用await()方法的线程会被挂起,他会等待直到count值为0才继续执行

countDown()

将count值减1,直到为0时,等待的线程会被唤醒

图解await和countDown方法

image-20200225094810249

两种典型用法

用法一

一个线程等待多个线程都执行完毕,在继续执行自己的工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 描述: 工厂中,质检,5个工人检查,所有人都认为通过,才通过
*/
public class CountDownLatchDemo1 {

public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {

@Override
public void run() {
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("No." + no + "完成了检查。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
};
service.submit(runnable);
}
System.out.println("等待5个人检查完.....");
latch.await();
System.out.println("所有人都完成了工作,进入下一个环节。");
}
}

输出结果

1
2
3
4
5
6
7
等待5个人检查完.....
No.5完成了检查。
No.3完成了检查。
No.2完成了检查。
No.1完成了检查。
No.4完成了检查。
所有人都完成了工作,进入下一个环节。

用法二

多个线程等待某一个线程的信号,同时开始执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 描述: 模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。
*/
public class CountDownLatchDemo2 {

public static void main(String[] args) throws InterruptedException {
CountDownLatch begin = new CountDownLatch(1);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("No." + no + "准备完毕,等待发令枪");
try {
begin.await();
System.out.println("No." + no + "开始跑步了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
service.submit(runnable);
}
//裁判员检查发令枪...
Thread.sleep(5000);
System.out.println("发令枪响,比赛开始!");
begin.countDown();
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
No.1准备完毕,等待发令枪
No.2准备完毕,等待发令枪
No.3准备完毕,等待发令枪
No.4准备完毕,等待发令枪
No.5准备完毕,等待发令枪
发令枪响,比赛开始!
No.1开始跑步了
No.5开始跑步了
No.4开始跑步了
No.3开始跑步了
No.2开始跑步了

一等多与多等一结合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 描述: 模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。当所有人都到终点后,比赛结束。
*/
public class CountDownLatchDemo1And2 {

public static void main(String[] args) throws InterruptedException {
CountDownLatch begin = new CountDownLatch(1);

CountDownLatch end = new CountDownLatch(5);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("No." + no + "准备完毕,等待发令枪");
try {
begin.await();
System.out.println("No." + no + "开始跑步了");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("No." + no + "跑到终点了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
end.countDown();
}
}
};
service.submit(runnable);
}
//裁判员检查发令枪...
Thread.sleep(5000);
System.out.println("发令枪响,比赛开始!");
begin.countDown();

end.await();
System.out.println("所有人到达终点,比赛结束");
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
No.1准备完毕,等待发令枪
No.3准备完毕,等待发令枪
No.2准备完毕,等待发令枪
No.4准备完毕,等待发令枪
No.5准备完毕,等待发令枪
发令枪响,比赛开始!
No.1开始跑步了
No.2开始跑步了
No.3开始跑步了
No.5开始跑步了
No.4开始跑步了
No.3跑到终点了
No.4跑到终点了
No.1跑到终点了
No.2跑到终点了
No.s
所有人到达终点,比赛结束

注意点

扩展用法:多个线程等多个线程完成执行后,再同时执行

CountDownLatch是不能够重用的,如果需要重新计算,可以考虑使用CyclicBarrier或者创建新的CountDownLatch实例

总结

两个典型用法:一等多和多等一

CountDownLatch类在创建实例的时候,需要传递倒数次数,倒数到0的时候,之前等待的线程会继续执行

CountDownLatch不能回滚重置

Semaphore信号量

Semaphore可以用来限制或管理数量有限的资源的使用情况

例子:工厂污水排放,污染不能太多,污染许可证只能发3张

信号量的作用是维护一个“许可证”的计数,线程可以获取”许可证“,那信号量剩余的许可证就减一,线程也可以释放一个”许可证“,那信号量剩余的许可证就加一,当信号量所拥有的许可证数量为0,那么下一个还想要获取许可证的线程,就需要等待,直到有另外的线程释放了许可证

image-20200225103525920

Semaphore应用实例

image-20200225103946103

没有使用信号量

image-20200225104104867

使用信号量

正常情况下获取许可证

线程1

image-20200225104243810 image-20200225104317178

线程2

image-20200225104357416

线程3

image-20200225104708377

线程4

线程4就被挡住了

image-20200225104846249

直到线程1归还许可证

image-20200225105016379

更多的线程进来

image-20200225105211821

总结

image-20200225105250007

信号量使用流程

  1. 初始化Semaphore,并指定许可证的数量
  2. 在需要被执行的代码前加acquire()或者acquireUninterruptibly()方法,以acquire()为主
  3. 在任务执行结束后,调用release()来释放许可证

信号量主要方法介绍

new Semaphore(int permits,boolean fair)

这里可以设置是否要使用公平策略,如果传入true,那么Semaphore会把之前等待的线程放到FIFO的队列里,以便于当有了新的许可证时,可以分发给之前等了最长时间的线程

accquire()

获取许可

accquireUninterruptibly()

tryAcquire()

看现在有没有空闲的许可证,如果有的话就去获取,如果没有也不必进入阻塞,可以去做别的事,过一段时间再来查看许可证的空闲情况,返回的boolean类型

tryAcquire(timeout)

和tryAcquire()一样,但是多了一个超时时间,如在3秒内获取不到许可证,就去做别的事

release()

归还许可证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 描述: 演示Semaphore用法
*/
public class SemaphoreDemo {

static Semaphore semaphore = new Semaphore(5, true);

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(50);
for (int i = 0; i < 100; i++) {
service.submit(new Task());
}
service.shutdown();
}

static class Task implements Runnable {

@Override
public void run() {
try {
semaphore.acquire(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "拿到了许可证");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "释放了许可证");
semaphore.release(3);
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
pool-1-thread-1拿到了许可证
pool-1-thread-1释放了许可证
pool-1-thread-2拿到了许可证
pool-1-thread-2释放了许可证
pool-1-thread-3拿到了许可证
pool-1-thread-3释放了许可证
pool-1-thread-4拿到了许可证
pool-1-thread-4释放了许可证
pool-1-thread-5拿到了许可证
pool-1-thread-5释放了许可证
pool-1-thread-6拿到了许可证
pool-1-thread-6释放了许可证
pool-1-thread-7拿到了许可证
pool-1-thread-7释放了许可证
pool-1-thread-8拿到了许可证
pool-1-thread-8释放了许可证
pool-1-thread-9拿到了许可证
pool-1-thread-9释放了许可证
pool-1-thread-10拿到了许可证
pool-1-thread-10释放了许可证
pool-1-thread-11拿到了许可证
...

信号量的特殊用法

一次性获取或释放多个许可证

比如TaskA会调用很消耗资源的method1(),而TaskB()调用的是不太消耗资源的method2(),假设一共有5个许可证,那么就要求TaskA获取5个许可证才能执行,而TaskB只需要获取到一个许可证就能执行,这样就避免了A和B同时运行的情况,用户可以根据自己的需求合理分配资源

获取和释放的许可证数量必须一致,如果每次都获取2个但是只释放一个甚至不释放,随着时间的推移,到最后许可证的数量不够用,就会使程序卡死

虽然信号量类并不对是否和获取数量做规范,但是这是程序员的编程规范,否则容易报错

注意点

  1. 在初始化Semaphore的时候设置公平性,一般设置为true会更合理

  2. 并不是必须由获取许可证的线程释放那个许可证,事实上,获取和释放许可证对新安村并无要求,也许是A获取了,然后由B释放,只要逻辑合理即可

  3. 信号量的作用,除了处理临界区最多同时有N个线程访问外,另一个作用是可以实现“条件等待”,例如线程1需要在线程2完成准备工作后才开始工作,那么就线程1acquire(),而线程2完成任务后release(),这样的话,相当于轻量级的CountDownLatch

Condition接口

又称条件对象,是绑定在锁上的

作用

当线程1需要等待某个条件的时候,就去执行condition.await()方法,一旦执行了await()方法,线程1就会进入阻塞状态

这时通常会有另一个线程,假设是线程2,线程2去执行对应的条件,直到这个条件达成的时候,线程2就会去执行condition.signal()方法,这时JVM就会从被阻塞的线程里寻找等待该condition的线程,当线程1收到可执行的信号的时候,线程1的线程状态就会变成Runnable状态

image-20200225151524469

signalAll()和signal()区别

signalAll()会唤醒所有的正在等待的线程

signal()是公平的,只会唤醒等待时间最长的线程

代码演示

普通用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 描述: 演示Condition的基本用法
*/
public class ConditionDemo1 {
private ReentrantLock lock = new ReentrantLock();
//利用锁来新建Condition
private Condition condition = lock.newCondition();

void method1() throws InterruptedException {
lock.lock();
try{
System.out.println("条件不满足,开始await");
condition.await();
System.out.println("条件满足了,开始执行后续的任务");
}finally {
lock.unlock();
}
}

void method2() {
lock.lock();
try{
System.out.println("准备工作完成,唤醒其他的线程");
condition.signal();
}finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
ConditionDemo1 conditionDemo1 = new ConditionDemo1();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
conditionDemo1.method2();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
conditionDemo1.method1();
}
}

输出结果

1
2
3
条件不满足,开始await
准备工作完成,唤醒其他的线程
条件满足了,开始执行后续的任务

用Condition实现生产者消费者模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* 描述: 演示用Condition实现生产者消费者模式
*/
public class ConditionDemo2 {

private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();

public static void main(String[] args) {
ConditionDemo2 conditionDemo2 = new ConditionDemo2();
Producer producer = conditionDemo2.new Producer();
Consumer consumer = conditionDemo2.new Consumer();
producer.start();
consumer.start();
}

class Consumer extends Thread {

@Override
public void run() {
consume();
}

private void consume() {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("队列空,等待数据");
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
notFull.signalAll();
System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
} finally {
lock.unlock();
}
}
}
}

class Producer extends Thread {

@Override
public void run() {
produce();
}

private void produce() {
while (true) {
lock.lock();
try {
while (queue.size() == queueSize) {
System.out.println("队列满,等待有空余");
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1);
notEmpty.signalAll();
System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
} finally {
lock.unlock();
}
}
}
}

}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
向队列插入了一个元素,队列剩余空间9
向队列插入了一个元素,队列剩余空间8
向队列插入了一个元素,队列剩余空间7
向队列插入了一个元素,队列剩余空间6
向队列插入了一个元素,队列剩余空间5
向队列插入了一个元素,队列剩余空间4
向队列插入了一个元素,队列剩余空间3
向队列插入了一个元素,队列剩余空间2
向队列插入了一个元素,队列剩余空间1
向队列插入了一个元素,队列剩余空间0
队列满,等待有空余
从队列里取走了一个数据,队列剩余9个元素
从队列里取走了一个数据,队列剩余8个元素
从队列里取走了一个数据,队列剩余7个元素
从队列里取走了一个数据,队列剩余6个元素
从队列里取走了一个数据,队列剩余5个元素
从队列里取走了一个数据,队列剩余4个元素
从队列里取走了一个数据,队列剩余3个元素
从队列里取走了一个数据,队列剩余2个元素
从队列里取走了一个数据,队列剩余1个元素
从队列里取走了一个数据,队列剩余0个元素
队列空,等待数据
向队列插入了一个元素,队列剩余空间9
向队列插入了一个元素,队列剩余空间8
向队列插入了一个元素,队列剩余空间7
向队列插入了一个元素,队列剩余空间6
向队列插入了一个元素,队列剩余空间5
向队列插入了一个元素,队列剩余空间4
向队列插入了一个元素,队列剩余空间3
向队列插入了一个元素,队列剩余空间2
向队列插入了一个元素,队列剩余空间1
向队列插入了一个元素,队列剩余空间0
队列满,等待有空余
从队列里取走了一个数据,队列剩余9个元素
...

注意点

实际上,如果说Lock用来代替synchronized,那么Condition就是用来代替相应的Object.wait/notify的,所以在用法和性质上,几乎都是一样的

await()方法会自动释放持有的Lock锁,和Object.wait()一样,不需要自己动手先释放锁

调用await()的时候,必须持有锁,否则会抛出异常,和Object.wait()一样

CyclicBarrier循环栅栏

CyclicBarrier循环栅栏和CountDownLatch很类似,都能阻塞一组线程

当有大量线程相互配合的时候,分别计算不同任务,并且需要最后统一汇总的时候,可以使用CycliBarrier

CycliBarrier可以构造一个集结点,当某一个线程执行完毕,它就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就被撤销,所有线程再统一出发,继续执行剩下的任务

代码演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 描述: 演示CyclicBarrier
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("所有人都到场了, 大家统一出发!");
}
});
for (int i = 0; i < 10; i++) {
new Thread(new Task(i, cyclicBarrier)).start();
}
}

static class Task implements Runnable{
private int id;
private CyclicBarrier cyclicBarrier;

public Task(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println("线程" + id + "现在前往集合地点");
try {
Thread.sleep((long) (Math.random()*10000));
System.out.println("线程"+id+"到了集合地点,开始等待其他人到达");
cyclicBarrier.await();
System.out.println("线程"+id+"出发了");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
线程1现在前往集合地点
线程0现在前往集合地点
线程2现在前往集合地点
线程3现在前往集合地点
线程4现在前往集合地点
线程6现在前往集合地点
线程5现在前往集合地点
线程7现在前往集合地点
线程8现在前往集合地点
线程9现在前往集合地点
线程5到了集合地点,开始等待其他人到达
线程0到了集合地点,开始等待其他人到达
线程9到了集合地点,开始等待其他人到达
线程1到了集合地点,开始等待其他人到达
线程7到了集合地点,开始等待其他人到达
所有人都到场了, 大家统一出发!
线程7出发了
线程5出发了
线程9出发了
线程0出发了
线程1出发了
线程4到了集合地点,开始等待其他人到达
线程2到了集合地点,开始等待其他人到达
线程6到了集合地点,开始等待其他人到达
线程3到了集合地点,开始等待其他人到达
线程8到了集合地点,开始等待其他人到达
所有人都到场了, 大家统一出发!
线程8出发了
线程3出发了
线程6出发了
线程2出发了
线程4出发了

CyclicBarrier和CountDownLatch的区别

作用不同

CyclicBarrier要等固定数量的线程都达到类栅栏的位置才能继续执行,额CountDownLatch只需要等待数字为0,也就是说,CountDownLatch用于事件,但是CyclicBarrier用于线程

可重用性不同

CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用了,除非新建新的实例,而CyclicBarrier可以重复使用

多线程一之并发编程的基石

并发编程的基石之多线程概念建立

image-20200131110622023

什么是线程、进程

操作系统、进程、线程的包含关系

操作系统是包含多个进程的容器,而每个进程又都是容纳多个线程的容器

单核CPU可以执行多个进程,是因为操作系统的调度

cpu里的一个核,在某一个时间点只能处理一个进程中的一个线程,正是因为这样,所以单核处理器无法并行执行,而多核处理器可以并行

并行并发串行针对进程里的线程之间的运行状态来说的,而同步异步阻塞非阻塞是针对执行线程的被调用者和调用者来说的,两者没有联系

Oracle文档的官方定义

进程:使用fork(2)系统调用创建的UNIX环境(例如文件描述符、用户ID等),它被设置为运行程序

线程:在进程上下文中执行的一系列指令

并发工具类

并发工具类

并发工具类的分类

1. 为了并发安全的工具类

而为了并发安全 的工具类可以分为3个小部分

1)互斥同步

最简单的理解就是锁,一个线程获取资源的时候,另一个线程不能获取资源,线程之间相互敌对的

2)非互斥同步

可以两个线程同时获取资源,不要求资源独占,最典型的就是原子类

3)无同步方案

包含ThreadLocal、final关键字

并发容器精讲

面试杀手锏

ConcurrentHashMap、CopyOnWriteArrayList、阻塞队列

并发容器

并发容器概览

ConcurrentHashMap:线程安全的HashMap

CopyOnWriteArrayList:线程安全的List

BlockingQueue:这是一个接口,表示阻塞队列,非常适合用于作为数据共享的通道

ConcurrentLinkedQueue:高效的非阻塞并发队列,使用链表实现。可以看做一个线程安全的LinkedList

ConcurrentSkipListMap:是一个Map,使用跳表的数据结构进行快速查询

古老和过时的同步容器

Vector和Hashtable

Vector

是jdk早期的一部分,目标也是提供一个线程安全的集合类,但是随着jdk的发展已经不能满足需求了,性能不够好

1
2
3
4
5
6
7
8
9
public class VectorDemo {

public static void main(String[] args) {
Vector<String> vector = new Vector<>();
vector.add("test");
System.out.println(vector.get(0));
}

}

输出结果

1
test

Vector中的很多方法都是由synchronized修饰的,在多线程下线程性能不够好

Hashtable

1
2
3
4
5
6
7
8
public class HashtableDemo {
public static void main(String[] args) {
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("学完以后跳槽涨薪幅度", "80%");
System.out.println(hashtable.get("学完以后跳槽涨薪幅度"));
}

}

输出结果

1
80%

同样,Hashtable中的很多方法都是由synchronized修饰的,在多线程下线程性能不够好

HashMap和ArrayList

虽然这两个类不是线程安全的,但是可以用Collections.synchronizedList(new ArrayList\<E>())Collections.synchronizedMap(new HashMap<K,V>)使之变成线程安全的

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 描述:演示Collections.synchronizedList(new ArrayList<E>())
*/
public class SynList {
public static void main(String[] args) {
List<Integer> list = Collections.synchronizedList(new ArrayList<Integer>());
list.add(5);
System.out.println(list.get(0));

Map<Object, Object> objectObjectMap = Collections.synchronizedMap(new HashMap<>());
}
}

同样使用synchronized来修饰,与上面的方法差不多

ConcurrentHashMap和CopyOnWriteArrayList

取代同步的HashMap和同步的ArrayList

绝大多数并发情况下,ConcurrentHashMap和CopyOnWriteArrayList的性能都更好

ConcurrentHashMap

重点、面试常考

Map简介

HashMap、Hashtabe、LinkedHashMap、TreeMap

HashMap

HashMap会根据键的HashCode来存储,由于可以直接算出HashCode的值,所以可以直接定位到需要找到的位置,它的访问速度是非常快的,允许键(key)为null来写入的,值也可以是null,但是是线程不安全的

Hashtable

是历史遗留类,很多功能与HashMap是一致的,但是线程安全的,任何时刻只有一个线程能对它进行操作,不建议使用

LinkedHashMap

HashMap的一个子类,保存了记录的插入顺序,在遍历的时候有用,遍历的顺序与插入的顺序一致

TreeMap

由于实现了SortedMap接口,所以可以根据键来排序,默认是升序,也可以自定义排序条件

以上一种实现都要求key是不可变对象,即key在创建后它的hash值不会改变

Map的基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 描述: 演示Map的基本用法
*/
public class MapDemo {
public static void main(String[] args) {
Map<String, Integer> map = new HashMap<>();
System.out.println(map.isEmpty());
map.put("东哥", 38);
map.put("西哥", 28);
System.out.println(map.keySet());
System.out.println(map.get("西哥"));
System.out.println(map.size());
System.out.println(map.containsKey("东哥"));
map.remove("东哥");
System.out.println(map.containsKey("东哥"));

}
}

输出结果

1
2
3
4
5
6
true
[东哥, 西哥]
28
2
true
false

为什么需要ConcurrentHashMap

为什么不用Collections.synchronizedMap()

它是通过锁来保证线程的安全访问的,但是由于synchronized对于并发量高的时候并不理想,所以不采用这样的方法

为什么HashMap是线程不安全的

注意:HashMap本来就不是用于并发的

同时put碰撞导致数据丢失

如果多个线程同时put,如果计算出来的hash值是一样的话,那么这几个key会放到同一个位置,但是多个线程放在同一个位置,只能保留一个线程的数据,其他线程的数据都会丢失的,所以会丢失一部分数据

同时put扩容导致数据丢失

如果多个线程同时put,并且发现需要同时扩容,那么扩容之后的数组,也只有一个会保留下来,所以也会造成某些数据的丢失

死循环造成CPU100%

HashMap在高并发下的死循环(仅在JDK7及以前存在)

原因:在多个线程同时扩容的时候会造成链表的死循环,这样一来就没有尽头了

HashMap的分析

image-20200223222538740

image-20200223222640457

红黑树

对二叉查找树BST的一种平衡策略,会自动平衡,防止极端不平衡从而影响查找效率的情况发生

特点

每个节点要么是红色的,要么是黑色的,但根节点永远是黑色的

红色节点不能连续,即红色节点的孩子和父亲都不能是红色

从任意节点到其子树中每个叶子节点的路径都包含相同数量的黑色节点

所有的叶结点都是黑色的

HashMap关于并发的特点

  1. 非线程安全
  2. 迭代时不允许改变内容
  3. 只读的并发是安全的
  4. 如果一定要把HashMap用在并发环境,用Collections.synchronizedMap(new HashMap())

JDK1.7的ConcurrentHashMap实现和分析

image-20200223230321759
  1. Java7中的ConcurrentHashMap最外层是多个segment,每个segment的底层数据结构与HashMap类似,仍然是数组和链表组成的拉链法

  2. 每个segment设置了独立的ReentrantLock锁,每个segment之间互不影响,提高了并发效率

  3. ConcurrentHashMap默认有16个Segments,所以最多可以同时支持16个线程并发的写(操作分别分布在不同的Segment),这个默认值可以在初始化的时候设置为其他值,但是一旦初始化以后,是不可以扩容的

JDK1.8的ConcurrentHashMap实现和分析

image-20200224095604376

putVal()流程

  1. 判断key、value不为空
  2. 计算hash值
  3. 根据对应位置节点的类型来赋值,或者helpTransfer,或者增长链表,或者给红黑树增加节点
  4. 检查满足阈值就红黑树化
  5. 返回oldVal

get()流程

  1. 计算hash值
  2. 找到对应的位置,根据情况进行:直接取值、或红黑树里找值、或遍历链表取值
  3. 返回找到的结果

为什么要把1.7结构改成1.8的结构

数据结构

Hash碰撞

保证并发安全

为什么超过8要转为红黑树

默认不是红黑树节点,默认是链表的形式,这是因为它所占用的内存更少

想要达到冲突为8是很难的,如果达到8就会转为红黑树,依然能提高效率

组合操作

ConcurrentHashMap为什么也不是线程安全的

replace操作

putIfAbsent

if(!map.containsKey(key)){

​ return map.put(key,value);

}else{

​ return map.get(key);

}

实际生产案例

线程安全问题需要时刻注意

CopyOnWriteArrayList

诞生的历史和原因

在JDK5引入,是List中最主要的并发工具,用来代替Vector和SynchronizedList,就和ConcurrentHashMap代替SynchronizedMap的原因是一样的

Vector和SynchronizedList的锁的粒度太大,并发效率相对比较低,并且迭代时无法编辑

CopyOnWrite并发容器还包括CopyOnWriteArraySet,用来替代同步Set

适用场景

读操作尽可能快,而写即使慢也没有太大关系

读多写少:黑名单、每日更新、监听器,这些的迭代操作远多于修改操作

读写原则

回顾读写锁,读读共享,其他都互斥(写写互斥,读写互斥,写读互斥)

读写锁规则的升级:读取是完全不用加锁的,并且更厉害的是写入也不会阻塞读取操作,只用写入和写入之间要进行同步等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 描述:演示CopyOnWriteArrayList可以在迭代的过程中修改数组内容,但是ArrayList不行,对比
*/
public class CopyOnWriteArrayListDemo1 {

public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
//CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");

Iterator<String> iterator = list.iterator();

while (iterator.hasNext()) {
System.out.println("list is" + list);
String next = iterator.next();
System.out.println(next);

if (next.equals("2")) {
list.remove("5");
}
if (next.equals("3")) {
list.add("3 found");
}
}
}
}

输出结果

1
2
3
4
5
6
7
8
list is[1, 2, 3, 4, 5]
1
list is[1, 2, 3, 4, 5]
2
list is[1, 2, 3, 4]
Exception in thread "main" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
at java.util.ArrayList$Itr.next(ArrayList.java:859)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 描述:演示CopyOnWriteArrayList可以在迭代的过程中修改数组内容,但是ArrayList不行,对比
*/
public class CopyOnWriteArrayListDemo1 {

public static void main(String[] args) {
//ArrayList<String> list = new ArrayList<>();
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");

Iterator<String> iterator = list.iterator();

while (iterator.hasNext()) {
System.out.println("list is" + list);
String next = iterator.next();
System.out.println(next);

if (next.equals("2")) {
list.remove("5");
}
if (next.equals("3")) {
list.add("3 found");
}
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
list is[1, 2, 3, 4, 5]
1
list is[1, 2, 3, 4, 5]
2
list is[1, 2, 3, 4]
3
list is[1, 2, 3, 4, 3 found]
4
list is[1, 2, 3, 4, 3 found]
5

实现原理

CopyOnWrite的含义

创建新副本,读写分离

不可变原理

迭代时候

缺点

数据一致性问题

CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性,所以如果希望写入的数据马上能读到,就不要使用CopyOnWrite容器

内存占用问题

因为CopyOnWrite的写是复制机制,所以在进行写操作的时候,内存里会同时保存两个对象的内存

源码分析

数据结构

get

add

并发队列Queue

阻塞队列、非阻塞队列

为什么要使用队列

用队列可以在线程间传递数据:生产者消费者模式、银行转账

考虑锁等线程安全问题的重任从用户转移到了队列上

并发队列简介

Queue与BlockingQueue是JDK5中新增的,Queue只用来保存一组等待处理的数据,它会有很多种实现,底层是LinkedList,Queue只能从头或者尾来获取数据,不能从中间获取数据

BlockingQueue增加了可阻塞的插入和获取操作,若队列为空,取的操作会一直阻塞,直到里面有了数据,如果队列满了,插入也插入不进去,也会一直阻塞,直到有用户把其中的数据取出来,这体现了生产者消费者模式

各并发队列关系图

image-20200224174734549

阻塞队列

BlockingQueue

什么是阻塞队列

阻塞队列是具有阻塞功能的队列,所以它首先是一个队列,其次是具有阻塞功能

通常,阻塞队列的一端是给生产者放数据用,另一端给消费者拿数据用,阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的

image-20200224185451780

阻塞功能:最有特色的两个带有阻塞功能的方法是take()方法和put()方法

take()方法:获取并移除队列的头结点,一旦在执行take的时候,队列里无数据,则阻塞,直到队列里有数据

image-20200224191055479

put()方法:插入元素,但是如果队列已满,那么就无法继续插入,并且阻塞,直到队列里有了空闲空间

image-20200224192035440

是否有界(容量有多大)

这是一个非常重要的属性,无界队列意味着里面可以容纳非常多的数据,Integer.MAX_VALUE,约为2的31次方,可以近似认为是无限容量

线程池与阻塞队列的关系:阻塞队列是线程池的重要组成部分

BlockingQueue的主要方法

put、take(会阻塞住)

add、remove、element(会抛出异常)

offer(添加元素)、poll(取出元素)、peek

主要方法介绍

ArrayBlockingQueue

有界

指定容量

公平:可以指定是否需要保证公平,如果要保证公平的话,那么等待了最长时间的线程就会被优先处理,不过这同时会带来一定的性能损耗

使用案例

有10个面试者,但只有1个面试官,大厅里有三个位置休息,每个人面试时间是10秒,模拟所有人面试的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* 描述: TODO
*/
public class ArrayBlockingQueueDemo {


public static void main(String[] args) {

ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);

Interviewer r1 = new Interviewer(queue);
Consumer r2 = new Consumer(queue);
new Thread(r1).start();
new Thread(r2).start();
}
}

class Interviewer implements Runnable {

BlockingQueue<String> queue;

public Interviewer(BlockingQueue queue) {
this.queue = queue;
}

@Override
public void run() {
System.out.println("10个候选人都来啦");
for (int i = 0; i < 10; i++) {
String candidate = "Candidate" + i;
try {
queue.put(candidate);
System.out.println("安排好了" + candidate);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
queue.put("stop");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class Consumer implements Runnable {

BlockingQueue<String> queue;

public Consumer(BlockingQueue queue) {

this.queue = queue;
}

@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String msg;
try {
while(!(msg = queue.take()).equals("stop")){
System.out.println(msg + "到了");
Thread.sleep(10000);
}
System.out.println("所有候选人都结束了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
10个候选人都来啦
安排好了Candidate0
安排好了Candidate1
安排好了Candidate2
安排好了Candidate3
Candidate0到了
Candidate1到了
安排好了Candidate4
Candidate2到了
安排好了Candidate5
Candidate3到了
安排好了Candidate6
Candidate4到了
安排好了Candidate7
Candidate5到了
安排好了Candidate8
Candidate6到了
安排好了Candidate9
Candidate7到了
Candidate8到了
Candidate9到了
所有候选人都结束了

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

LinkedBlockingQueue

无界,容量能达到Integer.MAX_VALUE

内部结构:Node、两把锁

put()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

PriorityBlockingQueue

支持优先级

自然排序,而不是先进先出

无界队列

是PriorityQueue的线程安全版本

SynchronousQueue

容量为0,值得注意的是,SynchronousQueue的容量不是1而是0,因为SynchronousQueue不需要持有元素,它所做的就是直接传递(direct handoff),因此效率很高

image-20200225082849924

SynchronousQueue没有peek函数,因为peek的含义是取出头结点,但是SynchronousQueue的容量为0,所以连头结点都没有,也就没有peek方法,同理,没有iterate相关方法

因此它是一个极好的用来直接传递的并发数据结构

SynchronousQueue是线程池Executors.newCachedThreadPool()使用的阻塞队列

非阻塞队列

ConcurrentLinkedQueue

并发包中的非阻塞队列只有ConcurrentLinkedQueue这一种,ConcurrentLinkedQueue是使用链表作为其数据结构的,使用CAS非阻塞算法来实现线程安全(不具备阻塞功能),适合用在对性能要求较高的并发场景,用的相对比较少

源码中的offer体现了CAS的思想,内有p.casNext方法,用了Unsafe.compareAndSwapObject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}

如何选择适合自己的队列

边界

空间

吞吐量

各并发容器总结

Java.util.concurrent包提供的容器分为3类,Concurrent*、CopyOnWrite*、Blocking*

Concurrent*的特点是大部分通过CAS实现并发的

CopyOnWrite*则是通过复制一份原数据来实现的

Blocking*通过AQS实现的

项目实战

高性能缓存

从0开始迭代,一步步设计并实现

缓存是在实际生产中非常常用的工具,使用缓存可以避免重复计算,提高吞吐量

最初级的缓存可以用一个Map来实现,但一个功能完备,性能强劲的缓存,需要考虑的点就非常多了,从最简单的HashMap入手,一点点提高项目缓存的性能

最简单缓存

HashMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 描述:最简单的缓存形式:HashMap
*/
public class ImoocCache1 {

private final HashMap<String,Integer> cache = new HashMap<>();

public Integer computer(String userId) throws InterruptedException {
Integer result = cache.get(userId);
//先检查HashMap里面有没有保存过之前的计算结果
if (result == null) {
//如果缓存中找不到,那么需要现在计算一下结果,并且保存到HashMap中
result = doCompute(userId);
cache.put(userId, result);
}
return result;
}

private Integer doCompute(String userId) throws InterruptedException {
TimeUnit.SECONDS.sleep(5);
return new Integer(userId);
}

public static void main(String[] args) throws InterruptedException {
ImoocCache1 imoocCache1 = new ImoocCache1();
System.out.println("开始计算了");
Integer result = imoocCache1.computer("13");
System.out.println("第一次计算结果:"+result);
result = imoocCache1.computer("13");
System.out.println("第二次计算结果:"+result);

}
}

这样的写法有一些问题

在多线程的情况下是并发不安全的

可以在此情况下进行升级

使用synchronized实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 描述:最简单的缓存形式:HashMap
*/
public class ImoocCache1 {

private final HashMap<String,Integer> cache = new HashMap<>();

public synchronized Integer computer(String userId) throws InterruptedException {
Integer result = cache.get(userId);
//先检查HashMap里面有没有保存过之前的计算结果
if (result == null) {
//如果缓存中找不到,那么需要现在计算一下结果,并且保存到HashMap中
result = doCompute(userId);
cache.put(userId, result);
}
return result;
}

private Integer doCompute(String userId) throws InterruptedException {
TimeUnit.SECONDS.sleep(5);
return new Integer(userId);
}

public static void main(String[] args) throws InterruptedException {
ImoocCache1 imoocCache1 = new ImoocCache1();
System.out.println("开始计算了");
Integer result = imoocCache1.computer("13");
System.out.println("第一次计算结果:"+result);
result = imoocCache1.computer("13");
System.out.println("第二次计算结果:"+result);

}
}

使用synchronized有如下缺点

性能差

代码复用性差

给HashMap加final关键字

属性被声明为final后,该变量则只能被赋值一次,一旦被赋值了,fianl变量就不能被改变

解决代码复用性

使用装饰者模式

假设ExpensiveFunction类是耗时计算的实现类,实现了Computable接口,但是其本身不具有缓存的功能,也不考虑缓存的事情

1
2
3
4
5
6
7
/**
* 描述:有一个计算函数computer,用来代表耗时计算,每个计算器都要实现这个接口,这样就可以无侵入实现缓存功能
*/
public interface Computable <A,V>{

V compute(A arg) throws Exception;
}
1
2
3
4
5
6
7
8
9
10
11
/**
* 描述:耗时计算的实现类,实现了Computable接口,但是本身不具备缓存能力,不需要考虑缓存的事情
*/
public class ExpensiveFunction implements Computable<String, Integer>{

@Override
public Integer compute(String arg) throws Exception {
Thread.sleep(5000);
return Integer.valueOf(arg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 描述: 用装饰者模式,给计算器自动添加缓存功能
*/
public class ImoocCache2<A,V> implements Computable<A,V> {

private final Map<A, V> cache = new HashMap();

private final Computable<A,V> c;

public ImoocCache2(Computable<A, V> c) {
this.c = c;
}

@Override
public synchronized V compute(A arg) throws Exception {
System.out.println("进入缓存机制");
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}

public static void main(String[] args) throws Exception {
ImoocCache2<String, Integer> expensiveComputer = new ImoocCache2<>(
new ExpensiveFunction());
Integer result = expensiveComputer.compute("666");
System.out.println("第一次计算结果:"+result);
result = expensiveComputer.compute("13");
System.out.println("第二次计算结果:"+result);
}
}

不足

当多个线程同时想计算的时候,需要慢慢等待,严重的时候甚至比不用缓存性能更差

性能待优化

引出锁性能优化经验,缩小锁的粒度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 描述: 缩小了synchronized的粒度,提高性能,但是依然并发不安全
*/
public class ImoocCache4<A, V> implements Computable<A, V> {

private final Map<A, V> cache = new HashMap();

private final Computable<A, V> c;

public ImoocCache4(Computable<A, V> c) {
this.c = c;
}

@Override
public V compute(A arg) throws Exception {
System.out.println("进入缓存机制");
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
synchronized (this) {
cache.put(arg, result);
}
}
return result;
}

public static void main(String[] args) throws Exception {
ImoocCache4<String, Integer> expensiveComputer = new ImoocCache4<>(
new ExpensiveFunction());
Integer result = expensiveComputer.compute("666");
System.out.println("第一次计算结果:" + result);
result = expensiveComputer.compute("666");
System.out.println("第二次计算结果:" + result);
}
}

虽然提高并发效率,但是并不意味着线程安全,还需要考虑同时读写等情况

使用ConcurrentHashMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ImoocCache5<A, V> implements Computable<A, V> {

private final Map<A, V> cache = new ConcurrentHashMap<>();

private final Computable<A, V> c;

public ImoocCache5(Computable<A, V> c) {
this.c = c;
}

@Override
public V compute(A arg) throws Exception {
System.out.println("进入缓存机制");
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}

public static void main(String[] args) throws Exception {
ImoocCache5<String, Integer> expensiveComputer = new ImoocCache5<>(
new ExpensiveFunction());
Integer result = expensiveComputer.compute("666");
System.out.println("第一次计算结果:" + result);
result = expensiveComputer.compute("666");
System.out.println("第二次计算结果:" + result);
}
}

此时也存在缺点

在计算完成前,另一个要求计算相同值的请求到来,会导致计算两遍,这和缓存想避免多次计算的初衷恰恰相反

如图

image-20200227104708583

避免重复计算

Future和Callable的使用

现在不同的线程进来之后,确实可以同时计算,但是如果两个线程相差无几的进来请求同一个数据,就会出现重复计算,若是更多的线程请求同样的内容,却都需要重新计算,则会造成更大的浪费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* 描述: 利用Future,避免重复计算
*/
public class ImoocCache7<A, V> implements Computable<A, V> {

private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();

private final Computable<A, V> c;

public ImoocCache7(Computable<A, V> c) {
this.c = c;
}

@Override
public V compute(A arg) throws Exception {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> callable = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<>(callable);
f = ft;
cache.put(arg, ft);
System.out.println("从FutureTask调用了计算函数");
ft.run();
}
return f.get();
}

public static void main(String[] args) throws Exception {
ImoocCache7<String, Integer> expensiveComputer = new ImoocCache7<>(
new ExpensiveFunction());
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveComputer.compute("666");
System.out.println("第一次的计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveComputer.compute("666");
System.out.println("第三次的计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = expensiveComputer.compute("667");
System.out.println("第二次的计算结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();


}
}

以上方法依然存在重复计算的可能

如果有两个同时计算666的线程,同时调用cache.get()方法,那么返回的结果都为null,后面还是会创建两个任务去计算相同的值

image-20200227110045352

计算抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 描述: 耗时计算的实现类,有概率计算失败
*/
public class MayFail implements Computable<String, Integer>{

@Override
public Integer compute(String arg) throws Exception {
double random = Math.random();
if (random > 0.5) {
throw new IOException("读取文件出错");
}
Thread.sleep(3000);
return Integer.valueOf(arg);
}
}

原子类

原子类

什么是原子类有什么作用

不可分割

一个操作是不可中断的,即使是多线程的情况也可以保证

java.util.concurrent.atomic

原子类的作用和锁类似,是为了保证并发情况下线程安全,不过原子类相比于锁,有一定的优势

原子类粒度更细,原子变量可以把竞争范围缩小到变量级别,这是用户可以获得的最细粒度的情况,通常所得粒度都要大于原子变量的粒度

原子类效率更高,通常使用原子类的效率会比使用锁的效率更高,除了高度竞争的情况

ThreadLocal详解

ThreadLocal详解

两大使用场景

典型场景1

每个线程需要一个独享的对象的时候

即每个Thread类中有自己的实例副本,而各个线程之间的实例之间是不共享的,如同教材只有一本,一起做笔记就会有线程安全问题,使并发读写带来数据的不一致,而使用ThreadLocal相当于把书本复印多份,这样每个用户使用自己的教材,就不会出问题了。而每一本书(每一个实例)都是由当前的线程访问并使用,其他线程是无法访问,不是共用的,这样就解决了线程问题

通常是工具类,典型需要使用的类有SimpleDateFormat和Random,这两个类都是线程不安全的

|