并发工具类

并发工具类

并发工具类的分类

1. 为了并发安全的工具类

而为了并发安全 的工具类可以分为3个小部分

1)互斥同步

最简单的理解就是锁,一个线程获取资源的时候,另一个线程不能获取资源,线程之间相互敌对的

2)非互斥同步

可以两个线程同时获取资源,不要求资源独占,最典型的就是原子类

3)无同步方案

包含ThreadLocal、final关键字

image-20200202123810331

image-20200202124043979

2. 管理线程、提高效率的工具类

image-20200202124220778

3. 用于线程协助的工具类

image-20200202124257072

线程池–治理线程的法宝

线程池的自我介绍

线程池的重要性

线程池的提问是层层递进的,线程的很多方面很多注意点都可以作为考点

什么是“池”

软件中的“池”,可以理解为计划经济,可以复用线程,可以控制总量

如果不使用线程池,每个任务都新建一个线程处理,在线程数较少的时候可以使用for循环创建线程(任务数量为10的时候),但是如果线程数量上升到1000的时候,虽然可以完成,但是会开销太大

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package threadpool;

import java.util.concurrent.Executors;

/**
* 描述:TODO
*/
public class ForLoop {

public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
Thread thread = new Thread(new Task());
thread.start();
}
}

static class Task implements Runnable {

@Override
public void run() {
System.out.println("执行了任务");
}
}

}

这样开销太大,我们希望有固定数量的线程,来执行这1000个线程,这样就避免了反复创建并销毁线程所带来的开销问题

为什么要使用线程池

问题1:反复创建线程开销大

问题2:过多的线程会占用太多内存

为了解决以上两个问题

用少量的线程,避免内存占用过多

让这部分线程都保持工作,且可以反复执行任务,避免声明周期的损耗

线程池的好处

1.加快响应速度

2.合理利用CPU和内存

3.统一管理资源

线程池适合应用的场景

1.服务器

服务器接受到大量请求时,使用线程池技术非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率

2.实际开发中

在开发中,如果需要创建个以上的线程,那么就可以使用线程池来管理

创建和停止线程池

线程池构造函数的参数

参数名 类型 含义
corePoolSize int 核心线程数
maxPoolSize int 最大线程数
keepAliveTime long 保持存活时间
workQueue BlockingQueue 任务存储队列,通常是阻塞队列类型
threadFactory ThreadFactory 当线程池需要新的线程的时候,会使用threadFactory来生成新的线程
Handler RejectedExecutionHandler 由于线程池无法接受你所提交的任务的拒绝策略
参数中的corePoolSize和maxPoolSize

corePoolSize指的是核心线程数:线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务来时,再创建新线程去执行任务

线程池有可能会在核心线程数的基础上额外增加一些线程,但是这些新增加的线程数有一个上限,这就是最大maxPoolSize

1
2
3
maximumPoolSize的说明

在课程中,maximumPoolSize和maxPoolSize的含义相同,不做额外区分。实际上,在ThreadPoolExecutor类的参数中,变量名是maximumPoolSize;不过在org.springframework.scheduling.concurrent包的ThreadPoolExecutorFactoryBean类等其他类中,也有使用maxPoolSize作为参数名的情况,我们直接理解为maximumPoolSize和maxPoolSize是相同的就可以了
添加线程规则

image-20200121130316190

  1. 如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务
  2. 如果线程数等于(或大于)corePoolSize但少于maximumPoolSize,则将任务放入队列
  3. 如果队列已满,并且线程数小于maxPoolSize,则创建一个新线程来运行任务
  4. 如果队列已满,并且线程数大于或等于maxPoolSize,则拒绝该任务

image-20200121132100492

是否需要增加线程的判断顺序是

  • corePoolSize
  • workQueue
  • maxPoolSize

举个例子

线程池:核心池大小为5,最大池大小为10,队列为100

因为线程中的请求最多会创建5个,然后任务将被添加到队列中,直到达到100。当队列已满时,将创建最新的线程maxPoolSize,最多到10个线程,如果再来任务就拒绝

增减线程的特点
  1. 通过设置corePoolSize和maximunPoolSize相同,就可以创建固定大小的线程池
  2. 实际上线程池希望保持较少的线程数,只有在负载变得很大时才增加它的数量
  3. 通过设置maximumPoolSize为很高的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务(队列有界限但是线程数没有界限)
  4. 是只有在队列填满时才创建多余corePoolSize的线程,所以如果你使用的是无界队列(例如LinedBlockingQueue),那么线程数就不会超过corePoolSize(队列没有界限)
keepAliveTime

如果线程池当前的线程数多于corePoolSize,那么如果多余的线程空闲时间超多keepAliveTime,它们就会被终止回收

ThreadFactory

专门用来创建线程

