欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 锐评 > 多线程-定时任务线程池源码

多线程-定时任务线程池源码

2025/5/11 5:03:12 来源:https://blog.csdn.net/qq_44818304/article/details/146054466  浏览:    关键词:多线程-定时任务线程池源码

定时任务线程池

ScheduledThreadPoolExecutor,可以执行定时任务的线程池。这里学习它的基本原理。

定时任务线程池,和普通线程池不同的地方在于,它使用一个延迟队列,延迟队列使用最小堆作为它的数据结构,它会按照任务的执行顺序,把最先执行的任务放到第一个,线程会获取第一个任务的延迟时长,然后阻塞指定时长,阻塞完成后,去执行任务。对于周期性执行的任务,执行完成后,会计算下一次启动时间,然后把任务重新提交到延迟队列。

源码分析

定时任务线程池的继承体系

定时任务线程池继承了ThreadPoolExecutor,同时实现了ScheduledExecutorService,这个接口定义了定时调度相关的功能

public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService {

ScheduledExecutorService:定义了定时调度的功能

public interface ScheduledExecutorService extends ExecutorService {// 定时调度1次的任务public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);// 定时调度1次的任务,有返回值public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);// 以固定频率调度的任务public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);// 以固定延迟调度的任务public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
}

描述定时任务的类

描述定时任务的类:ScheduledThreadPoolExecutor的内部类ScheduledFutureTask

// ScheduledFutureTask:是定时任务线程池的内部类,封装了任务的启动时间、周期时间(隔多长时间执行一次),
// 任务在延迟队列中的索引、任务序号
private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {// 任务的启动时间,单位是纳秒private long time;// 任务的执行周期,单位是纳秒private final long period;// 任务在队列中的索引int heapIndex;// 任务序号,通过原子类生成private final long sequenceNumber;// 持有自己的实例RunnableScheduledFuture<V> outerTask = this;// 构造方法,参数1 异步任务,参数2 结果,参数3 任务的启动时间,参数4 任务的周期时间ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}
}// ScheduledFutureTask实现了RunnableScheduledFuture,它代表一个可调度的异步任务的结果
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {// 定时任务是否是周期性的boolean isPeriodic();
}// RunnableScheduledFuture继承了ScheduledFuture
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}// ScheduledFuture继承了Delayed,它封装了任务的延迟时间,表示任务延迟多久启动,继承
// Comparable接口,因为在循环队列中排序时需要用到
public interface Delayed extends Comparable<Delayed> {// 返回对象相关的延迟时长long getDelay(TimeUnit unit);
}

ScheduledFutureTask的继承体系上:

  • 继承了FutureTask,代表一个异步任务。 // FutureTask在之前学习Callable接口的时候已经接触到了。
  • 实现了RunnableScheduledFuture,它代表一个可调度的异步任务的结果,同时间接实现了Delayed接口,用于排序

定时任务主要有两个参数来描述任务的执行时间:

  • time:任务的启动时间,这是一个绝对时间,描述到了某个时间点,任务应该启动执行
  • period:任务的周期,描述两个任务之间间隔多长时间

延迟队列

定时任务线程池和普通线程池不一样的地方,在于它使用延迟队列,定时任务中封装好了任务的执行时间,任务的调度工作,是由延迟队列来执行的。

延迟队列的结构:

static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {// 队列内部使用的数组private static final int INITIAL_CAPACITY = 16;private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];private int size = 0;// 等待队列中第一个任务的线程private Thread leader = null;
}

这里的结构很简单,主要是它的计算比较复杂,任务之间需要排序,组成一个最小堆,最先执行的任务放到前面,以及元素出队的方法、元素入队的方法。

工作机制

这里以scheduleAtFixedRate为例, 固定频率的定时任务,讲解定时任务的执行流程,其它类型的定时任务也类似。

提交定时任务

通过scheduleAtFixedRate方法,创建定时任务:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =// 构建ScheduledFutureTask实例new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),  // 任务的触发时间unit.toNanos(period));            // 任务的执行周期RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;
}

