0%

线程池ThreadPoolExecutor

  • 线程池:一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作。

    线程池作用:

    1. 降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
    2. 提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
    3. 提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
    • 线程池的核心思想:线程复用,同一个线程可以被重复使用,来处理多个任务
    • 池化技术 (Pool) :一种编程技巧,核心思想是资源复用,在请求量大时能优化应用性能,降低系统频繁建连的资源开销

线程池ThreadPoolExecutor

  • 线程池:一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作。

    线程池作用:

    1. 降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
    2. 提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
    3. 提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
    • 线程池的核心思想:线程复用,同一个线程可以被重复使用,来处理多个任务
    • 池化技术 (Pool) :一种编程技巧,核心思想是资源复用,在请求量大时能优化应用性能,降低系统频繁建连的资源开销

构造方法

  • 构造方法:

    1
    2
    3
    4
    5
    6
    7
    public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler)

    参数介绍:

    • corePoolSize:核心线程数,定义了最小可以同时运行的线程数量
    • maximumPoolSize:最大线程数,当队列中存放的任务达到队列容量时,当前可以同时运行的数量变为最大线程数,创建线程并立即执行最新的任务,与核心线程数之间的差值(maximumPoolSize - corePoolSize)又叫救急线程数
    • keepAliveTime:救急线程最大存活时间,当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的救急线程不会立即销毁,而是会等到 keepAliveTime 时间超过销毁
    • unit:keepAliveTime 参数的时间单位
    • workQueue:阻塞队列,存放被提交但尚未被执行的任务
    • threadFactory:线程工厂,创建新线程时用到,可以为线程创建时起名字
    • handler:拒绝策略,线程到达最大线程数且阻塞队列已满,此时仍有新任务来时会执行拒绝策略
      • RejectedExecutionHandler 下有 4 个实现类:
        • AbortPolicy:让调用者抛出 RejectedExecutionException 异常,默认策略
        • CallerRunsPolicy:让调用者运行的调节机制,将某些任务回退到调用者,从而降低新任务的流量
        • DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常
        • DiscardOldestPolicy:放弃队列中最早的任务,把当前任务加入队列中尝试再次提交当前任务

    根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池。

    • Executors 提供了四种线程池的创建:newFixedThreadPoolnewCachedThreadPoolnewSingleThreadExecutornewScheduledThreadPool

工作原理

img

  1. 创建线程池,这时没有创建线程(懒惰),等待提交过来的任务请求,调用 execute 方法才会创建线程
  2. 当调用 execute() 方法添加一个请求任务时,线程池会做如下判断:
    • 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务(当前线程数 < corePoolSize)
    • 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列(当前线程数 >= corePoolSize && 阻塞队列没满)
    • 如果这时队列满了且正在运行的线程数量还小于 maximumPoolSize,那么会创建救急线程立刻运行这个任务,这对于阻塞队列中的任务不公平。因为创建每个 Worker(线程)对象会绑定一个初始任务,启动 Worker 时会优先执行,而没有先执行阻塞队列中的等待任务(阻塞队列满 && 当前线程数 < maximumPoolSize)
    • 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行(阻塞队列满 && 当前线程数 >= maximumPoolSize)
  3. 当一个线程完成任务时,会从队列中取下一个任务来执行
  4. 当一个线程空闲超过一定的时间(keepAliveTime)时,线程池会判断:如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉,所以线程池的所有任务完成后最终会收缩到 corePoolSize 大小

提交任务

  • ExecutorService 类 API:

    方法说明
    void execute(Runnable command)执行任务(Executor 类 API)
    Future<?> submit(Runnable task)提交任务 task()
    Future submit(Callable task)提交任务 task,用返回值 Future 获得任务执行结果
    ListinvokeAll(Collection<? extends Callable> tasks)提交 tasks 中所有任务
    ListinvokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit)提交 tasks 中所有任务,超时时间针对所有task,超时会取消没有执行完的任务,并抛出超时异常
    T invokeAny(Collection<? extends Callable> tasks)提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
    • execute 和 submit 都属于线程池的方法,对比:
      • execute 只能执行 Runnable 类型的任务,没有返回值; submit 既能提交 Runnable 类型任务也能提交 Callable 类型任务,底层是封装成 FutureTask,然后调用 execute 执行
      • execute 会直接抛出任务执行时的异常,submit 会吞掉异常,可通过 Future 的 get 方法将任务执行时的异常重新抛出

关闭线程池

  • ExecutorService 类 API:

    方法说明
    void shutdown()线程池状态变为 SHUTDOWN,等待任务执行完后关闭线程池,不会接收新任务,但已提交任务会执行完,而且也可以添加线程(不绑定任务)
    List shutdownNow()线程池状态变为 STOP,用 interrupt 中断正在执行的任务,直接关闭线程池,不会接收新任务,会将队列中的任务返回
    boolean isShutdown()不在 RUNNING 状态的线程池,此执行者已被关闭,方法返回 true
    boolean isTerminated()线程池状态是否是 TERMINATED,如果所有任务在关闭后完成,返回 true
    boolean awaitTermination(long timeout, TimeUnit unit)调用 shutdown 后,由于调用 shutdown 的线程不会等待所有任务运行结束才向下运行,而是立刻继续向下运行,所以如果它想在线程池的状态为 TERMINATED 后才做些事情,可以利用此方法等待
  • ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量。这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 CAS 原子操作进行赋值。

    • 状态表示:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      // 111 000000000000000000,转换成整数后其实就是一个【负数】
      private static final int RUNNING = -1 << COUNT_BITS;
      // 000 000000000000000000
      private static final int SHUTDOWN = 0 << COUNT_BITS;
      // 001 000000000000000000
      private static final int STOP = 1 << COUNT_BITS;
      // 010 000000000000000000
      private static final int TIDYING = 2 << COUNT_BITS;
      // 011 000000000000000000
      private static final int TERMINATED = 3 << COUNT_BITS;
      状态高3位接收新任务处理阻塞任务队列说明
      RUNNING111YY
      SHUTDOWN000NY不接收新任务,但处理阻塞队列剩余任务
      STOP001NN中断正在执行的任务,并抛弃阻塞队列任务
      TIDYING010--任务全执行完毕,活动线程为 0 即将进入终结
      TERMINATED011--终止状态

      img