新的线程是由ThreadFactory创建的,默认使用Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级,并且都不是守护线程。如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等。

Executors.defaultThreadFactory()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

通常我们用默认的ThreadFactory就可以

工作队列

有3中最常见的队列类型:

  1. 直接交接:SynchronousQueue

    如果任务不是特别多的话,只是把任务通过队列进行简单的中转,来交给线程处理,就可以使用这个队列,这个队列本身是没有容量的,所以使用这个队列时maxPoolSize要设置大一些

  2. 无界队列:LinkedBlockingQuere

    此队列不会被塞满,可以防止流量突增,但也有问题,处理的速度跟不上提交的速度,就会造成内存浪费,并且可能造成OOM异常

  3. 有界队列:ArrayBlockingQueue

    可以设置队列大小,此时maxPoolSize就有意义了,当队列满了之后就需要创建新的线程

  4. 延迟队列:DelayWorkQueue

  5. workStealingPool

停止线程池的正确方法

1shutdown

把线程池关闭

运行这个方法不一定使线程停止,实际上这个方法只是初始化整个关闭过程,因为线程在执行到一半的时候,或者线程中有正在执行的任务,包括队列中有等待执行的任务,此时不是说停就停的。在运行这个方法之后,线程池就知道了我们想让他停止,此时线程池会将之前没有执行完的任务执行完再关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 描述:演示关闭线程池
*/
public class ShutDown {

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);

executorService.shutdown();
executorService.execute(new ShutDownTask());
}
}

class ShutDownTask implements Runnable {

@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
pool-1-thread-7
pool-1-thread-4
pool-1-thread-6
pool-1-thread-1
pool-1-thread-5
pool-1-thread-3
pool-1-thread-9
pool-1-thread-2
pool-1-thread-8
pool-1-thread-10
pool-1-thread-10
pool-1-thread-5
pool-1-thread-6
pool-1-thread-1
pool-1-thread-3
pool-1-thread-9
pool-1-thread-2
pool-1-thread-8
pool-1-thread-7
pool-1-thread-4
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task threadpool.ShutDownTask@7ea987ac rejected from java.util.concurrent.ThreadPoolExecutor@12a3a380[Shutting down, pool size = 10, active threads = 10, queued tasks = 70, completed tasks = 20]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at threadpool.ShutDown.main(ShutDown.java:20)
pool-1-thread-4
pool-1-thread-5
pool-1-thread-10

报出RejectedExecutionException错误,说明在执行executorService.execute(new ShutDownTask());的时候已经遭到拒绝,不能再执行新的线程,shutdown起到了效果

2isShutdown

线程池是否已经停止的状态

并不是完全停止,而是是否进入停止的状态,如果执行shutdown,这个方法就会返回true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ShutDown {

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
System.out.println(executorService.isShutdown());
executorService.shutdown();
System.out.println(executorService.isShutdown());

}
}

class ShutDownTask implements Runnable {


@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
false
true
pool-1-thread-1
pool-1-thread-4
pool-1-thread-2
pool-1-thread-3
3isTerminated

线程池是否完全终止,当正在执行的任务,队列里的任务都清空了,才会是isTerminated

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ShutDown {

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
System.out.println(executorService.isShutdown());
executorService.shutdown();
System.out.println(executorService.isShutdown());
System.out.println(executorService.isTerminated());

}
}

class ShutDownTask implements Runnable {


@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
pool-1-thread-10
pool-1-thread-8
pool-1-thread-9
false
true
false
pool-1-thread-6
pool-1-thread-10
pool-1-thread-2
4awaitTermination

检测一段时间内是否线程终止执行,如果没有都执行完,则返回false,如果都执行完则返回true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ShutDown {

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
boolean b = executorService.awaitTermination(3l, TimeUnit.SECONDS);
System.out.println(b);
}
}

class ShutDownTask implements Runnable {


@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
pool-1-thread-7
pool-1-thread-2
pool-1-thread-5
pool-1-thread-6
pool-1-thread-1
pool-1-thread-3
false
pool-1-thread-7
pool-1-thread-10
pool-1-thread-9
pool-1-thread-8
pool-1-thread-4
pool-1-thread-2
pool-1-thread-5
shutdownNow

立刻停止线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ShutDown {

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
//放进线程但是没有执行的任务,可以记录下来以后执行
List<Runnable> runnables = executorService.shutdownNow();
}
}

