當客戶端向服務層同時提交多個任務(Task)時,爲了充分發揮服務器的處理計算能力,普通單線程已經不能知足咱們的需求了,須要採用多線程或多進程的技術來併發處理task,服務器在處理併發任務時須要知足一下幾個要求:java
Client.java服務器
package com.ngsky.async; /** * @Description TODO * @Author daxiong * @Date 7/7/2018 10:48 AM **/ public class AsyncTaskTest { public static void main(String[] args){ AsyncExecutorService executorService = new AsyncExecutorService(5, 10, 30, TimeUnit.SECONDS, 2, 5); long beginTime = System.currentTimeMillis(); for(int i = 0; i < 1000;i++){ MockTask mockTask = new MockTask(); mockTask.setName("task" + i); mockTask.setRetryTimes(3); mockTask.setRetryWaitTime(2000); executorService.execute(mockTask); } long endTime = System.currentTimeMillis(); System.out.println("The task spend on " + ((endTime - beginTime) / 60L) + " minutes"); } } // every task spend 20s, (1000 * 2) / 60 = 33 minutes class MockTask extends AsyncTask{ public void doWork() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
AsyncTask.java多線程
package com.ngsky.async; /** * @Description TODO * @Author daxiong * @Date 7/7/2018 10:14 AM **/ public abstract class AsyncTask implements Runnable { private String name; // task name private boolean successDone; // whether the task completed successful private int retryTimes; // if the task was't finished successful, system allow retry to do the task for retrytimes private int retryWaitTime; // system cant't retry to do work immediately, but execute after retryWaitTime public void run() { System.out.println("(task) " + getName() + " beginning execute...."); long beginTime = System.currentTimeMillis(); int currentRetryTimes = 1; try { doWork(); System.out.println("(task) " + getName() + " completed successful!"); this.setSuccessDone(true); } catch (Exception e) { System.out.println("(task) " + " execute filed..., message " + e); this.setSuccessDone(false); } if (getRetryTimes() <= 0) return; while (!isSuccessDone() && currentRetryTimes <= getRetryTimes()) { System.out.println("(task) " + "Executing retry " + currentRetryTimes + "th!" ); if (getRetryWaitTime() > 0) { try { Thread.sleep(getRetryWaitTime()); } catch (InterruptedException e) { e.printStackTrace(); } } try { doWork(); System.out.println("(task) " + getName() + " completed successful!"); this.setSuccessDone(true); } catch (Exception e) { System.out.println("(task) " + getName() + " was failed, unknown reason! Please try again!"); this.setSuccessDone(false); currentRetryTimes++; } } long endTime = System.currentTimeMillis(); System.out.println("(task) " + " spend on " + (endTime - beginTime) + "ms and result is " + (this.isSuccessDone() ? "successful!" : "failed,Please check your task!")); } public abstract void doWork() throws Exception; public String getName() { return name; } public void setName(String name) { this.name = name; } public boolean isSuccessDone() { return successDone; } public void setSuccessDone(boolean successDone) { this.successDone = successDone; } public int getRetryTimes() { return retryTimes; } public void setRetryTimes(int retryTimes) { this.retryTimes = retryTimes; } public int getRetryWaitTime() { return retryWaitTime; } public void setRetryWaitTime(int retryWaitTime) { this.retryWaitTime = retryWaitTime; } }
AsyncExecutorService.java併發
package com.ngsky.async; import java.util.concurrent.*; /** * @Description async service interface * @Author daxiong * @Date 7/7/2018 9:16 AM **/ public class AsyncExecutorService { private int corePoolSize; private int maximumPoolSize; private long keepLiveTime; private TimeUnit timeUnit; private int retryTimes; private int retryWaitTime; private LinkedBlockingQueue<Runnable> blockingQueue; private ThreadPoolExecutor executor; public AsyncExecutorService(int corePoolSize, int maximumPoolSize, long keepLiveTimes, TimeUnit timeUnit, int retryTimes, int retryWaitTimes){ this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.keepLiveTime = keepLiveTimes; this.timeUnit = timeUnit; this.retryTimes = retryTimes; this.retryWaitTime = retryWaitTimes; init(); } private void init(){ System.out.println("Async executor initializing..."); blockingQueue = new LinkedBlockingQueue<Runnable>(); executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepLiveTime, timeUnit, blockingQueue); } // init task information private AsyncTask initTask(AsyncTask task){ System.out.println("(task) " + task.getName() + " initializing..."); if(retryTimes > 0) task.setRetryTimes(retryTimes); if(retryWaitTime > 0) task.setRetryWaitTime(retryWaitTime); return task; } public void execute(AsyncTask task){ task = initTask(task); executor.execute(task); } public <T> Future<T> submit(Callable<T> job){ return executor.submit(job); } public void shutdown(){ if(executor != null) executor.shutdown(); } public int getCorePoolSize() { return corePoolSize; } public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; } public int getMaximumPoolSize() { return maximumPoolSize; } public void setMaximumPoolSize(int maximumPoolSize) { this.maximumPoolSize = maximumPoolSize; } public long getKeepLiveTime() { return keepLiveTime; } public void setKeepLiveTime(long keepLiveTime) { this.keepLiveTime = keepLiveTime; } public TimeUnit getTimeUnit() { return timeUnit; } public void setTimeUnit(TimeUnit timeUnit) { this.timeUnit = timeUnit; } public int getRetryTimes() { return retryTimes; } public void setRetryTimes(int retryTimes) { this.retryTimes = retryTimes; } public int getRetryWaitTime() { return retryWaitTime; } public void setRetryWaitTime(int retryWaitTime) { this.retryWaitTime = retryWaitTime; } public LinkedBlockingQueue<Runnable> getBlockingQueue() { return blockingQueue; } public void setBlockingQueue(LinkedBlockingQueue<Runnable> blockingQueue) { this.blockingQueue = blockingQueue; } public ThreadPoolExecutor getExecutor() { return executor; } public void setExecutor(ThreadPoolExecutor executor) { this.executor = executor; } }
執行結果 異步
異步任務調度相關的理論和實現還有許多地方沒有涉及到,這裏作一個拋磚引玉,好比任務執行過程當中的攔截操做,線程池初始化時大小如何選擇等等。async
任務調度針對的是高併發任務請求,要求保證系統性能,以最快的速度處理任務,提升處理效率和成功率。高併發
選擇的技術:多線程,線程池,併發處理,任務調度,資源分配,重試機制。性能
再接再礪,寫好每一篇博客,不要求天天寫一篇,但要求每一篇都要是精華,都要飽含質量!this