線程池&異步編程

多線程

建立線程的四種方式

一、繼承Threadjava

Thread01 thread = new Thread01();
thread.start();

二、實現Runable接口spring

Runable01 runable = new Runable();
new Thread(runable).start();

三、實現Callable接口 + FutureTask(能夠拿到返回結果,能夠處理異常)多線程

Callable01 callable01 = new Callable01(); 
FutureTask<Integer> futureTask = new FutureTask<>(new callable01());
new Thread(futureTask).start();
Integer result = futureTask.get(); // 阻塞等待,直到整個線程執行完成,得到返回結果
  • FutureTask源碼
public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
	// Runnable也能夠得到返回結果
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

四、線程池 (最經常使用) ,應該將全部多線程異步任務都交給線程池執行app

Excutors(少用)框架

public static ExecutorService service = Executors.newFixedThreadPool(10);
原生ThreadExcutorPool(經常使用)

7大參數源碼less

/**
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     */
    public ThreadPoolExecutor(
        int corePoolSize, // 核心線程數,建立好就準備就緒的線程數量,一直存在,空閒也不會被釋放,除非設置allowCoreThreadTimeOut
        int maximumPoolSize, // 最大線程數量,用於控制資源,空閒線程超過指定的keepAliveTime時間會被釋放
        long keepAliveTime, // 存活時間。若是當前的線程數大於核心線程數,那麼只要線程空閒時間大於指定keepAliveTime,就釋放該線程
        TimeUnit unit, // 存活時間的時間單位
        BlockingQueue<Runnable> workQueue, // 阻塞式工做隊列,若是任務不少,就會將多的任務放進該隊列,只要有線程空閒就會去隊列取新任務執行
        ThreadFactory threadFactory, // 線程的建立工廠
        RejectedExecutionHandler handler  // 若是workQueue工做隊列滿了,按照指定的handler方法執行拒絕策略執行任務 
    )
開發中通常只用使用線程池ThreadExecutorPool,能夠下降線程建立和銷燬帶來的性能損耗、提升響應速度、提升線程可管理性

線程池原理

其實java線程池的實現原理很簡單,說白了就是一個線程集合workerSet和一個阻塞隊列workQueue。當用戶向線程池提交一個任務(也就是線程)時,線程池會先將任務放入workQueue中。workerSet中的線程會不斷的從workQueue中獲取線程而後執行。當workQueue中沒有任務的時候,worker就會阻塞,直到隊列中有任務了就取出來繼續執行異步

image-20201213140533744

  1. corePoolSize: 規定線程池有幾個線程(worker)在運行。
  2. maximumPoolSize: 當workQueue滿了,不能添加任務的時候,這個參數纔會生效。規定線程池最多隻能有多少個線程(worker)在執行。
  3. keepAliveTime: 超出corePoolSize大小的那些線程的生存時間,這些線程若是長時間沒有執行任務而且超過了keepAliveTime設定的時間,就會消亡。
  4. unit: 生存時間對於的單位
  5. workQueue: 存聽任務的隊列
  6. threadFactory: 建立線程的工廠
  7. handler: 當workQueue已經滿了,而且線程池線程數已經達到maximumPoolSize,將執行拒絕策略。

任務提交後的流程分析

用戶經過submit提交一個任務。線程池會執行以下流程:async

  1. 判斷當前運行的worker數量是否超過corePoolSize, 若是不超過corePoolSize。就建立一個worker直接執行該任務。—— 線程池最開始是沒有worker在運行的
  2. 若是正在運行的worker數量超過或者等於corePoolSize,那麼就將該任務加入到workQueue隊列中去。
  3. 若是workQueue隊列滿了,也就是offer方法返回false的話,就檢查當前運行的worker數量是否小於maximumPoolSize,若是小於就建立一個worker直接執行該任務。
  4. 若是當前運行的worker數量是否大於等於maximumPoolSize,那麼就執行RejectedExecutionHandler來拒絕這個任務的提交

源碼解析

咱們先來看一下ThreadPoolExecutor中的幾個關鍵屬性。ide

