ThreadPoolExecutor

介绍

ThreadPoolExecutor是Java中线程池的核心实现类

1
2
3
4
classDiagram
Executor <|-- ExecutorService : 继承
ExecutorService <|-- AbstractExecutorService : 实现
AbstractExecutorService <|-- ThreadPoolExecutor : 继承

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory.
*
* @param corePoolSize 核心线程池大小
* @param maximumPoolSize 最大线程池大小
* @param keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)成为核心线程的有效时间
* @param unit keepAliveTime的时间单位
* @param workQueue 阻塞任务队列
* @param threadFactory 线程工厂,对线程命名
* @param handler 拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

任务调度

所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

任务存储策略

使用不同的队列可以实现不一样的任务存取策略

  • ArrayBlockingQueue: 一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁
  • LinkedBlockingQueue: 一个由链表结构组成的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序。此队列的默认长度是Integer.MAX_VALUE,所以默认创建的队列有容量危险
  • PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的排序
  • DelayQueue: 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素
  • SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲60秒后会被回收
  • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其他队列,LinkedTransferQueue队列多了transfertryTransfer方法
  • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列,队列的头和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降低一半

RejectedExecutionHandler拒绝策略

  • AbortPolicy: 默认策略,直接抛出异常阻止系统正常运行

  • CallerRunsPolicy: “调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将任务回馈至发起方比如main线程

  • DiscardOldestPolicy: 抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务

  • DiscardPolicy: 直接丢弃任务,不给予任何处理也不跑出异常,如果允许任务丢失,这是最好的一种方案

execute()和submit()区别

  1. 接收的参数不一样
  2. submit有返回值,而execute没有
  3. submit方便Exception处理:可以通过捕获Future.get抛出的异常

例子:

1
2
3
4
5
6
7
8
9
ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("test-666-%d").build();
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
threadPoolExecutor.execute(()->{
...
});
<!--threadPoolExecutor.submit(()->{-->
<!-- ...-->
<!--});-->
threadPoolExecutor.shutdown();
分享到: