Coroutine in Java - Quasar Fiber實現--轉載

轉自 https://segmentfault.com/a/1190000006079389?from=groupmessage&isappinstalled=0

簡介

說到協程(Coroutine),不少人會想到go,lua,erlang等語言,其實JVM上也有蠻多的實現,如PicoThread,Kilim,Quasar等,本文主要介紹其中一種Coroutine實現 -- Quasar Fiber,Quasar Fiber相對來講流行度更好一些,若是以前沒有接觸過協程(用戶級輕量級線程),能夠看下What are fibersCoroutinehtml

那麼爲何要使用協程?
協程能夠用同步的編程方式達到或接近於純異步的性能,而沒有異步帶來的Callback hell,雖然有不少機制或模式解決或解耦callback hell的問題, 但同步的編程方式更容易維護和理解(風格之爭是另一個話題了,有興趣能夠看下akka跟fiber的比較)
相比於os thread,fiber無論在內存資源仍是調度上都比前者輕量的多,相對於thread blocking, fiber blocking能夠達到比前者大幾個數量級的併發度,更有效的利用CPU資源(運行fiber的worker線程並無block)
具體你們能夠看下Why and When use Fiberjava

好像是個神奇的東西呢,咋實現的
相比於callback接口回調的異步框架,Coroutine這個暫停和恢復在沒有JVM支持下,比較難以理解,是怎麼作到的?有沒有什麼魔法?其實JVM中Coroutine的實現方式有不少(implementing-coroutines-in-java),Quasar Fiber則是經過字節碼修改技術在編譯或載入時織入必要的上下文保存/恢復代碼,經過拋異常來暫停,恢復的時候根據保存的上下文(Continuation),恢復jvm的方法調用棧和局部變量,Quasar Fiber提供相應的Java類庫來實現,對應用有必定的侵入性(很小)git

Quasar Fiber 主要有 Instrument + Continuation + Scheduler幾個部分組成github

  • Instrument 作一些代碼的植入,如park先後上下文的保存/恢復等web

  • Continuation 保存方法調用的信息,如局部變量,引用等,用戶態的stack,這個也是跟akka等基於固定callback接口的異步框架最大的區別spring

  • Scheduler 調度器,負責將fiber分配到具體的os thread執行編程

下面具體介紹下Quasar Fiber的實現細節,最好先閱讀下quasar官方文檔,不是很長bootstrap

Instrument

Weaving

quasar fiber的運行須要織入一些指令,用於調用棧的保存和恢復,quasar提供了三種方式進行織入(AOT、javaagent、ClassLoader)
quasar 會對咱們的代碼進行static call-site分析,在必要的地方織入用於保存和恢復調用棧的代碼。
哪些方法須要call site分析?這裏須要顯式的mark(jdk9不須要),以下segmentfault

  • 方法帶有Suspendable 註解api

  • 方法帶有SuspendExecution

  • 方法爲classpath下/META-INF/suspendables、/META-INF/suspendable-supers指定的類或接口,或子類
    符合上面條件的method,quasar會對其作call site分析,也許爲了效率,quasar並無對全部方法作call site分析

方法內哪些指令須要instrument(在其先後織入相關指令)?

  • 調用方法帶有Suspendable 註解

  • 調用方法帶有SuspendExecution

  • 調用方法爲classpath下/META-INF/suspendables、/META-INF/suspendable-supers指定的類或接口,或子類
    主要爲了解決第三方庫沒法添加Suspendable註解的問題

  • 經過反射調用的方法

  • 動態方法調用 MethodHandle.invoke

  • Java動態代理InvocationHandler.invoke

  • Java 8 lambdas調用

注意,instrument是在class loading的時候,不是runtime,因此這裏call site分析的好比virtual invoke指令是編譯期決定的,這裏比較容易犯錯,我總結了以下兩點
1.基於接口或基類編譯的代碼,若是實現類有可能suspend,那麼須要在接口或基類中添加suspendable annotation或suspend異常
2.若是實現類會suspend,須要添加suspendable annotation或suspend異常,固然能夠把全部實現類都聲明成suspendable(若是方法裏找不到suspend的調用,該方法將不被instrument,因此也沒有overhead,儘管這個overhead很是微小)

接下來咱們簡單看下quasar instrument都織入了哪些代碼

從上圖能夠看出,quasar instrument主要在park()前保存相關的局部變量和pc,再fiber恢復執行的時候經過pc(switch case跳轉的程序計數器,非寄存器pc) jump到park()以後的代碼並恢復局部變量,另外在方法調用先後還會push/pop相關的Contiuation

