零、简要
按照上一篇的介绍,Quartz是个作业调度系统,核心组件有Job/Trigger/Scheduler等,如何使用Quartz我们已经知晓,此次细心看看其内部的代码逻辑。
一、核心部件
Quartz内部部件关系如图所示
一个Job对应多个Trigger,每个Trigger被触发,即代表着Job执行一次。
1.Scheduler:任务调度器,是实际执行任务调度的控制器。在spring中通过SchedulerFactoryBean封装起来。
2.Trigger:触发器,用于定义任务调度的时间规则,有SimpleTrigger,CronTrigger,DateIntervalTrigger和NthIncludedDayTrigger,其中CronTrigger用的比较多。CronTrigger在spring中封装在CronTriggerFactoryBean中。
3.JobDetail:用来描述Job实现类及其它相关的静态信息,如Job名字、关联监听器等信息。在spring中有JobDetailFactoryBean和 MethodInvokingJobDetailFactoryBean两种实现,如果任务调度只需要执行某个类的某个方法,就可以通过MethodInvokingJobDetailFactoryBean来调用。
4.Job:是一个接口,只有一个方法void execute(JobExecutionContext context),开发者实现该接口定义运行任务,JobExecutionContext类提供了调度上下文的各种信息。Job运行时的信息保存在JobDataMap实例中。实现Job接口的任务,默认是无状态的,若要将Job设置成有状态的,在quartz中是给实现的Job添加@DisallowConcurrentExecution注解(以前是实现StatefulJob接口,现在已被Deprecated),在与spring结合中可以在spring配置文件的job detail中配置concurrent参数。
二、Scheduler
在Quartz内部,每个调度器Scheduler有一个负责调度的线程QuartzSchedulerThread,该线程启动后不断地抓取即将触发的Trigger,做了一番封装后,放入负责执行的线程池ThreadPool,大致逻辑如下时序图:
2.1
时序图中间的QuartzSchedulerThread是最核心的部分,它撑起了整个Quartz的运行,看看该线程的启动逻辑:
(代码段1)
1 | public void run() { |
调度线程在最开始的时候已经生成并启动,一直处于等待状态,直到QuartzScheduler调用了schedThread.togglePause(false),将paused设置为false,调度线程才启动。
2.2
schedThread.togglePause(false)方法如以下代码:
(代码段2)
1 | void togglePause(boolean pause) { |
2.3
调度线程开始干活,第一步,是查看是否有空闲线程可以用来触发job,具体是调用ThreadPool.blockForAvailableThreads()
(代码段3)
1 | public int blockForAvailableThreads() { |
2.4
调度线程获知有空闲线程,它会从JobStore中获取最近的Trigger,并进入等待,直到触发时间的到来
(代码段4)
1 | try { |
2.5
触发时间到来,调度线程先是将Trigger和它指向的Job包装成一个RunShell(安全壳)
(代码段5)
1 | JobRunShell shell = null; |
2.6
初始化Shell的逻辑如下:
(代码段6)
1 | public void initialize(QuartzScheduler qs, TriggerFiredBundle firedBundle) |
2.7
调度线程得到了RunShell,后者实现了Runnable接口,可以让线程去执行,所以下一步是放入线程池,让线程池执行Job,调用ThreadPool().runInThread(shell)
(代码段7)
1 | public boolean runInThread(Runnable runnable) { |
2.8
RunShell实现了Runnable接口,它负责触发Job的执行,记录事务,记录Job执行的全程,通知各种触发器等
(代码段8)
1 | public void run() { |
二、Job
2.1 Job的保存
对于用户来说,在新建Job类的基础上,每new一个JobDetail,都代表着建立了一个Job实例,比如
(代码段10)
1 | // 新建一个Job |
最后一行的scheduler.scheduleJob,进一步调用JobStore.storeJobAndTrigger,将JobDetail和Trigger存放进JobStore之中。
在这引出了JobStore的概念,看一下它的官方定义: The interface to be implemented by classes that want to provide a org.quartz.Job and org.quartz.Trigger storage mechanism for the org.quartz.core.QuartzScheduler’s use。非常的简明概要,用来保存JobStore和Trigger,一个调度器有一个JobStore。
JobStore有三种实现,RAMJobStore、JDBC-JobStoreTX、JDBC-JobStoreCMT。
2.1.1 RAMJobStore
RAMJobStore用于存储内存中的调度信息(jobs,Triggers和日历)。RAMJobStore快速轻便,但是当进程终止时,所有调度信息都会丢失。
RAMJobStore存取JobDetail和Trigger的时候,使用synchronized加锁,如下面所示
(代码段11)
1 | public void storeTrigger(SchedulingContext ctxt, Trigger newTrigger, |
2.1.2 JDBC-JobStore
JDBCJobStore用于在关系数据库中存储调度信息(jobs,Triggers和日历),这意味着所有数据都会落地,系统崩溃后重新启动可以恢复Job和Trigger的运行。在Quartz中,JDBCJobStore的具体实现是JobStoreSupport,JobStoreTX和JobStoreCMT都继承自它。
JobStoreSupport中保存了DataSource、数据库表名、数据库重试时长和具体的锁类型等消息。
JobStoreSupport既支持数据库锁,也在内部实现了一个简单的信号量锁(将锁资源放在threadlocal中,若申请不到锁资源,则一直wait)。
a) 信号量锁的实现:
(代码段12)
1 | public synchronized boolean obtainLock(Connection conn, String lockName) { |
b) 数据库锁实现
数据库锁的原理如下:
1.在执行某种需要加锁的『关键的逻辑』之前,先启用一个事务,并执行一个需要数据库加锁的语句(比如for update或update),令当前线程得到数据库锁。
2.线程得到数据库锁,开始执行『关键的逻辑』。
3.线程执行完毕『关键的逻辑』,若一切正常,则提交事务,否则回滚事务,线程失去数据库锁。
具体的代码如下:
(代码段13)
1 | public boolean obtainLock(Connection conn, String lockName) |
执行sql获取锁的代码逻辑则是在行锁StdRowLockSemaphore和更新锁UpdateLockRowSemaphore,MySQL锁介绍可看MySQL innodb中各种SQL语句加锁分析
行锁信号量StdRowLockSemaphore使用for update语句获取MySQL锁,代码如下
(代码段14)
1 |
|
更新锁UpdateLockRowSemaphore执行Update语句来获取MySQL锁,Quartz之所以在行锁的基础上,额外又实现更新锁,是因为部分数据库不支持for update,比如MSSQL。更新锁UpdateLockRowSemaphore执行的sql语句如下:
(代码段15)1
2
3
4public static final String UPDATE_FOR_LOCK =
"UPDATE " + TABLE_PREFIX_SUBST + TABLE_LOCKS +
" SET " + COL_LOCK_NAME + " = " + COL_LOCK_NAME +
" WHERE " + COL_LOCK_NAME + " = ? ";
2.1.3 JDBC-JobStoreTX
JobStoreTX自身使用数据库的事务,不依赖spring等『容器』的事务,它独立于『容器』。
实际上,JobStoreTX的逻辑和JobStoreSupport一致,在保存Job等资料的时候,是放在事务中进行的,如下:
(代码段16)
1 | protected Object executeInNonManagedTXLock( |
保存Job的代码如下:
(代码段17)
1 | protected void storeJob(Connection conn, SchedulingContext ctxt, |
2.1.4 JDBC-JobStoreCMT
和JobStoreTX相比,JobStoreCMT使用容器内的事务管理器,意味着它不能自己调用connection的commit或rollback,需要交给容器的事务管理器,这一点从JobStoreCMT的加锁执行方法executeInLock可以看出来:
(代码段18)
1 | protected Object executeInLock( |
2.2 如何扫描出到时间的job
对任何的job来说,只有指向它的触发器Trigger到时间了,job才会被执行。简单地说,job什么时候执行,由触发器Trigger决定。
2.3 如何调度job
类似上面一条,在Quartz中,调度器Scheduler调度的对象是触发器Trigger,调度器不关心job处于何种状态。
2.4 如何执行job
当调度器Scheduler发现Trigger到了触发时间,会触发Trigger,连带着执行Trigger指向的job
2.6 触发器和job的关系
Trigger和Job是多对一的关系,一个Trigger只能对应一个Job,一个Job可以对应多个Trigger。
2.7 如何恢复崩溃的job
job在执行过程中,如果发生了异常,job的execute被中断,Quartz根据异常中的code判断是否需要重新启动job,代码如下:
(代码段19)
1 | try { |
三、触发器
3.1 如何扫描出到时间的trigger
Trigger和Job一样,存放在JobStore之中,调度器扫描最近的触发器时,委托JobStore去查找,以RAMJobStore为例,它的acquireNextTrigger方法代码如下:
(代码段20)
1 | public Trigger acquireNextTrigger(SchedulingContext ctxt, long noLaterThan) { |
3.2 如何暂停、如何重启
暂停一个job,本质上仅是暂停它的Trigger,重启同理。
(代码段21)
1 | public void pauseTrigger(Connection conn, SchedulingContext ctxt, |
四、集群部署
4.1 集群部署原理
Quartz中,独立的节点并不与其它节点进行通行,而是通过数据库中的数据感知别的节点。每个节点有个ClusterManager,它是个独立的线程,定时到数据库中『签到』,如果发现别的调度器很久没来签到,则将其视为已故障,并重新恢复故障调度器的触发器。
节点『签到』的实现:
(代码段22)
1 | protected List clusterCheckIn(Connection conn) |
判断集群故障的逻辑:
(代码段23)
1 |
|
恢复其它集群触发器的逻辑:
(代码段24)
1 | protected void clusterRecover(Connection conn, List failedInstances) |