分佈式事務 TCC-Transaction 源碼分析 —— Dubbo 支持

摘要: 原創出處 http://www.iocoder.cn/TCC-Transaction/dubbo-support/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!html

本文主要基於 TCC-Transaction 1.2.3.3 正式版java


🙂🙂🙂關注**微信公衆號:【芋道源碼】**有福利:git

  1. RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  4. 新的源碼解析文章實時收到通知。每週更新一篇左右
  5. 認真的源碼交流微信羣。

1. 概述

本文分享 Dubbo 支持github

TCC-Transaction 經過 Dubbo 隱式傳參的功能,避免本身對業務代碼的入侵。可能有同窗不太理解爲何說 TCC-Transaction 對業務代碼有必定的入侵性,一塊兒來看個代碼例子:spring

public interface CapitalTradeOrderService {
    String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto);
}
複製代碼
  • 代碼來自 tcc-transaction-http-sample 。聲明遠程調用時,增長了參數 TransactionContext。固然你也能夠經過本身使用的遠程調用框架作必定封裝,避免入侵。

以下是對 Dubbo 封裝了後,Dubbo Service 方法的例子:api

public interface CapitalTradeOrderService {

    @Compensable
    String record(CapitalTradeOrderDto tradeOrderDto);

}
複製代碼
  • 代碼來自 http-transaction-dubbo-sample 。是否是不須要傳入參數 TransactionContext。固然,註解是確定須要的,不然 TCC-Transaction 怎麼知道哪些方法是 TCC 方法。

TCC-Transaction 經過 Dubbo Proxy 的機制,實現 @Compensable 屬性自動生成,增長開發體驗,也避免出錯。緩存


Dubbo 支持( Maven 項目 tcc-transaction-dubbo ) 總體代碼結構以下:微信

  • proxy
  • context

咱們分紅兩個小節分享這兩個包實現的功能。併發

筆者暫時對 Dubbo 瞭解的不夠深刻,若是有錯誤的地方,還煩請指出,謝謝。app

你行好事會由於獲得讚揚而愉悅
同理,開源項目貢獻者會由於 Star 而更加有動力
爲 TCC-Transaction 點贊!傳送門

ps:筆者假設你已經閱讀過《tcc-transaction 官方文檔 —— 使用指南1.2.x》

2. Dubbo 代理

將 Dubbo Service 方法上的註解 @Compensable ,自動生成註解的 confirmMethodcancelMethodtransactionContextEditor 屬性,例子代碼以下:

@Compensable(propagation=Propagation.SUPPORTS, confirmMethod="record", cancelMethod="record", transactionContextEditor=DubboTransactionContextEditor.class)
public String record(RedPacketTradeOrderDto paramRedPacketTradeOrderDto) {
    // ... 省略代碼
}
複製代碼
  • 該代碼經過 Javassist 生成的 Proxy 代碼的示例。
  • propagation=Propagation.SUPPORTS :支持當前事務,若是當前沒有事務,就以非事務方式執行。爲何不使用 REQUIRED ?若是使用 REQUIRED 事務傳播級別,事務恢復重試時,會發起新的事務。
  • confirmMethodcancelMethod 使用和 try 方法相同方法名本地發起遠程服務 TCC confirm / cancel 階段,調用相同方法進行事務的提交或回滾。遠程服務的 CompensableTransactionInterceptor 會根據事務的狀態是 CONFIRMING / CANCELLING 來調用對應方法。
  • transactionContextEditor=DubboTransactionContextEditor.class,使用 Dubbo 事務上下文編輯器,在「3. Dubbo 事務上下文編輯器」詳細分享。

Dubbo Service Proxy 提供了兩種生成方式:

  • JavassistProxyFactory,基於 Javassist 方式
  • JdkProxyFactory,基於 JDK 動態代理機制

這塊內容咱們不拓展開,感興趣的同窗點擊以下文章:

Dubbo 的 Invoker 模型是很是關鍵的概念,看下圖:

2.1 JavassistProxyFactory

2.1.1 Javassist

Javassist 是一個開源的分析、編輯和建立 Java 字節碼的類庫。經過使用Javassist 對字節碼操做能夠實現動態 」AOP」 框架。

關於 Java 字節碼的處理,目前有不少工具,如 bcel,asm( cglib只是對asm又封裝了一層 )。不過這些都須要直接跟虛擬機指令打交道。

Javassist 的主要的優勢,在於簡單,並且快速,直接使用 Java 編碼的形式,而不須要了解虛擬機指令,就能動態改變類的結構,或者動態生成類。

2.1.2 TccJavassistProxyFactory

org.mengyun.tcctransaction.dubbo.proxy.javassist.TccJavassistProxyFactory,TCC Javassist 代理工廠。實現代碼以下:

public class TccJavassistProxyFactory extends JavassistProxyFactory {

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) TccProxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

}
複製代碼
  • 項目啓動時,調用 TccJavassistProxyFactory#getProxy(...) 方法,生成 Dubbo Service 調用 Proxy。
  • com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler,Dubbo 調用處理器,點擊鏈接查看代碼。