//這個屬性是用來存放 當前運行的worker數量以及線程池狀態的
//int是32位的,這裏把int的高3位拿來充當線程池狀態的標誌位,後29位拿來充當當前運行worker的數量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存聽任務的阻塞隊列
private final BlockingQueue<Runnable> workQueue;
//worker的集合,用set來存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//歷史達到的worker數最大值
private int largestPoolSize;
//當隊列滿了而且worker的數量達到maxSize的時候,執行具體的拒絕策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存時間
private volatile long keepAliveTime;
//常駐worker的數量
private volatile int corePoolSize;
//最大worker的數量,通常當workQueue滿了纔會用到這個參數
private volatile int maximumPoolSize;

1. 提交任務相關源碼

下面是execute方法的源碼函數

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //workerCountOf(c)會獲取當前正在運行的worker數量
        if (workerCountOf(c) < corePoolSize) {
            //若是workerCount小於corePoolSize,就建立一個worker而後直接執行該任務
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //isRunning(c)是判斷線程池是否在運行中,若是線程池被關閉了就不會再接受任務
        //後面將任務加入到隊列中
        if (isRunning(c) && workQueue.offer(command)) {
            //若是添加到隊列成功了,會再檢查一次線程池的狀態
            int recheck = ctl.get();
            //若是線程池關閉了,就將剛纔添加的任務從隊列中移除
            if (! isRunning(recheck) && remove(command))
                //執行拒絕策略
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //若是加入隊列失敗,就嘗試直接建立worker來執行任務
        else if (!addWorker(command, false))
            //若是建立worker失敗,就執行拒絕策略
            reject(command);
}

添加worker的方法addWorker源碼

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        //使用自旋+cas失敗重試來保證線程競爭問題
        for (;;) {
            //先獲取線程池的狀態
            int c = ctl.get();
            int rs = runStateOf(c);

            // 若是線程池是關閉的,或者workQueue隊列非空,就直接返回false,不作任何處理
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //根據入參core 來判斷能夠建立的worker數量是否達到上限,若是達到上限了就拒絕建立worker
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //沒有的話就嘗試修改ctl添加workerCount的值。這裏用了cas操做,若是失敗了下一個循環會繼續重試,直到設置成功
                if (compareAndIncrementWorkerCount(c))
                    //若是設置成功了就跳出外層的那個for循環
                    break retry;
                //重讀一次ctl,判斷若是線程池的狀態改變了,會再從新循環一次
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            //建立一個worker,將提交上來的任務直接交給worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //加鎖,防止競爭
                mainLock.lock();
                try {
                    int c = ctl.get();
                    int rs = runStateOf(c);
                    //仍是判斷線程池的狀態
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //若是worker的線程已經啓動了,會拋出異常
                        if (t.isAlive()) 
                              throw new IllegalThreadStateException();
                        //添加新建的worker到線程池中
                        workers.add(w);
                        int s = workers.size();
                        //更新歷史worker數量的最大值
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //設置新增標誌位
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //若是worker是新增的,就啓動該線程
                if (workerAdded) {
                    t.start();
                     //成功啓動了線程,設置對應的標誌位
                    workerStarted = true;
                }
            }
        } finally {
            //若是啓動失敗了,會觸發執行相應的方法
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
}

2. Worker的結構

Worker是ThreadPoolExecutor內部定義的一個內部類。咱們先看一下Worker的繼承關係

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

它實現了Runnable接口,因此能夠拿來當線程用。同時它還繼承了AbstractQueuedSynchronizer同步器類,主要用來實現一個不可重入的鎖。

一些屬性還有構造方法:

//運行的線程,前面addWorker方法中就是直接經過啓動這個線程來啓動這個worker
final Thread thread;
//當一個worker剛建立的時候,就先嚐試執行這個任務
Runnable firstTask;
//記錄完成任務的數量
volatile long completedTasks;
Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //建立一個Thread,將本身設置給他,後面這個thread啓動的時候,也就是執行worker的run方法
            this.thread = getThreadFactory().newThread(this);
}

worker的run方法

public void run() {
            //這裏調用了ThreadPoolExecutor的runWorker方法
            runWorker(this);
}

ThreadPoolExecutor的runWorker方法