newFixedThreadPool

  • newFixedThreadPool:创建一个拥有 n 个线程的线程池

    1
    2
    3
    4
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
    • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
    • LinkedBlockingQueue 是一个单向链表实现的阻塞队列,默认大小为 Integer.MAX_VALUE,也就是无界队列,可以放任意数量的任务,在任务比较多的时候会导致 OOM(内存溢出)
    • 适用于任务量已知,相对耗时的长期任务

newCachedThreadPool

  • newCachedThreadPool:创建一个可扩容的线程池

    1
    2
    3
    4
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }
    • 核心线程数是 0, 最大线程数是 29 个 1(二进制),全部都是救急线程(60s 后可以回收),可能会创建大量线程,从而导致 OOM
    • SynchronousQueue 作为阻塞队列,没有容量,对于每一个 take 的线程会阻塞直到有一个 put 的线程放入元素为止(类似一手交钱、一手交货)
    • 适合任务数比较密集,但每个任务执行时间较短的情况

newSingleThreadExecutor

  • newSingleThreadExecutor:创建一个只有 1 个线程的单线程池

    1
    2
    3
    4
    5
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }
    • 保证所有任务按照指定顺序执行,线程数固定为 1,任务数多于 1 时会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放

    单线程池对比单线程:

    • 创建一个单线程来执行任务,如果任务执行出现异常,那么线程会终止,不会再执行后续提交的其他任务,而线程池会再新建一个线程,保证池的正常工作,保证提交的其他任务也能被执行

newScheduledThreadPool

  • 如果我们想延时执行任务,可以使用 “任务调度线程池”。在 “任务调度线程池” 功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。使用 “任务调度线程池” 可以解决这个问题。
1
2
3
4
5
6
7
8
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
  • 任务调度线程池 ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor:

    • 使用内部类 ScheduledFutureTask 封装任务
    • 使用内部类 DelayedWorkQueue 作为线程池队列
    • 重写 onShutdown 方法去处理 shutdown 后的任务
    • 提供 decorateTask 方法作为 ScheduledFutureTask 的修饰方法,以便开发者进行扩展
  • 常用 API:

    • ScheduledFuture<?> schedule(Runnable/Callable<V>, long delay, TimeUnit u):延迟执行任务,但是出现异常并不会在控制台打印,也不会影响其他线程的执行

    • ScheduledFuture<?> scheduleAtFixedRate(Runnable/Callable<V>, long initialDelay, long period, TimeUnit unit):定时执行周期任务,不考虑执行的耗时,参数为初始延迟时间、间隔时间、单位,(period = 下一次任务开始执行的时间 - 这一次任务开始执行的时间),如果任务的执行时间 > period,则下一次任务会紧挨着这一次任务结束后执行。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      public static void main(String[] args) {
      ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
      System.out.println("start..." + new Date());

      pool.scheduleAtFixedRate(() -> {
      System.out.println("running..." + new Date());
      Thread.sleep(2000);
      }, 1, 1, TimeUnit.SECONDS);
      }

      /*start...Sat Apr 24 18:08:12 CST 2021 启动
      running...Sat Apr 24 18:08:13 CST 2021 延时1s后任务开始执行
      running...Sat Apr 24 18:08:15 CST 2021 任务执行耗时2s,因为任务执行耗时 > period,所以下一次任务紧接着就开始执行
      running...Sat Apr 24 18:08:17 CST 2021
    • ScheduledFuture<?> scheduleWithFixedDelay(Runnable/Callable<V>, long initialDelay, long delay, TimeUnit unit):定时执行周期任务,考虑执行的耗时,参数为初始延迟时间、间隔时间、单位,(delay = 下一次任务开始执行的时间 - 这一次任务结束执行的时间

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      public static void main(String[] args){
      ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
      System.out.println("start..." + new Date());

      pool.scheduleWithFixedDelay(() -> {
      System.out.println("running..." + new Date());
      Thread.sleep(2000);
      }, 1, 1, TimeUnit.SECONDS);
      }
      /*start...Sat Apr 24 18:11:41 CST 2021 启动
      running...Sat Apr 24 18:11:42 CST 2021 延时1s后开始执行
      running...Sat Apr 24 18:11:45 CST 2021 任务执行耗时2s,加上delay的1s,所以下一次任务在3s后开始执行
      running...Sat Apr 24 18:11:48 CST 2021
  • 如何处理任务执行时出现的异常:

    1. 在任务执行中就用 try-catch 捕捉异常,相当于是任务自己处理异常
    2. 通过返回的 FutureTask 得到异常,如果任务执行出现异常,FutureTask 中封装的就是异常信息
---------------The End---------------