什麼是協程
大多數的開發人員可能對進程,線程這兩個名字比較熟悉。可是爲了追求最大力度的發揮硬件的性能和提高軟件的速度,出現了協程或者叫纖程(Fiber),或者綠色線程(GreenThread)。那咱們來聊下什麼是協程,以及在java中是怎麼體現和運用協程的。html
在說協程以前,咱們先來回想下,如今大多數的程序中,都是使用了多線程技術來解決一些須要長時間阻塞的場景。JAVA中每一個線程棧默認1024K,沒有辦法開成千上萬個線程,並且就算經過JVM參數調小,CPU也沒法分配時間片給每一個線程,大多數的線程仍是在等待中,因此咱們通常會使用
Runtime.getRuntime().availableProcessors()來配置線程數的大小(或者會根據實際狀況調整,就不展開討論了),可是就算是咱們開了新的線程,該線程也多是在等待系統IO的返回或者網絡IO的返回,並且線程的切換有着大量的開銷。java
爲了解決上面說的問題,你們可能會想到回調。如今不少框架都是基於回調來解決那些耗時的操做。但層數嵌套多了反而會引發反人類的回調地獄,而且回調後就丟失原函數的上下文。其中的表明呢就好比說nodeJs。node
終於能夠來聊聊協程。它的基本原理是:在某個點掛起當前的任務,而且保存棧信息,去執行另外一個任務;等完成或達到某個條件時,在還原原來的棧信息並繼續執行。上面提到的幾個點你們會想到JVM的結構,棧, 程序計數器等等,可是JVM原生是不支持這樣的操做的(至少java是不支持的,kotlin是能夠的)。所以若是要在純java代碼裏須要使用協程的話須要引入第三方包,如kilim,Quasar。而kilim已經好久未更新了,那麼咱們來看看Quasar。python
Quasar原理
一、利用字節碼加強,將普通的java代碼轉換爲支持協程的代碼。
二、在調用pausable方法的時候,若是pause了就保存當前方法棧的State,中止執行當前協程,將控制權交給調度器
三、調度器負責調度就緒的協程
四、協程resume的時候,自動恢復State,根據協程的pc計數跳轉到上次執行的位置,繼續執行。
這些第三方的框架大部分實現是一致的。經過對字節碼直接操做,在編譯期把你寫的代碼變爲支持協程的版本,並在運行時把你全部須要用到協程的部分由他來控制和調度,同時也支持在運行期這樣作。
Quasar中使用了拋異常的方式來中斷線程,可是 實際上若是咱們捕獲了這個異常就會產生問題,因此通常是以這種方式來註冊:
@Suspendable
public int f() {
try {
// do some stuff
return g() * 2;
} catch(SuspendExecution s) {
//這裏不該該捕獲到異常.
throw new AssertionError(s);
}
}
在調度方面,Quasar中默認使用了JDK7以上纔有的ForkJoinPool,它的優點就在於空閒的線程會去從其餘線程任務隊列尾部」偷取」任務來本身處理,所以也叫work-stealing功能。這個功能能夠大大的利用CPU資源,不讓線程白白空閒着。程序員
Quasar模塊golang
Fiber
Fiber能夠認爲是一個微線程,使用方式基本上和Thread相同,啓動start:
new Fiber<V>() {
@Override
protected V run() throws SuspendExecution, InterruptedException {算法
// your code
}
}.start();
new Fiber<Void>(new SuspendableRunnable() {
public void run() throws SuspendExecution, InterruptedException {
// your code
}
}).start(); 網絡
其實它更相似於一個CallBack,是能夠攜帶返回值的,而且能夠拋異常SuspendExecution,InterruptedException。你也能夠向其中傳遞SuspendableRunnable 或 SuspendableCallable 給Fiber的構造函數。你甚至能夠像線程同樣調用join(),或者get()來阻塞線程等待他完成。
當Fiber比較大的時候,Fiber能夠在調用parkAndSerialize 方法時被序列化,在調用unparkSerialized時被反序列化。
從以上咱們能夠看出Fiber與Thread很是相似,極大的減小了遷移的成本。多線程
FiberScheduler
FiberScheduler是Quasar框架中核心的任務調度器,負責管理任務的工做者線程WorkerThread,以前提到的他是一個FiberForkJoinScheduler。
ForkJoinPool的默認初始化個數爲Runtime.getRuntime().availableProcessors()。框架
instrumentation
當一個類被加載時,Quasar的instrumentation模塊 (使用 Java agent時) 搜索suspendable 方法。每個suspendable 方法 f經過下面的方式 instrument:
它搜索對其它suspendable方法的調用。對suspendable方法g的調用,一些代碼會在這個調用g的先後被插入,它們會保存和恢復fiber棧本地變量的狀態,記錄這個暫停點。在這個「suspendable function chain」的最後,咱們會發現對Fiber.park的調用。park暫停這個fiber,扔出 SuspendExecution異常。
當g block的時候,SuspendExecution異常會被Fiber捕獲。 當Fiber被喚醒(使用unpark), 方法f會被調用, 執行記錄顯示它被block在g的調用上,因此程序會當即跳到f調用g的那一行,而後調用它。最終咱們會到達暫停點,而後繼續執行。當g返回時, f中插入的代碼會恢復f的本地變量。
過程聽起來很複雜,可是它只會帶來3% ~ 5%的性能的損失。
下面看一個簡單的例子, 方法m2聲明拋出SuspendExecution異常,方法m1調用m2和m3,因此也聲明拋出這個異常,最後這個異常會被Fiber所捕獲:
public class Helloworld {
static void m1() throws SuspendExecution, InterruptedException {
String m = "m1"; System.out.println("m1 begin"); m = m2(); m = m3(); System.out.println("m1 end"); System.out.println(m);
}
static String m2() throws SuspendExecution, InterruptedException {
return "m2";
}
static String m3() throws SuspendExecution, InterruptedException {
return "m3";
}
static public void main(String[] args) throws ExecutionException, InterruptedException {
new Fiber<Void>("Caller", new SuspendableRunnable() { @Override public void run() throws SuspendExecution, InterruptedException { m1(); } }).start();
}
}
// 反編譯後的代碼
@Instrumented(suspendableCallSites={16, 17}, methodStart=13, methodEnd=21, methodOptimized=false)
static void m1()
throws SuspendExecution, InterruptedException
{
// Byte code:
// 0: aconst_null
// 1: astore_3
// 2: invokestatic 88 co/paralleluniverse/fibers/Stack:getStack ()Lco/paralleluniverse/fibers/Stack;
// 5: dup
// 6: astore_1
// 7: ifnull +42 -> 49
// 10: aload_1
// 11: iconst_1
// 12: istore_2
// 13: invokevirtual 92 co/paralleluniverse/fibers/Stack:nextMethodEntry ()I
// 16: tableswitch default:+24->40, 1:+64->80, 2:+95->111
// 40: aload_1
// 41: invokevirtual 96 co/paralleluniverse/fibers/Stack:isFirstInStackOrPushed ()Z
// 44: ifne +5 -> 49
// 47: aconst_null
// 48: astore_1
// 49: iconst_0
// 50: istore_2
// 51: ldc 2
// 53: astore_0
// 54: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 57: ldc 4
// 59: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 62: aload_1
// 63: ifnull +26 -> 89
// 66: aload_1
// 67: iconst_1
// 68: iconst_1
// 69: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 72: aload_0
// 73: aload_1
// 74: iconst_0
// 75: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 78: iconst_0
// 79: istore_2
// 80: aload_1
// 81: iconst_0
// 82: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 85: checkcast 110 java/lang/String
// 88: astore_0
// 89: invokestatic 6 com/colobu/fiber/Helloworld:m2 ()Ljava/lang/String;
// 92: astore_0
// 93: aload_1
// 94: ifnull +26 -> 120
// 97: aload_1
// 98: iconst_2
// 99: iconst_1
// 100: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 103: aload_0
// 104: aload_1
// 105: iconst_0
// 106: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 109: iconst_0
// 110: istore_2
// 111: aload_1
// 112: iconst_0
// 113: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 116: checkcast 110 java/lang/String
// 119: astore_0
// 120: invokestatic 7 com/colobu/fiber/Helloworld:m3 ()Ljava/lang/String;
// 123: astore_0
// 124: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 127: ldc 8
// 129: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 132: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 135: aload_0
// 136: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 139: aload_1
// 140: ifnull +7 -> 147
// 143: aload_1
// 144: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 147: return
// 148: aload_1
// 149: ifnull +7 -> 156
// 152: aload_1
// 153: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 156: athrow
// Line number table:
// Java source line #13 -> byte code offset #51
// Java source line #15 -> byte code offset #54
// Java source line #16 -> byte code offset #62
// Java source line #17 -> byte code offset #93
// Java source line #18 -> byte code offset #124
// Java source line #19 -> byte code offset #132
// Java source line #21 -> byte code offset #139
// Local variable table:
// start length slot name signature
// 53 83 0 m String
// 6 147 1 localStack co.paralleluniverse.fibers.Stack
// 12 99 2 i int
// 1 1 3 localObject Object
// 156 1 4 localSuspendExecution SuspendExecution
// Exception table:
// from to target type
// 49 148 148 finally
// 49 148 156 co/paralleluniverse/fibers/SuspendExecution
// 49 148 156 co/paralleluniverse/fibers/RuntimeSuspendExecution
}
我並無更深刻的去了解Quasar的實現細節以及調度算法,有興趣的讀者能夠翻翻它的代碼。
實戰
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.5.1</version>
<executions>
<execution>
<id>getClasspathFilenames</id>
<goals>
<goal>properties</goal>
</goals>
</execution>
</executions>
</plugin>
public class Helloworld {
@Suspendable
static void m1() throws InterruptedException, SuspendExecution {
String m = "m1"; //System.out.println("m1 begin"); m = m2(); //System.out.println("m1 end"); //System.out.println(m);
}
static String m2() throws SuspendExecution, InterruptedException {
String m = m3(); Strand.sleep(1000); return m;
}
//or define in META-INF/suspendables
@Suspendable
static String m3() {
List l = Stream.of(1,2,3).filter(i -> i%2 == 0).collect(Collectors.toList()); return l.toString();
}
static public void main(String[] args) throws ExecutionException, InterruptedException {
int count = 10000; testThreadpool(count); testFiber(count);
}
static void testThreadpool(int count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(count); ExecutorService es = Executors.newFixedThreadPool(200); LongAdder latency = new LongAdder(); long t = System.currentTimeMillis(); for (int i =0; i< count; i++) { es.submit(() -> { long start = System.currentTimeMillis(); try { m1(); } catch (InterruptedException e) { e.printStackTrace(); } catch (SuspendExecution suspendExecution) { suspendExecution.printStackTrace(); } start = System.currentTimeMillis() - start; latency.add(start); latch.countDown(); }); } latch.await(); t = System.currentTimeMillis() - t; long l = latency.longValue() / count; System.out.println("thread pool took: " + t + ", latency: " + l + " ms"); es.shutdownNow();
}
static void testFiber(int count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(count); LongAdder latency = new LongAdder(); long t = System.currentTimeMillis(); for (int i =0; i< count; i++) { new Fiber<Void>("Caller", new SuspendableRunnable() { @Override public void run() throws SuspendExecution, InterruptedException { long start = System.currentTimeMillis(); m1(); start = System.currentTimeMillis() - start; latency.add(start); latch.countDown(); } }).start(); } latch.await(); t = System.currentTimeMillis() - t; long l = latency.longValue() / count; System.out.println("fiber took: " + t + ", latency: " + l + " ms");
}
}
OUTPUT:
1
2
thread pool took: 50341, latency: 1005 ms
fiber took: 1158, latency: 1000 ms
能夠看到很明顯的時間差距,存在多線程阻塞的狀況下,協程的性能很是的好,可是。若是把sleep這段去掉,Fiber的性能反而更差:
這說明Fiber並不意味着它能夠在全部的場景中均可以替換Thread。當fiber的代碼常常會被等待其它fiber阻塞的時候,就應該使用fiber。
對於那些須要CPU長時間計算的代碼,不多遇到阻塞的時候,就應該首選thread
擴展
其實協程這個概念在其餘的語言中有原生的支持,如:
kotlin 1.30以後已經穩定
: https://www.kotlincn.net/docs...
golang : https://gobyexample.com/gorou...
python : http://www.gevent.org/content...~
在這些語言中協程就看起來至少沒這麼奇怪或者難以理解了,並且開發起開也相比java簡單不少。
總結
協程的概念也不算是很新了,可是在像Java這樣的語言或者特定的領域並非很火,也並無徹底普及。不是很明白是它的學習成本高,仍是說應用場景是在過小了。可是當我聽到這個概念的時候確實是挺好奇,也挺好奇的。也但願以後會有更多的框架和特性來簡化咱們苦逼程序員的開發~~