第一步:创建ScheduledFutureTask的实例

ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);  // 调用父类FutureTask的构造方法this.time = ns;  // 任务的触发时间,这是一个绝对时间this.period = period;  // 任务的执行周期,表示两个任务之间间隔多长时间this.sequenceNumber = sequencer.getAndIncrement();  // 当前任务的序列号,通过原子类生成
}// 父类FutureTask的构造方法
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable
}

第二步:添加任务到延迟队列,并且新建线程执行任务

private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task);  // 任务添加到延迟队列if (isShutdown() &&   // 判断线程池是否关闭,如果关闭,移除任务!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);elseensurePrestart(); // 向线程池中添加线程,确保有线程执行任务}
}// 添加任务到延迟队列的方法
public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;  // 异步任务实例final ReentrantLock lock = this.lock;lock.lock();try {int i = size;if (i >= queue.length)grow();size = i + 1;if (i == 0) {queue[0] = e;setIndex(e, 0);} else {siftUp(i, e);}if (queue[0] == e) {  // 如果当前任务是第一个任务,要唤醒在条件变量上阻塞的线程leader = null;available.signal();}} finally {lock.unlock();}return true;
}// 向线程池中添加线程
void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize)// 注意,这里Worker实例的第一个参数 firstTask,值为null,表示Worker只可以从队列中获取任务addWorker(null, true);else if (wc == 0)addWorker(null, false);
}

从阻塞队列中获取任务

新线程启动后,会执行Worker类的run方法 (参考ThreadPoolExecutor的执行原理),在run方法中,会从阻塞队列中获取异步任务,定时任务使用的阻塞队列是DelayedWorkQueue。

从阻塞队列中获取任务的方法:

// take方法没有指定超时时长,类似的,还有指定了超时时长的poll方法
public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();  // 获取锁try {for (;;) {RunnableScheduledFuture<?> first = queue[0];  // 队列中的第一个元素,if (first == null)available.await();  // 如果队列为空,阻塞else {// 获取第一个任务的延迟时间,表示延迟指定时长后,开始执行任务long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return finishPoll(first);  // 如果延迟时长小于等于0,证明可以开始执行任务了first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;  // 设置当前线程为leadertry {available.awaitNanos(delay);  // 如果延迟时长大于0,那么线程进入阻塞状态并且指定时长} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}
}// 获取任务的延迟时间
public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);  // time是绝对时间,它减去now(),就是相对时间,也就是延迟时间
}

阻塞队列负责按照任务的执行时间,对任务进行排序,最先执行的任务放在队列的第一位,这里没有展示排序的逻辑,排序是按照最小堆的逻辑来排序的。线程从阻塞队列中获取任务,会计算第一个任务的延迟时长,然后等待指定时长,在执行任务,这就是定时任务可以在指定时长后启动的逻辑,如果延迟队列中没有任务,线程会一直等待,同时,向延迟队列中添加任务时,如果发现当前任务是第一个任务,会唤醒正在等待的线程。

执行定时任务

从延迟队列中获取到任务后,线程会执行ScheduledFutureTask的run方法,因为ScheduledFutureTask间接继承了Runnable接口

// ScheduledFutureTask的run方法
public void run() {boolean periodic = isPeriodic();  // 任务是否是周期性的if (!canRunInCurrentRunState(periodic))  // 判断线程池是否还在运行cancel(false);else if (!periodic)  // 如果不是周期性的任务,直接执行,这里执行的是FutureTask中的run方法ScheduledFutureTask.super.run();// 如果是周期性的任务,执行完之后计算下次执行时间,然后重新提交任务实例到阻塞队列else if (ScheduledFutureTask.super.runAndReset()) {  setNextRunTime();  // 计算下次任务的执行时间,这个方法会更新任务的time属性reExecutePeriodic(outerTask);  // 再次向线程池中提交任务实例}
}// 判断任务是否是周期性的
public boolean isPeriodic() {// 参考之前ScheduledFutureTask的实例的创建过程,period代表任务的执行周期,// 这个值不为0,证明是周期性的任务return period != 0;
}

