You should never do your asynchronous work alone.
— Jon Brisbin
完成Reactor 1後寫到
You should never do your asynchronous work alone.
— Stephane Maldini
完成Reactor 2後寫到
名稱解釋:back pressure:背壓。在交換機在阻止外來數據包發送到堵塞端口的時候可能會發生丟包。而背壓就是考驗交換機在這個時候避免丟包的能力。不少的交換機當發送或接收緩衝區溢出的時候經過將阻塞信號發送回源地址來實現背壓。交換機在全雙工時使用IEEE802.3x流控制達到一樣目的。java
首先,咱們使用groovy示例來展現core模塊的功能:react
//Initialize context and get default dispatcher Environment.initialize() //RingBufferDispatcher with 8192 slots by default def dispatcher = Environment.sharedDispatcher() //Create a callback Consumer<Integer> c = { data -> println "some data arrived: $data" } //Create an error callback Consumer<Throwable> errorHandler = { it.printStackTrace } //Dispatch data asynchronously r.dispatch(1234, c, errorHandler) Environment.terminate()
下面,咱們使用Stream reactive實現來看:git
//standalone async processor def processor = RingBufferProcessor.<Integer>create() //send data, will be kept safe until a subscriber attaches to the processor processor.onNext(1234) processor.onNext(5678) //consume integer data processor.subscribe(new Subscriber<Integer>(){ void onSubscribe(Subscription s){ //unbounded subscriber s.request Long.MAX } void onNext(Integer data){ println data } void onError(Throwable err){ err.printStackTrace() } void onComplete(){ println 'done!' } } //Shutdown internal thread and call complete processor.onComplete()
Core模塊概覽github
Reactor core模塊的子單元:redis
Common IO和功能類型,一些是直接從java8 功能接口回遷的。
Function、Supplier、consumer、Predicate、BiConsumer、BiFunction
Tuples
Resource、Pausable、Timer
Buffer,Codec和一組預約義的Codec。
Environment 上下文
Dispatcher 協議和一組預約義的Dispatcher。
預約義的Reactive Stream Processor
reactor-core能夠用來逐漸替代另外的消息傳遞策略、調度時間任務或者以小的功能塊組織代碼。這種突破使開發者與其它Reactive基礎庫更好的合做,特別是對於沒有耐心的開發者,沒有了對RingBuffer的理解負擔。spring
注意:Reactor-core隱藏了LMAX disruptor,所以不會出現也不會和現有的Disruptor依賴衝突。編程
功能模塊json
功能模塊重用是核心,一般狀況下在你使用Reactor時就須要的功能。所以,功能編程酷在哪裏?其中一個核心理念是將可執行代碼看作別的數據。另外一點,相似於Closure或者匿名函數,此時業務邏輯由最初的調用者決定。它一樣避免了過量的If/SWITCH模塊,而且這種分離是概念更清晰:每一個模塊完成一個功能且不須要共享任何東西。安全
組織功能模塊服務器
每一個功能組件都給出它的通常任務的明確意圖:
Consumer:簡單回調--一勞永逸的
BiCounsumer:兩個參數的簡單回調,一般用在序列比較,例如:前一個和下一個參數。
Function:轉換邏輯--請求/應答
BiFunction:兩個參數的轉換,一般用在累加器,比較前一個和下一個參數,返回一個新的值。
Supplier:工廠邏輯--輪詢
Predicate:測試路徑--過濾
注意:咱們也將Publisher和Subscriber視做功能塊,勇於稱之爲Reactive功能塊。儘管如此,它們做爲基礎組件,普遍應用到Reactor及其其它地方。Stream API接收reactor.fn參數,爲你建立合適的Subscriber。
好消息是在功能模塊中包裝可執行指令能夠向磚塊同樣進行復用。
Consumer<String> consumer = new Consumer<String>(){ @Override void accept(String value){ System.out.println(value); } }; //Now in Java 8 style for brievety Function<Integer, String> transformation = integer -> ""+integer; Supplier<Integer> supplier = () -> 123; BiConsumer<Consumer<String>, String> biConsumer = (callback, value) -> { for(int i = 0; i < 10; i++){ //lazy evaluate the final logic to run callback.accept(value); } }; //note how the execution flows from supplier to biconsumer biConsumer.accept( consumer, transformation.apply( supplier.get() ) );
最初聽起來,這可能不是一個引人注目的革命性變革。可是這種基本思惟模式的改變,將揭示咱們使異步代碼變的穩健和可組合性的使命是多麼難得。Dispatcher分發器將輸入數據和錯誤回調分發給consumer來處理。Reactor Stream模塊將更好的使用這些組件。
當使用Ioc容器如spring時,一個好的開發者將利用Java的配置屬性來返回一個無狀態的功能bean。而後能夠優美的注入到stream Pipeline或者分發他們的執行代碼中的block中。
元組
你能夠注意到這些接口,它們對輸入參數和比較少的固定數量的參數的泛型有很好的支持。你怎麼傳遞超過1個或者超過2個的參數呢?答案是使用元組Tuple,Tuple相似於csv中一個單獨實例的同樣,能夠在在功能性編程中保證它們的類型安全和支持多個數量的參數。
之前面的例子爲例,咱們嘗試提供兩個參數的BiConsumer而使用單個參數的Consumer
Consumer<Tuple2<Consumer<String>, String>> biConsumer = tuple -> { for(int i = 0; i < 10; i++){ //Correct typing, compiler happy tuple.getT1().accept(tuple.getT2()); } }; biConsumer.accept( Tuple.of( consumer, transformation.apply(supplier.get()) ) );
注意:Tuple須要分配更多的空間,所以在比較或者鍵值信號等通常使用場景中更多直接使用Bi***組件。
Environment和Dispatcher
功能性構建塊已經準備就緒,讓咱們使用它們來進行異步編程。第一步是到Dispatcher分區。
在咱們啓動任意Dispatcher前,須要保證能夠有效的建立它們。一般,建立它們的代價比較高,緣由是須要預分配一個內存分區來保持分配的信號,這就是前言中介紹的著名的運行時分配和啓動時預分配的不一樣對比。所以提出了一個名爲"Environment"共享上下文概念,使用它來管理這些不一樣類型的Dispatcher,從而避免沒必要要的建立開銷。
Environment
reactor的使用者(或者可用的擴展庫如@Spring)建立或者中止Environment。它們自動從META_INF/reactor/reactor-environment.properties處讀取配置文件。
注意,屬性文件能夠改變,經過在classpath下的META-INFO/reactor目錄下一個新的屬性配置能夠改變屬性文件。
經過傳遞下面的環境變量reactor.profiles.active來在運行時段改變默認的配置文件。
java - jar reactor-app.jar -Dreactor.profiles.active=turbo
啓動和中止Environment
Environment env = Environment.initialize(); //Current registered environment is the same than the one initialized Assert.isTrue(Environment.get() == env); //Find a dispatcher named "shared" Dispatcher d = Environment.dispatcher("shared"); //get the Timer bound to this environment Timer timer = Environment.timer(); //Shutdown registered Dispatchers and Timers that might run non-daemon threads Environment.terminate(); //An option could be to register a shutdownHook to automatically invoke terminate.
注意:在一個給定的Jvm應用中,最好只維護一個Enviroment.在大多數狀況下,使用Environment.initializeIfEmpty()就徹底ok。
Dispacher分發器
從Reactor 1開始,Dispatcher就存在了。Dispatcher一般抽象消息傳遞的方法,和Java Executor有相似的通用約定。事實上Dispatcher繼承自Executor。
Dispatcher對有數據信號的傳送方式及消費者同步或異步執行的錯誤信息有一套比較嚴格的類型限制約定。這種方式在面對經典的Executors時解決了第一個問題--錯誤隔離。效果以下:
錯誤消費者的調用不須要終端當前分配的資源。若是沒有指定,它默認從當前存在的Environment中去尋找,並使用指定給它的errorJournalConsumer。
異步Dispatche提供的第二個獨特的特徵是運行使用尾部遞歸策略來再次調度。尾部遞歸的應用場景是分發器發現Dispatcher的classLoader已經分配到正在運行的線程,這時,噹噹前消費者返回時將要執行的task放入到隊列中。
使用一個相似於 Groovy Spock test的異步的多線程分發器:
import reactor.core.dispatch.* //... given: def sameThread = new SynchronousDispatcher() def diffThread = new ThreadPoolExecutorDispatcher(1, 128) def currentThread = Thread.currentThread() Thread taskThread = null def consumer = { ev -> taskThread = Thread.currentThread() } def errorConsumer = { error -> error.printStackTrace() } when: "a task is submitted" sameThread.dispatch('test', consumer, errorConsumer) then: "the task thread should be the current thread" currentThread == taskThread when: "a task is submitted to the thread pool dispatcher" def latch = new CountDownLatch(1) diffThread.dispatch('test', { ev -> consumer(ev); latch.countDown() }, errorConsumer) latch.await(5, TimeUnit.SECONDS) // Wait for task to execute then: "the task thread should be different when the current thread" taskThread != currentThread
注意:
如Java Executor同樣,它們缺乏了咱們將加入到Reactor 2.x的一個特色:Reactive stream協議。這時在Reactor中僅有幾個未完成事項中的一個未完成事項--沒有將Reactive stream標準直接綁定到Reactor中。而後,你能夠在Stream章節部分找到快速結合Reactor stream的方法。
表3 Dispatcher家族介紹
Dispatcher | From Environment | Description | Strengths | Weaknesses |
---|---|---|---|---|
RingBuffer |
sharedDispatcher() |
An LMAX DisruptorRingBuffer based Dispatcher. |
Small latency peaks tolerated Fastest Async Dispatcher, 10-15M+ dispatch/sec on commodity hardware Support ordering |
'Spin' Loop when getting the next slot on full capcity Single Threaded, no concurrent dispatch |
Mpsc |
sharedDispatcher() if Unsafe not available |
Alternative optimized message-passing structure. |
Latency peaks tolerated 5-10M+ dispatch/sec on commodity hardware Support ordering |
Unbounded and possibly using as much available heap memory as possible Single Threaded, no concurrent dispatch |
WorkQueue |
workDispatcher() |
An LMAX DisruptorRingBuffer based Dispatcher. |
Latency Peak tolerated for a limited time Fastest Multi-Threaded Dispatcher, 5-10M+ dispatch/sec on commodity hardware |
'Spin' Loop when getting the next slot on full capcity Concurrent dispatch Doesn’t support ordering |
Synchronous |
dispatcher("sync") or SynchronousDispatcher. INSTANCE |
Runs on the current thread. |
Upstream and Consumer executions are colocated Useful for Test support Support ordering if the reentrant dispatch is on the current thread |
No Tail Recursion support Blocking |
TailRecurse |
tailRecurse() or TailRecurse Dispatcher. INSTANCE |
Synchronous Reentrant Dispatcher that enqueue dispatches when currently dispatching. |
Upstream and Consumer executions are colocated Reduce execution stack, greatly expanded by functional call chains |
Unbounded Tail Recurse depth Blocking Support ordering (Thread Stealing) |
ThreadPoolExecutor |
newDispatcher(int, int, DispatcherType. THREAD_POOL_EXECUTOR) |
Use underlying ThreadPoolExecutor message-passing |
Multi-Threaded Blocking Consumers, permanent latency tolerated 1-5M+ dispatch/sec on commodity hardware |
Concurrent run on a given consumer executed twice or more Unbounded by default Doesn’t support ordering |
Traceable Delegating |
N/A |
Decorate an existing dispatcher with TRACE level logs. |
Dispatch tapping Runs slower than the delegated dispatcher alone |
Log overhead (runtime, disk) |
你可能已經注意到了,一些Dispatcher事單線程的,特別是RingBufferDispatcher和MpsDispatcher。更進一步,根據Reactive Stream規範,Subscriber/Processor的實現是不容許併發通知的。這一點尤爲對Reactor Streams產生了影響,使用Stream.dispachOn(Dispatcher)和一個Dispatcher來給併發信號的顯示失敗留後門。
而後,有一個方法來避免這個缺點,使用Dispatcher池DispatcherSupplier。實際上,做爲Supplier的工廠,Supplier.get()方法根據有趣的共享策略:輪詢、最少使用。。等間接提供一個Dispatcher。
Enviroment提供了一個靜態方法去建立、並註冊到當前活躍Environment的Dispatcher池:一組輪詢的返回Dispatcher。一旦就緒,Supplier提供對Dispatcher數目的控制。
不一樣於通常的Dispatcher,Environment提供了一站式的管理服務:
Environment.initialize(); //.... //Create an anonymous pool of 2 dispatchers with automatic default settings (same type than default dispatcher, default backlog size...) DispatcherSupplier supplier = Environment.newCachedDispatchers(2); Dispatcher d1 = supplier.get(); Dispatcher d2 = supplier.get(); Dispatcher d3 = supplier.get(); Dispatcher d4 = supplier.get(); Assert.isTrue( d1 == d3 && d2 == d4); supplier.shutdown(); //Create and register a new pool of 3 dispatchers DispatcherSupplier supplier1 = Environment.newCachedDispatchers(3, "myPool"); DispatcherSupplier supplier2 = Environment.cachedDispatchers("myPool"); Assert.isTrue( supplier1 == supplier2 ); supplier1.shutdown();
Dispatcher儘量快的計算接收的任務,然而,Timer定時器提供一次性或者週期性的調度API。Reactor Core模塊默認提供了一個HashWheelTimer定時器,它自動綁定到任意的新的Environment中。HashWheelTimer對處理大量的、併發的、內存調度任務有巨大的優點,它是替換java TaskScheduler的一個強大的選項。
注意:它不是一個持久化的調度器,應用關閉時task將會丟失。下個正式版本Timer定時器將會有一些改變,例如使用redis增長持久化/共享,請關注。
建立一個簡單的定時器:
import reactor.fn.timer.Timer //... given: "a new timer" Environment.initializeIfEmpty() Timer timer = Environment.timer() def latch = new CountDownLatch(10) when: "a task is submitted" timer.schedule( { Long now -> latch.countDown() } as Consumer<Long>, period, TimeUnit.MILLISECONDS ) then: "the latch was counted down" latch.await(1, TimeUnit.SECONDS) timer.cancel() Environment.terminate()
核心Processor用來作比Dispatcher更集中的job:支持背壓計算異步task。
提供了org.reactivestreams.Processor接口的直接實現,所以能夠很好的和別的Reactive Stream廠商一塊兒工做。
記住:Processor便是Subscriber也是Publisher,所以你能夠在想要的地方(source,processing,sink)將一個Processor插入到Reactive stream chain中。
注意:規範不推薦直接使用Processor.onNext(d)。
基於RingBuffer的Reactive Stream Processor的優勢以下:
高吞吐量
重啓時不會丟掉沒有消費的數據,且從最近的沒有消費的數據開始執行
若沒有Subscriber監聽,數據不會丟失(不想Reactor-stream的Broadcaster會丟掉數據)
若在消息處理過程當中取消Subscriber,信號將會安全的從新執行,實際上它能在RingBufferProcessor上很好的工做。
靈活的背壓,它容許任意時間內有限數量的背壓,Subscriber會消費掉而且請求更多的數據。
傳播的背壓,由於它是一個Processor,它能夠經過訂閱方式傳遞消息。
多線程的出/入Processor。
它們的惟一缺點是它們在運行時建立它們會消耗大量的資源,緣由是它們不像它們的兄弟RingBufferDispatcher能夠很容易的共享,這種特性使它們更適應於高吞吐量的預約義數據管道。
Reactor的RingBufferProcessor組件本質上是Disruptor的RingBuffer,設計的目的是儘量的和原生的效率同樣。使用場景是:你須要分發task到另一個線程,且該線程具備低耗、高吞吐量還在你的工做流中管理背壓。
我使用RingBufferProcessor來計算遠程異步調用的各類輸出:AMQP, SSD存儲和內存存儲,Process徹底處理掉易變的延遲,每秒百萬級別的消息的數據源歷來沒有阻塞過。
— 友好的Reactor使用者
RingBufferProcessor的使用場景
圖7 在跟定時間T內,一個ringbufferprocessor,2個消費同一個sequence的Subscriber。
你可使用靜態工具方法去建立一個ringbufferprocessor:
Processor<Integer, Integer> p = RingBufferProcessor.create("test", 32); //1 Stream<Integer> s = Streams.wrap(p); //2 s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //3 s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //4 s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //5 input.subscribe(p); //5
1.建立一個Processor,讓它具備32個slot的內部RingBuffer。
2. 從Reactive Streams Processor建立一個Reactor。
3. 每一個請求調用consume方法在本身的線程內建立一個Disruptor的EventProcessor。
4. 每一個請求調用consume方法在本身的線程內建立一個Disruptor的EventProcessor。
5. 每一個請求調用consume方法在本身的線程內建立一個Disruptor的EventProcessor。
6. 向一個Reactive Streams Publisher訂閱這個Processor。
傳遞到Processor的Subscribe.onNext(Buffer)方法的每一個數據元素將廣播給全部的消費者。這個Processor沒有使用輪詢分發,由於它在RingBufferWorkProcess中,RingBufferWorkProcess下面將要討論。若傳遞一、二、3三個整數到Processor,能夠看到控制檯輸出結果以下:
Thread[test-2,5,main] data=1 Thread[test-1,5,main] data=1 Thread[test-3,5,main] data=1 Thread[test-1,5,main] data=2 Thread[test-2,5,main] data=2 Thread[test-1,5,main] data=3 Thread[test-3,5,main] data=2 Thread[test-2,5,main] data=3 Thread[test-3,5,main] data=3
每一個線程接收到傳給Process的全部數據,每一個線程順序得到數據,由於內部使用RingBuffer管理
slot來發布數據。
不像標準的RingBufferProcessor只廣播它的值給全部的消費者,RingBufferWorkProcessor基於消費者的多少來分發請求值。Processor接收信息,而後輪詢發送到不一樣的線程中(由於每一個消費者有本身獨立的線程),然而使用內部RingBuffer來有效管理消息的發佈。
咱們構造了一個可擴展的、多種htp微服務器請求負載均衡的RingBufferWorkProcessor.說它看起來快過光速多是我錯了,另外gc的壓力徹底可控。
— 使用RingBufferWorkProcessor的Reactor友好者
使用RingBufferWorkProcessor很是簡單,你只要改變上面示例代碼的引用到靜態的create方法建立。使用RingBufferWorkProcessor以下,其它的代碼時同樣的。
Processor<Integer, Integer> p = RingBufferWorkProcessor.create("test", 32);
建立一個具備32個slot的內部RingBuffer的Processor。
如今,發佈消息到Processor時,將不會廣播給每個consumer,會根據消費者的數目分發給不一樣的消費者。運行示例,結果以下:
Thread[test-2,5,main] data=3 Thread[test-3,5,main] data=2 Thread[test-1,5,main] data=1
注意,RingBufferWorkProcessor會重複終端的信號、檢測正在中止工做的Subscriber的取消異常,最終會被別的Subscriber執行一次。咱們保證適合事件至少發送一次。若你理解這個語義,你可能會當即說「等等,RingBufferWorkProcessor怎麼做爲一個消息代理工做啦?」 答案是確定的。
字節碼操做對大量數據管道配置的應用是一個核心關注點。reactor-net普遍使用字節碼操做來對接收的字節碼進行編組和分組或者經過IO發送。
reactor.io.buffer.Buffer是java byteBuffer處理的一個裝飾器,增長了一些列的操做。目的是經過使用ByteBuffer的limit和讀取/覆蓋預先分配的字節來減小字節的複製。追蹤ByteBuffer的位置是開發人員口頭的問題,Buffer簡化了這些,咱們只須要關注這個簡單的工具就能夠了。
下面是一個簡單的Buffer操做示例:
import reactor.io.buffer.Buffer //... given: "an empty Buffer and a full Buffer" def buff = new Buffer() def fullBuff = Buffer.wrap("Hello World!") when: "a Buffer is appended" buff.append(fullBuff) then: "the Buffer was added" buff.position() == 12 buff.flip().asString() == "Hello World!"
Buffer的一個有用的應用是Buffer.View,多個操做例如split都會返回Buffer.View。它提供了一個無需拷貝的方式去掃描和檢索ByteBuffer的字節碼。Buffer.View一樣也是一種Buffer。
使用一個分隔符和Buffer.view使塊數據讀取能夠複用一樣的字節碼
byte delimiter = (byte) ';'; byte innerDelimiter = (byte) ','; Buffer buffer = Buffer.wrap("a;b-1,b-2;c;d"); List<Buffer.View> views = buffer.split(delimiter); int viewCount = views.size(); Assert.isTrue(viewCount == 4); for (Buffer.View view : views) { System.out.println(view.asString()); //prints "a" then "b-1,b-2", then "c" and finally "d" if(view.indexOf(innerDelimiter) != -1){ for(Buffer.View innerView : view.split(innerDelimiter)){ System.out.println(innerView.asString()); //prints "b-1" and "b-2" } } }
使用Buffer應用到普通的分組和編組對開發者來講可能顯得不夠高級,Reactor提供了一系列名稱爲Codec的預約義的轉換器。一些Codec須要在classpath路徑下添加一些額外的依賴,如json操做的Jackson依賴。
codec以兩種方式工做:第一,繼承Function去直接編碼並返回編碼好的數據,一般以Buffer的形式返回。這很是棒,但僅限於與無狀態的Codec才能起效,另一個可選的方法是使用Codec.encoder來返回編碼函數。
Codec.encoder()對比Codec.apply(Source) Codec.encoder() 返回一個惟一的編碼函數,這個編碼函數不能被不一樣線程共享。 Codec.apply(Source) 直接編碼(並保存分配的編碼器), 但Codec自己能夠在線程間共享。
對大部分實現了Buffer的codec來講,Codec一樣也能夠根據source類型去解碼數據。
解碼數據源,須要使用Codec.decoder()獲取解碼函數。和編碼不一樣的是,沒有爲編碼目的而重寫的快捷方法。和編碼相同的是,解碼函數不能在線程間共享。
有兩種形式的Code.decoder()函數,Codec.decoder()是一個阻塞的解碼函數,它直接從傳遞源數據解碼返回解碼後的數據。Codec.decoder(Consumer)用做非阻塞的解碼,它返回null,一旦解碼只觸發的Consumer,它能夠和其它異步工具結合使用。
使用一個預約義的codec示例以下:
import reactor.io.json.JsonCodec //... given: 'A JSON codec' def codec = new JsonCodec<Map<String, Object>, Object>(Map); def latch = new CountDownLatch(1) when: 'The decoder is passed some JSON' Map<String, Object> decoded; def callbackDecoder = codec.decoder{ decoded = it latch.countDown() } def blockingDecoder = codec.decoder() //yes this is real simple async strategy, but that's not the point here :) Thread.start{ callbackDecoder.apply(Buffer.wrap("{\"a\": \"alpha\"}")) } def decodedMap = blockingDecoder.apply(Buffer.wrap("{\"a\": \"beta\"}") then: 'The decoded maps have the expected entries' latch.await() decoded.size() == 1 decoded['a'] == 'alpha' decodedMap['a'] == 'beta'
可用的核心Codec
名稱 | 描述 | 須要的依賴 |
---|---|---|
ByteArrayCodec |
Wrap/unwrap byte arrays from/to Buffer. |
N/A |
DelimitedCodec |
Split/Aggregate Buffer and delegate to the passed Codec for unit marshalling. |
N/A |
FrameCodec |
Split/Aggregate Buffer into |
N/A |
JavaSerializationCodec |
Deserialize/Serialize Buffers using Java Serialization. |
N/A |
PassThroughCodec |
Leave the Buffers untouched. |
N/A |
StringCodec |
Convert String to/from Buffer |
N/A |
LengthFieldCodec |
Find the length and decode/encode the appropriate number of bytes into/from Buffer |
N/A |
KryoCodec |
Convert Buffer into Java objects using Kryo with Buffers |
|
JsonCodec,JacksonJsonCodec |
Convert Buffer into Java objects using Jackson with Buffers |
|
SnappyCodec |
A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer |
|
GZipCodec |
A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer |
N/A |
參考文獻:
1. http://baike.baidu.com/link?url=kXnm3flViIx-4E7PxZtYVgb3xY5tlwovUqog2u_TgCCiN7FSFkxt7ze-Qio5j1FXPmIz2DGV2_lbOBoLeyXdaa
2. http://projectreactor.io/docs/reference/