2.1.3 TccProxy & TccClassGenerator

org.mengyun.tcctransaction.dubbo.proxy.javassist.TccProxy,TCC Proxy 工廠,生成 Dubbo Service 調用 Proxy 。筆者認爲,TccProxy 改爲 TccProxyFactory 更合適,緣由在下文。

org.mengyun.tcctransaction.dubbo.proxy.javassist.TccClassGenerator,TCC 類代碼生成器,基於 Javassist 實現。

🦅案例

一個 Dubbo Service,TccProxy 會動態生成兩個類:

  • Dubbo Service 調用 Proxy
  • Dubbo Service 調用 ProxyFactory,生成對應的 Dubbo Service Proxy

例如 Dubbo Service 接口以下:

public interface RedPacketTradeOrderService {

    @Compensable
    String record(RedPacketTradeOrderDto tradeOrderDto);
}
複製代碼

生成 Dubbo Service 調用 ProxyFactory 以下 :

public class TccProxy3 extends TccProxy implements TccClassGenerator.DC {
  public Object newInstance(InvocationHandler paramInvocationHandler) {
    return new proxy3(paramInvocationHandler);
  }
}
複製代碼
  • TccProxy 提供 #newInstance(handler) 方法,建立 Proxy,因此筆者認爲,TccProxy 改爲 TccProxyFactory 更合適。
  • org.mengyun.tcctransaction.dubbo.proxy.javassist.TccClassGenerator.DC 動態生成類標記,標記該類由 TccClassGenerator 生成的。

生成 Dubbo Service 調用 Proxy 以下 :

public class proxy3 implements TccClassGenerator.DC, RedPacketTradeOrderService, EchoService {
    
    public static Method[] methods;
    private InvocationHandler handler;

    public proxy3() {}

    public proxy3(InvocationHandler paramInvocationHandler) {
        this.handler = paramInvocationHandler;
    }

    @Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = DubboTransactionContextEditor.class)
    public String record(RedPacketTradeOrderDto paramRedPacketTradeOrderDto) {
        Object[] arrayOfObject = new Object[1];
        arrayOfObject[0] = paramRedPacketTradeOrderDto;
        Object localObject = this.handler.invoke(this, methods[0], arrayOfObject);
        return (String) localObject;
    }

    public Object $echo(Object paramObject) {
        Object[] arrayOfObject = new Object[1];
        arrayOfObject[0] = paramObject;
        Object localObject = this.handler.invoke(this, methods[1], arrayOfObject);
        return (Object) localObject;
    }
}
複製代碼
  • com.alibaba.dubbo.rpc.service.EchoService,Dubbo Service 回聲服務接口,用於服務健康檢查,Dubbo Service 默認自動實現該接口,點擊鏈接查看代碼。
  • org.mengyun.tcctransaction.dubbo.proxy.javassist.TccClassGenerator.DC 動態生成類標記,標記該類由 TccClassGenerator 生成的。

🦅實現

調用 TccProxy#getProxy(...) 方法,得到 TCC Proxy 工廠,實現代碼以下:

1: // 【TccProxy.java】
  2: public static TccProxy getProxy(ClassLoader cl, Class<?>... ics) {
  3:     // 校驗接口超過上限
  4:     if (ics.length > 65535) {
  5:         throw new IllegalArgumentException("interface limit exceeded");
  6:     }
  7: 
  8:     // use interface class name list as key.
  9:     StringBuilder sb = new StringBuilder();
 10:     for (Class<?> ic : ics) {
 11:         String itf = ic.getName();
 12:         // 校驗是否爲接口
 13:         if (!ic.isInterface()) {
 14:             throw new RuntimeException(itf + " is not a interface.");
 15:         }
 16:         // 加載接口類
 17:         Class<?> tmp = null;
 18:         try {
 19:             tmp = Class.forName(itf, false, cl);
 20:         } catch (ClassNotFoundException ignored) {
 21:         }
 22:         if (tmp != ic) { // 加載接口類失敗
 23:             throw new IllegalArgumentException(ic + " is not visible from class loader");
 24:         }
 25:         sb.append(itf).append(';');
 26:     }
 27:     String key = sb.toString();
 28: 
 29:     // get cache by class loader.
 30:     Map<String, Object> cache;
 31:     synchronized (ProxyCacheMap) {
 32:         cache = ProxyCacheMap.get(cl);
 33:         if (cache == null) {
 34:             cache = new HashMap<String, Object>();
 35:             ProxyCacheMap.put(cl, cache);
 36:         }
 37:     }
 38: 
 39:     // 得到 TccProxy 工廠
 40:     TccProxy proxy = null;
 41:     synchronized (cache) {
 42:         do {
 43:             // 從緩存中獲取 TccProxy 工廠
 44:             Object value = cache.get(key);
 45:             if (value instanceof Reference<?>) {
 46:                 proxy = (TccProxy) ((Reference<?>) value).get();
 47:                 if (proxy != null) {
 48:                     return proxy;
 49:                 }
 50:             }
 51:             // 緩存中不存在,設置生成 TccProxy 代碼標記。建立中時,其餘建立請求等待,避免併發。
 52:             if (value == PendingGenerationMarker) {
 53:                 try {
 54:                     cache.wait();
 55:                 } catch (InterruptedException ignored) {
 56:                 }
 57:             } else {
 58:                 cache.put(key, PendingGenerationMarker);
 59:                 break;
 60:             }
 61:         }
 62:         while (true);
 63:     }
 64: 
 65:     long id = PROXY_CLASS_COUNTER.getAndIncrement();
 66:     String pkg = null;
 67:     TccClassGenerator ccp = null; // proxy class generator
 68:     TccClassGenerator ccm = null; // proxy factory class generator
 69:     try {
 70:         // 建立 Tcc class 代碼生成器
 71:         ccp = TccClassGenerator.newInstance(cl);
 72: 
 73:         Set<String> worked = new HashSet<String>(); // 已處理方法簽名集合。key:方法簽名
 74:         List<Method> methods = new ArrayList<Method>(); // 已處理方法集合。
 75: 
 76:         // 處理接口
 77:         for (Class<?> ic : ics) {
 78:             // 非 public 接口,使用接口包名
 79:             if (!Modifier.isPublic(ic.getModifiers())) {
 80:                 String npkg = ic.getPackage().getName();
 81:                 if (pkg == null) {
 82:                     pkg = npkg;
 83:                 } else {
 84:                     if (!pkg.equals(npkg)) { // 實現了兩個非 public 的接口,
 85:                         throw new IllegalArgumentException("non-public interfaces from different packages");
 86:                     }
 87:                 }
 88:             }
 89:             // 添加接口
 90:             ccp.addInterface(ic);
 91:             // 處理接口方法
 92:             for (Method method : ic.getMethods()) {
 93:                 // 添加方法簽名到已處理方法簽名集合
 94:                 String desc = ReflectUtils.getDesc(method);
 95:                 if (worked.contains(desc)) {
 96:                     continue;
 97:                 }
 98:                 worked.add(desc);
 99:                 // 生成接口方法實現代碼
100:                 int ix = methods.size();
101:                 Class<?> rt = method.getReturnType();
102:                 Class<?>[] pts = method.getParameterTypes();
103:                 StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
104:                 for (int j = 0; j < pts.length; j++) {
105:                     code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
106:                 }
107:                 code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);");
108:                 if (!Void.TYPE.equals(rt)) {
109:                     code.append(" return ").append(asArgument(rt, "ret")).append(";");
110:                 }
111:                 methods.add(method);
112:                 // 添加方法
113:                 Compensable compensable = method.getAnnotation(Compensable.class);
114:                 if (compensable != null) {
115:                     ccp.addMethod(true, method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
116:                 } else {
117:                     ccp.addMethod(false, method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
118:                 }
119:             }
120:         }
121: 
122:         // 設置包路徑
123:         if (pkg == null) {
124:             pkg = PACKAGE_NAME;
125:         }
126: 
127:         // create ProxyInstance class.
128:         // 設置類名
129:         String pcn = pkg + ".proxy" + id;
130:         ccp.setClassName(pcn);
131:         // 添加靜態屬性 methods
132:         ccp.addField("public static java.lang.reflect.Method[] methods;");
133:         // 添加屬性 handler
134:         ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
135:         // 添加構造方法,參數 handler
136:         ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
137:         // 添加構造方法,參數 空
138:         ccp.addDefaultConstructor();
139:         // 生成類
140:         Class<?> clazz = ccp.toClass();
141:         // 設置靜態屬性 methods
142:         clazz.getField("methods").set(null, methods.toArray(new Method[0]));
143: 
144:         // create TccProxy class.
145:         // 建立 Tcc class 代碼生成器
146:         ccm = TccClassGenerator.newInstance(cl);
147:         // 設置類名
148:         String fcn = TccProxy.class.getName() + id;
149:         ccm.setClassName(fcn);
150:         // 添加構造方法,參數 空
151:         ccm.addDefaultConstructor();
152:         // 設置父類爲 TccProxy.class
153:         ccm.setSuperClass(TccProxy.class);
154:         // 添加方法 #newInstance(handler)
155:         ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
156:         // 生成類
157:         Class<?> pc = ccm.toClass();
158:         // 建立 TccProxy 對象
159:         proxy = (TccProxy) pc.newInstance();
160:     } catch (RuntimeException e) {
161:         throw e;
162:     } catch (Exception e) {
163:         throw new RuntimeException(e.getMessage(), e);
164:     } finally {
165:         // release TccClassGenerator
166:         if (ccp != null) {
167:             ccp.release();
168:         }
169:         if (ccm != null) {
170:             ccm.release();
171:         }
172:         // 喚醒緩存 wait
173:         synchronized (cache) {
174:             if (proxy == null) {
175:                 cache.remove(key);
176:             } else {
177:                 cache.put(key, new WeakReference<TccProxy>(proxy));
178:             }
179:             cache.notifyAll();
180:         }
181:     }
182:     return proxy;
183: }
複製代碼
  • 第 3 至 7 行 :校驗接口超過上限。

  • 第 8 至 27 行 :使用接口集合類名以 ; 分隔拼接,做爲 Proxy 的惟一標識。例如 :key=org.mengyun.tcctransaction.sample.dubbo.redpacket.api.RedPacketAccountService;com.alibaba.dubbo.rpc.service.EchoService;

  • 第 29 至 37 行 :得到 Proxy 對應的 ClassLoader。這裏咱們看下靜態屬性 ProxyCacheMap 的定義:

    /** * Proxy 對象緩存 * key :ClassLoader * value.key :Tcc Proxy 標識。使用 Tcc Proxy 實現接口名拼接 * value.value :Tcc Proxy 工廠對象 */
    private static final Map<ClassLoader, Map<String, Object>> ProxyCacheMap = new WeakHashMap<ClassLoader, Map<String, Object>>();
    複製代碼
  • 第 39 至 63 行 :一直得到 TCC Proxy 工廠直到成功。

    • 第 43 至 50 行 :從緩存中獲取 TCC Proxy 工廠。
    • 第 51 至 60 行 :若緩存中不存在,設置正在生成 TccProxy 代碼標記。建立中時,其餘建立請求等待,避免併發。
  • 第 65 行 :PROXY_CLASS_COUNTER,Proxy Class 計數,用於生成 Proxy 類名自增。代碼以下:

    private static final AtomicLong PROXY_CLASS_COUNTER = new AtomicLong(0);
    複製代碼
  • 第 66 至 67 行

    • ccm,生成 Dubbo Service 調用 ProxyFactory 的代碼生成器
    • ccp,生成 Dubbo Service 調用 Proxy 的代碼生成器
  • 第 70 至 142 行 :生成 Dubbo Service 調用 Proxy 的代碼

    • 第 70 至 71 行 :調用 TccClassGenerator#newInstance(loader) 方法, 建立生成 Dubbo Service 調用 Proxy 的代碼生成器。實現代碼以下:

      // TccClassGenerator.java
      public final class TccClassGenerator {
          
          /** * CtClass hash 集合 * key:類名 */
          private ClassPool mPool;
          
          public static TccClassGenerator newInstance(ClassLoader loader) {
              return new TccClassGenerator(getClassPool(loader));
          }
          
          private TccClassGenerator(ClassPool pool) {
              mPool = pool;
          }
      }
      複製代碼
      • ClassPool 是一個 CtClass 對象的 hash 表,類名作爲 key 。ClassPool 的 #get(key) 搜索 hash 表找到與指定 key 關聯的 CtClass 對象。若是沒有找到 CtClass 對象,#get(key) 讀一個類文件構建新的 CtClass 對象,它是被記錄在 hash 表中而後返回這個對象。
    • 第 76 至 120 行,處理接口。

      • 第 79 至 88 行,生成類的包名。

      • 第 89 至 90 行,調用 TccClassGenerator#addInterface(cl) 方法,添加生成類的接口( Dubbo Service 接口 )。實現代碼以下:

        /** * 生成類的接口集合 */
        private Set<String> mInterfaces;
        
        public TccClassGenerator addInterface(Class<?> cl) {
           return addInterface(cl.getName());
        }
        
        public TccClassGenerator addInterface(String cn) {
           if (mInterfaces == null) {
               mInterfaces = new HashSet<String>();
           }
           mInterfaces.add(cn);
           return this;
        }
        複製代碼
        • x
      • 第 93 至 98 行,添加方法簽名到已處理方法簽名集合。多個接口可能存在相同的接口方法,跳過相同的方法,避免衝突。

      • 第 99 至 110 行,生成 Dubbo Service 調用實現代碼。案例代碼以下:

        public String record(RedPacketTradeOrderDto paramRedPacketTradeOrderDto) {
            Object[] arrayOfObject = new Object[1];
            arrayOfObject[0] = paramRedPacketTradeOrderDto;
            Object localObject = this.handler.invoke(this, methods[0], arrayOfObject);
            return (String)localObject;
          }
        複製代碼
      • 第 112 至 118 行 :調用 TccClassGenerator#addMethod(...) 方法,添加生成的方法。實現代碼以下:

        /** * 生成類的方法代碼集合 */
        private List<String> mMethods;
        
        /** * 帶 @Compensable 方法代碼集合 */
        private Set<String> compensableMethods = new HashSet<String>();
        
        public TccClassGenerator addMethod(boolean isCompensableMethod, String name, int mod, Class<?> rt, Class<?>[] pts, Class<?>[] ets, String body) {
           // 拼接方法
           StringBuilder sb = new StringBuilder();
           sb.append(modifier(mod)).append(' ').append(ReflectUtils.getName(rt)).append(' ').append(name);
           sb.append('(');
           for (int i = 0; i < pts.length; i++) {
               if (i > 0)
                   sb.append(',');
               sb.append(ReflectUtils.getName(pts[i]));
               sb.append(" arg").append(i);
           }
           sb.append(')');
           if (ets != null && ets.length > 0) {
               sb.append(" throws ");
               for (int i = 0; i < ets.length; i++) {
                   if (i > 0)
                       sb.append(',');
                   sb.append(ReflectUtils.getName(ets[i]));
               }
           }
           sb.append('{').append(body).append('}');
           // 是否有 @Compensable 註解
           if (isCompensableMethod) {
               compensableMethods.add(sb.toString());
           }
           return addMethod(sb.toString());
        }
        
        public TccClassGenerator addMethod(String code) {
           if (mMethods == null) {
               mMethods = new ArrayList<String>();
           }
           mMethods.add(code);
           return this;
        }
        複製代碼
    • 第 122 至 130 行,生成類名( 例如,org.mengyun.tcctransaction.dubbo.proxy.javassist.proxy3 ),並調用 TccClassGenerator#setClassName(...) 方法,設置類名。實現代碼以下:

      /** * 生成類的類名 */
      private String mClassName;
      
      public TccClassGenerator setClassName(String name) {
         mClassName = name;
         return this;
      }
      複製代碼
      • x
    • 第 131 至 134 行,調用 TccClassGenerator#addField(...) 方法,添加靜態屬性 methods ( Dubbo Service 方法集合 )和屬性 handler ( Dubbo InvocationHandler )。實現代碼以下:

      /** * 生成類的屬性集合 */
      private List<String> mFields;
      
      public TccClassGenerator addField(String code) {
         if (mFields == null) {
             mFields = new ArrayList<String>();
         }
         mFields.add(code);
         return this;
      }
      複製代碼
      • x
    • 第 135 至 136 行,調用 TccClassGenerator#addConstructor(...) 方法,添加參數爲 handler 的構造方法。實現代碼以下:

      /** * 生成類的非空構造方法代碼集合 */
      private List<String> mConstructors;
      
      public TccClassGenerator addConstructor(int mod, Class<?>[] pts, Class<?>[] ets, String body) {
         // 構造方法代碼
         StringBuilder sb = new StringBuilder();
         sb.append(modifier(mod)).append(' ').append(SIMPLE_NAME_TAG);
         sb.append('(');
         for (int i = 0; i < pts.length; i++) {
             if (i > 0)
                 sb.append(',');
             sb.append(ReflectUtils.getName(pts[i]));
             sb.append(" arg").append(i);
         }
         sb.append(')');
         if (ets != null && ets.length > 0) {
             sb.append(" throws ");
             for (int i = 0; i < ets.length; i++) {
                 if (i > 0)
                     sb.append(',');
                 sb.append(ReflectUtils.getName(ets[i]));
             }
         }
         sb.append('{').append(body).append('}');
         //
         return addConstructor(sb.toString());
      }
      
      public TccClassGenerator addConstructor(String code) {
         if (mConstructors == null) {
             mConstructors = new LinkedList<String>();
         }
         mConstructors.add(code);
         return this;
      }
      
      public TccClassGenerator addConstructor(String code) {
         if (mConstructors == null) {
             mConstructors = new LinkedList<String>();
         }
         mConstructors.add(code);
         return this;
      }
      複製代碼
      • x
    • 第 137 至 138 行,調用 TccClassGenerator#addDefaultConstructor() 方法,添加默認空構造方法。實現代碼以下:

      /** * 默認空構造方法 */
      private boolean mDefaultConstructor = false;
      
      public TccClassGenerator addDefaultConstructor() {
         mDefaultConstructor = true;
         return this;
      }
      複製代碼
      • x
    • 第 139 行,調用 TccClassGenerator#toClass() 方法,生成類。實現代碼以下:

      1: public Class<?> toClass() {
        2:    // mCtc 非空時,進行釋放;下面會進行建立 mCtc
        3:    if (mCtc != null) {
        4:        mCtc.detach();
        5:    }
        6:    long id = CLASS_NAME_COUNTER.getAndIncrement();
        7:    try {
        8:        CtClass ctcs = mSuperClass == null ? null : mPool.get(mSuperClass);
        9:        if (mClassName == null) { // 類名
       10:            mClassName = (mSuperClass == null || javassist.Modifier.isPublic(ctcs.getModifiers())
       11:                    ? TccClassGenerator.class.getName() : mSuperClass + "$sc") + id;
       12:        }
       13:        // 建立 mCtc
       14:        mCtc = mPool.makeClass(mClassName);
       15:        if (mSuperClass != null) { // 繼承類
       16:            mCtc.setSuperclass(ctcs);
       17:        }
       18:        mCtc.addInterface(mPool.get(DC.class.getName())); // add dynamic class tag.
       19:        if (mInterfaces != null) { // 實現接口集合
       20:            for (String cl : mInterfaces) {
       21:                mCtc.addInterface(mPool.get(cl));
       22:            }
       23:        }
       24:        if (mFields != null) { // 屬性集合
       25:            for (String code : mFields) {
       26:                mCtc.addField(CtField.make(code, mCtc));
       27:            }
       28:        }
       29:        if (mMethods != null) { // 方法集合
       30:            for (String code : mMethods) {
       31:                if (code.charAt(0) == ':') {
       32:                    mCtc.addMethod(CtNewMethod.copy(getCtMethod(mCopyMethods.get(code.substring(1))), code.substring(1, code.indexOf('(')), mCtc, null));
       33:                } else {
       34:                    CtMethod ctMethod = CtNewMethod.make(code, mCtc);
       35:                    if (compensableMethods.contains(code)) {
       36:                        // 設置 @Compensable 屬性
       37:                        ConstPool constpool = mCtc.getClassFile().getConstPool();
       38:                        AnnotationsAttribute attr = new AnnotationsAttribute(constpool, AnnotationsAttribute.visibleTag);
       39:                        Annotation annot = new Annotation("org.mengyun.tcctransaction.api.Compensable", constpool);
       40:                        EnumMemberValue enumMemberValue = new EnumMemberValue(constpool);
       41:                        enumMemberValue.setType("org.mengyun.tcctransaction.api.Propagation");
       42:                        enumMemberValue.setValue("SUPPORTS");
       43:                        annot.addMemberValue("propagation", enumMemberValue);
       44:                        annot.addMemberValue("confirmMethod", new StringMemberValue(ctMethod.getName(), constpool));
       45:                        annot.addMemberValue("cancelMethod", new StringMemberValue(ctMethod.getName(), constpool));
       46:                        ClassMemberValue classMemberValue = new ClassMemberValue("org.mengyun.tcctransaction.dubbo.context.DubboTransactionContextEditor", constpool);
       47:                        annot.addMemberValue("transactionContextEditor", classMemberValue);
       48:                        attr.addAnnotation(annot);
       49:                        ctMethod.getMethodInfo().addAttribute(attr);
       50:                    }
       51:                    mCtc.addMethod(ctMethod);
       52:                }
       53:            }
       54:        }
       55:        if (mDefaultConstructor) { // 空參數構造方法
       56:            mCtc.addConstructor(CtNewConstructor.defaultConstructor(mCtc));
       57:        }
       58:        if (mConstructors != null) { // 帶參數構造方法
       59:            for (String code : mConstructors) {
       60:                if (code.charAt(0) == ':') {
       61:                    mCtc.addConstructor(CtNewConstructor.copy(getCtConstructor(mCopyConstructors.get(code.substring(1))), mCtc, null));
       62:                } else {
       63:                    String[] sn = mCtc.getSimpleName().split("\\$+"); // inner class name include $.
       64:                    mCtc.addConstructor(CtNewConstructor.make(code.replaceFirst(SIMPLE_NAME_TAG, sn[sn.length - 1]), mCtc));
       65:                }
       66:            }
       67:        }
       68: // mCtc.debugWriteFile("/Users/yunai/test/" + mCtc.getSimpleName().replaceAll(".", "/") + ".class");
       69:        // 生成
       70:        return mCtc.toClass();
       71:    } catch (RuntimeException e) {
       72:        throw e;
       73:    } catch (NotFoundException e) {
       74:        throw new RuntimeException(e.getMessage(), e);
       75:    } catch (CannotCompileException e) {
       76:        throw new RuntimeException(e.getMessage(), e);
       77:    }
       78: }
      複製代碼
      • 基於 Javassist 生成類。這裏不作拓展解釋,配合《Java學習之javassist》一塊兒理解。
      • 第 18 行,添加 org.mengyun.tcctransaction.dubbo.proxy.javassist.TccClassGenerator.DC 動態生成類標記,標記該類由 TccClassGenerator 生成的。
      • 第 34 至 50 行,設置 @Compensable 默認屬性。
    • 第 141 至 142 行,設置 Dubbo Service 方法集合設置到靜態屬性 methods 上。

  • 第 144 至 157 行,生成 Dubbo Service 調用 Proxy 工廠的代碼

    • 第 146 行,調用 TccClassGenerator#newInstance(loader) 方法, 建立生成 Dubbo Service 調用 Proxy 工廠 的代碼生成器。

    • 第 147 至 149 行,生成類名( 例如,org.mengyun.tcctransaction.dubbo.proxy.javassist.TccProxy3 ),並調用 TccClassGenerator#setClassName(...) 方法,設置類名。

    • 第 150 至 151 行,調用 TccClassGenerator#addDefaultConstructor() 方法,添加默認空構造方法。

    • 第 152 至 153 行,調用 TccClassGenerator#mSuperClass() 方法,設置繼承父類 TccProxy。實現代碼以下:

      /** * 生成類的父類名字 */
      private String mSuperClass;
      
      public TccClassGenerator setSuperClass(Class<?> cl) {
         mSuperClass = cl.getName();
         return this;
      }
      複製代碼
      • x
    • 第 154 至 155 行,調用 TccClassGenerator#addInterface(cl) 方法,添加生成 Proxy 實現代碼的方法。代碼案例以下:

      public Object newInstance(InvocationHandler paramInvocationHandler) {
           return new proxy3(paramInvocationHandler);
      }
      複製代碼
      • x
    • 第 156 至 157 行,調用 TccClassGenerator#toClass() 方法,生成類

  • 第 159 行,調用 TccProxy#newInstance() 方法,建立 Proxy 。實現代碼以下:

    /** * get instance with default handler. * * @return instance. */
    public Object newInstance() {
       return newInstance(THROW_UNSUPPORTED_INVOKER);
    }
    
    /** * get instance with special handler. * * @return instance. */
    abstract public Object newInstance(InvocationHandler handler);
    複製代碼
    • #newInstance(handler),抽象方法,上面第 154 至 155 行生成。TccJavassistProxyFactory 調用該方法,得到 Proxy 。
  • 第 165 至 171 行,釋放 TccClassGenerator 。實現代碼以下:

    public void release() {
       if (mCtc != null) {
           mCtc.detach();
       }
       if (mInterfaces != null) {
           mInterfaces.clear();
       }
       if (mFields != null) {
           mFields.clear();
       }
       if (mMethods != null) {
           mMethods.clear();
       }
       if (mConstructors != null) {
           mConstructors.clear();
       }
       if (mCopyMethods != null) {
           mCopyMethods.clear();
       }
       if (mCopyConstructors != null) {
           mCopyConstructors.clear();
       }
    }
    複製代碼
  • 第 172 至 180 行,設置 Proxy 工廠緩存,並喚醒等待線程。

**ps:**代碼比較多,收穫會比較多,算是 Javassist 實戰案例了。TCC-Transaction 做者在實現上述類,可能參考了 Dubbo 自帶的實現:

2.1.4 配置 Dubbo Proxy

// META-INF.dubbo/com.alibaba.dubbo.rpc.ProxyFactory
tccJavassist=org.mengyun.tcctransaction.dubbo.proxy.javassist.TccJavassistProxyFactory

// tcc-transaction-dubbo.xml
<dubbo:provider proxy="tccJavassist"/>
複製代碼

目前 Maven 項目 tcc-transaction-dubbo 已經默認配置,引入便可。

2.2 JdkProxyFactory

2.2.1 JDK Proxy

《 Java JDK 動態代理(AOP)使用及實現原理分析》

2.2.2 TccJdkProxyFactory

org.mengyun.tcctransaction.dubbo.proxy.jd.TccJdkProxyFactory,TCC JDK 代理工廠。實現代碼以下:

public class TccJdkProxyFactory extends JdkProxyFactory {

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        T proxy = (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new TccInvokerInvocationHandler(proxy, invoker));
    }

}
複製代碼
  • 項目啓動時,調用 TccJavassistProxyFactory#getProxy(...) 方法,生成 Dubbo Service 調用 Proxy。
  • 第一次調用 Proxy#newProxyInstance(...) 方法,建立調用 Dubbo Service 服務的 Proxy。com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler,Dubbo 調用處理器,點擊鏈接查看代碼。
  • 第二次調用 Proxy#newProxyInstance(...) 方法,建立對調用 Dubbo Service 的 Proxy 的 Proxy。爲何會有兩層 Proxy?答案在下節 TccInvokerInvocationHandler 。