final void runWorker(Worker w) {
        //獲取當前線程
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        //執行unlock方法,容許其餘線程來中斷本身
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //若是前面的firstTask有值,就直接執行這個任務
            //若是沒有具體的任務,就執行getTask()方法從隊列中獲取任務
            //這裏會不斷執行循環體,除非線程中斷或者getTask()返回null纔會跳出這個循環
            while (task != null || (task = getTask()) != null) {
                //執行任務前先鎖住,這裏主要的做用就是給shutdown方法判斷worker是否在執行中的
                //shutdown方法裏面會嘗試給這個線程加鎖,若是這個線程在執行,就不會中斷它
                w.lock();
               //判斷線程池狀態,若是線程池被強制關閉了,就立刻退出
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //執行任務前調用。預留的方法,可擴展
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //真正的執行任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                       //執行任務後調用。預留的方法,可擴展
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //記錄完成的任務數量
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
}

下面來看一下getTask()方法,這裏面涉及到keepAliveTime的使用,從這個方法咱們能夠看出先吃池是怎麼讓超過corePoolSize的那部分worker銷燬的。

private Runnable getTask() {
        boolean timedOut = false; 

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 若是線程池已經關閉了,就直接返回null,
            //若是這裏返回null,調用的那個worker就會跳出while循環,而後執行完銷燬線程
            //SHUTDOWN狀態表示執行了shutdown()方法
            //STOP表示執行了shutdownNow()方法
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //獲取當前正在運行中的worker數量
            int wc = workerCountOf(c);

            // 若是設置了核心worker也會超時或者當前正在運行的worker數量超過了corePoolSize,就要根據時間判斷是否要銷燬線程了
            //其實就是從隊列獲取任務的時候要不要設置超時間時間,若是超過這個時間隊列尚未任務進來,就會返回null
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            //若是上一次循環從隊列獲取到的未null,這時候timedOut就會爲true了
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //經過cas來設置WorkerCount,若是多個線程競爭,只有一個能夠設置成功
                //最後若是沒設置成功,就進入下一次循環,說不定下一次worker的數量就沒有超過corePoolSize了,也就不用銷燬worker了
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //若是要設置超時時間,就設置一下咯
                //過了這個keepAliveTime時間尚未任務進隊列就會返回null,那worker就會銷燬
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //若是r爲null,就設置timedOut爲true
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
}

3. 添加Callable任務的實現源碼

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
}

要添加一個有返回值的任務的實現也很簡單。其實就是對任務作了一層封裝,將其封裝成Future,而後提交給線程池執行,最後返回這個future。
這裏的 newTaskFor(task) 方法會將其封裝成一個FutureTask類。
外部的線程拿到這個future,執行get()方法的時候,若是任務自己沒有執行完,執行線程就會被阻塞,直到任務執行完。
下面是FutureTask的get方法

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //判斷狀態,若是任務還沒執行完,就進入休眠,等待喚醒
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        //返回值
        return report(s);
}

FutureTask中經過一個state狀態來判斷任務是否完成。當run方法執行完後,會將state狀態置爲完成,同時喚醒全部正在等待的線程。咱們能夠看一下FutureTask的run方法

public void run() {
        //判斷線程的狀態
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //執行call方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    //這個方法裏面會設置返回內容,而且喚醒因此等待中的線程
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
}

4. shutdown和shutdownNow方法的實現

shutdown方法會將線程池的狀態設置爲SHUTDOWN,線程池進入這個狀態後,就拒絕再接受任務,而後會將剩餘的任務所有執行完

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //檢查是否能夠關閉線程
            checkShutdownAccess();
            //設置線程池狀態
            advanceRunState(SHUTDOWN);
            //嘗試中斷worker
            interruptIdleWorkers();
             //預留方法,留給子類實現
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
}

private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //遍歷全部的worker
            for (Worker w : workers) {
                Thread t = w.thread;
                //先嚐試調用w.tryLock(),若是獲取到鎖,就說明worker是空閒的,就能夠直接中斷它
                //注意的是,worker本身自己實現了AQS同步框架,而後實現的相似鎖的功能
                //它實現的鎖是不可重入的,因此若是worker在執行任務的時候,會先進行加鎖,這裏tryLock()就會返回false
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
}

