簡介: Java異步非阻塞編程的幾種方式
一個很簡單的業務邏輯,其餘後端服務提供了一個接口,咱們須要經過接口調用,獲取到響應的數據。java
逆地理接口:經過經緯度獲取這個經緯度所在的省市區縣以及響應的code:git
curl-i"http://xxx?latitude=31.08966221524924&channel=amap7a&near=false&longitude=105.13990312814713"
{"adcode":"510722"}
服務端執行,最簡單的同步調用方式:apache
服務端響應以前,IO會阻塞在:
java.net.SocketInputStream#socketRead0 的native方法上:編程
經過jstack日誌,能夠發現,此時這個Thread會一直在runable的狀態:後端
"main"#1 prio=5 os_prio=31 tid=0x00007fed0c810000 nid=0x1003 runnable [0x000070000ce14000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at org.apache.http.impl.conn.LoggingInputStream.read(LoggingInputStream.java:84) at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:282) at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259) at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163) at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:165) at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273) at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125) at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) at com.amap.aos.async.AsyncIO.blockingIO(AsyncIO.java:207) .......
線程模型示例:app
同步最大的問題是在IO等待的過程當中,線程資源沒有獲得充分的利用,對於大量IO場景的業務吞吐量會有必定限制。curl
在JDK 1.5 中,JUC提供了Future抽象:異步
固然並非全部的Future都是這樣實現的,如
io.netty.util.concurrent.AbstractFuture 就是經過線程輪詢去。socket
這樣作的好處是,主線程能夠不用等待IO響應,能夠去作點其餘的,好比說再發送一個IO請求,能夠等到一塊兒返回:async
"main"#1 prio=5 os_prio=31 tid=0x00007fd7a500b000 nid=0xe03 waiting on condition [0x000070000a95d000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000076ee2d768> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at org.asynchttpclient.netty.NettyResponseFuture.get(NettyResponseFuture.java:162) at com.amap.aos.async.AsyncIO.futureBlockingGet(AsyncIO.java:201) ..... "AsyncHttpClient-2-1"#11 prio=5 os_prio=31 tid=0x00007fd7a7247800 nid=0x340b runnable [0x000070000ba94000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198) at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x000000076eb00ef0> (a io.netty.channel.nio.SelectedSelectionKeySet) - locked <0x000000076eb00f10> (a java.util.Collections$UnmodifiableSet) - locked <0x000000076eb00ea0> (a sun.nio.ch.KQueueSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:693) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:748)
主線程在等待結果返回過程當中依然須要等待,沒有根本解決此問題。
第二節中,依然須要主線程等待,獲取結果,那麼可不能夠在主線程完成發送請求後,不再用關心這個邏輯,去執行其餘的邏輯?那就可使用Callback機制。
如此一來,主線程不再須要關心發起IO後的業務邏輯,發送完請求後,就能夠完全去幹其餘事情,或者回到線程池中再供調度。若是是HttpServer,那麼須要結合Servlet 3.1的異步Servlet。
使用Callback方式,從線程模型中看,發現線程資源已經獲得了比較充分的利用,整個過程當中已經沒有線程阻塞。
回調地獄,當Callback的線程還須要執行下一個IO調用的時候,這個時候進入回調地獄模式。
典型的應用場景如,經過經緯度獲取行政區域adcode(逆地理接口),而後再根據得到的adcode,獲取當地的天氣信息(天氣接口)。
在同步的編程模型中,幾乎不會涉及到此類問題。
Callback方式的核心缺陷
那麼有沒有辦法解決Callback Hell的問題?固然有,JDK 1.8中提供了CompletableFuture,先看看它是怎麼解決這個問題的。
將逆地理的Callback邏輯,封裝成一個獨立的CompletableFuture,當異步線程回調時,調用 future.complete(T) ,將結果封裝。
將天氣執行的Call邏輯,也封裝成爲一個獨立的CompletableFuture ,完成以後,邏輯同上。
compose銜接,whenComplete輸出:
每個IO操做,都可以封裝爲獨立的CompletableFuture,從而避免回調地獄。
CompletableFuture,只有兩個屬性:
weatherFuture這個方法是如何被調用的呢?
經過堆棧能夠發現,是在
reverseCodeFuture.complete(result) 的時候,而且也將得到的adcode做爲參數執行接下來的邏輯。
這樣一來,就完美解決回調地獄問題,在主的邏輯中,看起來像是在同步的進行編碼。
Info-Service中,大量使用的 Vert.x Future 也是相似的解決的方案,不過設計上使用Handler的概念。
核心執行的邏輯差很少:
這固然不是Vertx的所有,固然這是題外話了。
異步編程對吞吐量以及資源有好處,可是有沒有統一的抽象去解決此類問題內,答案是 Reactive Streams。
核心抽象:Publisher Subscriber Processor Subscription ,整個包裏面,只有這四個接口,沒有實現類。
在JDK 9裏面,已經被做爲一種規範封裝到 java.util.concurrent.Flow :
一個簡單的例子:
Flux & Mono
做者:開發者小助手_LS
原文連接本文爲阿里雲原創內容,未經容許不得轉載