线程池
定义:线程池是一种用于管理和复用线程的机制。
作用及优点
- 降低资源消耗:重用已创建的线程,避免了线程的频繁创建和销毁。
- 提高线程的管理性:线程池可以统一管理、分配、调优和监控,有效控制线程的最大并发数,避免大量线程抢占共享资源而导致阻塞的问题。
- 提高响应速度:任务到达时不需要等待线程创建就可以立即执行。
ThreadPoolExecutor
线程池涉及的接口
线程池主要源于接口Executor,最终由ThreadPoolExecutor实现,主要涉及如下图几个类。
线程池构造函数涉及的参数说明
ThreadPoolExecutor的构造函数主要有四种,七种参数类型,这些参数类型会影响线程池的功能特性。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {}
- corePoolSize
核心线程数大小
1、如果核心线程池有空闲位置,这时新的任务就会被核心线程池新建一个线程执行。
2、默认情况下,核心线程执行完不会被销毁。可通过ThreadPoolExecutor的allowCoreThreadTimeOut 属性设置为true,那么闲置的核心线程在等待新任务到来时会有超时策略,这个时间间隔由keepAliveTime 所指定,当等待时间超过 keepAliveTime所指定的时长后,核心线程就会被终止。 - maximumPoolSize
线程池能创建最大的线程数量
如果核心线程池和缓存队列都已经满了,新的任务进来就会创建新的线程来执行。但是数量不能超过maximunPoolSize,否侧会采取拒绝接受任务策略。 - keepAliveTime
非核心线程闲置时的超时时长
1、超过这个时长,非核心线程会被销毁回收。
2、核心线程通过属性allowCoreThreadTimeOut,设置成true后空闲时也会按这个时间来处理。 - TimeUnit unit
keepAliveTime 参数的时间单位
1、常用的单位有TimeUnit.MILLISECONDS(亳秒)、TimeUnit.SECONDS(秒)以及 TimeUnit.MINUTES(分钟)。 - workQueue
任务队列
1、用来存放等待被执行的任务。 - threadFactory
线程工厂
1、主要用来创建线程。 - handler
任务拒绝策略
1、线程数量大于最大线程数就会采用拒绝处理策略。
2、主要有如下4中策略
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
线程池执行流程
- 线程池中的线程数量未达到核心线程的数量,则会启动一个核心线程来执行任务。
- 线程池中的线程数量已经达到或者超过核心线程的数量,那么任务会被插入到任务队列中排队等待执行。
- 插入任务的时候任务队列已满,且线程数量未达到线程池规定的最大值,那么会启动一个非核心线程来执行任务。
- 否则就拒绝执行此任务,根据RejectedExecutionHandler 策略执行。
线程池分类
线程池主要分为4种类型,通过Executors工具类提供的工厂方法来创建不同类型的线程池。
1、FixedThreadPool(固定大小线程池)
- 线程数量固定,初始化时指定线程数量,并在整个生命周期中保持这个数量不变。
- 当任务提交到线程池时,如果当前有空闲线程,则任务会立即执行;如果没有空闲线程,则任务会被放入等待队列中,等待有空闲线程时再执行。
2、CachedThreadPool(可缓存线程池)
- 只有非核心线程,最大值是Integer.MAX_VALUE。
- 当任务提交到线程池时,如果有空闲线程,则任务会立即执行;如果没有空闲线程,则线程池会创建一个新的线程来执行任务。
- 当线程池中的线程在60秒内没有执行任务处于空闲状态时,该线程将被终止并从池中移除。
- CachedThreadPool的任务队列是SynchronousQueue ,本质相当于一个空集合,这将导致任何任务都会立即被执行,因为在这种场景下SynchronousQueue 是无法插入任务的。SynchronousQueue是一个非常特殊的队列,在很多情况下可以把它简单理解为一个无法存储元素的队列。
3、ScheduledThreadPool(定时线程池)
- 核心线程国定,非核心线程没有限制。
- 适用于可以按照指定的时间间隔或延迟来执行任务的情况。
4、SingleThreadExecutor(单线程线程池)
- 使用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO、LIFO、优先级)执行。
- 当任务提交到线程池时,如果当前有线程在执行任务,则新任务会被放入等待队列中,等待当前任务执行完毕后再执行。
线程池使用方法
- 创建线程池:通过Executors工具类的静态方法创建线程池实例,例如Executors.newFixedThreadPool(int nThreads)、Executors.newSingleThreadExecutor()等。
- 提交任务:使用线程池的execute(Runnable command)方法或submit(Runnable task)、submit(Callable task)方法提交任务给线程池执行。execute方法用于提交不需要返回值的任务,而submit方法用于提交需要返回值的任务。
- 关闭线程池:通过调用线程池的shutdown()方法关闭线程池,不再接受新任务,但会继续执行已提交的任务。如果需要立即关闭线程池并尝试停止正在执行的任务,可以使用shutdownNow()方法。
1、FixedThreadPool(固定大小线程池)
ExecutorService fixedExecutorService = Executors.newFixedThreadPool(2);for (int i = 0; i < 10; i++) {final int taskId = i;fixedExecutorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("正在执行任务i="+taskId);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}});}
2、CachedThreadPool(可缓存线程池)
ExecutorService cachedExecutorService = Executors.newCachedThreadPool();for (int i = 0; i < 10; i++) {final int taskId = i;cachedExecutorService.submit(() -> {System.out.println("Executing task " + taskId + " on thread " + Thread.currentThread().getName());try {// 模拟任务执行时间Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}
3、ScheduledThreadPool(定时线程池)
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);// 定时任务:延迟5秒后执行scheduledExecutorService.schedule(() -> {System.out.println("Executing delayed task on thread " + Thread.currentThread().getName());}, 5, TimeUnit.SECONDS);// 周期性任务:初始延迟0秒后,每隔2秒执行一次scheduledExecutorService.scheduleAtFixedRate(() -> {System.out.println("Executing periodic task on thread " + Thread.currentThread().getName() + " at " + System.currentTimeMillis());}, 0, 2, TimeUnit.SECONDS);
4、SingleThreadExecutor(单线程线程池)
ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();for (int i = 0; i < 5; i++) {final int taskId = i;singleExecutorService.submit(() -> {System.out.println("Executing task " + taskId + " on thread " + Thread.currentThread().getName());try {// 模拟任务执行时间Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}
源码分析
线程池接口分析
- Executor
public interface Executor {/*** 执行任务方法* @param command 任务* @throws 如果此任务无法被接受执行,则出现RejectedExecutionException异常* @throws 任务为空抛出NullPointerException*/void execute(Runnable command);
}
- ExecutorService
public interface ExecutorService extends Executor {/*** 启动线程池的关闭序列,线程池将不再接受新的任务提交。* 但是,已经提交的任务(包括正在执行的 和等待执行的任务)会继续执行,直到它们完成。* 将线程池置为SHUNTDOWM状态*/void shutdown();/*** 尝试立即关闭线程池,尝试停止所有正在执行的任务,并返回等待执行的任务列表。* 行为:调用shutdownNow()后,线程池会尝试停止所有正在执行的任务(通过中断线程来实现),* 并且不再接受新任务。同时,它会返回一个列表,其中包含等待执行的任务。* 将线程池置为STOP状态* @return 返回等待执行的任务列表* @throws SecurityException.*/List<Runnable> shutdownNow();/*** 判断线程池是否已关闭.*/boolean isShutdown();/*** 判断线程池中的子线程是否已全部终止* @return {@code true} if all tasks have completed following shut down*/boolean isTerminated();/*** 在调用shutdown后调用该方法,让线程池在指定时间内关闭,* 不管任务是否执行完成,在指定时间内还在执行任务则抛出异常中断线程* @param timeout 等待任务完成的最大时间量* @param unit 参数的时间单位*/boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;/*** 向线程池提交一个Callable类型的异步任务,当线程池执行后返回执行结果* @param task 任务*/<T> Future<T> submit(Callable<T> task);/*** 向线程池提交一个Runnable类型的异步任务,线程池执行完成后将返回指定类型的执行结果* @throws NullPointerException if the task is null*/<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);/*** 传入一个Collection类型的异步任务集合,批量执行并返回执行结果* @param tasks 任务集合*/<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
ThreadPoolExecutor分析
初始化
构造函数主要有四种,参数配置不同。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue)public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory)public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
线程池状态
- 线程池里面的状态主要由参数 ctl控制,这个ctl参数是一个原子操作的 AtomicInteger。
- ctl主要有两个参数,runState(线程池运行状态)、workerCount(工作的线程数)。
- ctl 高 3 位是用于表示当前线程池的状态,低29位用于存放当前的线程数。
- 线程池状态主要有5种。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits//线程池初始化(创建出来之后)处于此状态,能够接收新任务,以及对已添加的任务进行处理。private static final int RUNNING = -1 << COUNT_BITS;//当调用shutdown()方法时改为此状态,在此状态时,不接收新任务,但能处理已添加的任务。private static final int SHUTDOWN = 0 << COUNT_BITS;//调用shutdownNow()方法时处于此状态,在此状态时,不接收新任务,不处理已添加的任务,并且会尝试中断正在处理的任务。private static final int STOP = 1 << COUNT_BITS;//1、当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。//2、当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。private static final int TIDYING = 2 << COUNT_BITS;//线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。线程池彻底终止,就变成TERMINATED状态。private static final int TERMINATED = 3 << COUNT_BITS;
ctl相关的方法
// Packing and unpacking ctl//获取线程池的状态private static int runStateOf(int c) { return c & ~COUNT_MASK; }// 获取线程池的工作线程数private static int workerCountOf(int c) { return c & COUNT_MASK; }// 根据工作线程数和线程池状态获取 ctlprivate static int ctlOf(int rs, int wc) { return rs | wc; }
提交执行任务
- 如果当前运行的线程,少于corePoolSize,则创建一个新的线程来执行任务。
- 如果运行的线程等于或多于 corePoolSize,将任务加入 BlockingQueue。
- 如果加入 BlockingQueue 成功,需要二次检查线程池的状态如果线程池没有处于 Running,则从 BlockingQueue 移除任务,启动拒绝策略。
- 如果线程池处于 Running状态,则检查工作线程(worker)是否为0。如果为0,则创建新的线程来处理任务。如果启动线程数大于maximumPoolSize,任务将被拒绝策略拒绝。
- 如果加入 BlockingQueue 失败,则创建新的线程来处理任务。
- 如果启动线程数大于maximumPoolSize,任务将被拒绝策略拒绝。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 工作的线程数小于核心线程数if (workerCountOf(c) < corePoolSize) {//执行addWork,提交为核心线程,提交成功直接return。if (addWorker(command, true))return;//提交失败重新获取ctlc = ctl.get();}//如果工作线程数大于核心线程数,则检查线程池状态是否是正在运行,且将新任务向阻塞队列提交。if (isRunning(c) && workQueue.offer(command)) {//需要再次检查,判断加入到阻塞队里中的任务是否可以被执行int recheck = ctl.get();//如果线程池状态不为running,将任务从阻塞队列里面移除,启用拒绝策略if (! isRunning(recheck) && remove(command))reject(command);// 如果线程池的工作线程为零,则调用addWoker提交任务,从队列获取else if (workerCountOf(recheck) == 0)addWorker(null, false);}//任务添加到队列失败,提交任务执行else if (!addWorker(command, false))reject(command);}
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {// Check if queue empty only if necessary.//通过当前状态判断是否可以添加任务if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {//获取工作线程数//判断是否大于线程池上限,即是否大于核心线程数,或者最大线程数if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;///CAS 增加工作线程数if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}//上面的逻辑主要是判断是否能够够添加线程,如果可以就cas的增加工作线程数量boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//新建workerw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;//获取锁mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();//1、线程池处于running//2、线程池处于SHUTDOWN,且是不带任务的(说明可能是阻塞队列还有未执行完的任务)if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {if (t.getState() != Thread.State.NEW)throw new IllegalThreadStateException();//添加线程workers.add(w);workerAdded = true;int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;}} finally {mainLock.unlock();}//添加成功则启动线程if (workerAdded) {t.start();workerStarted = true;}}} finally {//添加失败需要回滚if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
线程池线程复用
- 在线程池内部的线程被封装成一个Worker对象来操作的。而当我们使用Worker.thread.start()启动线程时,会调用Worker中重写的run()方法执行任务。
- 通过一个死循环让当前线程一直处于运行状态,阻止OS将当前工作线程回收,从而做到线程的复用。而关于死循环的条件则比较简单,判断task是否为空,在调用方法执行的时候会先获取外部传递的任务,如果没有获取到外部传递的任务则调用getTask()方法获取任务队列中的任务并执行。
任务执行
public void run() {runWorker(this);}
final void runWorker(Worker w) {//获取当前线程和任务Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//运行线程中断w.unlock(); // allow interrupts//是否因为异常跳出循环boolean completedAbruptly = true;try {//1、task != null 表示线程池直接执行的任务,如核心线程数未满//2、(task = getTask()) != null 从阻塞队列获取任务//2.1、如果在任务队列中获取到了任务则直接执行已经获取的任务//2.2、如果任务队列为空,没有任务则反复执行空循环阻塞当前线程死亡(getTask()方法的操作)while (task != null || (task = getTask()) != null) {w.lock();// 如果当前状态>=STOP状态,要确保线程中断// If pool is stopping, ensure thread is interrupted;// 如果线程池为停止,请确保当前线程未被中断// if not, ensure thread is not interrupted. //需要重新检测并且清除中断// this requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//生命周期钩子方法,方法是在每个任务执行之前调用的。你可以通过重写这个方法来自定义每个任务执行前的行为。beforeExecute(wt, task);try {//执行任务的run方法task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//关闭线程processWorkerExit(w, completedAbruptly);}}
从队列获取任务
- 效验线程池状态,一切正常时开始任务的获取逻辑,使用的是阻塞时获取方式,如果任务队列中没有任务,当前线程会阻塞等待,直到任务队列中有新的任务时才会获取并返回执行。
- 如果线程池设置了存活时间,那么当前线程会阻塞到存活时间的阈值,如果超出存活时间会返回null。
private Runnable getTask() {// 表示上次从队列获取任务是否超时boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();// Check if queue empty only if necessary.// 状态处于SHUTDOWN且队列任务为空或则STOP状态,将WorkerCount递减并返回nullif (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}//工作线程数int wc = workerCountOf(c);// Are workers subject to culling?// 判断是否需要超时控制//1、allowCoreThreadTimeOut=true表示核心线程需要超时控制//2、wc > corePoolSize 表示非核心线程,需要超时控制boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//1、工作线程超过最大线程数//2、上次获取超时且本次需要超时控制if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//如果需要超时控制则用poll阻塞超时获取Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;//获取超时timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
关闭线程
- 线程池中工作线程的销毁是由processWorkerExit()方法来完成的,在这个方法中首先会判断当前线程是因为执行出现异常还是超出存活时间导致需要发生回收的。
- 如果是因为超出存活时间,先判断线程池状态之后再从工作集中移除当前线程即可。
- 如果是由于异常导致的则需要先对线程池的工作线程数进行自减,然后再移除工作集中的工作线程,最后再调用addWorker()添加一个工作线程保证线程池内工作线程的数量。
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;//移除线程workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}}