class ShutDownTask implements Runnable {


@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
pool-1-thread-5
pool-1-thread-3
pool-1-thread-4
pool-1-thread-10
pool-1-thread-1
pool-1-thread-2
pool-1-thread-8
pool-1-thread-7
pool-1-thread-4
pool-1-thread-8
pool-1-thread-7
pool-1-thread-6
pool-1-thread-2
pool-1-thread-1
pool-1-thread-3
pool-1-thread-9
pool-1-thread-5
pool-1-thread-10
pool-1-thread-9被中断了
pool-1-thread-2被中断了
pool-1-thread-6被中断了
pool-1-thread-10被中断了
pool-1-thread-5被中断了
pool-1-thread-7被中断了
pool-1-thread-8被中断了
pool-1-thread-3被中断了
pool-1-thread-1被中断了
pool-1-thread-4被中断了

常见线程池的特点和用法

FixedThreadPool

CachedThreadPool

image-20200202165854428

ScheduledThreadPool

SingleThreadExecutor

线程池应该手动创建还是自动创建

手动创建更好,因为这样可以让我们更加明确线程池的运行规则,避免资源耗尽的风险

自动创建线程(也就是直接调用JDK封装好的构造函数)可能带来的问题

自动创建线程

①newFixedThreadPool

newFixedThreadPool使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 描述:演示newFixedThreadPool
*/
public class FixedThreadPoolTest {

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
}
}

class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}

newFixedThreadPool类

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

ThreadPoolExecutor中第一个参数nThreads是用户传进去的线程数corePoolSize,第二个参数nThreads是maxPoolSize,即corePoolSize与maxPoolSize相等,第三次参数是存活时间0L,第四个参数用来表示存活时间单位,第五个参数是队列类型,由于传进去的LinkedBlockingQueue是没有容量上限的,所以当请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,可能会导致OOM

可能出错的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* 描述:演示newFixedThreadPool出错的情况
*/
public class FixedThreadPoolOOM {

private static ExecutorService executorService = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executorService.execute(new SubThread());
}
}

}

class SubThread implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

JVM设置

image-20200202161637484

输出结果

1
2
3
4
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at threadpool.FixedThreadPoolOOM.main(FixedThreadPoolOOM.java:14)

②newSingleThreadExecutor

此线程池中只有一个线程

newSingleThreadExecutor使用

1
2
3
4
5
6
7
8
9
10
11
/**
* 描述:TODO
*/
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
}
}

newSingleThreadExecutor类

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

可以看出,这里和newFixedThreadPool的原理基本一样,只不过把线程数的核心数量和最大数量都直接设置成1,同样也传入一个无限容量的队列,所以这也会导致同样的问题,也就是当请求堆积的时候,可能会占用大量的内存

③newCachedThreadPool

可以缓存

特点:无界线程池,具有自动回收多余线程的功能

原理图

image-20200122111013945

newCachedThreadPool的使用

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 描述: TODO
*/
public class CachedThreadPool {

public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
}
}

newCachedThreadPool类

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

这里的弊端在于第二个参数maximumPoolSize被设置为Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至导致OOM

④newScheduledThreadPool

支持定时及周期性任务执行的线程池

方式1:等待5秒之后进行执行线程

1
2
3
4
5
6
7
8
9
10
/**
* 描述: TODO
*/
public class ScheduledThreadPoolTest {
public static void main(String[] args) {
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
threadPool.schedule(new Task(), 5, TimeUnit.SECONDS);
//threadPool.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);
}
}

方式2:以一定频率重复执行

每隔3秒执行一次

1
2
3
4
5
6
7
8
9
10
11
/**
* 描述: TODO
*/
public class ScheduledThreadPoolTest {
public static void main(String[] args) {
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
//threadPool.schedule(new Task(), 5, TimeUnit.SECONDS);
//1是等待1秒后开始执行,3是间隔3秒执行
threadPool.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);
}
}

正确的创建线程池的方法

根据不同的业务场景,自己设置线程池参数,比如内存有多大,给线程取什么名字等等

线程池里的线程数量设定为多少比较合适

CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1-2倍左右

耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推荐的计算方法

结合以上两种得出结论

合适线程数=CPU核心数*(1+平均等待时间、平均工作时间)

总结

以上4种线程池的构造函数的参数

image-20200122212028580

阻塞队列分析

1

FixedThreadPool和SingleThreadExecutor的Queue是LinkedBlockingQueue?

corePoolSize是有限的,就不得不设置阻塞队列是无线的

2

CachedThreadPool使用的Queue是SynchronousQueue?

maxPoolSize是无限的,所以可以使用SynchronousQueue,任务过来直接使用线程执行就行

3

对ScheduledThreadPool来说,它使用的是延迟队列DelayWorkQueue,延迟队列的能力就是把里面的任务根据时间先后去做延时

4

workStealingPool是JDK1.8加入的

这个线程池和之前的都有很大不同

1)这里的任务可以产生子任务的时候才适用,比如说树的遍历,处理矩阵的时候

2)使用这个线程池有一定的窃取能力,执行顺序不能保证

任务太多,怎么拒绝

