dubbo事件通知機制(1)

此文已由做者嶽猛受權網易雲社區發佈。
html

歡迎訪問網易雲社區,瞭解更多網易技術產品運營經驗。git


dubbo事件通知機制:http://dubbo.io/books/dubbo-user-book/demos/events-notify.htmlweb

1、使用方式安全

兩個服務:async

  • DemoService:真正要調用的服務ide

  • Notify:事件通知服務(用在consumer端)oop

provider測試

1 package com.alibaba.dubbo.demo;
2 
3 public interface DemoService {
4     String sayHello(String name);
5 }


1 public class DemoServiceImpl implements DemoService {
2     @Override
3     public String sayHello(String name) {
4         throw new RpcException("ex, param: " + name);//測試onthrow方法
5 //        return "Hello " + name;//測試onreturn方法
6     }
7 }


consumerurl

通知服務:Notifyspa


1 package com.alibaba.dubbo.demo.consumer.eventnotify;
2 
3 public interface Notify {
4     void oninvoke(String name); // 調用以前
5     void onreturnWithoutParam(String result); // 調用以後
6     void onreturn(String result, String name); // 調用以後
7     void onthrow(Throwable ex, String name);  // 出現異常
8 }
  1 package com.alibaba.dubbo.demo.consumer.eventnotify;
 2 
 3 public class NotifyService implements Notify {
 4     @Override
 5     public void oninvoke(String name) {
 6         System.out.println("======oninvoke======, param: " + name);
 7     }
 8 
 9     @Override
10     public void onreturnWithoutParam(String result) {
11         System.out.println("======onreturn======, result: " + result);
12     }
13 
14     @Override
15     public void onreturn(String result, String name) {
16         System.out.println("======onreturn======, param: " + name + ", result: " + result);
17     }
18 
19     @Override
20     public void onthrow(Throwable ex, String name) {
21         System.out.println("======onthrow======, param: " + name + ", exception: " + ex.getMessage());
22     }
23 }


xml配置:

1     <bean id="notifyService"  class="com.alibaba.dubbo.demo.consumer.eventnotify.NotifyService"/>
2     <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
3         <dubbo:method name="sayHello" timeout="60000" oninvoke="notifyService.oninvoke" onreturn="notifyService.onreturnWithoutParam" onthrow="notifyService.onthrow"/>
4     </dubbo:reference>

以後就能夠運行Consumer啓動類,以後調用demoService.sayHello(String name)了。