2.2.3 TccInvokerInvocationHandler

org.mengyun.tcctransaction.dubbo.proxy.jdk.TccInvokerInvocationHandler,TCC 調用處理器,在調用 Dubbo Service 服務時,使用 ResourceCoordinatorInterceptor 攔截處理。實現代碼以下:

1: public class TccInvokerInvocationHandler extends InvokerInvocationHandler {
  2: 
  3:     /** 4: * proxy 5: */
  6:     private Object target;
  7: 
  8:     public TccInvokerInvocationHandler(Invoker<?> handler) {
  9:         super(handler);
 10:     }
 11: 
 12:     public <T> TccInvokerInvocationHandler(T target, Invoker<T> invoker) {
 13:         super(invoker);
 14:         this.target = target;
 15:     }
 16: 
 17:     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
 18:         Compensable compensable = method.getAnnotation(Compensable.class);
 19:         if (compensable != null) {
 20:             // 設置 @Compensable 屬性
 21:             if (StringUtils.isEmpty(compensable.confirmMethod())) {
 22:                 ReflectionUtils.changeAnnotationValue(compensable, "confirmMethod", method.getName());
 23:                 ReflectionUtils.changeAnnotationValue(compensable, "cancelMethod", method.getName());
 24:                 ReflectionUtils.changeAnnotationValue(compensable, "transactionContextEditor", DubboTransactionContextEditor.class);
 25:                 ReflectionUtils.changeAnnotationValue(compensable, "propagation", Propagation.SUPPORTS);
 26:             }
 27:             // 生成切面
 28:             ProceedingJoinPoint pjp = new MethodProceedingJoinPoint(proxy, target, method, args);
 29:             // 執行
 30:             return FactoryBuilder.factoryOf(ResourceCoordinatorAspect.class).getInstance().interceptTransactionContextMethod(pjp);
 31:         } else {
 32:             return super.invoke(target, method, args);
 33:         }
 34:     }
 35: 
 36: }
