在文章 中我们知道,实现线程一共有三种方式,当需要线程的时候,通过这几种方式来创建线程执行任务,如果需要很多的任务同时执行的时候,就可以创建多个线程来执行,但是,线程的创建和销毁是一个耗时操作,如果频繁的创建和销毁线程,则会影响系统的效率;此时,我们可以使用线程池来执行任务,也就是说,事先先创建好几个线程放着,如果有任务来了,则取其中的一个线程取执行它,当该线程执行完任务后,并不销毁,而是留着执行下一个任务,这样就不用频繁的创建和销毁线程了。在 Java 中,提供了 ThreadPoolExecutor 类来实现线程池的相关操作,下面就来分析下该类的一个实现原理。
一:类图
要想看某个类的源码,首先要看下它的类图,理清它的一个继承关系,让脑海中大概有个印象。ThreadPoolExecutor 的类图如下所示:
Executor
ThreadPoolExecutor 实现的顶层接口为 Executor 接口,该接口只定义了一个方法 execute() ,用来执行线程的,注意该方法是没有返回值的,且还可能会抛出 RejectedExecutionException 和 NullPointerException 异常。
public interface Executor { /** * @param command the runnable task * @throws RejectedExecutionException if this task cannot be accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command);}
ExecutorService
ExecutorService 继承 Executor ,它也是一个接口,它提供了如下方法,
public interface ExecutorService extends Executor { //关闭线程池,先前提交的任务将被执行,但不会再接受任何新任务。 如果已经关闭,再次调用没有其他影响。 //此方法不会等待先前提交的任务完成执行。 void shutdown(); //尝试停止所有正在执行的任务,也不会处理正在等待的任务,并返回等待执行的任务列表。 ListshutdownNow(); //是否关闭 boolean isShutdown(); //如果在调用shutdown后所有任务都已完成,则返回true。 //值得注意的是,除非先调用shutdown或shutdownNow,否则isTerminated永远不会为true。 boolean isTerminated(); //调用shutdown后,会阻塞直到所有任务计算完毕,或者超时,或者线程被中断 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; //提交Callable任务到线程池中执行,并返回任务的计算结果 Future submit(Callable task); //提交Runnable任务到线程池中执行,并返回结果 Future submit(Runnable task, T result); /提交Runnable任务到线程池中执行,并返回结果/ Future submit(Runnable task); //执行所有任务,返回结果集合 List > invokeAll(Collection > tasks) throws InterruptedException; //执行所有任务,返回结果集合,有超时 List > invokeAll(Collection > tasks, long timeout, TimeUnit unit) throws InterruptedException; //执行任务集合中任意一个任务 T invokeAny(Collection > tasks) throws InterruptedException, ExecutionException; //执行任务集合中任意一个任务,有超时 T invokeAny(Collection > tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}
ExecutorService 也提供了提交任务的方法 submit(),它和 execute() 方法区别就是 submit() 可以有返回值,而 execute() 方法没有返回值。
AbstractExecutorService
AbstractExecutorService 实现了 ExecutorService 接口,并提供了某些方法的默认实现,还提供了一些额外的方法:
public abstract class AbstractExecutorService implements ExecutorService { protectedRunnableFuture newTaskFor(Runnable runnable, T value) { return new FutureTask (runnable, value); } protected RunnableFuture newTaskFor(Callable callable) { return new FutureTask (callable); } public Future submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, null); execute(ftask); return ftask; } public Future submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, result); execute(ftask); return ftask; } public Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask; }
ThreadPoolExecutor
接下来看下 ThreadPoolExecutor,它是线程池实现的一个主要的类。
二:线程池的状态
在 ThreadPoolExecutor 类中,提供了几个变量来表示线程池的状态:
private static final int COUNT_BITS = Integer.SIZE - 3; private static final int RUNNING = -1 << COUNT_BITS; // -536870912 private static final int SHUTDOWN = 0 << COUNT_BITS; // 0 private static final int STOP = 1 << COUNT_BITS; // 536870912 private static final int TIDYING = 2 << COUNT_BITS; // 1073741824 private static final int TERMINATED = 3 << COUNT_BITS; // 1610612736
这几个状态主要用 runState 来表示,看下这几个状态是什么意思:
RUNNING : 表示线程池可以接受新任务并处理排队任务
SHUTDOWN : 不接受新任务,但处理排队任务
STOP : 不接受新任务,不处理排队任务
TIDYING : 所有任务都已终止,workerCount为零,转换到状态TIDYING的线程将运行 terminate()钩子方法
TERMINATED : terminate() 方法已经完成
这 5 个状态之间的转换如下:
调用 shutdown() 方法后,状态从 RUNNING 变为 SHUTDOWN,这种变化也可能在 finalize() 方法中发生
调用 shutdownNow() 方法后,状态将从 RUNNING 或者 SHUTDOWN 状态变为 STOP 状态
当队列和线程池中任务都为空时,状态将从 SHUTDOWN 变为 TIDYING 状态
当队列中没有任务的时候,状态将从 STOP 状态变为 TIDYING 状态
当terminate() 方法已经完成的时候,状态从 TIDYING 变为 TERMINATED
转换图如下:
三:构造方法
接下来看下 ThreadPoolExecutor 的一些属性:
// workQueue 主要用来存放任务的,任何的 BlockingQueue 都可用保存任务;如果当前的线程数量小于 corePoolSize 核// 心线程池的大小,会创建新的线程来执行任务,而不会把任务放到队列中;如果当前正在运行的线程数大于等于 corePoolSize核心线程池的大小,// 则会把提交的任务放到队列中;选择队列一般有如下三种:// 1:SynchronousQueue// 2:LinkedBlockingQueue// 3:ArrayBlockingQueueprivate final BlockingQueueworkQueue;// 锁private final ReentrantLock mainLock = new ReentrantLock();// 工作集合,只有获取到锁才能访问private final HashSet workers = new HashSet ();// 线程池中曾经出现过最大的线程数private int largestPoolSize;// 完成的任务数 private long completedTaskCount;// 创建线程的工厂,如果不提供,则默认是使用默认的工厂,它们都属于一个线程组,相同的优先级等,可自定义。private volatile ThreadFactory threadFactory;// 拒绝策略,当线程池已关闭时,提交任务会如何处理,或者,当队列已满,且线程数量已达到线程池允许的最大数量时,提交任务会如何处理,// ThreadPoolExecutor 类提供了四个内部类来进行处理:// 1: AbortPolicy:该策略会抛出一个异常。(默认)// 2: CallerRunsPolicy:它直接在execute方法的调用线程中运行被拒绝的任务,若线程池已关闭,则任务将被丢弃。// 3: DiscardPolicy: 直接丢弃任务// 4: DiscardOldestPolicy : 丢弃最早的未处理的任务,然后执行该任务private volatile RejectedExecutionHandler handler;// 线程的保活时间,当线程池中线程的个数大于 corePoolSize 且不再执行任务的时候,最多存货的时间,超过时间则销毁private volatile long keepAliveTime;// 如果为false(默认值),即使空闲,核心线程也会保持活动状态。 // 如果为true,则核心线程使用keepAliveTime超时等待工作。private volatile boolean allowCoreThreadTimeOut;// 核心线程池的大小,这些线程永远不会销毁,除非设置了allowCoreThreadTimeOutprivate volatile int corePoolSize;// 最大线程池的大小private volatile int maximumPoolSize;
了解了上述的属性定义之后,接下来看下 ThreadPoolExecutor 的构造方法,ThreadPoolExecutor 提供了 4 个构造方法:
下面来看下第四个构造的定义,其他三个都会调用第四个进行创建且一些参数提供了默认值而已。
public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小 int maximumPoolSize, // 最大线程池大小 long keepAliveTime, // 线程保活时间 TimeUnit unit, // 保活时间单位 BlockingQueueworkQueue, // 存放任务的缓存队列 ThreadFactory threadFactory, // 线程工厂 RejectedExecutionHandler handler) // 拒绝策略{ if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}
构造的参数就是上述说的属性,各个参数的作用在上面已做说明。
当任务提交到线程池后,任务会怎么被执行呢?接下来就来看下,任务提交后,如何执行。任务提交到线程池后,会由 execute() 方法来执行,虽然 submit() 也可以提交任务到线程池中执行,但是在上面的 AbstractExecutorService 类的解析过程中可知,submit 也会调用 execute() 来执行任务,接下来看下 execute() 方法的一个流程:
// 执行任务,任务可能会创建新的线程来执行,也可能才线程池中取出已有的线程执行;如果线程池已经关闭或缓存队列已满且线程池中的线程数量达到允许的最大值,// 则提交的任务将由 RejectedExecutionHandler 进行处理// 提交一个任务有以下三个步骤:// 1: 如果线程池中正在运行的线程数量小于 corePoolSize, 则会新创建一个线程来执行任务// 2:如果当前工作线程数量大于等于 corePoolSize 的话,则会把任务插入到缓存队列中,如果任务插入队列成功,则仍然需要再次检查是否应该添加任务。如线程池已经关闭等// 3:如果队列满了,添加任务到队列失败,则会尝试去新创建一个线程来执行任务,如果线程池中线程的数量已经达到 maximumPoolSize,则会由 RejectedExecutionHandler 进行处理public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 线程池的状态主要由 ctl 控制,它记录着 runState 和 workCount int c = ctl.get(); // 如果当前的工作线程数小于 corePoolSize ,则会创建新的的线程来执行任务,并把线程放入到线程池中 if (workerCountOf(c) < corePoolSize) { // 添加任务,第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断; // 如果为true,根据corePoolSize来判断;如果为false,则根据maximumPoolSize来判断 if (addWorker(command, true)) return; // 添加失败,重新获取状态 c = ctl.get(); } // 如果当前线程池的状态是运行状态,且把任务添加到队列成功 if (isRunning(c) && workQueue.offer(command)) { // 再次获取线程池的状态 int recheck = ctl.get(); // 如果线程池的状态不在是运行状态,则需要删除该任务,因为上面已经把任务添加到队列中了;之后由 RejectedExecutionHandler 来处理该任务 if (! isRunning(recheck) && remove(command)) reject(command); // 如果线程池还是运行状态,且 工作的线程数量为0,则添加任务。 // 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize else if (workerCountOf(recheck) == 0) addWorker(null, false); } //如果执行到这里,有两种情况: // 1. 线程池已经不是运行状态; // 2. 线程池是运行状态,但workerCount >= corePoolSize并且workQueue已满,这时,再次调用addWorker方法, // 但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;如果失败由RejectedExecutionHandler 处理该任务 else if (!addWorker(command, false)) reject(command);}
总结上述添加任务的过程如下:
1.如果当前线程池中线程的数量小于 corePoolSize,则会创建新的线程来执行任务
2.如果当前线程池中线程的数量大于等于 corePoolSize,且缓存队列没有满,则把任务插入到队列中
3.如果当前线程池中线程的数量大于等于 corePoolSize 且 小于线程池允许的最大数量,且队列没有满,则创建新的线程执行任务
4.如果当前线程池中线程的数量大于线程池允许的最大数量,且队列已满,则有拒绝策略处理
在 ThreadPoolExecutor 类中,还有一个内部类 Worker 类,它主要是用来维护运行任务的线程中断状态,它实现 Runnable 和 AbstractQueuedSynchronizer,表示它也是一个线程,关于队列同步器 AbstractQueuedSynchronizer,可以查看文章
// 线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 正在运行的线程,可为null,即thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。 final Thread thread; // 要第一次运行的任务,可为null,即firstTask用来保存传入的任务 Runnable firstTask; // 每个线程任务计数器 volatile long completedTasks; //创建worker Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 使用线程工厂创建线程 this.thread = getThreadFactory().newThread(this); } // 运行线程 public void run() { runWorker(this); } // 是否获取到锁 // 0 表示没有获取到锁 // 1 表示获取到锁 protected boolean isHeldExclusively() { return getState() != 0; } // 尝试去获取锁,获取到返回true,获取不到返回false protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 解锁 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } // 都调用队列同步器的方法 public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
以上就是 ThreadPoolExecutor 分析:
参考: