并发编程之定时任务&定时线程池原理解析


并发编程之定时任务&定时线程池原理解析

文章插图
作者公众号:一角钱技术(org_yijiaoqian)
前言线程池的具体实现有两种 , 分别是ThreadPoolExecutor 默认线程池和ScheduledThreadPoolExecutor 定时线程池 , 上一篇已经分析过ThreadPoolExecutor原理与使用了 , 本篇我们来重点分析下ScheduledThreadPoolExecutor的原理与使用 。
  • 《并发编程之Executor线程池原理与源码解读》
ScheduledThreadPoolExecutorScheduledThreadPoolExecutor 与 ThreadPoolExecutor 线程池的概念有些区别 , 它是一个支持任务周期性调度的线程池 。
ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor , 同时通过实现 ScheduledExecutorSerivce 来扩展基础线程池的功能 , 使其拥有了调度能力 。其整个调度的核心在于内部类 DelayedWorkQueue  , 一个有序的延时队列 。
定时线程池类的类结构图如下:
并发编程之定时任务&定时线程池原理解析

文章插图
 
ScheduledThreadPoolExecutor 的出现 , 很好的弥补了传统 Timer 的不足 , 具体对比看下表:
【并发编程之定时任务&定时线程池原理解析】TimerScheduledThreadPoolExecutor线程单线程多线程多任务任务之间相互影响任务之间不影响调度时间绝对时间相对时间异常单任务异常 , 后续任务受影响无影响
工作原理它用来处理延时任务或定时任务
并发编程之定时任务&定时线程池原理解析

文章插图
 
它接收SchduledFutureTask类型的任务 , 是线程池调度任务的最小单位 , 有三种提交任务的方式:
  1. schedule , 特定时间延时后执行一次任务
  2. scheduledAtFixedRate , 固定周期执行任务(与任务执行时间无关 , 周期是固定的)
  3. scheduledWithFixedDelay , 固定延时执行任务(与任务执行时间有关 , 延时从上一次任务完成后开始)

并发编程之定时任务&定时线程池原理解析

文章插图
 
它采用 DelayedWorkQueue 存储等待的任务
  1. DelayedWorkQueue 内部封装了一个 PriorityQueue  , 它会根据 time 的先后时间排序 , 若 time 相同则根据 sequenceNumber 排序;
  2. DelayedWorkQueue 也是一个无界队列;
因为前面讲阻塞队列实现的时候 , 已经对DelayedWorkQueue进行了说明 , 更多内容请查看《阻塞队列 — DelayedWorkQueue源码分析》
工作线程的执行过程:
  • 工作线程会从DelayedWorkerQueue取已经到期的任务去执行;
  • 执行结束后重新设置任务的到期时间 , 再次放回DelayedWorkerQueue 。
take方法是什么时候调用的呢? 在ThreadPoolExecutor中 , getTask方法 , 工作线程会循环地从workQueue中取任务 。但定时任务却不同 , 因为如果一旦getTask方法取出了任务就开始执行了 , 而这时可能还没有到执行的时间 , 所以在take方法中 , 要保证只有在到指定的执行时间的时候任务才可以被取走 。
PS:对于以上原理的理解 , 可以通过下面的源码分析加深印象 。
源码分析构造方法ScheduledThreadPoolExecutor有四个构造形式:
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  new DelayedWorkQueue());}public ScheduledThreadPoolExecutor(int corePoolSize,                                       ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,     new DelayedWorkQueue(), threadFactory);}public ScheduledThreadPoolExecutor(int corePoolSize,                                       RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  new DelayedWorkQueue(), handler);}public ScheduledThreadPoolExecutor(int corePoolSize,                                       ThreadFactory threadFactory,                                       RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,     new DelayedWorkQueue(), threadFactory, handler);}


推荐阅读