複製代碼
  • 第 18 至 26 行,設置帶有 @Compensable 屬性的默認屬性。

  • 第 28 行,生成方法切面 org.mengyun.tcctransaction.dubbo.proxy.jdk.MethodProceedingJoinPoint。實現代碼以下:

    public class MethodProceedingJoinPoint implements ProceedingJoinPoint, JoinPoint.StaticPart {
    
        /** * 代理對象 */
        private Object proxy;
        /** * 目標對象 */
        private Object target;
        /** * 方法 */
        private Method method;
        /** * 參數 */
        private Object[] args;
        
        @Override
        public Object proceed() throws Throwable {
            // Use reflection to invoke the method.
            try {
                ReflectionUtils.makeAccessible(method);
                return method.invoke(target, args);
            } catch (InvocationTargetException ex) {
                // Invoked method threw a checked exception.
                // We must rethrow it. The client won't see the interceptor.
                throw ex.getTargetException();
            } catch (IllegalArgumentException ex) {
                throw new SystemException("Tried calling method [" +
                        method + "] on target [" + target + "] failed", ex);
            } catch (IllegalAccessException ex) {
                throw new SystemException("Could not access method [" + method + "]", ex);
            }
        }
    
        @Override
        public Object proceed(Object[] objects) throws Throwable {
            // throw new UnsupportedOperationException(); // TODO 芋艿:疑問
            return proceed();
        }
        
        // ... 省略不重要的方法和對象
    }
    複製代碼
    • 該類參考 org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint 實現。
    • TODO【1】 proxy 和 target 是否保留一個便可?
    • 在切面處理完成後,調用 #proceed(...) 方法,進行遠程 Dubbo Service 服務調用。
    • TODO【2】#proceed(objects) 拋出 throw new UnsupportedOperationException();。須要跟做者確認下。
  • 調用 ResourceCoordinatorAspect#interceptTransactionContextMethod(...) 方法,對方法切面攔截處理。爲何無需調用 CompensableTransactionAspect 切面?由於傳播級別爲 Propagation.SUPPORTS,不會發起事務。

