先看play.jobs.JobsPlugin。java
public void onApplicationStart() { int core = Integer.parseInt(Play.configuration.getProperty("play.jobs.pool", "10")); executor = new ScheduledThreadPoolExecutor(core, new PThreadFactory("jobs"), new ThreadPoolExecutor.AbortPolicy()); }
在onAppliactionStart()方法中,實例化一個ScheduledThreadPollExecutor作executor。 接受afterApplicationStart事件中,纔會處理Job。算法
public void afterApplicationStart() { List> jobs = new ArrayList>(); for (Class clazz : Play.classloader.getAllClasses()) { if (Job.class.isAssignableFrom(clazz)) { jobs.add(clazz); } } scheduledJobs = new ArrayList(); for (final Class clazz : jobs) { // @OnApplicationStart if (clazz.isAnnotationPresent(OnApplicationStart.class)) { //check if we're going to run the job sync or async OnApplicationStart appStartAnnotation = clazz.getAnnotation(OnApplicationStart.class); if( !appStartAnnotation.async()) { //run job sync try { Job job = ((Job) clazz.newInstance()); scheduledJobs.add(job); job.run(); if(job.wasError) { if(job.lastException != null) { throw job.lastException; } throw new RuntimeException("@OnApplicationStart Job has failed"); } } catch (...) { ... } } else { //run job async try { Job job = ((Job) clazz.newInstance()); scheduledJobs.add(job); //start running job now in the background @SuppressWarnings("unchecked") Callable callable = (Callable)job; executor.submit(callable); } catch (...) { ... } } } // @On if (clazz.isAnnotationPresent(On.class)) { try { Job job = ((Job) clazz.newInstance()); scheduledJobs.add(job); scheduleForCRON(job); } catch (InstantiationException ex) { throw new UnexpectedException("Cannot instanciate Job " + clazz.getName()); } catch (IllegalAccessException ex) { throw new UnexpectedException("Cannot instanciate Job " + clazz.getName()); } } // @Every if (clazz.isAnnotationPresent(Every.class)) { try { Job job = (Job) clazz.newInstance(); scheduledJobs.add(job); String value = job.getClass().getAnnotation(Every.class).value(); if (value.startsWith("cron.")) { value = Play.configuration.getProperty(value); } value = Expression.evaluate(value, value).toString(); if(!"never".equalsIgnoreCase(value)){ executor.scheduleWithFixedDelay(job, Time.parseDuration(value), Time.parseDuration(value), TimeUnit.SECONDS); } } catch (InstantiationException ex) { throw new UnexpectedException("Cannot instanciate Job " + clazz.getName()); } catch (IllegalAccessException ex) { throw new UnexpectedException("Cannot instanciate Job " + clazz.getName()); } } } public static void scheduleForCRON(Job job) { ... try { ... executor.schedule((Callable)job, nextDate.getTime() - now.getTime(), TimeUnit.MILLISECONDS); ... } catch (Exception ex) { throw new UnexpectedException(ex); } }
第一步讀取全部類中的Job類,並判斷是不是OnApplicationStart標記,若是是sync同步的,就會直接run。若是async異步的,會加入executor中,雖然執行時間間隔爲0毫秒,可是實際執行時間由executor決定。 若是被On標記,play會解析On註解中表達式的值。app
這裏須要注意,若是是以cron開頭,就會讀取配置文件中的值,這點以前還沒發現哈。 而後根據表達式觸發時間與目前時間計算延遲時間,並加入executor中。 框架
若是是Every標記,也會像上面的同樣處理註解中表達式,不一樣的是,play會將這個Job和他的執行時間注入到executor,不用再手動規定延遲執行時間,由executor徹底接管執行。 異步
那麼被On標記的job如何達到週期執行的效果呢?關鍵在play.jobs.Job類中。async
public class Job extends Invoker.Invocation implements Callable { public void doJob() throws Exception { } public V doJobWithResult() throws Exception { doJob(); return null; } private V withinFilter(play.libs.F.Function0 fct) throws Throwable { for (PlayPlugin plugin : Play.pluginCollection.getEnabledPlugins() ){ if (plugin.getFilter() != null) { return (V)plugin.getFilter().withinFilter(fct); } } return null; } public V call() { Monitor monitor = null; try { if (init()) { before(); V result = null; try { ... result = withinFilter(new play.libs.F.Function0() { public V apply() throws Throwable { return doJobWithResult(); } }); ... } catch (...) { ... } after(); return result; } } catch (Throwable e) { onException(e); } finally { ... _finally(); } return null; } @Override public void _finally() { super._finally(); if (executor == JobsPlugin.executor) { JobsPlugin.scheduleForCRON(this); } } }
run()方法中調用call方法。ide
call方法中的代碼也有init/before/after與請求調用過程相似,由於Job也得初始化相應的上下文。也要經過過濾器,這樣JPA事務管理也對Job有用,過濾以後會執行doJobWithReslut()方法,這樣就出了框架,進入應用了。this
在finally模塊中進入__finally()方法後,判斷job.executor與JobsPlugin.executor的是否同樣,由於在上面處理之中,用On標記的賦值了executor,而every標記的沒有賦值。這就是區別,相同的話會再次進入JobsPlugin.scheduleForCRON方法,根據表達式觸發時間與目前時間計算延遲時間,並加入executor中。 google
如此一次一次就達到週期執行效果。 Job中其餘幾種算法:lua
public void every(String delay) { every(Time.parseDuration(delay)); } public void every(int seconds) { JobsPlugin.executor.scheduleWithFixedDelay(this, seconds, seconds, TimeUnit.SECONDS); }
every方法,能夠達到設定every註解同樣的效果,我猜想應該是匿名內部類時使用,由於不能直接加註解,能夠調用every方法。
public Promise in(String delay) { return in(Time.parseDuration(delay)); } public Promise in(int seconds) { final Promise smartFuture = new Promise(); JobsPlugin.executor.schedule(getJobCallingCallable(smartFuture), seconds, TimeUnit.SECONDS); return smartFuture; }
in方法,設定延遲時間,延遲執行job。
public Promise now() { final Promise smartFuture = new Promise(); JobsPlugin.executor.submit(getJobCallingCallable(smartFuture)); return smartFuture; }
now方法,馬上執行。
public Promise afterRequest() { InvocationContext current = Invoker.InvocationContext.current(); if(current == null || !Http.invocationType.equals(current.getInvocationType())) { return now(); } final Promise smartFuture = new Promise(); Callable callable = getJobCallingCallable(smartFuture); JobsPlugin.addAfterRequestAction(callable); return smartFuture; }
afterRequest方法,請求執行,利用事件機制,在beforeInvocation與afterInvocation事件清除和執行job。
in/now/afterRequest三個方法,返回的是Promise對象,是Play對Promise模式的實現,至於什麼是Premise模式,你們能夠google一下。