Java之協程(quasar)

 1、前面咱們簡單的說了一下,Python中的協程原理。這裏補充Java的協程實現過程。有須要能夠查看python之協程html

  2、Java協程,其實作Java這麼久我也沒有怎麼聽過Java協程的東西,可是一直有有聽到微線程/協程的概念,這不在學習Python的時候接觸到了協程一詞。而後返回來去了解Java的協程問題,可是看了不少資料,發現官網以及不少地方都沒有涉及到協程的東西,沒有辦法,只能經過強大的社區來學習協程的相關東西。java

  3、這裏主要關注的是:quasar。python

  1)協程的目的:當咱們在使用多線程的時候,若是存在長時間的I/O操做。這個時候線程一直處於阻塞狀態,若是線程不少的時候,會存在不少線程處於空閒狀態,形成了資源應用不完全。相對的協程不同了,在單線程中多個任務來回自行若是出現長時間的I/O操做,讓其讓出目前的協程調度,執行下一個任務。固然可能全部任務,所有卡在同一個點上,可是這只是針對於單線程而言,當全部數據正常返回時,會同時處理當前的I/O操做。golang

  2)多線程測試(這裏使用100萬個線程,來測試內存佔用) 編程

複製代碼

for (int i = 0; i < 1000000; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(100000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

複製代碼

  結果:多線程

  直接卡死了,內存溢出併發

  

  可想而知,若是存在100萬個線程,開銷是有多大。學習

  3)協程測試測試

  a、阿里雲搜索到的依賴包阿里雲

    <dependency>
            <groupId>co.paralleluniverse</groupId>
            <artifactId>quasar-core</artifactId>
            <version>0.7.9</version>
            <classifier>jdk8</classifier>
        </dependency>

  b、測試內存佔用量

複製代碼

  public static void main(String[] args) throws Exception {
        //使用阻塞隊列來獲取結果。
        LinkedBlockingQueue<Fiber<Integer>> fiberQueue = new LinkedBlockingQueue<>();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        for (int i = 0; i < 1000000; i++) {
            int finalI = i;
            //這裏的Fiber有點像Callable,能夠返回數據
            Fiber<Integer> fiber = new Fiber<>((SuspendableCallable<Integer>) () -> {
                //這裏用於測試內存佔用量
                Fiber.sleep(100000);
                System.out.println("in-" + finalI + "-" + LocalDateTime.now().format(formatter));
                return finalI;
            });
            //開始執行
            fiber.start();
            //加入隊列
            fiberQueue.add(fiber);
        }
        while (true) {
            //阻塞
            Fiber<Integer> fiber = fiberQueue.take();
            System.out.println("out-" + fiber.get() + "-" + LocalDateTime.now().format(formatter));
        }
    }

複製代碼

  結果:

  堆:

  

   估計:1個G左右。

  內存:

  

  估計:1個G左右,也就是每個fiber佔用1Kb左右。

   c、正常測試

  修改一下參數:

複製代碼

public static void main(String[] args) throws Exception {
        //使用阻塞隊列來獲取結果。
        LinkedBlockingQueue<Fiber<Integer>> fiberQueue = new LinkedBlockingQueue<>();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            //這裏的Fiber有點像Callable,能夠返回數據
            Fiber<Integer> fiber = new Fiber<>((SuspendableCallable<Integer>) () -> {
                //這裏用於測試內存佔用量
                Fiber.sleep(1000);
                System.out.println("in-" + finalI + "-" + LocalDateTime.now().format(formatter));
                return finalI;
            });
            //開始執行
            fiber.start();
            //加入隊列
            fiberQueue.add(fiber);
        }
        while (true) {
            //阻塞
            Fiber<Integer> fiber = fiberQueue.take();
            System.out.println("out-" + fiber.get() + "-" + LocalDateTime.now().format(formatter));
        }
    }

複製代碼

  結果:

  

  4)能夠看出併發的狀態仍是很不錯的,固然這只是多個任務執行而已。

  4、經過上面的測試,能夠看出,quasar中Fiber,很像Callable的用法、並且在內存佔用上面減小了不少,固然,堆的數量確實很多,可是能夠接受。

  仍是要說明:協程的方式更多用來作I/O密集型的操做。計算密集型的仍是使用線程更加合理。

  5、原理,我估麼着看了一下源碼,複雜度很高,這裏不作深究。

  原理參考:次時代Java編程(一):Java裏的協程

  一、Quasar裏的Fiber實際上是一個continuation,他能夠被Quasar定義的scheduler調度,一個continuation記錄着運行實例的狀態,並且會被隨時中斷,而且也會隨後在他被中斷的地方恢復。Quasar實際上是經過修改bytecode來達到這個目的,因此運行Quasar程序的時候,你須要先經過java-agent在運行時修改你的代碼,固然也能夠在編譯期間這麼幹。golang的內置了本身的調度器,Quasar則默認使用ForkJoinPool這個JDK7之後纔有的,具備work-stealing功能的線程池來當調度器。work-stealing很是重要,由於你不清楚哪一個Fiber會先執行完,而work-stealing能夠動態的從其餘的等等隊列偷一個context過來,這樣能夠最大化使用CPU資源。

  二、那這裏你會問了,Quasar怎麼知道修改哪些字節碼呢,其實也很簡單,Quasar會經過java-agent在運行時掃描哪些方法是能夠中斷的,同時會在方法被調用前和調度後的方法內插入一些continuation邏輯,若是你在方法上定義了@Suspendable註解,那Quasar會對調用該註解的方法作相似下面的事情。

  三、這裏假設你在方法f上定義了@Suspendable,同時去調用了有一樣註解的方法g,那麼全部調用f的方法會插入一些字節碼,這些字節碼的邏輯就是記錄當前Fiber棧上的狀態,以便在將來能夠動態的恢復。(Fiber相似線程也有本身的棧)。在suspendable方法鏈內Fiber的父類會調用Fiber.park,這樣會拋出SuspendExecution異常,從而來中止線程的運行,好讓Quasar的調度器執行調度。這裏的SuspendExecution會被Fiber本身捕獲,業務層面上不該該捕獲到。若是Fiber被喚醒了(調度器層面會去調用Fiber.unpark),那麼f會在被中斷的地方從新被調用(這裏Fiber會知道本身在哪裏被中斷),同時會把g的調用結果(g會return結果)插入到f的恢復點,這樣看上去就好像g的return是f的local variables了,從而避免了callback嵌套。

  四、上面囉嗦了一大堆,其實簡單點講就是,想辦法讓運行中的線程棧停下來,好讓Quasar的調度器介入。JVM線程中斷的條件只有兩個,一個是拋異常,另一個就是return。這裏Quasar就是經過拋異常的方式來達到的,因此你會看到我上面的代碼會拋出SuspendExecution。可是若是你真捕獲到這個異常,那就說明有問題了,因此通常會這麼寫。

相關文章
相關標籤/搜索