instrument還會對JUC(java.util.concurrent)中的Thread.park,替換成Fiber.park,這樣park to thread就變成park to fiber,因此使用juc的代碼,能夠不用修改的跑在Fiber上
quasar在織入代碼的同時,會對處理的類和方法加上Instrumented註解,以在運行期檢查是否Instrumented,Instrumented註解包含了一個suspendableCallSites數組,用來存放方法體內suspendable call的line number
contiuations/stack詳細請看contiuations章節

QuasarInstrumentor

無論哪一種織入方式,都是經過建立QuasarInstrumentor來處理Class的字節流
QuasarInstrumentor內部使用ASM來處理Class的字節流,經過SuspendableClassifier類來判斷是否須要instrument
SuspendableClassifier有兩個子類,分別爲DefaultSuspendableClassifier和SimpleSuspendableClassifier
DefaultSuspendableClassifier 掃描classpath下SuspendableClassifier的實現,而且調用其接口判斷是否須要instrument,也會調用SimpleSuspendableClassifier
SimpleSuspendableClassifier 經過/META-INF/suspendables、/META-INF/suspendable-supers判斷

Quasar-core.jar包中suspendable-supers包含java nio及juc lock/future等接口,由於這些接口沒法改變簽名,而quasar織入是在編譯或載入時,沒法知道具體實現類是否Suspendable,因此需顯式指定

Method Instrument實現細節

這裏是整個Quasar Fiber是實現原理中最爲關鍵的地方,也是你們疑問最多的地方,你們有興趣能夠看下源代碼,大概1000多行的ASM操做,既能夠鞏固JVM知識又能深刻原理理解Fiber,這裏我不打算引入過多ASM的知識,主要從實現邏輯上進行介紹

InstrumentClass 繼承ASM的ClassVisitor,對Suspendable的方法先後進行織入
InstrumentClass visitEnd中會建立InstrumentMethod,具體織入的指令在InstrumentMethod中處理
結合上面的instrument示例代碼圖,不妨先思考幾個問題

  • 怎麼找到suspend call

  • 怎麼保存、恢復局部變量,棧幀等

  • switch case跳轉如何織入

  • suspend call在try catch塊中,如何處理

  • 什麼狀況下在suspend call先後能夠不織入也能正常運行

1.怎麼找到suspend call
InstrumentMethod.callsSuspendables這個方法會遍歷方法的instructions,
若是instruction是method invoke,則判斷是否爲suspend call(判斷邏輯見上面章節)
若是instruction爲suspend call,則把instrunction序號和source line number分別紀錄到suspCallsBcis及suspCallsSourceLines這兩個數組,供後面邏輯使用

2.switch case跳轉織入是如何實現的
如今咱們知道了怎麼找到method中的suspend call,那如何把這些suspend calls拆分紅instrument示例圖中那樣呢(switch case,pc...)
這個拆分過程在InstrumentMethod.collectCodeBlocks
根據上面計算的suspend call的數組,分配label數組,而後根據pc計數器(詳細見後續章節)進行跳轉label
label是JVM裏用於jump類指令,如(GOTO,IFEQ,TABLESWITCH等)
quasar會把織入的上下文保存恢復指令及代碼原始的指令生成到對應label

3.怎麼保存、恢復局部變量,棧幀

- 在方法開始執行
1.調用Stack.nextMethodEntry,開啓新的method frame

- 在方法結束執行
1.Stack.popMethod, 進行出棧

- 在調用Suspendable方法以前,增長如下邏輯
1.調用Stack.pushMethod 保存棧幀信息
2.依次調用Stack.put保存操做數棧數據
3.依次調用Stack.put保存局部變量

- 在Suspendable方法調用後
1.依次調用Stack.get恢復局部變量
2.依次調用Stack.get恢復操做數棧
  恢復局部變量和操做數棧的區別是前者在get後調用istore

由於Stack.put有3個參數,因此這裏每一個put實際上是多條jvm指令

aload_x //若是是保存操做數棧,這條指令不須要,由於值已經在操做數棧了
aload_x //load Stack引用
iconst_x //load Stack idx
invokestatic  co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
/**
   Stack.put會根據不一樣類型進行處理,Object或Array保存到dataObject[],其餘保存到dataLong[]
**/
public static void push(long value, Stack s, int idx) 
public static void push(float value, Stack s, int idx)
public static void push(double value, Stack s, int idx) 
public static void push(Object value, Stack s, int idx)
public static void push(int value, Stack s, int idx)

