歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我我的微信「java_front」一塊兒交流學習java
1 文章概述
咱們在服務端開發時若是須要實現異步調用,首先聲明一個線程池,並將調用業務方法封裝成一個任務提交至線程池,若是不須要獲取返回值則封裝爲Runnable,須要獲取返回值則封裝爲Callable並經過Future對象接受結果。spring
class CalcTask1 implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("task1耗時計算"); Thread.sleep(1000L); return 100; } } class CalcTask2 implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("task2耗時計算"); Thread.sleep(3000L); return 200; } } public class CallableTest { public static void test1() throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); CalcTask1 task1 = new CalcTask1(); Future<Integer> f1 = executor.submit(task1); CalcTask2 task2 = new CalcTask2(); Future<Integer> f2 = executor.submit(task2); Integer result1 = f1.get(); Integer result2 = f2.get(); System.out.println("final result=" + (result1 + result2)); executor.shutdown(); } public static void test2() throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>(); CalcTask1 task1 = new CalcTask1(); CalcTask2 task2 = new CalcTask2(); tasks.add(task1); tasks.add(task2); for (int i = 0; i < tasks.size(); i++) { Future<Integer> future = executor.submit(tasks.get(i)); System.out.println("result=" + future.get()); } executor.shutdown(); } }
1.1 什麼是消費異步化
在使用DUBBO進行異步化調用時不須要這麼麻煩,DUBBO基於NIO非阻塞能力使得服務消費者無需啓用多線程就能夠實現並行調用多個服務,在此咱們給出基於2.7.0版本調用實例。編程
1.1.1 生產者
(1) 服務聲明
public interface CalcSumService { public Integer sum(int a, int b); } public class CalcSumServiceImpl implements CalcSumService { @Override public Integer sum(int a, int b) { return a + b; } } public interface CalcSubtractionService { public Integer subtraction(int a, int b); } public class CalcSubtractionServiceImpl implements CalcSubtractionService { @Override public Integer subtraction(int a, int b) { return a - b; } }
(2) 配置文件
<beans> <dubbo:application name="java-front-provider" /> <dubbo:registry address="zookeeper://127.0.0.1:2181" /> <dubbo:protocol name="dubbo" port="9999" /> <bean id="calcSumService" class="com.java.front.dubbo.demo.provider.service.CalcSumServiceImpl" /> <bean id="calcSubtractionService" class="com.java.front.dubbo.demo.provider.service.CalcSubtractionServiceImpl" /> <dubbo:service interface="com.java.front.dubbo.demo.provider.service.CalcSumService" ref="calcSumService" /> <dubbo:service interface="com.java.front.dubbo.demo.provider.service.CalcSubtractionService" ref="calcSubtractionService" /> </beans>
(3) 服務發佈
public class Provider { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath*:META-INF/spring/dubbo-provider.xml"); context.start(); System.out.println(context); System.in.read(); } }
1.1.2 消費者
(1) 配置文件
<beans> <dubbo:application name="java-front-consumer" /> <dubbo:registry address="zookeeper://127.0.0.1:2181" /> <dubbo:reference id="calcSumService" interface="com.java.front.dubbo.demo.provider.service.CalcSumService" timeout="10000"> <dubbo:method name="sum" async="true" /> </dubbo:reference> <dubbo:reference id="calcSubtractionService" interface="com.java.front.dubbo.demo.provider.service.CalcSubtractionService" timeout="10000"> <dubbo:method name="subtraction" async="true" /> </dubbo:reference> </beans>
(2) 服務消費
public class Consumer { public static void main(String[] args) throws Exception { testAsync(); System.in.read(); } public static void testAsync() { try { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" }); System.out.println(context); context.start(); /** 加法運算 **/ CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService"); calcSumService.sum(3, 2); CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture(); /** 減法運算 **/ CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService"); calcSubtractionService.subtraction(3, 2); CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture(); /** 輸出結果 **/ int sumResult = futureSum.get(); int subtractionResult = futureSubtraction.get(); System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult); } catch (Exception e) { e.printStackTrace(); } } }
1.2 爲何消費異步化
異步化能夠將本來串行的調用並行化,減小執行時間從而提高性能。假設上述實例加法服務須要100ms,減法服務須要200ms,那麼串行化執行時間爲兩者之和300ms:設計模式
若是消費異步化那麼執行時間減小爲兩者最大值200ms,異步化所帶來的性能提高不言而喻:微信
2 保護性暫停模式
分析DUBBO源碼以前咱們首先介紹一種多線程設計模式:保護性暫停模式。咱們設想這樣一種場景:線程A生產數據,線程B讀取這個數據。咱們必須面對一種狀況:線程B準備讀取數據時,此時線程A尚未生產出數據。在這種狀況下線程B不能一直空轉,也不能當即退出,線程B要等到生產數據完成並拿到數據以後才退出。多線程
那麼在數據沒有生產出這段時間,線程B須要執行一種等待機制,這樣能夠達到對系統保護目的,這就是保護性暫停。架構
public class MyData implements Serializable { private static final long serialVersionUID = 1L; private String message; public MyData(String message) { this.message = message; } } class Resource { private MyData data; private Object lock = new Object(); public MyData getData() { synchronized (lock) { while (data == null) { try { // 沒有數據則釋放鎖並暫停等待被喚醒 lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return data; } } public void sendData(MyData data) { synchronized (lock) { // 生產數據後喚醒消費線程 this.data = data; lock.notifyAll(); } } } public class ProtectDesignTest { public static void main(String[] args) { Resource resource = new Resource(); new Thread(() -> { try { MyData data = new MyData("hello"); System.out.println(Thread.currentThread().getName() + "生產數據=" + data); // 模擬發送耗時 TimeUnit.SECONDS.sleep(3); resource.sendData(data); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1").start(); new Thread(() -> { MyData data = resource.getData(); System.out.println(Thread.currentThread().getName() + "接收到數據=" + data); }, "t2").start(); } }
在上述代碼實例中線程1生產數據,線程2消費數據,Resource類經過wait/notify實現了保護性暫停模式,關於保護性暫停模式請參看我以前《保護性暫停模式詳解以及其在DUBBO應用源碼分析》這篇文章。app
3 源碼分析
本章節咱們分析對比2.6.9和2.7.0兩個版本源碼,之因此選取這兩個版本是由於2.7.0是一個里程碑版本,異步化能力獲得了明顯加強。異步
3.1 version_2.6.9
3.1.1 異步調用
咱們首先看看這個版本異步調用使用方式,生產者內容和消費者配置文件同第一章節再也不贅述,咱們重點分析服務消費代碼。async
public class AsyncConsumer { public static void main(String[] args) throws Exception { test1(); System.in.read(); } public static void test1() throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" }); System.out.println(context); context.start(); /** 加法運算 **/ CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService"); calcSumService.sum(3, 2); Future<Integer> futureSum = RpcContext.getContext().getFuture(); /** 減法運算 **/ CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService"); calcSubtractionService.subtraction(3, 2); Future<Integer> futureSubtraction = RpcContext.getContext().getFuture(); /** 輸出結果 **/ int sumResult = futureSum.get(); int subtractionResult = futureSubtraction.get(); System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult); } }
消費者最終執行DubboInvoker.doInvoke,這個方法包含異步調用核心:
public class DubboInvoker<T> extends AbstractInvoker<T> { @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 單向調用 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } // 異步調用 else if (isAsync) { // 發起請求給生產者 ResponseFuture future = currentClient.request(inv, timeout); // 設置future對象至上下文 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); // 返回空結果 return new RpcResult(); } // 同步調用 else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } }
若是包含async屬性則表示異步調用,第一步發送調用請求給生產者,第二步設置Future對象至上下文,第三步當即返回空結果。那麼在服務消費時關鍵一步就是獲取Future對象,因此咱們在調用時要從上下文獲取Future對象:
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService"); calcSumService.sum(3, 2); Future<Integer> futureSum = RpcContext.getContext().getFuture();
使用Future對象獲取結果:
int sumResult = futureSum.get();
進入FutureAdapter.get()方法:
public class FutureAdapter<V> implements Future<V> { private final ResponseFuture future; public V get() throws InterruptedException, ExecutionException { try { return (V) (((Result) future.get()).recreate()); } catch (RemotingException e) { throw new ExecutionException(e.getMessage(), e); } catch (Throwable e) { throw new RpcException(e); } } }
進入ResponseFuture.get()方法,咱們能夠看到保護性暫停模式應用,當生產者線程沒有返回數據則阻塞並等待被喚醒:
public class DefaultFuture implements ResponseFuture { private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); @Override public Object get() throws RemotingException { return get(timeout); } @Override public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { // 遠程調用未完成則等待被喚醒 done.await(timeout, TimeUnit.MILLISECONDS); // 超時時間未完成則退出 if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } // 拋出超時異常 if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); } }
當消費者接收到生產者響應時會調用received方法喚醒相關阻塞線程,這時阻塞在get方法中的線程便可獲取到數據:
public class DefaultFuture implements ResponseFuture { private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); public static void received(Channel channel, Response response) { try { // 根據惟一請求號獲取Future DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { // 喚醒相關阻塞線程 done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } } }
3.1.2 設置回調函數
咱們如今調用get方法會阻塞在那裏等到結果,那麼有沒有一種方式當結果返回時就當即調用咱們設置的回調函數?答案是有。
public class AsyncConsumer { public static void main(String[] args) throws Exception { test2(); System.in.read(); } public static void test2() throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" }); System.out.println(context); context.start(); /** 加法運算 **/ CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService"); calcSumService.sum(3, 2); /** 執行回調函數 **/ ((FutureAdapter<Object>) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() { @Override public void done(Object response) { System.out.println("sumResult=" + response); } @Override public void caught(Throwable exception) { exception.printStackTrace(); } }); /** 減法運算 **/ CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService"); calcSubtractionService.subtraction(3, 2); /** 執行回調函數 **/ ((FutureAdapter<Object>) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() { @Override public void done(Object response) { System.out.println("subtractionResult=" + response); } @Override public void caught(Throwable exception) { exception.printStackTrace(); } }); } }
DefaultFuture能夠設置callback回調函數,當結果返回時若是回調函數不爲空則執行:
public class DefaultFuture implements ResponseFuture { private volatile ResponseCallback callback; private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } if (callback != null) { // 執行回調函數 invokeCallback(callback); } } private void invokeCallback(ResponseCallback c) { ResponseCallback callbackCopy = c; if (callbackCopy == null) { throw new NullPointerException("callback cannot be null."); } c = null; Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null. url:" + channel.getUrl()); } if (res.getStatus() == Response.OK) { try { // 執行成功回調 callbackCopy.done(res.getResult()); } catch (Exception e) { logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(), e); } } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { try { TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); // 發生超時回調 callbackCopy.caught(te); } catch (Exception e) { logger.error("callback invoke error ,url:" + channel.getUrl(), e); } } else { try { RuntimeException re = new RuntimeException(res.getErrorMessage()); callbackCopy.caught(re); } catch (Exception e) { logger.error("callback invoke error ,url:" + channel.getUrl(), e); } } } }
3.2 version_2.7.0
CompletableFuture在這個版本中被引入實現異步調用,能夠使用此類強大的異步編程API加強異步能力,咱們首先回顧1.1.2章節實例:
public class Consumer { public static void testAsync() { try { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" }); System.out.println(context); context.start(); /** 加法運算 **/ CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService"); calcSumService.sum(3, 2); CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture(); /** 減法運算 **/ CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService"); calcSubtractionService.subtraction(3, 2); CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture(); /** 輸出結果 **/ int sumResult = futureSum.get(); int subtractionResult = futureSubtraction.get(); System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult); } catch (Exception e) { e.printStackTrace(); } } }
在上述消費者代碼的實例中咱們只是應用了CompletableFuture.get()方法,並無發揮其強大功能。咱們對上述實例稍加改造,兩個CompletionStage任務都執行完成後,兩個任務結果會一塊兒交給thenCombine進行處理:
public class Consumer { public static void testAsync() { try { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer2.xml" }); System.out.println(context); context.start(); /** 加法運算 **/ CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService"); calcSumService.sum(3, 2); CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture(); /** 減法運算 **/ CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService"); calcSubtractionService.subtraction(3, 2); CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture(); /** 乘法運算 **/ CompletableFuture<Integer> multiplyResult = futureSum.thenCombine(futureSubtraction, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer t, Integer u) { return (t * u); } }); System.out.println("multiplyResult=" + multiplyResult.get()); } catch (Exception e) { e.printStackTrace(); } } }
DubboInvoker代碼有所變化:
public class DubboInvoker<T> extends AbstractInvoker<T> { @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 是否爲異步調用 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // 是否爲future異步方式 boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv); // 是否須要響應結果 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // 超時時間 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 單向調用 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } // 異步請求 else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future); RpcContext.getContext().setFuture(futureAdapter); Result result; if (isAsyncFuture) { result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } else { result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } return result; } // 同步請求 else { RpcContext.getContext().setFuture(null); Result result = (Result) currentClient.request(inv, timeout).get(); return result; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } }
咱們看到與2.6.9版本相同的是FutureAdapter一樣會被設置到上下文,可是FutureAdapter自己已經發生了變化:
public class FutureAdapter<V> extends CompletableFuture<V> { private final ResponseFuture future; private CompletableFuture<Result> resultFuture; public FutureAdapter(ResponseFuture future) { this.future = future; this.resultFuture = new CompletableFuture<>(); // 設置回調函數至DefaultFuture future.setCallback(new ResponseCallback() { // 設置響應結果至CompletableFuture @Override public void done(Object response) { Result result = (Result) response; FutureAdapter.this.resultFuture.complete(result); V value = null; try { value = (V) result.recreate(); } catch (Throwable t) { FutureAdapter.this.completeExceptionally(t); } FutureAdapter.this.complete(value); } // 設置異常結果至FutureAdapter @Override public void caught(Throwable exception) { FutureAdapter.this.completeExceptionally(exception); } }); } public ResponseFuture getFuture() { return future; } public CompletableFuture<Result> getResultFuture() { return resultFuture; } }
咱們在服務消費時經過getResultFuture方法獲取CompletableFuture,這個對象值在回調時被設置,回調時機一樣在DefaultFuture.doReceived方法裏面:
public class DefaultFuture implements ResponseFuture { private volatile ResponseCallback callback; private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } if (callback != null) { // 執行回調函數代碼同version_2.6.9 invokeCallback(callback); } } }
4 文章總結
本文第一介紹了DUBBO消費異步化是什麼,以及異步化爲何會帶來性能提高。第二介紹了保護性暫停模式,這是實現異步化的基礎。最後咱們閱讀了兩個不一樣版本異步化源碼,瞭解了DUBBO異步化演進過程,但願本文對你們有所幫助。
歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我我的微信「java_front」一塊兒交流學習