拒绝时机

  1. 当Executor关闭时,提交新任务会被拒绝

  2. 当Executor对最大线程和工作队列容量使用有限边界并已经饱和时

image-20200202222225941

4种拒绝策略

当任务源源不断的过来,而我们的系统又处理不过来的时候,我们要采取的策略是拒绝服务。RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略

  1. CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); }}
    这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)
  2. AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionException
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException();}
    这种策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接收的,记得做好记录)
  3. DiscardPolicy:不能执行的任务将被删除
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
    这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
  4. DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {e.getQueue().poll();e.execute(r); }}
    该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。

钩子方法,给线程池加点料

可以在每个任务执行的执行前后添加日志、统计等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* 描述:演示每个任务执行前后放钩子函数
*/
public class PauseableThreadPool extends ThreadPoolExecutor {

private final ReentrantLock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition();
private boolean isPaused;


public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}

public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
handler);
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}

public void resume() {
lock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("线程池被暂停了");
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("线程池被恢复了");

}
}

实现原理、源码分析

线程池组成部分

线程池管理器

工作线程

任务队列

任务接口(Task)

image-20200203002538726

Executor家族

线程池、ThreadPoolExecutor、ExecutorService、Executor、Executors等这么多和线程池相关的类,之间有什么关系

image-20200203004145317

Executor接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Executor {

/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}

execute方法就是用来执行任务的

ExecutorService接口

具有初步管理线程池的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public interface ExecutorService extends Executor {
void shutdown();

List<Runnable> shutdownNow();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
public abstract class AbstractExecutorService implements ExecutorService {

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

/**
* the main mechanics of invokeAny.
*/
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);

// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.

try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();

// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;

for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}

if (ee == null)
ee = new ExecutionException();
throw ee;

} finally {
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));

final long deadline = System.nanoTime() + nanos;
final int size = futures.size();

// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}

for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

}

Executors类

是工具类

image-20200203005800921

线程池实现任务复用的原理

核心:用相同的线程执行不同的任务

线程池的状态

RUNNING:接受新任务并处理排队任务

SHUTDOWN:不接受新任务,但处理排队任务

STOP:不接受任务,也不处理排队任务,中断正在进行的任务

TIDYING:中文是整洁,理解中文就容易理解这个状态,所有的任务 都已经终止,workerCount为零时,线程转换到TIDYING状态,并将运行terminate()钩子方法

TERMINATED:terminate()运行完成

使用线程池的注意点

避免任务堆积

避免线程数过度增加

排查线程泄露

文章目录
  1. 1. 并发工具类
    1. 1.1. 并发工具类的分类
      1. 1.1.0.1. 1. 为了并发安全的工具类
      2. 1.1.0.2. 2. 管理线程、提高效率的工具类
      3. 1.1.0.3. 3. 用于线程协助的工具类
  2. 1.2. 线程池–治理线程的法宝
    1. 1.2.1. 线程池的自我介绍
      1. 1.2.1.1. 线程池的重要性
      2. 1.2.1.2. 什么是“池”
      3. 1.2.1.3. 为什么要使用线程池
      4. 1.2.1.4. 线程池的好处
      5. 1.2.1.5. 线程池适合应用的场景
    2. 1.2.2. 创建和停止线程池
      1. 1.2.2.1. 线程池构造函数的参数
        1. 1.2.2.1.1. 参数中的corePoolSize和maxPoolSize
        2. 1.2.2.1.2. 添加线程规则
        3. 1.2.2.1.3. 增减线程的特点
        4. 1.2.2.1.4. keepAliveTime
        5. 1.2.2.1.5. ThreadFactory
        6. 1.2.2.1.6. 工作队列
      2. 1.2.2.2. 停止线程池的正确方法
        1. 1.2.2.2.1. 1shutdown
        2. 1.2.2.2.2. 2isShutdown
        3. 1.2.2.2.3. 3isTerminated
        4. 1.2.2.2.4. 4awaitTermination
        5. 1.2.2.2.5. shutdownNow
    3. 1.2.3. 常见线程池的特点和用法
      1. 1.2.3.1. 线程池应该手动创建还是自动创建
      2. 1.2.3.2. 线程池里的线程数量设定为多少比较合适
      3. 1.2.3.3. 总结
      4. 1.2.3.4. 阻塞队列分析
    4. 1.2.4. 任务太多,怎么拒绝
      1. 1.2.4.1. 拒绝时机
      2. 1.2.4.2. 4种拒绝策略
    5. 1.2.5. 钩子方法,给线程池加点料
    6. 1.2.6. 实现原理、源码分析
      1. 1.2.6.1. 线程池组成部分
      2. 1.2.6.2. Executor家族
    7. 1.2.7. 线程池实现任务复用的原理
      1. 1.2.7.1. 线程池的状态
    8. 1.2.8. 使用线程池的注意点
|