java編譯期可知局部變量表和操做數棧個數,上面put或get依賴這些信息,Stack具體邏輯見後面章節

4.什麼狀況下在suspend call先後能夠不織入也能正常運行 
這裏實際上是一個優化,就是若是method內部只有一個suspend call,且先後沒有以下指令

  • side effects,包括方法調用,屬性設置

  • 向前jump

  • monitor enter/exit

那麼,quasar並不會對其instrument,也就不須要collectCodeBlocks分析,由於不須要保存、恢復局部變量

5.suspend call在try catch塊中,如何處理
若是suspend call在一個大的try catch中,而咱們又須要在中間用switch case切分,彷佛是個比較棘手的問題,
因此在織入代碼前,須要對包含suspend call的try catch作切分,將suspend call單獨包含在try catch當中,經過ASM MethodNode.tryCatchBlocks.add添加新try catch塊,
quasar先獲取MethodNode的tryCatchBlocks進行遍歷,若是suspend call的指令序號在try catch塊內,那麼就須要切分,以便織入代碼

Fiber

下面介紹下Quasar Fiber中的提供給用戶的類和接口
Strand是quasar裏對Thread和Fiber統一的抽象,Fiber是Strand的用戶級線程實現,Thread是Strand內核級線程的實現

Fiber主要有幾下幾個功能

new

@SuppressWarnings("LeakingThisInConstructor")
public Fiber(String name, FiberScheduler scheduler, int stackSize, SuspendableCallable<V> target)
屬性 類型 說明
name String fiber名稱
scheduler FiberScheduler 調度器,默認爲FiberForkJoinScheduler
stackSize int stack大小,默認32
target SuspendableCallable<V> 具體業務代碼,在SuspendableCallable.run()裏

構造函數主要完成如下幾件事情

  • 設置state爲State.NEW

  • 初始化Stack(用於保存fiber調用棧信息,Continuations的具體實現)

  • 校驗target是否Instrumented

  • 將當前fiber封裝成一個能夠由scheduler調度的task,默認爲FiberForkJoinTask

  • 保存Thread的inheritableThreadLocals和contextClassLoader到Fiber

start

Fiber.start() 邏輯比較簡單,以下

  • 將fiber state切換到State.STARTED

  • 調用task的submit,提交給scheduler運行
    這裏默認的爲FiberForkJoinScheduler,FiberForkJoinScheduler會提交到內部的ForkJoinPool,並hash到其中一個work queue

exec

fiber scheduler的worker thread從work quere獲取到task,並調用fiber.exec()
fiber.exec()主要步驟以下

  • cancel timeout task

  • 將Thread的threadlocals、inheritableThreadLocals、contextClassLoader分別與fiber的互換,實現了local to fiber而不是local to thread,這裏須要特別注意

  • 因此基於thread local和context classloader的代碼基本上都能運行在fiber上

  • state = State.RUNNING;

  • 運行業務邏輯(方法fiber.run())

  • state = State.TERMINATED;

Fiber暫停時如何處理
fiber task切換有兩種方式,一種是fiber task正常結束, 一種是fiber task拋SuspendExecution

fiber.exec()裏會catch SuspendExecution,並交出執行權限,具體步驟以下

  • stack sp = 0; // fiber恢復執行須要從最開始的frame恢復

  • 設置fiber狀態 TIMED_WAITING/WAITING

  • 恢復線程的Thread的threadlocals、inheritableThreadLocals、contextClassLoader

調用棧信息已經在park()以前保存到stack中(見instrument章節),因此這裏無需處理

park

暫停當前fiber的執行,並交出執行權
fiber task狀態: RUNNABLE -> PARKING -> PARKED

fiber狀態: RUNNING -> WAITING

Fiber.park方法以下,只能在當前fiber調用

static boolean park(Object blocker, ParkAction postParkActions, long timeout, TimeUnit unit) throws SuspendExecution

park主要邏輯以下

  • 設置fiber狀態

  • 若是設置了timeout,則向FiberTimedScheduler新增ScheduledFutureTask,用於超時檢查

  • 設置fiber.postPark = postParkActions,用於上面exec方法捕獲異常後執行

  • 拋異常,移交執行權限, 後續邏輯見exec章節移交執行權限

unpark

恢復fiber的執行
fiber task狀態: PARKED -> RUNNABLE

fiber狀態: WAITING -> RUNNING

unpark主要也是作兩件事情,一是設置狀態,二是把fiber task從新submit到scheduler

