欢迎关注我的公众号: Java编程技术乐园。分享技术,一起精进Quartz!
做一个积极的人
编码、改bug、提升自己
我有一个乐园,面向编程,春暖花开!
QuartzSchedulerThread.run()是主要处理任务的方法!下面进行分析,方便自己查看! 我都是分析的jobStore 方式为jdbc的SimpleTrigger!RAM的方式类似分析方式!
如还没有阅读过Quartz学习——scheduler.start()启动源码分析请先阅读这一篇!
QuartzSchedulerThread.run()主要是在有可用线程的时候获取需要执行Trigger并出触发进行任务的调度!
解释:
{0} : 表的前缀 ,如表qrtztrigger ,{0}== qrtz
{1} :quartz.properties 中配置的 org.quartz.scheduler.instanceName: myInstanceName ,{1} ==myInstanceName
1 、QuartzSchedulerThread.run()源码讲解
/** * <p> * The main processing loop of the <code>QuartzSchedulerThread</code>. * </p> */ @Override public void run() { boolean lastAcquireFailed = false; while (!halted.get()) { try { // check if we're supposed to pause... //检查我们是否应该暂停... synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... //等待直到togglePause(false)被调用... sigLock.wait(1000L); } catch (InterruptedException ignore) { } } if (halted.get()) { break; } } //2.1获取可用线程的数量 int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); //将永远是true,由于blockForAvailableThreads的语义... if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... List<OperableTrigger> triggers = null;//定义触发器集合 long now = System.currentTimeMillis();//获取当前的时间 clearSignaledSchedulingChange(); try { //2.2 从jobStore中获取下次要触发的触发器集合 //idleWaitTime == 30L * 1000L; 当调度程序发现没有当前触发器要触发,它应该等待多长时间再检查... triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); lastAcquireFailed = false; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if(!lastAcquireFailed) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } lastAcquireFailed = true; continue; } catch (RuntimeException e) { if(!lastAcquireFailed) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } lastAcquireFailed = true; continue; } //判断返回的触发器存在 if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); //若有没有触发的Trigger,下次触发时间 next_fire_time 这个会在启动的时候有个默认的misfire机制,如上一篇中分析的 。setNextFireTime(); 即start()启动时候的当前时间。 long timeUntilTrigger = triggerTime - now; while(timeUntilTrigger > 2) {//这里为什么是2 ???不懂??? synchronized (sigLock) { if (halted.get()) { break; } if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // we could have blocked a long while // on 'synchronize', so we must recompute now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) { } } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers //这种情况发生,如果releaseIfScheduleChangedSignificantly 决定 释放Trigger if(triggers.isEmpty()) continue; // set triggers to 'executing' //将触发器设置为“正在执行” List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>(); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { //2.3 通知JobStore调度程序现在正在触发其先前已获取(保留)的给定触发器(执行其关联的作业)。 List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; //下面的2.3方法返回的数据赋值到bndles } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occurred while firing triggers '" + triggers + "'", se); //QTZ-179 : a problem occurred interacting with the triggers from the db //we release them and loop again for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } //循环List<TriggerFiredResult> bndles 集合,获取TriggerFiredResult和TriggerFiredBundle等 for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // it's possible to get 'null' if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time... or if the scheduler was shutdown (halted) //如果触发器被暂停,阻塞或其他类似的事件阻止它在这时被触发,或者如果调度器被关闭(暂停),则可以获得'null' if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } JobRunShell shell = null; try { //创建 JobRunShell ,并初始化 shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... //这种情况不应该发生,因为它表示调度程序正在关闭或线程池或线程池中并发使用的错误 - 文档说不要这样做... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } } continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract ////应该永远不会发生,如果threadPool.blockForAvailableThreads()遵循约定 continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; //idleWaitTime == 30L * 1000L; idleWaitVariablness == 7 * 1000; //计算getRandomizedIdleWaitTime()的值 : idleWaitTime - random.nextInt(idleWaitVariablness); synchronized(sigLock) { try { if(!halted.get()) { // QTZ-336 A job might have been completed in the mean time and we might have // missed the scheduled changed signal by not waiting for the notify() yet // Check that before waiting for too long in case this very job needs to be // scheduled very soon if (!isScheduleChanged()) { sigLock.wait(timeUntilContinue); } } } catch (InterruptedException ignore) { } } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } // while (!halted) // drop references to scheduler stuff to aid garbage collection... //删除对调度程序内容的引用以帮助垃圾回收... qs = null; qsRsrcs = null; }
2.1获取可用线程的数量
// 参考 quartz2.2源码分析3-线程模型:https://my.oschina.net/chengxiaoyuan/blog/674603 //获取线程池,两个实现 SimpleThreadPool 和 ZeroSizeThreadPool, 一般使用SimpleThreadPool,在quartz.properties 中配置 public int blockForAvailableThreads() {//方法具体实现,没有可用线程等待。。。。 synchronized(nextRunnableLock) { while((availWorkers.size() < 1 || handoffPending) && !isShutdown) { try { nextRunnableLock.wait(500); } catch (InterruptedException ignore) { } } return availWorkers.size(); } }
2.2 从jobStore中获取下次要触发的触发器集合
//acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); //获取要触发的下一个N个触发器的句柄,并由调用调度器将它们标记为“保留”。 //noLaterThan if > 0,那么JobStore应该只返回一个触发器,该触发器不会晚于此值中所表示的时间触发: public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow) throws JobPersistenceException { String lockName; //isAcquireTriggersWithinLock() false , maxCount == 1 if(isAcquireTriggersWithinLock() || maxCount > 1) { lockName = LOCK_TRIGGER_ACCESS; } else { lockName = null; } return executeInNonManagedTXLock(lockName, new TransactionCallback<List<OperableTrigger>>() { public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException { //2.2.1 到了这里,继续往下,这里返回需要下次执行的Trigger return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow); } }, new TransactionValidator<List<OperableTrigger>>() { public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException { try { //选择所有fired-trigger记录的状态 List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId()); Set<String> fireInstanceIds = new HashSet<String>(); for (FiredTriggerRecord ft : acquired) { fireInstanceIds.add(ft.getFireInstanceId()); } for (OperableTrigger tr : result) { if (fireInstanceIds.contains(tr.getFireInstanceId())) { return true; } } return false; } catch (SQLException e) { throw new JobPersistenceException("error validating trigger acquisition", e); } } }); }
2.2.1看 acquireNextTrigger方法
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow) throws JobPersistenceException { if (timeWindow < 0) { throw new IllegalArgumentException(); } List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>(); Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>(); final int MAX_DO_LOOP_RETRY = 3; int currentLoopCount = 0; long firstAcquiredTriggerFireTime = 0; do { currentLoopCount ++; try { //2.2.2查询下次触发的触发器 Triggerkey ,在继续看这个方法里面 List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount); // No trigger is ready to fire yet. //还没有触发器准备好。 keys 为上面查询出来的!!! if (keys == null || keys.size() == 0) return acquiredTriggers; //循环TriggerKey name 和 group for(TriggerKey triggerKey: keys) { //如果我们的触发器不再可用,请尝试一个新的触发器。 (retrieve:检索) OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey); //查询Trigger 并封装!retrieveTrigger 的 sql "SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?" if(nextTrigger == null) { continue; // next trigger } //如果触发器的作业设置为@DisallowConcurrentExecution,并且它已经添加到结果中,则将其放回到timeTriggers设置并继续搜索下 //一个触发器。 JobKey jobKey = nextTrigger.getJobKey(); //为给定的作业名称/组名称选择JobDetail对象。 封装成JobDetailImpl 对象 JobDetail job = getDelegate().selectJobDetail(conn, jobKey, getClassLoadHelper()); //selectJobDetail 的sql :"SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?" if (job.isConcurrentExectionDisallowed()) {//相关联的Job类是否携带DisallowConcurrentExecution注释。 if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) { continue; // next trigger } else { acquiredJobKeysForNoConcurrentExec.add(jobKey); } } // We now have a acquired trigger, let's add to return list. // If our trigger was no longer in the expected state, try a new one. //我们现在有一个获取触发器,让我们添加到返回列表。 如果我们的触发器不再处于预期状态,请尝试新的触发器。 int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING); //updateTriggerStateFromOtherState 如果给定触发器处于给定的旧状态,则将其更新为给定的新状态 //第一个? newState ; 第四个 ? oldState updateTriggerStateFromOtherState的sql "UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ? AND TRIGGER_STATE = ?" if (rowsUpdated <= 0) { continue; // next trigger } nextTrigger.setFireInstanceId(getFiredTriggerRecordId()); //插入一个fired Trigger, 更新当前执行的时间(FIRED_TIME)和下次要执行的时间(SCHED_TIME) //如下看详情请查看源码的这个方法 以及执行的sql,主要代码如下注释: /** "INSERT INTO {0}FIRED_TRIGGERS (SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" //FIRED_TIME ps.setBigDecimal(5, new BigDecimal(String.valueOf(System.currentTimeMillis()))); //SCHED_TIME ps.setBigDecimal(6, new BigDecimal(String.valueOf(trigger.getNextFireTime().getTime()))); */ getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null); //下次执行的Trigger放入acquiredTriggers List中 acquiredTriggers.add(nextTrigger); if(firstAcquiredTriggerFireTime == 0) firstAcquiredTriggerFireTime = nextTrigger.getNextFireTime().getTime(); } // if we didn't end up with any trigger to fire from that first // batch, try again for another batch. We allow with a max retry count. if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) { continue; } // We are done with the while loop. break; } catch (Exception e) { throw new JobPersistenceException( "Couldn't acquire next trigger: " + e.getMessage(), e); } } while (true); // Return the acquired trigger list //返回获取的触发器列表 return acquiredTriggers; } }
2.2.2查询下次触发的触发器 selectTriggerToAcquire
/** 选择下一次需要触发器的Trigger,它将在两个给定时间戳之间按照触发时间的升序触发,然后按优先级降序。 *参数: conn数据库Connection noLaterThan触发器的getNextFireTime()的最大值(独占) noEarlierThan触发器的最大值getMisfireTime() maxCount允许在返回列表中获取的最大触发器数量。 备注: noLaterThan : System.currentTimeMillis() + 30*1000L(30秒) noEarlierThan :System.currentTimeMillis() + 60*1000L(one minute:一分钟) */ selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount) 最终这个方法执行的sql: "SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC" //搜索出下次需要执行的Trigger //NEXT_FIRE_TIME <= ?(noLaterThan) and NEXT_FIRE_TIME >= ?(noEarlierThan) noEarlierThan <= 下次触发的时间 <= noLaterThan
注:这里的sql和MISFIREINSTR 这个值有关系,默认情况MISFIREINSTR = 0;除非你自己设置的MISFIRE处理机制! 这个地方要想弄明白,需要去了解Quartz源码——Quartz调度器的Misfire处理规则(四)——scheduler.start()启动源码分析(二)中2.1 恢复job recoverJobs(); 中讲解代码!
我测试执行的一条sql:
com.mysql.jdbc.JDBC4PreparedStatement@72163468: "SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM qrtz_TRIGGERS WHERE SCHED_NAME = 'dufy_test' AND TRIGGER_STATE = 'WAITING' AND NEXT_FIRE_TIME <= 1481126708931 AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= 1481126618934)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC" 重点这里:(MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= 1481126618934))
提个小小问题, 什么情况MISFIRE_INSTR == -1 ???
2.3 通知JobStore调度程序现在正在触发其先前已获取(保留)的给定触发器(执行其关联的作业)。
public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException { return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS, new TransactionCallback<List<TriggerFiredResult>>() { public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException { List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>(); TriggerFiredResult result; for (OperableTrigger trigger : triggers) { try { TriggerFiredBundle bundle = triggerFired(conn, trigger);//这个里面比较复杂,分析!!! //然后封装成TriggerFiredResult 返回, 回到2.3开始的地方run方法中! result = new TriggerFiredResult(bundle); } catch (JobPersistenceException jpe) { result = new TriggerFiredResult(jpe); } catch(RuntimeException re) { result = new TriggerFiredResult(re); } results.add(result); } return results; } }, new TransactionValidator<List<TriggerFiredResult>>() { .... }); } protected TriggerFiredBundle triggerFired(Connection conn, OperableTrigger trigger) throws JobPersistenceException { JobDetail job; Calendar cal = null; // Make sure trigger wasn't deleted, paused, or completed... //确保触发器未被删除,暂停或完成... try { // if trigger was deleted, state will be STATE_DELETED //如果触发器被删除,状态将是STATE_DELETED String state = getDelegate().selectTriggerState(conn, trigger.getKey()); //查询到数据,返回查询到的当前状态,否则返回 STATE_DELETED ,删除状态 //sql:"SELECT TRIGGER_STATE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?" if (!state.equals(STATE_ACQUIRED)) { return null; } } catch (SQLException e) { throw new JobPersistenceException("Couldn't select trigger state: " + e.getMessage(), e); } try { //恢复job,根据 trigger.getJobKey() 获取Job的name 和 group job = retrieveJob(conn, trigger.getJobKey()); //sql:"SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?" if (job == null) { return null; } } catch (JobPersistenceException jpe) { try { getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getDelegate().updateTriggerState(conn, trigger.getKey(), STATE_ERROR); } catch (SQLException sqle) { getLog().error("Unable to set trigger state to ERROR.", sqle); } throw jpe; } // trigger 有CalendarName 去查询qrtz_calendar 表 if (trigger.getCalendarName() != null) { cal = retrieveCalendar(conn, trigger.getCalendarName()); //sql: "SELECT * FROM {0}CALENDARS WHERE SCHED_NAME = {1} AND CALENDAR_NAME = ?" if (cal == null) { return null; } } try { //更新触发的触发器记录。 将更新字段“触发实例”,“触发时间”和“状态”。STATE_EXECUTING:执行状态 getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job); //sql:"UPDATE {0}FIRED_TRIGGERS SET INSTANCE_NAME = ?, FIRED_TIME = ?, SCHED_TIME = ?, STATE = ?, JOB_NAME = ?, JOB_GROUP = ?, IS_NONCONCURRENT = ?, REQUESTS_RECOVERY = ? WHERE SCHED_NAME = {1} AND ENTRY_ID = ?" //ps.setBigDecimal(2, new BigDecimal(String.valueOf(System.currentTimeMillis()))); //ps.setBigDecimal(3, new BigDecimal(String.valueOf(trigger.getNextFireTime().getTime()))); } catch (SQLException e) { throw new JobPersistenceException("Couldn't insert fired trigger: " + e.getMessage(), e); } Date prevFireTime = trigger.getPreviousFireTime(); // call triggered - to update the trigger's next-fire-time state... // 2.3.1 更新触发器的下一个触发时间状态... trigger.triggered(cal); String state = STATE_WAITING; boolean force = true; //相关联的Job类是否携带DisallowConcurrentExecution注释。 if (job.isConcurrentExectionDisallowed()) { state = STATE_BLOCKED; force = false; try { getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_BLOCKED, STATE_WAITING); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_BLOCKED, STATE_ACQUIRED); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_PAUSED_BLOCKED, STATE_PAUSED); } catch (SQLException e) { throw new JobPersistenceException( "Couldn't update states of blocked triggers: " + e.getMessage(), e); } } //getNextFireTime == null 说明trigger执行完成,没有下次触发的时间了 if (trigger.getNextFireTime() == null) { state = STATE_COMPLETE; force = true; } //2.3.2插入或者更新一个Trigger ,进入看看 storeTrigger(conn, trigger, job, true, state, force, false); //清除'dirty'标志(将dirty标志设置为false)。 job.getJobDataMap().clearDirtyFlag(); //创建一个 TriggerFiredBundle的对象,封装数据 return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup() .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime()); }
2.3.1 更新触发器的下一个触发时间状态…
//当调度程序决定“触发”触发器(执行关联的作业)时调用,以便为触发器更新自身以进行下一次触发(如果有)。 public void triggered(Calendar calendar) { timesTriggered++; //默认 timesTriggered==0 previousFireTime = nextFireTime; //下次执行的next_time赋值给pre_time nextFireTime = getFireTimeAfter(nextFireTime);//获取下次触发的时间 while (nextFireTime != null && calendar != null && !calendar.isTimeIncluded(nextFireTime.getTime())) { nextFireTime = getFireTimeAfter(nextFireTime); if(nextFireTime == null) break; //avoid infinite loop java.util.Calendar c = java.util.Calendar.getInstance(); c.setTime(nextFireTime); if (c.get(java.util.Calendar.YEAR) > YEAR_TO_GIVEUP_SCHEDULING_AT) { nextFireTime = null; } } }
2.3.2插入或者更新一个Trigger ,进入看看
protected void storeTrigger(Connection conn, OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state, boolean forceState, boolean recovering) throws JobPersistenceException { //检查Trigger是否存在 , trigger 的name 和 group boolean existingTrigger = triggerExists(conn, newTrigger.getKey()); if ((existingTrigger) && (!replaceExisting)) { throw new ObjectAlreadyExistsException(newTrigger); } try { boolean shouldBepaused; if (!forceState) { shouldBepaused = getDelegate().isTriggerGroupPaused( conn, newTrigger.getKey().getGroup()); if(!shouldBepaused) { shouldBepaused = getDelegate().isTriggerGroupPaused(conn, ALL_GROUPS_PAUSED); if (shouldBepaused) { getDelegate().insertPausedTriggerGroup(conn, newTrigger.getKey().getGroup()); } } if (shouldBepaused && (state.equals(STATE_WAITING) || state.equals(STATE_ACQUIRED))) { state = STATE_PAUSED; } } //job 为null ,重新去查询一遍 jobDetail 根据 Trigger的name和group if(job == null) { job = getDelegate().selectJobDetail(conn, newTrigger.getJobKey(), getClassLoadHelper()); } if (job == null) { throw new JobPersistenceException("The job (" + newTrigger.getJobKey() + ") referenced by the trigger does not exist."); } //是否有注解 ,上面传过来的recovering参数 == false if (job.isConcurrentExectionDisallowed() && !recovering) { state = checkBlockedState(conn, job.getKey(), state); } //existingTrigger存在的话,进行更新操作,否则插入操作 if (existingTrigger) { //2.3.2.1这个里面还是很复杂,贴代码出来了 getDelegate().updateTrigger(conn, newTrigger, state, job); getDelegate().updateTrigger(conn, newTrigger, state, job); } else { //插入和更新很多类似地方 具体代码不贴出来了 ! getDelegate().insertTrigger(conn, newTrigger, state, job); } } catch (Exception e) { throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '" + newTrigger.getJobKey() + "' job:" + e.getMessage(), e); } }
2.3.2.1这个里面还是很复杂,贴代码出来了
//更新基本触发器数据。 public int updateTrigger(Connection conn, OperableTrigger trigger, String state, JobDetail jobDetail) throws SQLException, IOException { // save some clock cycles by unnecessarily writing job data blob ... //通过不必要地写入作业数据blob来保存一些时钟周期... boolean updateJobData = trigger.getJobDataMap().isDirty();//确定Map是否标记为dirty。 ByteArrayOutputStream baos = null; if(updateJobData && trigger.getJobDataMap().size() > 0) { baos = serializeJobData(trigger.getJobDataMap()); } PreparedStatement ps = null; int insertResult = 0; try { if(updateJobData) { ps = conn.prepareStatement(rtp(UPDATE_TRIGGER)); --------------------------------------------------------------------- "UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ?, JOB_DATA = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?" --------------------------------------------------------------------- } else { ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_SKIP_DATA)); --------------------------------------------------------------------- "UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?" --------------------------------------------------------------------- } ps.setString(1, trigger.getJobKey().getName()); ps.setString(2, trigger.getJobKey().getGroup()); ps.setString(3, trigger.getDescription()); long nextFireTime = -1; if (trigger.getNextFireTime() != null) {//触发器下次执行的时间不为null //备注:一般执行完成,即是complete 状态,getNextFireTime == null nextFireTime = trigger.getNextFireTime().getTime(); } ps.setBigDecimal(4, new BigDecimal(String.valueOf(nextFireTime))); long prevFireTime = -1; if (trigger.getPreviousFireTime() != null) {//触发器之前执行的时间不为 null //备注:触发器第一次执行的时候 getNextFireTime 为 null prevFireTime = trigger.getPreviousFireTime().getTime(); } ps.setBigDecimal(5, new BigDecimal(String.valueOf(prevFireTime))); ps.setString(6, state); //获取对于代理 ;如JDBC 或者RAM TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger); String type = TTYPE_BLOB; if(tDel != null) type = tDel.getHandledTriggerTypeDiscriminator(); ps.setString(7, type); ps.setBigDecimal(8, new BigDecimal(String.valueOf(trigger .getStartTime().getTime()))); long endTime = 0; if (trigger.getEndTime() != null) { endTime = trigger.getEndTime().getTime(); } ps.setBigDecimal(9, new BigDecimal(String.valueOf(endTime))); ps.setString(10, trigger.getCalendarName()); ps.setInt(11, trigger.getMisfireInstruction()); ps.setInt(12, trigger.getPriority()); if(updateJobData) { setBytes(ps, 13, baos); ps.setString(14, trigger.getKey().getName()); ps.setString(15, trigger.getKey().getGroup()); } else { ps.setString(13, trigger.getKey().getName()); ps.setString(14, trigger.getKey().getGroup()); } insertResult = ps.executeUpdate(); if(tDel == null) updateBlobTrigger(conn, trigger); else //更新扩展触发器属性 tDel.updateExtendedTriggerProperties(conn, trigger, state, jobDetail); --------------------------------------------------------------- "UPDATE {0}SIMPLE_TRIGGERS SET REPEAT_COUNT = ?, REPEAT_INTERVAL = ?, TIMES_TRIGGERED = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?" --------------------------------------------------------------------- } finally { closeStatement(ps); } return insertResult;//返回执行的结果 为int }
谢谢你的阅读,如果您觉得这篇博文对你有帮助,请点赞或者喜欢,让更多的人看到!祝你每天开心愉快!
不管做什么,只要坚持下去就会看到不一样!在路上,不卑不亢!
愿你我在人生的路上能都变成最好的自己,能够成为一个独挡一面的人
? 每天都在变得更好的阿飞云