異步任務調度

問題

當客戶端向服務層同時提交多個任務(Task)時,爲了充分發揮服務器的處理計算能力,普通單線程已經不能知足咱們的需求了,須要採用多線程或多進程的技術來併發處理task,服務器在處理併發任務時須要知足一下幾個要求:java

  • 1.任務提交後,但願服務器儘量快的處理並返回結果
  • 2.多個任務同時處理時並不影響性能
  • 3.當某個任務執行過程當中出錯了,容許重試,而且不會阻礙其它task的處理

分析

  • 1.多個任務同時提交,而且但願服務器儘量快的處理並返回結果,採用多線程技術來實現,針對每一個任務,若是都分配一個線程去處理的話,那麼當同時提交成千上萬的任務的話,很容易致使服務器系統資源耗盡,服務器宕機。在分配任務時,建立和銷燬線程很浪費資源,經過線程池技術來提升線程的執行效率,減去任務執行過程當中的線程建立和銷燬的時間,大大提升任務執行效率。經過線程池技術,使得系統初始化時存放必定量的線程,客戶端提交task給服務器後,將各個task分配給現有的空閒線程去處理,可是當提交不少task的話,很明顯,線程池裏面的線程數量也會不足,那如何解決呢?在客戶端提交task到服務器後,現將task提交到任務隊列中(TaskQueue),系統執行task時,從任務隊列中提取task再分配給線程池中的空閒線程去執行任務。
  • 2.當咱們發佈任務後,任務交給了服務層取處理,在處理的過程當中不免會遇到一些特殊的問題,致使任務執行失敗,爲了保證任務能夠有效的執行並實現咱們須要的任務,還須要爲任務處理提供一個重試機制,這樣當系統執行任務失敗的時候,就能夠啓動重試機制來再次處理提交的任務,增長任務執行的成功率。

設計

image

  • Client
    Client 模擬的是多個客戶端,submit()方法用於提交task,提交的這個task類型爲AsyncTask,該類爲專用的任務類,下文會講述。
  • AsyncExecutorService
    首先這個類的目的是建立出一個線程池 executor來,用於資源分配,須要初始化一些參數,好比線程數量,最大線程數量,線程存活時長,重試次數以及重試等待時間等,這其中還有一個隱藏的參數--> 任務隊列,分配資源時從任務隊列中隨機選擇任務,進行分配,當有任務執行完成後,存在空閒線程時,該executor又會進行資源再分配,從任務隊列總選擇相應的任務進行執行。
    該類還提供了執行任務的接口,從客戶端傳遞過來的任務,在該類進行接收,並執行內部邏輯。
  • AsyncTask
    AsyncTask是一個Runnable類,整正執行任務的地方,這行的任務經過 抽象方法 doWork()對外提供,這樣就能夠實現不一樣的任務操做。在該類中主要實現了重試機制的邏輯,當任務執行失敗時進行相應次數的重試執行,保證任務執行成功。

代碼

Client.java服務器

package com.ngsky.async;

/**
 * @Description TODO
 * @Author daxiong
 * @Date 7/7/2018 10:48 AM
 **/
public class AsyncTaskTest {
    public static void main(String[] args){
        AsyncExecutorService executorService = new AsyncExecutorService(5, 10, 30, TimeUnit.SECONDS, 2, 5);
        long beginTime = System.currentTimeMillis();
        for(int i = 0; i < 1000;i++){
            MockTask mockTask = new MockTask();
            mockTask.setName("task" + i);
            mockTask.setRetryTimes(3);
            mockTask.setRetryWaitTime(2000);
            executorService.execute(mockTask);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("The task spend on " + ((endTime - beginTime) / 60L) + " minutes");
    }

}
// every task spend 20s, (1000 * 2) / 60 = 33 minutes
class MockTask extends AsyncTask{
    public void doWork() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

AsyncTask.java多線程

package com.ngsky.async;

/**
 * @Description TODO
 * @Author daxiong
 * @Date 7/7/2018 10:14 AM
 **/
public abstract class AsyncTask implements Runnable {

    private String name;  // task name
    private boolean successDone; // whether the task completed successful
    private int retryTimes;  // if the task was't finished successful, system allow retry to do the task for retrytimes
    private int retryWaitTime;  // system cant't retry to do work immediately, but execute after retryWaitTime