注意

  • oninvoke方法:

    • 必須具備與真實的被調用方法sayHello相同的入參列表:例如,oninvoke(String name)

  • onreturn方法:

    • 至少要有一個入參且第一個入參必須與sayHello的返回類型相同,接收返回結果:例如,onreturnWithoutParam(String result)

    • 能夠有多個參數,多個參數的狀況下,第一個後邊的全部參數都是用來接收sayHello入參的:例如, onreturn(String result, String name)

  • onthrow方法:

    • 至少要有一個入參且第一個入參類型爲Throwable或其子類,接收返回結果;例如,onthrow(Throwable ex)

    • 能夠有多個參數,多個參數的狀況下,第一個後邊的全部參數都是用來接收sayHello入參的:例如,onthrow(Throwable ex, String name)

  • 若是是consumer在調用provider的過程當中,出現異常時不會走onthrow方法的,onthrow方法只會在provider返回的RpcResult中含有Exception對象時,纔會執行。(dubbo中下層服務的Exception會被放在響應RpcResult的exception對象中傳遞給上層服務

 

2、源碼解析

整個事件通知的邏輯都在FutureFilter中,來看一下源碼:


1 /**
  2  * EventFilter
  3  */
  4 @Activate(group = Constants.CONSUMER)
  5 public class FutureFilter implements Filter {
  6 
  7     protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);
  8 
  9     public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
 10         final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
 11 
 12         //1 調用服務以前:執行xxxService.oninvoke方法
 13         fireInvokeCallback(invoker, invocation);
 14         //2 調用服務
 15         Result result = invoker.invoke(invocation);
 16         //3 調用服務以後
 17         if (isAsync) {
 18             asyncCallback(invoker, invocation);
 19         } else {
 20             syncCallback(invoker, invocation, result); 21         }
 22         //4 返回調用結果
 23         return result;
 24     }
 25 
 26     private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
 27         if (result.hasException()) {
 28             //3.1 調用服務以後:若是返回結果異常信息(注意:若是是consumer本身throw的異常,會在2的時候直接拋走,不會走到這裏),直接執行xxxService.onthrow方法
 29             fireThrowCallback(invoker, invocation, result.getException());
 30         } else {
 31             //3.2 調用服務以後:若是返回值正常,執行xxxService.onreturn方法
 32             fireReturnCallback(invoker, invocation, result.getValue());
 33         }
 34     }
 35 
 36     private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
 37         Future<?> f = RpcContext.getContext().getFuture();
 38         if (f instanceof FutureAdapter) {
 39             ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
 40             // 3.1 調用服務以後:設置回調ResponseCallback對象到DefaultFuture中,當provider返回響應時,執行DefaultFuture.doReceived方法,該方法會調用ResponseCallback對象的done或者caught方法
 41             future.setCallback(new ResponseCallback() {
 42                 public void done(Object rpcResult) {
 43                     if (rpcResult == null) {
 44                         logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
 45                         return;
 46                     }
 47                     ///must be rpcResult
 48                     if (!(rpcResult instanceof Result)) {
 49                         logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
 50                         return;
 51                     }
 52                     Result result = (Result) rpcResult;
 53                     if (result.hasException()) {
 54                         fireThrowCallback(invoker, invocation, result.getException());
 55                     } else {
 56                         fireReturnCallback(invoker, invocation, result.getValue());
 57                     }
 58                 }
 59 
 60                 public void caught(Throwable exception) {
 61                     fireThrowCallback(invoker, invocation, exception);
 62                 }
 63             });
 64         }
 65     }
 66 
 67     /**
 68      * 反射執行xxxService.oninvoke方法:必須具備與真實的被調用方法sayHello相同的入參列表。
 69      */
 70     private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
 71         final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
 72         final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));
 73 
 74         if (onInvokeMethod == null && onInvokeInst == null) {
 75             return;
 76         }
 77         if (onInvokeMethod == null || onInvokeInst == null) {
 78             throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
 79         }
 80         if (onInvokeMethod != null && !onInvokeMethod.isAccessible()) {
 81             onInvokeMethod.setAccessible(true);
 82         }
 83         // 獲取真實方法sayHello傳入的參數
 84         Object[] params = invocation.getArguments();
 85         try {
 86             onInvokeMethod.invoke(onInvokeInst, params);
 87         } catch (InvocationTargetException e) {
 88             fireThrowCallback(invoker, invocation, e.getTargetException());
 89         } catch (Throwable e) {
 90             fireThrowCallback(invoker, invocation, e);
 91         }
 92     }
 93 
 94     /**
 95      * 反射執行xxxService.onreturn方法:至少要有一個入參,接收返回結果
 96      */
 97     private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
 98         final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
 99         final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));
100 
101         //not set onreturn callback
102         if (onReturnMethod == null && onReturnInst == null) {
103             return;
104         }
105 
106         if (onReturnMethod == null || onReturnInst == null) {
107             throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
108         }
109         if (onReturnMethod != null && !onReturnMethod.isAccessible()) {
110             onReturnMethod.setAccessible(true);
111         }
112 
113         Object[] args = invocation.getArguments();
114         Object[] params;
115         Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
116         if (rParaTypes.length > 1) {
117             // onreturn(xx, Object[]) 兩個參數:第一個參數與真實方法sayHello方法返回結果類型相同,第二個接收全部的真實請求參數
118             if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
119                 params = new Object[2];
120                 params[0] = result; // 真實方法的執行結果
121                 params[1] = args;   // 真實方法sayHello傳入的參數
122             // onreturn(xx, Object... args) 多個參數:第一個參數與真實方法sayHello方法返回結果類型相同,後邊幾個接收全部的真實請求參數
123             } else {
124                 params = new Object[args.length + 1];
125                 params[0] = result; // 真實方法的執行結果
126                 System.arraycopy(args, 0, params, 1, args.length);
127             }
128         } else {
129             // onreturn(xx) 只有一個參數:接收返回執行結果
130             params = new Object[]{result}; // 執行結果
131         }
132         try {
133             onReturnMethod.invoke(onReturnInst, params);
134         } catch (InvocationTargetException e) {
135             fireThrowCallback(invoker, invocation, e.getTargetException());
136         } catch (Throwable e) {
137             fireThrowCallback(invoker, invocation, e);
138         }
139     }
140 
141     /**


免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點擊


相關文章:
【推薦】 Dubbo與Hadoop RPC的區別
【推薦】 git subrepo

相關文章
相關標籤/搜索