shutdownNow作的比較絕,它先將線程池狀態設置爲STOP,而後拒絕全部提交的任務。最後中斷左右正在運行中的worker,而後清空任務隊列。

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //檢測權限
            advanceRunState(STOP);
            //中斷全部的worker
            interruptWorkers();
            //清空任務隊列
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
}

private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //遍歷全部worker,而後調用中斷方法
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
}

CompletableFuture異步編排

  • 建立異步對象

無回調結果的異步方法:CompletableFuture.runAsync(Runnable runnable, Executor executor)

public static void main(String[] args) {
        System.out.println("main。。。start。。。");
        CompletableFuture.runAsync(() -> {
            System.out.println("當前線程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運行結果:" + i);
        }, executor);
        System.out.println("main。。。end。。。");
    }

帶有回調結果的異步方法:CompletableFuture.supplyAsyn(Supplier supplier, Executor executor)

public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main。。。start。。。");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("當前線程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運行結果:" + i);
            return i;
        }, executor);
        Integer integer = future.get();
        System.out.println("main。。。end。。。" + integer);
    }
  • 計算完成時回調方法

方法完成後的感知 whenCompleteAsync 和 exceptionally

public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main。。。start。。。");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("當前線程:" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("運行結果:" + i);
            return i;
        }, executor).whenCompleteAsync((res, exception) -> {
            // 只能獲得異常信息,沒法修改返回數據
            System.out.println("異步任務完成了,結果是:" + res + "異常是:" + exception);
        }).exceptionally(throwable -> {
            // 能夠獲取異常信息,同時能夠返回默認值
            return 10;  // 修改返回值future.get() 結果爲10
        });
        Integer integer = future.get();
        System.out.println("main。。。end。。。" + integer);
    }
/** 結果:
    main。。。start。。。
    當前線程:11
    異步任務完成了,結果是:null異常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    main。。。end。。。10
**/
  • 方法完成後的處理 handle
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("當前線程:" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("運行結果:" + i);
            return i;
        }, executor).handle((res, thr) -> {
            if(res != null) {
                return res * 2; // 修改future返回結果
            }
            if(thr != null) { // 異常
                return 0
            }
            return 0;
        });
  • 線程串行化

一、thenRunAsync:不能獲取到上一步執行結果,無返回值

CompletableFuture.supplyAsync(() -> {
    System.out.println("當前線程:" + Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("運行結果:" + i);
    return i;
}, executor).thenRunAsync(() -> {  // 沒有返回值, 不能獲取到上一步的執行結果
    System.out.println("任務2啓動了。。");
}, executor);

二、thenAcceptAsync:能接收到上一步結果但無返回值

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("當前線程:" + Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("運行結果:" + i);
    return i;
}, executor).thenAcceptAsync((res) -> {  // 沒有返回值
    System.out.println("任務2啓動了。。" + res);
}, executor);

三、thenApplAsyncy:能接收到上一步返回結果,也有返回值

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("當前線程:" + Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("運行結果:" + i);
    return i;
}, executor).thenApplyAsync((res) -> {  // 沒有返回值
    System.out.println("任務2啓動了。。" + res);
    return "hello" + res;
}, executor);
  • 兩任務組合
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("當前線程:" + Thread.currentThread().getId());
    int i = 10 / 4;
    System.out.println("運行結果:" + i);
    return i;
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    return "hello";
}, executor);

future1.runAfterBothAsync(future2, () -> { // 沒法感知前兩個任務的結果
    System.out.println("任務3開始。。");
}, executor);

future1.thenAcceptBothAsync(future2, (f1, f2) -> { // 能獲取到前兩個任務的結果
    System.out.println("任務3開始。。。以前的結果:" + f1 + "=>" +f2);
},executor);

// 既能獲取到前兩個任務的返回結果, 又能最終的返回結果future
CompletableFuture<String> future = future1.thenCombineAsync(future2, (f1, f2) -> {
            return f1 + ":" + f2 + "-> haha";
        }, executor);


// 兩個任務只要有一個完成,就執行任務3, 不能接受結果,沒有返回值
future1.runAfterEitherAsync(future2, () -> {  
            System.out.println("任務3開始。。。以前的結果:" + res);
        }, executor);