    public void run() {
        System.out.println("(task) " + getName() + " beginning execute....");
        long beginTime = System.currentTimeMillis();
        int currentRetryTimes = 1;
        try {
            doWork();
            System.out.println("(task) " + getName() + " completed successful!");
            this.setSuccessDone(true);
        } catch (Exception e) {
            System.out.println("(task) " + " execute filed..., message " + e);
            this.setSuccessDone(false);
        }
        if (getRetryTimes() <= 0) return;
        while (!isSuccessDone() && currentRetryTimes <= getRetryTimes()) {
            System.out.println("(task) " + "Executing retry " + currentRetryTimes + "th!" );
            if (getRetryWaitTime() > 0) {
                try {
                    Thread.sleep(getRetryWaitTime());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                doWork();
                System.out.println("(task) " + getName() + " completed successful!");
                this.setSuccessDone(true);
            } catch (Exception e) {
                System.out.println("(task) " + getName() + " was failed, unknown reason! Please try again!");
                this.setSuccessDone(false);
                currentRetryTimes++;
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("(task) " + " spend on " + (endTime - beginTime) + "ms and result is " + (this.isSuccessDone() ? "successful!" : "failed,Please check your task!"));
    }

    public abstract void doWork() throws Exception;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public boolean isSuccessDone() {
        return successDone;
    }

    public void setSuccessDone(boolean successDone) {
        this.successDone = successDone;
    }

    public int getRetryTimes() {
        return retryTimes;
    }

    public void setRetryTimes(int retryTimes) {
        this.retryTimes = retryTimes;
    }

    public int getRetryWaitTime() {
        return retryWaitTime;
    }

    public void setRetryWaitTime(int retryWaitTime) {
        this.retryWaitTime = retryWaitTime;
    }
}

AsyncExecutorService.java併發

package com.ngsky.async;

import java.util.concurrent.*;

/**
 * @Description async service interface
 * @Author daxiong
 * @Date 7/7/2018 9:16 AM
 **/
public class AsyncExecutorService {

    private int corePoolSize;
    private int maximumPoolSize;
    private long keepLiveTime;
    private TimeUnit timeUnit;
    private int retryTimes;
    private int retryWaitTime;
    private LinkedBlockingQueue<Runnable> blockingQueue;
    private ThreadPoolExecutor executor;

    public AsyncExecutorService(int corePoolSize, int maximumPoolSize, long keepLiveTimes,
                                TimeUnit timeUnit, int retryTimes, int retryWaitTimes){
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.keepLiveTime = keepLiveTimes;
        this.timeUnit = timeUnit;
        this.retryTimes = retryTimes;
        this.retryWaitTime = retryWaitTimes;
        init();
    }

    private void init(){
        System.out.println("Async executor initializing...");
        blockingQueue = new LinkedBlockingQueue<Runnable>();
        executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepLiveTime, timeUnit, blockingQueue);
    }

    // init task information
    private AsyncTask initTask(AsyncTask task){
        System.out.println("(task) " + task.getName() + " initializing...");
        if(retryTimes > 0) task.setRetryTimes(retryTimes);
        if(retryWaitTime > 0) task.setRetryWaitTime(retryWaitTime);
        return task;
    }

    public void execute(AsyncTask task){
        task = initTask(task);
        executor.execute(task);
    }

    public <T> Future<T> submit(Callable<T> job){
        return executor.submit(job);
    }

    public void shutdown(){
        if(executor != null)
            executor.shutdown();
    }

    public int getCorePoolSize() {
        return corePoolSize;
    }

    public void setCorePoolSize(int corePoolSize) {
        this.corePoolSize = corePoolSize;
    }

    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

    public void setMaximumPoolSize(int maximumPoolSize) {
        this.maximumPoolSize = maximumPoolSize;
    }

    public long getKeepLiveTime() {
        return keepLiveTime;
    }

    public void setKeepLiveTime(long keepLiveTime) {
        this.keepLiveTime = keepLiveTime;
    }

    public TimeUnit getTimeUnit() {
        return timeUnit;
    }

    public void setTimeUnit(TimeUnit timeUnit) {
        this.timeUnit = timeUnit;
    }

    public int getRetryTimes() {
        return retryTimes;
    }

    public void setRetryTimes(int retryTimes) {
        this.retryTimes = retryTimes;
    }

    public int getRetryWaitTime() {
        return retryWaitTime;
    }

    public void setRetryWaitTime(int retryWaitTime) {
        this.retryWaitTime = retryWaitTime;
    }

    public LinkedBlockingQueue<Runnable> getBlockingQueue() {
        return blockingQueue;
    }

    public void setBlockingQueue(LinkedBlockingQueue<Runnable> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    public ThreadPoolExecutor getExecutor() {
        return executor;
    }

    public void setExecutor(ThreadPoolExecutor executor) {
        this.executor = executor;
    }
}

執行結果 image異步

總結

異步任務調度相關的理論和實現還有許多地方沒有涉及到,這裏作一個拋磚引玉,好比任務執行過程當中的攔截操做,線程池初始化時大小如何選擇等等。async

任務調度針對的是高併發任務請求,要求保證系統性能,以最快的速度處理任務,提升處理效率和成功率。高併發

選擇的技術:多線程,線程池,併發處理,任務調度,資源分配,重試機制。性能


再接再礪,寫好每一篇博客,不要求天天寫一篇,但要求每一篇都要是精華,都要飽含質量!this

相關文章
相關標籤/搜索