這裏除了手工調用fiber的park,unpark來暫停和恢復fiber外,能夠用FiberAsync類來將基於callback的異步調用封裝成fiber blocking,基於fiber的第三方庫comsat就是經過將bio替換成nio,而後再封裝成FiberAsync來實現的,FiberAsync可參考http://blog.paralleluniverse....

狀態切換

fiber運行狀態由兩部分組成,一個是fiber自己的狀態,一個是scheduler task的狀態

fiber狀態

狀態 描述
NEW Strand created but not started
STARTED Strand started but not yet running
RUNNING Strand is running
WAITING Strand is blocked
TIMED_WAITING Strand is blocked with a timeout
TERMINATED Strand has terminated

task狀態,這裏以默認的FiberForkJoinTask爲例

狀態 描述
RUNNABLE 可運行
LEASED unpark時狀態是RUNNABLE,設置爲LEASED
PARKED 中止
PARKING 中止中

運行狀態切換圖

Continuation

Fiber/Coroutine = Continuation + scheduler能夠看出,Continuation在Fiber中是相當重要的,他保存了fiber恢復執行時的必要數據,如pc,sp等

Quasar 中Continuation的實現爲Stack類

Stack

Stack類是quasar 對Fiber Continuation的實現類,該類由quasar instrument調用,以保存和恢復方法調用棧信息

屬性 類型 說明
sp int 表明當前操做的frame序號
dataLong long[] holds primitives on stack as well as each method's entry
dataObject Object[] holds refs on stack,防止jvm gc回收方法局部對象

dataLong中每個long,表明一個method frame,具體定義以下

  • entry (PC) : 14 bits, 程序計數器,用於swich case跳轉

  • num slots : 16 bits, 當前method frame佔用多少個slot

  • prev method slots : 16 bits , 上一個method frame佔用多少個slot,主要用於pop跳轉

我簡單畫了一個stack例子,其中pc,slots,prev slots用逗號分隔,xxxxxx表明method frame額外的一些數據
下面idx和data分別代碼dataLong的序號和內容

idx data 說明
5 0L 下一個frame的存儲位置,sp指向該節點
4 xxxxxxx 方法2局部變量c
3 xxxxxxx 方法2局部變量b
2 7 , 3 , 2 方法2,pc計數器爲7,佔用3個slot,上一個方法佔用2個slots
1 xxxxxxx 方法1局部變量a
0 1,2 , 0 方法1,pc計數器爲1,佔用2個slot,上一個方法佔用0個slots

quasar會在instrument階段織入stack/continuation邏輯,具體以下

  • 調用Suspendable方法以前,調用Stack.pushMethod

  • 在Suspendable方法開始, 調用Stack.nextMethodEntry

  • 在Suspendable方法結束, 調用Stack.popMethod

下面咱們依次看下這幾個方法的邏輯

Stack.pushMethod

  • 保存當前pc

  • 保存當前slots數量

  • 將下一個frame設置成0L

Stack.nextMethodEntry

  • 將sp移動到當前frame位置

  • 將上一個freme的slots數量設置到當前frame的prev slots字段

Stack.popMethod

  • 按照當前frame的prev slots進行出棧操做

Scheduler

scheduler顧名思義,是執行fiber代碼的地方,quasar裏用ForkJoinPool作爲默認scheduler的線程池,

ForkJoinPool的優點這裏再也不強調,咱們主要關注下Quasar中如何使用ForkJoinPool來調度fiber task

FiberForkJoinScheduler

quasar裏默認的fiber task scheduler,是JUC ForkJoinPool的wrapper類, ForkJoinPool具體細節參考ForkJoinPool

//主要屬性
private final ForkJoinPool fjPool;//具體執行task的線程池
private final FiberTimedScheduler timer;//監控fiber timeout的scheduler
private final Set<FiberWorkerThread> activeThreads;//保存fiber worker線程

FiberForkJoinTask

wrapper了fiber的ForkJoinTask

//主要屬性
private final ForkJoinPool fjPool;
private final Fiber<V> fiber;

FiberTimedScheduler

quasar自實現的timeout scheduler,用於fiber timeout的處理
FiberTimedScheduler默認的work queue爲SingleConsumerNonblockingProducerDelayQueue,這是一個多生產單消費的無鎖隊列,內部是一個lock-free的基於skip list的優先級鏈表,有興趣能夠看下具體的實現,也值得一看
scheduler實現邏輯就比較簡單了,從SingleConsumerNonblockingProducerDelayQueue內部的優先級隊列取數據,若是超時了則調用fiber.unpark()