2.2.4 配置 Dubbo Proxy

// META-INF.dubbo/com.alibaba.dubbo.rpc.ProxyFactory
tccJdk=org.mengyun.tcctransaction.dubbo.proxy.jdk.TccJdkProxyFactory

// appcontext-service-dubbo.xml
<dubbo:provider proxy="tccJdk"/>

<dubbo:reference proxy="tccJdk" id="captialTradeOrderService" interface="org.mengyun.tcctransaction.sample.dubbo.capital.api.CapitalTradeOrderService" timeout="5000"/>
複製代碼
  • ProxyFactory 的 tccJdk 在 Maven 項 tcc-transaction-dubbo 已經聲明。
  • 聲明 dubbo:providerproxy="tccJdk"
  • 聲明 dubbo:referenceproxy="tccJdk",不然不生效。

3. Dubbo 事務上下文編輯器

org.mengyun.tcctransaction.dubbo.context.DubboTransactionContextEditor,Dubbo 事務上下文編輯器實現,實現代碼以下:

public class DubboTransactionContextEditor implements TransactionContextEditor {

    @Override
    public TransactionContext get(Object target, Method method, Object[] args) {
        String context = RpcContext.getContext().getAttachment(TransactionContextConstants.TRANSACTION_CONTEXT);
        if (StringUtils.isNotEmpty(context)) {
            return JSON.parseObject(context, TransactionContext.class);
        }
        return null;
    }

    @Override
    public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) {
        RpcContext.getContext().setAttachment(TransactionContextConstants.TRANSACTION_CONTEXT, JSON.toJSONString(transactionContext));
    }

}
複製代碼
  • 經過 Dubbo 的隱式傳參的方式,避免在 Dubbo Service 接口上聲明 TransactionContext 參數,對接口產生必定的入侵。

666. 彩蛋

知識星球

HOHO,對動態代理又學習了一遍,蠻 High 的。

這裏推薦動態代理無關,和 Dubbo 相關的文章:

胖友,分享一波朋友圈可好。

相關文章
相關標籤/搜索