Play源碼深刻之五:Job模塊的原理

先看play.jobs.JobsPlugin。java

public void onApplicationStart() {
    int core = Integer.parseInt(Play.configuration.getProperty("play.jobs.pool", "10"));
    executor = new ScheduledThreadPoolExecutor(core, new PThreadFactory("jobs"), new ThreadPoolExecutor.AbortPolicy());
}

在onAppliactionStart()方法中,實例化一個ScheduledThreadPollExecutor作executor。 接受afterApplicationStart事件中,纔會處理Job。算法

public void afterApplicationStart() {
    List> jobs = new ArrayList>();
    for (Class clazz : Play.classloader.getAllClasses()) {
        if (Job.class.isAssignableFrom(clazz)) {
            jobs.add(clazz);
        }
    }
    scheduledJobs = new ArrayList();
    for (final Class clazz : jobs) {
        // @OnApplicationStart
        if (clazz.isAnnotationPresent(OnApplicationStart.class)) {
            //check if we're going to run the job sync or async
            OnApplicationStart appStartAnnotation = clazz.getAnnotation(OnApplicationStart.class);
            if( !appStartAnnotation.async()) {
                //run job sync
                try {
                    Job job = ((Job) clazz.newInstance());
                    scheduledJobs.add(job);
                    job.run();
                    if(job.wasError) {
                        if(job.lastException != null) {
                            throw job.lastException;
                        }
                        throw new RuntimeException("@OnApplicationStart Job has failed");
                    }
                } catch (...) {
                    ...
                }
            } else {
                //run job async
                try {
                    Job job = ((Job) clazz.newInstance());
                    scheduledJobs.add(job);
                    //start running job now in the background
                    @SuppressWarnings("unchecked")
                    Callable callable = (Callable)job;
                    executor.submit(callable);
                } catch (...) {
                    ...
                }
            }
        }

        // @On
        if (clazz.isAnnotationPresent(On.class)) {
            try {
                Job job = ((Job) clazz.newInstance());
                scheduledJobs.add(job);
                scheduleForCRON(job);
            } catch (InstantiationException ex) {
                throw new UnexpectedException("Cannot instanciate Job " + clazz.getName());
            } catch (IllegalAccessException ex) {
                throw new UnexpectedException("Cannot instanciate Job " + clazz.getName());
            }
        }
        // @Every
        if (clazz.isAnnotationPresent(Every.class)) {
            try {
                Job job = (Job) clazz.newInstance();
                scheduledJobs.add(job);
                String value = job.getClass().getAnnotation(Every.class).value();
                if (value.startsWith("cron.")) {
                    value = Play.configuration.getProperty(value);
                }
                value = Expression.evaluate(value, value).toString();
                if(!"never".equalsIgnoreCase(value)){
                    executor.scheduleWithFixedDelay(job, Time.parseDuration(value), Time.parseDuration(value), TimeUnit.SECONDS);
                }
            } catch (InstantiationException ex) {
                throw new UnexpectedException("Cannot instanciate Job " + clazz.getName());
            } catch (IllegalAccessException ex) {
                throw new UnexpectedException("Cannot instanciate Job " + clazz.getName());
            }   
    }
}

public static  void scheduleForCRON(Job job) {
    ...
    try {
        ...
        executor.schedule((Callable)job, nextDate.getTime() - now.getTime(), TimeUnit.MILLISECONDS);
        ...
    } catch (Exception ex) {
        throw new UnexpectedException(ex);
    }
}

第一步讀取全部類中的Job類,並判斷是不是OnApplicationStart標記,若是是sync同步的,就會直接run。若是async異步的,會加入executor中,雖然執行時間間隔爲0毫秒,可是實際執行時間由executor決定。 若是被On標記,play會解析On註解中表達式的值。app

這裏須要注意,若是是以cron開頭,就會讀取配置文件中的值,這點以前還沒發現哈。 而後根據表達式觸發時間與目前時間計算延遲時間,並加入executor中。 框架

若是是Every標記,也會像上面的同樣處理註解中表達式,不一樣的是,play會將這個Job和他的執行時間注入到executor,不用再手動規定延遲執行時間,由executor徹底接管執行。 異步

那麼被On標記的job如何達到週期執行的效果呢?關鍵在play.jobs.Job類中。async

public class Job extends Invoker.Invocation implements Callable {

    public void doJob() throws Exception {
    }

    public V doJobWithResult() throws Exception {
        doJob();
        return null;
    }

    private V withinFilter(play.libs.F.Function0 fct) throws Throwable {
        for (PlayPlugin plugin :  Play.pluginCollection.getEnabledPlugins() ){
           if (plugin.getFilter() != null) {
              return (V)plugin.getFilter().withinFilter(fct);
           }
        }
        return null;
    }

    public V call() {
        Monitor monitor = null;
        try {
            if (init()) {
                before();
                V result = null;

                try {
                    ... 
                    result = withinFilter(new play.libs.F.Function0() {
                        public V apply() throws Throwable {
                          return doJobWithResult();
                        }
                    });
                    ...
                } catch (...) {
                    ...
                }
                after();
                return result;
            }
        } catch (Throwable e) {
            onException(e);
        } finally {
            ...
            _finally();
        }
        return null;
    }

    @Override
    public void _finally() {
        super._finally();
        if (executor == JobsPlugin.executor) {
            JobsPlugin.scheduleForCRON(this);
        }
    }
}

run()方法中調用call方法。ide

call方法中的代碼也有init/before/after與請求調用過程相似,由於Job也得初始化相應的上下文。也要經過過濾器,這樣JPA事務管理也對Job有用,過濾以後會執行doJobWithReslut()方法,這樣就出了框架,進入應用了。this

在finally模塊中進入__finally()方法後,判斷job.executor與JobsPlugin.executor的是否同樣,由於在上面處理之中,用On標記的賦值了executor,而every標記的沒有賦值。這就是區別,相同的話會再次進入JobsPlugin.scheduleForCRON方法,根據表達式觸發時間與目前時間計算延遲時間,並加入executor中。 google

如此一次一次就達到週期執行效果。 Job中其餘幾種算法:lua

public void every(String delay) {
        every(Time.parseDuration(delay));
    }

    public void every(int seconds) {
        JobsPlugin.executor.scheduleWithFixedDelay(this, seconds, seconds, TimeUnit.SECONDS);
    }

every方法,能夠達到設定every註解同樣的效果,我猜想應該是匿名內部類時使用,由於不能直接加註解,能夠調用every方法。

public Promise in(String delay) {
        return in(Time.parseDuration(delay));
    }
  
    public Promise in(int seconds) {
        final Promise smartFuture = new Promise();
        JobsPlugin.executor.schedule(getJobCallingCallable(smartFuture), seconds, TimeUnit.SECONDS);
        return smartFuture;
    }

in方法,設定延遲時間,延遲執行job。

public Promise now() {
        final Promise smartFuture = new Promise();
        JobsPlugin.executor.submit(getJobCallingCallable(smartFuture));
        return smartFuture;
    }

now方法,馬上執行。

public Promise afterRequest() {
    InvocationContext current = Invoker.InvocationContext.current();
    if(current == null || !Http.invocationType.equals(current.getInvocationType())) {
      return now();
    }

    final Promise smartFuture = new Promise();
    Callable callable = getJobCallingCallable(smartFuture);
    JobsPlugin.addAfterRequestAction(callable);
    return smartFuture;
  }

afterRequest方法,請求執行,利用事件機制,在beforeInvocation與afterInvocation事件清除和執行job。 

in/now/afterRequest三個方法,返回的是Promise對象,是Play對Promise模式的實現,至於什麼是Premise模式,你們能夠google一下。

相關文章
相關標籤/搜索