// 兩個任務只要有一個完成,就執行任務3, 感知結果,本身沒有返回值
future1.acceptEitherAsync(future2, (res) -> {
            System.out.println("任務3開始。。。以前的結果:" + res);
        }, executor);

// 兩個任務只要有一個完成,就執行任務3, 既能感知結果,本身也有返回值
CompletableFuture<Object> future = future1.applyToEitherAsync(future2, res -> {
            System.out.println("任務3開始。。。以前的結果:" + res);
        }executor);
  • 多任務組合操做

    // 執行完全部任務才執行 allOf.get()
    CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
    System.out.println("main。。。end。。。" + allOf.get());
    // 有一個執行完就執行 anyOf.get()
    CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
    System.out.println("main。。。end。。。" + anyOf.get());

Java異步實現

1、建立線程

@Test
public void test0() throws Exception {
  System.out.println("main函數開始執行");
  Thread thread=new Thread(new Runnable() {
    @Override
    public void run() {
      System.out.println("===task start===");
      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println("===task finish===");
    }
  });

  thread.start();
  System.out.println("main函數執行結束");

}

2、Future

jdk8以前的實現方式,在JUC下增長了Future,從字面意思理解就是將來的意思,但使用起來卻着實有點雞肋,並不能實現真正意義上的異步,獲取結果時須要阻塞線程,或者不斷輪詢。

@Test
public void test1() throws Exception {

    System.out.println("main函數開始執行");

    ExecutorService executor = Executors.newFixedThreadPool(1);
    Future<Integer> future = executor.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {

            System.out.println("===task start===");
            Thread.sleep(5000);
            System.out.println("===task finish===");
            return 3;
        }
    });
    //這裏須要返回值時會阻塞主線程,若是不須要返回值使用是OK的。倒也還能接收
    //Integer result=future.get();
    System.out.println("main函數執行結束");

    System.in.read();

}

3、CompletableFuture

使用原生的CompletableFuture實現異步操做,加上對lambda的支持,能夠說實現異步任務已經發揮到了極致。

@Test
public void test2() throws Exception {
    System.out.println("main函數開始執行");
    ExecutorService executor = Executors.newFixedThreadPool(2);
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            System.out.println("===task start===");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("===task finish===");
            return 3;
        }
    }, executor);
    future.thenAccept(e -> System.out.println(e));
    System.out.println("main函數執行結束");
}

4、Spring的Async註解

使用spring實現異步須要開啓註解,可使用xml方式或者java config的方式。

xml方式: <task:annotation-driven />

<task:annotation-driven executor="executor" />
<task:executor id="executor"
        pool-size="2" 線程池的大小
        queue-capacity="100" 排隊隊列長度 
        keep-alive="120" 線程保活時間(單位秒)
        rejection-policy="CALLER_RUNS" 對拒絕的任務處理策略 />

java方式:

@EnableAsync
public class MyConfig {

    @Bean
    public TaskExecutor executor(){
        ThreadPoolTaskExecutor executor=new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10); //核心線程數
        executor.setMaxPoolSize(20);  //最大線程數
        executor.setQueueCapacity(1000); //隊列大小
        executor.setKeepAliveSeconds(300); //線程最大空閒時間
        executor.setThreadNamePrefix("fsx-Executor-"); //指定用於新建立的線程名稱的前綴。
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}

(1)@Async

@Test
public void test3() throws Exception {
    System.out.println("main函數開始執行");
    myService.longtime();
    System.out.println("main函數執行結束");
}

 @Async
public void longtime() {
    System.out.println("我在執行一項耗時任務");
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("完成");

}

(2)AsyncResult

若是須要返回值,耗時方法返回值用AsyncResult包裝。

@Test
public void test4() throws Exception {
    System.out.println("main函數開始執行");
    Future<Integer> future=myService.longtime2();
    System.out.println("main函數執行結束");
    System.out.println("異步執行結果:"+future.get());
}

 @Async
public Future<Integer> longtime2() {
    System.out.println("我在執行一項耗時任務");

    try {
        Thread.sleep(8000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("完成");
    return new AsyncResult<>(3);
}
相關文章
相關標籤/搜索