monitor

能夠經過JMX監控fiber的運行狀態,work queue的堆積,fiber的數量,調度延遲等

comsat

comsat在quasar fiber基礎上提供了一些庫,使得跟fiber的集成更加容易,好比與servlet、springboot、drapwizard集成

https://github.com/puniverse/...

COMSAT (or Comsat) is a set of open source libraries that integrate Quasar with various web or enterprise technologies (like HTTP services and database access). With Comsat, you can write web applications that are scalable and performing and, at the same time, are simple to code and maintain.

Comsat is not a web framework. In fact, it does not add new APIs at all (with one exception, Web Actors, mentioned later). It provides implementation to popular (and often, standard) APIs like Servlet, JAX-RS, and JDBC, that can be used efficiently within Quasar fibers.

遇到的問題與解決

本人在應用中集成Fiber的時候遇到了很多問題,有些問題也反映了Quasar Fiber不是很完善,這裏列出來供你們參考下

Netty PoolByteBufAllocator 在 Fiber調用 致使Memory Leak

因爲Quasar字節碼的處理,ThreadLocal在fiber上調用,實際是"local to Fiber",而不是"local to Thread", 若是要繞過Fiber取underlying的ThreadLocal,須要用TrueThreadLocal

Netty的PoolByteBufAllocator$PoolThreadLocalCache用到了ThreadLocal,若是運行在fiber上,每次PoolThreadLocalCache.get()都會返回新的PoolThreadCache對象(由於每一個請求起一個新的fiber處理,非WebActor模式)

而在PoolThreadCache的構造函數裏,會調用ThreadDeathWatcher.watch,把當前線程和PoolThreadLocalCache.get()返回的對象 add到全局ThreadDeathWatcher列表,以便相關線程中止的時候能釋放內存池

可是對於fiber就會有問題了, PoolThreadLocalCache.get()不斷的返回新的對象,而後add到ThreadDeathWatcher,而正真運行fiber的fiber-Fork/JoinPool的worker線程並不會終止,最終致使ThreadDeathWatcher裏watcher列表愈來愈多,致使memory leak,100% full gc time

問題總結:fiber上ThreadLocal返回的對象,逃逸到了全局對象裏,而netty只會在真正的線程(os thread)終止時釋放內存

解決辦法: 不使用Netty的對象池,或則mock netty代碼換成用TrueThreadLocal

啓動的時候會有[quasar] WARNING: Can’t determine super class of xxx

Quasar這個告警只會在啓動的時候出現,能夠忽略,Quasar暫時沒有開關能夠swith off

Fabio: As for the first warning, this is the relevant code and it basically means Quasar’s instrumentor couldn’t load a class’ superclass. This can happen because the class is not present or, more likely, because the classloader where that instrumentation code is running doesn’t allow to access it. Adding to that the strange warning about the agent not being running, I think the latter is most probably the case.

If the application runs you can just ignore the warnings (they should be printed only at instrumentation time, so bootstrap/warming stage) or if you can share a minimal project setup I could help having a deeper look to figure out what’s happening exactly.

https://groups.google.com/for...

FJP worker運行時若是有疑似blocking,會有WARNING hogging the CPU or blocking a thread

you can disable the warning by setting a system property with "-Dco.paralleluniverse.fibers.detectRunawayFibers=false」

獨立Tomcat + Quasar Agent FiberHttpServlet報NPE

[quasar] ERROR: Unable to instrument class co/paralleluniverse/fibers/servlet/FiberHttpServlet

From the full logs I see that my setup and your setup are different though: I’m using an embedded Tomcat while you’re running Tomcat as a standalone servlet container and using the agent for instrumentation. Unfortunately the agent doesn’t currently work with standalone Tomcat and you need to use the instrumenting loader.

官方推薦:獨立Tomcat + QuasarWebAppClassLoader 或者 內嵌容器 + Quasar Agent

WARNING: Uninstrumented methods on the call stack (marked with **)

Quasar不能修改第三方庫爲@Suspend, 能夠顯式的把相關的方法放入META-INF/suspendables

獨立Tomcat + QuasarWebAppClassLoader UnableToInstrumentException (harmless)

這是個Comsat的bug,可是無害,能夠忽略

UnableToInstrumentException: Unable to instrument co/paralleluniverse/fibers/Fiber#onResume()V because of catch for SuspendExecution
google group comsat issues 25

相關文章
相關標籤/搜索