线程池:一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作。
线程池作用:
- 降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
- 提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
- 提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
- 线程池的核心思想:线程复用,同一个线程可以被重复使用,来处理多个任务
- 池化技术 (Pool) :一种编程技巧,核心思想是资源复用,在请求量大时能优化应用性能,降低系统频繁建连的资源开销
线程池ThreadPoolExecutor
线程池:一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作。
线程池作用:
- 降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
- 提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
- 提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
- 线程池的核心思想:线程复用,同一个线程可以被重复使用,来处理多个任务
- 池化技术 (Pool) :一种编程技巧,核心思想是资源复用,在请求量大时能优化应用性能,降低系统频繁建连的资源开销
构造方法
构造方法:
1
2
3
4
5
6
7public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)参数介绍:
- corePoolSize:核心线程数,定义了最小可以同时运行的线程数量
- maximumPoolSize:最大线程数,当队列中存放的任务达到队列容量时,当前可以同时运行的数量变为最大线程数,创建线程并立即执行最新的任务,与核心线程数之间的差值(maximumPoolSize - corePoolSize)又叫救急线程数
- keepAliveTime:救急线程最大存活时间,当线程池中的线程数量大于
corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的救急线程不会立即销毁,而是会等到keepAliveTime
时间超过销毁 - unit:keepAliveTime 参数的时间单位
- workQueue:阻塞队列,存放被提交但尚未被执行的任务
- threadFactory:线程工厂,创建新线程时用到,可以为线程创建时起名字
- handler:拒绝策略,线程到达最大线程数且阻塞队列已满,此时仍有新任务来时会执行拒绝策略
- RejectedExecutionHandler 下有 4 个实现类:
- AbortPolicy:让调用者抛出 RejectedExecutionException 异常,默认策略
- CallerRunsPolicy:让调用者运行的调节机制,将某些任务回退到调用者,从而降低新任务的流量
- DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常
- DiscardOldestPolicy:放弃队列中最早的任务,把当前任务加入队列中尝试再次提交当前任务
- RejectedExecutionHandler 下有 4 个实现类:
根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池。
- Executors 提供了四种线程池的创建:
newFixedThreadPool
、newCachedThreadPool
、newSingleThreadExecutor
、newScheduledThreadPool
工作原理
- 创建线程池,这时没有创建线程(懒惰),等待提交过来的任务请求,调用 execute 方法才会创建线程
- 当调用 execute() 方法添加一个请求任务时,线程池会做如下判断:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务(当前线程数 < corePoolSize)
- 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列(当前线程数 >= corePoolSize && 阻塞队列没满)
- 如果这时队列满了且正在运行的线程数量还小于 maximumPoolSize,那么会创建救急线程立刻运行这个任务,这对于阻塞队列中的任务不公平。因为创建每个 Worker(线程)对象会绑定一个初始任务,启动 Worker 时会优先执行,而没有先执行阻塞队列中的等待任务(阻塞队列满 && 当前线程数 < maximumPoolSize)
- 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行(阻塞队列满 && 当前线程数 >= maximumPoolSize)
- 当一个线程完成任务时,会从队列中取下一个任务来执行
- 当一个线程空闲超过一定的时间(keepAliveTime)时,线程池会判断:如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉,所以线程池的所有任务完成后最终会收缩到 corePoolSize 大小
提交任务
ExecutorService 类 API:
方法 说明 void execute(Runnable command) 执行任务(Executor 类 API) Future<?> submit(Runnable task) 提交任务 task() Future submit(Callable task) 提交任务 task,用返回值 Future 获得任务执行结果 List invokeAll(Collection<? extends Callable> tasks) 提交 tasks 中所有任务 List invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) 提交 tasks 中所有任务,超时时间针对所有task,超时会取消没有执行完的任务,并抛出超时异常 T invokeAny(Collection<? extends Callable> tasks) 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消 - execute 和 submit 都属于线程池的方法,对比:
- execute 只能执行 Runnable 类型的任务,没有返回值; submit 既能提交 Runnable 类型任务也能提交 Callable 类型任务,底层是封装成 FutureTask,然后调用 execute 执行
- execute 会直接抛出任务执行时的异常,submit 会吞掉异常,可通过 Future 的 get 方法将任务执行时的异常重新抛出
- execute 和 submit 都属于线程池的方法,对比:
关闭线程池
ExecutorService 类 API:
方法 说明 void shutdown() 线程池状态变为 SHUTDOWN,等待任务执行完后关闭线程池,不会接收新任务,但已提交任务会执行完,而且也可以添加线程(不绑定任务) List shutdownNow() 线程池状态变为 STOP,用 interrupt 中断正在执行的任务,直接关闭线程池,不会接收新任务,会将队列中的任务返回 boolean isShutdown() 不在 RUNNING 状态的线程池,此执行者已被关闭,方法返回 true boolean isTerminated() 线程池状态是否是 TERMINATED,如果所有任务在关闭后完成,返回 true boolean awaitTermination(long timeout, TimeUnit unit) 调用 shutdown 后,由于调用 shutdown 的线程不会等待所有任务运行结束才向下运行,而是立刻继续向下运行,所以如果它想在线程池的状态为 TERMINATED 后才做些事情,可以利用此方法等待 ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量。这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 CAS 原子操作进行赋值。
状态表示:
1
2
3
4
5
6
7
8
9
10// 111 000000000000000000,转换成整数后其实就是一个【负数】
private static final int RUNNING = -1 << COUNT_BITS;
// 000 000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;状态 高3位 接收新任务 处理阻塞任务队列 说明 RUNNING 111 Y Y SHUTDOWN 000 N Y 不接收新任务,但处理阻塞队列剩余任务 STOP 001 N N 中断正在执行的任务,并抛弃阻塞队列任务 TIDYING 010 - - 任务全执行完毕,活动线程为 0 即将进入终结 TERMINATED 011 - - 终止状态
newFixedThreadPool
newFixedThreadPool:创建一个拥有 n 个线程的线程池
1
2
3
4public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- LinkedBlockingQueue 是一个单向链表实现的阻塞队列,默认大小为
Integer.MAX_VALUE
,也就是无界队列,可以放任意数量的任务,在任务比较多的时候会导致 OOM(内存溢出) - 适用于任务量已知,相对耗时的长期任务
newCachedThreadPool
newCachedThreadPool:创建一个可扩容的线程池
1
2
3
4public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}- 核心线程数是 0, 最大线程数是 29 个 1(二进制),全部都是救急线程(60s 后可以回收),可能会创建大量线程,从而导致 OOM
- SynchronousQueue 作为阻塞队列,没有容量,对于每一个 take 的线程会阻塞直到有一个 put 的线程放入元素为止(类似一手交钱、一手交货)
- 适合任务数比较密集,但每个任务执行时间较短的情况
newSingleThreadExecutor
newSingleThreadExecutor:创建一个只有 1 个线程的单线程池
1
2
3
4
5public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}- 保证所有任务按照指定顺序执行,线程数固定为 1,任务数多于 1 时会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放
单线程池对比单线程:
- 创建一个单线程来执行任务,如果任务执行出现异常,那么线程会终止,不会再执行后续提交的其他任务,而线程池会再新建一个线程,保证池的正常工作,保证提交的其他任务也能被执行
newScheduledThreadPool
- 如果我们想延时执行任务,可以使用 “任务调度线程池”。在 “任务调度线程池” 功能加入之前,可以使用
java.util.Timer
来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。使用 “任务调度线程池” 可以解决这个问题。
1 | public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { |
任务调度线程池 ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor:
- 使用内部类 ScheduledFutureTask 封装任务
- 使用内部类 DelayedWorkQueue 作为线程池队列
- 重写 onShutdown 方法去处理 shutdown 后的任务
- 提供 decorateTask 方法作为 ScheduledFutureTask 的修饰方法,以便开发者进行扩展
常用 API:
ScheduledFuture<?> schedule(Runnable/Callable<V>, long delay, TimeUnit u)
:延迟执行任务,但是出现异常并不会在控制台打印,也不会影响其他线程的执行ScheduledFuture<?> scheduleAtFixedRate(Runnable/Callable<V>, long initialDelay, long period, TimeUnit unit)
:定时执行周期任务,不考虑执行的耗时,参数为初始延迟时间、间隔时间、单位,(period = 下一次任务开始执行的时间 - 这一次任务开始执行的时间),如果任务的执行时间 > period,则下一次任务会紧挨着这一次任务结束后执行。1
2
3
4
5
6
7
8
9
10
11
12
13
14public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
System.out.println("start..." + new Date());
pool.scheduleAtFixedRate(() -> {
System.out.println("running..." + new Date());
Thread.sleep(2000);
}, 1, 1, TimeUnit.SECONDS);
}
/*start...Sat Apr 24 18:08:12 CST 2021 启动
running...Sat Apr 24 18:08:13 CST 2021 延时1s后任务开始执行
running...Sat Apr 24 18:08:15 CST 2021 任务执行耗时2s,因为任务执行耗时 > period,所以下一次任务紧接着就开始执行
running...Sat Apr 24 18:08:17 CST 2021ScheduledFuture<?> scheduleWithFixedDelay(Runnable/Callable<V>, long initialDelay, long delay, TimeUnit unit)
:定时执行周期任务,考虑执行的耗时,参数为初始延迟时间、间隔时间、单位,(delay = 下一次任务开始执行的时间 - 这一次任务结束执行的时间)1
2
3
4
5
6
7
8
9
10
11
12
13public static void main(String[] args){
ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
System.out.println("start..." + new Date());
pool.scheduleWithFixedDelay(() -> {
System.out.println("running..." + new Date());
Thread.sleep(2000);
}, 1, 1, TimeUnit.SECONDS);
}
/*start...Sat Apr 24 18:11:41 CST 2021 启动
running...Sat Apr 24 18:11:42 CST 2021 延时1s后开始执行
running...Sat Apr 24 18:11:45 CST 2021 任务执行耗时2s,加上delay的1s,所以下一次任务在3s后开始执行
running...Sat Apr 24 18:11:48 CST 2021
如何处理任务执行时出现的异常:
- 在任务执行中就用 try-catch 捕捉异常,相当于是任务自己处理异常
- 通过返回的 FutureTask 得到异常,如果任务执行出现异常,FutureTask 中封装的就是异常信息