1、为什么执行任务时会执行FutureTask中的run方法?因为在FutureTask的run方法中,会调用用户编写的run方法,也就是异步任务,ScheduledFutureTask中的run方法负责整体流程。

2、如果是周期性的任务,执行FutureTask中的runAndReset方法,它和run方法有什么不同?它执行完任务后,不会设置返回值,同时会把任务设置为初始状态,这个方法是为了执行多次的异步任务而设计的。

3、周期性的任务,执行完任务后,如何计算下次任务的执行时间?

// 计算任务下次执行时间的方法:
private void setNextRunTime() {long p = period;  // 任务的执行周期if (p > 0)time += p;    // 当前时间加上周期,固定频率(scheduleAtFixedRate)的定时任务走这段逻辑elsetime = triggerTime(-p);  // 更新任务的执行时间,固定延迟(scheduleWithFixedDelay)的定时任务走这段逻辑
}

这里需要解释一下,time属性是任务的执行时间,是一个绝对时间,表示到了某个点,例如 2020-01-01 00:00:00 这个固定的点,启动定时任务,period,是两个任务之间的间隔时长,例如,每隔10分钟,执行一次定时任务。对于固定频率的定时任务和固定延迟的定时任务,它们在创建任务实例的过程中稍有不同:

// 创建固定频率的定时任务
new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),   // 计算time的值unit.toNanos(period));             // 计算period的值,注意,period是正数
// 创建固定延迟的定时任务
new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),   // 计算time的值unit.toNanos(-delay));             // 计算period的值,注意,period是负数

一个执行周期是正数,一个执行周期时负数,所以在计算任务下次执行时间的方法中,它们会走向不同的链路,把该方法重新粘贴到下面,重新再看:

// 计算任务下次执行时间的方法:
private void setNextRunTime() {long p = period;  // 任务的执行周期if (p > 0)time += p;    // 固定频率,上次任务执行时间加上执行周期,就是下次执行时间elsetime = triggerTime(-p);  // 固定延迟,当前时间加上执行周期,就是下次执行时间, // triggerTime方法在下面
}long triggerTime(long delay) {return now() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

所以,固定频率执行的任务,如果上一次的任务执行超时,直到下一次任务该启动时还没有执行完成,一旦上一次任务执行完成,下一次任务立刻启动,因为上一次任务执行完成后,计算下一次任务的执行时间,发现执行时间在当前时间之前,所以线程获取任务时不会阻塞,会立刻取出任务,然后执行。固定延迟的任务,是根据上次任务结束时间来计算下次任务开始时间的,所以它是固定延迟。

总结

定时任务的执行过程:

  • 第一步:向线程池提交定时任务(schedule方法)
  • 第二步:创建定时任务实例(ScheduleFutureTask实例)
  • 第三步:把定时任务添加到延迟队列,延迟队列会对任务进行排序,最先执行的定时任务放到开头
  • 第四步:新建线程,从延迟队列中获取定时任务,线程会获取第一个任务的延迟时长,然后阻塞指定时长,阻塞结束后,执行定时任务
  • 第五步:执行完成后,计算下一次任务的执行时间,然后重新向线程池中提交任务实例

Q&A

只执行一次的定时任务和周期性的定时任务,分别是如何执行的?

周期性的定时任务,在执行完一次后,会计算下次任务的启动时间,然后再次向阻塞队列中提交任务实例,只执行一次的定时任务则不会

线程是如何在指定时间启动定时任务的?

阻塞队列会把需要最先执行的定时任务放在队列的开头,线程会获取第一个任务的延迟时间,然后根据延迟时间休眠指定时长,休眠结束后,执行定时任务。

按照固定频率执行的定时任务和按照固定延迟执行的定时任务,分别是如何执行的?

按照固定频率执行的定时任务,下次任务的执行时间 = 上次任务的启动时间 + 周期

按照固定延迟执行的定时任务,下次任务的执行时间 = 上次任务的结束时间 + 周期

依据不同的计算方式,计算出下次任务的执行时间,然后提交任务实例到队列中

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词