長文詳解:DUBBO源碼使用了哪些設計模式

歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我微信「java_front」一塊兒交流學習html


0 文章概述

DUBBO做爲RPC領域優秀開源的框架在業界十分流行,本文咱們閱讀其源碼並對其使用到的設計模式進行分析。須要說明的是本文所說的設計模式更加廣義,不只包括標準意義上23種設計模式,還有一些代碼中常見通過檢驗的代碼模式例如雙重檢查鎖模式、多線程保護性暫停模式等等。java


1 模板方法

模板方法模式定義一個操做中的算法骨架,通常使用抽象類定義算法骨架。抽象類同時定義一些抽象方法,這些抽象方法延遲到子類實現,這樣子類不只遵照了算法骨架約定,也實現了本身的算法。既保證了規約也兼顧靈活性。這就是用抽象構建框架,用實現擴展細節。mysql

DUBBO源碼中有一個很是重要的核心概念Invoker,咱們能夠理解爲執行器或者說一個可執行對象,可以根據方法的名稱、參數獲得相應執行結果,這個特性體現了代理模式咱們後面章節再說,本章節咱們先分析其中的模板方法模式。web

public abstract class AbstractInvoker<T> implements Invoker<T> {

    @Override
    public Result invoke(Invocation inv) throws RpcException {
        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
        if (attachment != null && attachment.size() > 0) {
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            invocation.addAttachments(contextAttachments);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        try {
            return doInvoke(invocation);
        } catch (InvocationTargetException e) {
            Throwable te = e.getTargetException();
            if (te == null) {
                return new RpcResult(e);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                }
                return new RpcResult(te);
            }
        } catch (RpcException e) {
            if (e.isBiz()) {
                return new RpcResult(e);
            } else {
                throw e;
            }
        } catch (Throwable e) {
            return new RpcResult(e);
        }
    }

    protected abstract Result doInvoke(Invocation invocation) throws Throwable;
}
複製代碼

AbstractInvoker做爲抽象父類定義了invoke方法這個方法骨架,而且定義了doInvoke抽象方法供子類擴展,例如子類InjvmInvoker、DubboInvoker各自實現了doInvoke方法。算法

InjvmInvoker是本地引用,調用時直接從本地暴露生產者容器獲取生產者Exporter對象便可。spring

class InjvmInvoker<T> extends AbstractInvoker<T> {

    @Override
    public Result doInvoke(Invocation invocation) throws Throwable {
        Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap, getUrl());
        if (exporter == null) {
            throw new RpcException("Service [" + key + "] not found.");
        }
        RpcContext.getContext().setRemoteAddress(Constants.LOCALHOST_VALUE, 0);
        return exporter.getInvoker().invoke(invocation);
    }
}
複製代碼

DubboInvoker相對複雜一些,須要考慮同步異步調用方式,配置優先級,遠程通訊等等。sql

public class DubboInvoker<T> extends AbstractInvoker<T> {

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

            // 超時時間方法級別配置優先級最高
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
                RpcContext.getContext().setFuture(futureAdapter);
                Result result;
                if (isAsyncFuture) {
                    result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                } else {
                    result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                }
                return result;
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}
複製代碼

2 動態代理

代理模式核心是爲一個目標對象提供一個代理,以控制對這個對象的訪問,咱們能夠經過代理對象訪問目標對象,這樣能夠加強目標對象功能。數據庫

代理模式分爲靜態代理與動態代理,動態代理又分爲JDK代理和Cglib代理,JDK代理只能代理實現類接口的目標對象,可是Cglib沒有這種要求。apache


2.1 JDK動態代理

動態代理本質是經過生成字節碼的方式將代理對象織入目標對象,本文以JDK動態代理爲例。編程

第一步定義業務方法,即須要被代理的目標對象:

public interface StudentJDKService {
    public void addStudent(String name);
    public void updateStudent(String name);
}

public class StudentJDKServiceImpl implements StudentJDKService {
  
    @Override
    public void addStudent(String name) {
      System.out.println("add student=" + name);
	}

	@Override
	public void updateStudent(String name) {
		System.out.println("update student=" + name);
	}
}
複製代碼

第二步定義一個事務代理對象:

public class TransactionInvocationHandler implements InvocationHandler {

	private Object target;

	public TransactionInvocationHandler(Object target) {
		this.target = target;
	}

	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
		System.out.println("------前置通知------");
		Object rs = method.invoke(target, args);
		System.out.println("------後置通知------");
		return rs;
	}
}
複製代碼

第三步定義代理工廠:

public class ProxyFactory {

	public Object getProxy(Object target, InvocationHandler handler) {
		ClassLoader loader = this.getClass().getClassLoader();
		Class<?>[] interfaces = target.getClass().getInterfaces();
		Object proxy = Proxy.newProxyInstance(loader, interfaces, handler);
		return proxy;
	}
}
複製代碼

第四步進行測試:

public class ZTest {

	public static void main(String[] args) throws Exception {
		testSimple();
	}

	public static void testSimple() {
		StudentJDKService target = new StudentJDKServiceImpl();
		TransactionInvocationHandler handler = new TransactionInvocationHandler(target);
		ProxyFactory proxyFactory = new ProxyFactory();
		Object proxy = proxyFactory.getProxy(target, handler);
		StudentJDKService studentService = (StudentJDKService) proxy;
		studentService.addStudent("JAVA前線");
	}
}
複製代碼

ProxyGenerator.generateProxyClass是生成字節碼文件核心方法,咱們看一看動態字節碼到底如何定義:

public class ZTest {

    public static void main(String[] args) throws Exception {
        createProxyClassFile();
    }

    public static void createProxyClassFile() {
        String name = "Student$Proxy";
        byte[] data = ProxyGenerator.generateProxyClass(name, new Class[] { StudentJDKService.class });
        FileOutputStream out = null;
        try {
            String fileName = "c:/test/" + name + ".class";
            File file = new File(fileName);
            out = new FileOutputStream(file);
            out.write(data);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {
            if (null != out) {
                try {
                    out.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
複製代碼

最終生成字節碼文件以下,咱們看到代理對象被織入了目標對象:

import com.xpz.dubbo.simple.jdk.StudentJDKService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;

public final class Student$Proxy extends Proxy implements StudentJDKService {
    private static Method m1;

    private static Method m2;

    private static Method m4;

    private static Method m3;

    private static Method m0;

    public Student$Proxy(InvocationHandler paramInvocationHandler) {
        super(paramInvocationHandler);
    }

    public final boolean equals(Object paramObject) {
        try {
            return ((Boolean)this.h.invoke(this, m1, new Object[] { paramObject })).booleanValue();
        } catch (Error | RuntimeException error) {
            throw null;
        } catch (Throwable throwable) {
            throw new UndeclaredThrowableException(throwable);
        }
    }

    public final String toString() {
        try {
            return (String)this.h.invoke(this, m2, null);
        } catch (Error | RuntimeException error) {
            throw null;
        } catch (Throwable throwable) {
            throw new UndeclaredThrowableException(throwable);
        }
    }

    public final void updateStudent(String paramString) {
        try {
            this.h.invoke(this, m4, new Object[] { paramString });
            return;
        } catch (Error | RuntimeException error) {
            throw null;
        } catch (Throwable throwable) {
            throw new UndeclaredThrowableException(throwable);
        }
    }

    public final void addStudent(String paramString) {
        try {
            this.h.invoke(this, m3, new Object[] { paramString });
            return;
        } catch (Error | RuntimeException error) {
            throw null;
        } catch (Throwable throwable) {
            throw new UndeclaredThrowableException(throwable);
        }
    }

    public final int hashCode() {
        try {
            return ((Integer)this.h.invoke(this, m0, null)).intValue();
        } catch (Error | RuntimeException error) {
            throw null;
        } catch (Throwable throwable) {
            throw new UndeclaredThrowableException(throwable);
        }
    }

    static {
        try {
            m1 = Class.forName("java.lang.Object").getMethod("equals", new Class[] { Class.forName("java.lang.Object") });
            m2 = Class.forName("java.lang.Object").getMethod("toString", new Class[0]);
            m4 = Class.forName("com.xpz.dubbo.simple.jdk.StudentJDKService").getMethod("updateStudent", new Class[] { Class.forName("java.lang.String") });
            m3 = Class.forName("com.xpz.dubbo.simple.jdk.StudentJDKService").getMethod("addStudent", new Class[] { Class.forName("java.lang.String") });
            m0 = Class.forName("java.lang.Object").getMethod("hashCode", new Class[0]);
            return;
        } catch (NoSuchMethodException noSuchMethodException) {
            throw new NoSuchMethodError(noSuchMethodException.getMessage());
        } catch (ClassNotFoundException classNotFoundException) {
            throw new NoClassDefFoundError(classNotFoundException.getMessage());
        }
    }
}
複製代碼

2.2 DUBBO源碼應用

那麼在DUBBO源碼中動態代理是如何體現的呢?咱們知道消費者在消費方法時實際上執行的代理方法,這是消費者在refer時生成的代理方法。

代理工廠AbstractProxyFactory有兩個子類:

JdkProxyFactory
JavassistProxyFactory
複製代碼

經過下面源碼咱們能夠分析獲得DUBBO經過InvokerInvocationHandler代理了invoker對象:

public class JdkProxyFactory extends AbstractProxyFactory {

    @Override
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
    }
}

public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
}
複製代碼

InvokerInvocationHandler將參數信息封裝至RpcInvocation進行傳遞:

public class InvokerInvocationHandler implements InvocationHandler {
    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[JAVA前線], attachments={}]
        RpcInvocation rpcInvocation = createInvocation(method, args);
        return invoker.invoke(rpcInvocation).recreate();
    }

    private RpcInvocation createInvocation(Method method, Object[] args) {
        RpcInvocation invocation = new RpcInvocation(method, args);
        if (RpcUtils.hasFutureReturnType(method)) {
            invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true");
            invocation.setAttachment(Constants.ASYNC_KEY, "true");
        }
        return invocation;
    }
}
複製代碼

3 策略模式

在1995年出版的《設計模式:可複用面向對象軟件的基礎》給出了策略模式定義:

Define a family of algorithms, encapsulate each one, and make them interchangeable. Strategy lets the algorithm vary independently from clients that use it

定義一系列算法,封裝每個算法,並使它們能夠互換。策略模式可使算法的變化獨立於使用它們的客戶端代碼。

在設計模式原則中有一條開閉原則:對擴展開放,對修改關閉,我認爲這是設計模式中最重要設計原則:

(1) 當需求變化時應該經過擴展而不是經過修改已有代碼來實現變化,這樣就保證代碼的穩定性,避免牽一髮而動全身

(2) 擴展也不是隨意擴展,由於事先定義了算法,擴展也是根據算法擴展,體現了用抽象構建框架,用實現擴展細節

(3) 標準意義的二十三種設計模式說到底最終都是在遵循開閉原則


3.1 策略模式實例

假設咱們如今須要解析一段文本,這段文本有多是HTML也有多是TEXT,若是不使用策略模式應該怎麼寫呢?

public enum DocTypeEnum {
    HTML(1, "HTML"),
    TEXT(2, "TEXT");

    private int value;
    private String description;

    private DocTypeEnum(int value, String description) {
        this.value = value;
        this.description = description;
    }
    
    public int value() {  
        return value;  
    }    
}

public class ParserManager {

    public void parse(Integer docType, String content) {
        // 文本類型是HTML
        if(docType == DocTypeEnum.HTML.getValue()) {
            // 解析邏輯
        }
        // 文本類型是TEXT
        else if(docType == DocTypeEnum.TEXT.getValue()) {
            // 解析邏輯
        }
    }
}
複製代碼

這種寫法功能上沒有問題,可是當本文類型愈來愈多時,那麼parse方法將會愈來愈冗餘和複雜,if else代碼塊也會愈來愈多,因此咱們要使用策略模式。

第一步定義業務類型和業務實體:

public enum DocTypeEnum {
    HTML(1, "HTML"),
    TEXT(2, "TEXT");

    private int value;
    private String description;

    private DocTypeEnum(int value, String description) {
        this.value = value;
        this.description = description;
    }

    public int value() {
        return value;
    }
}

public class BaseModel {
    // 公共字段
}

public class HtmlContentModel extends BaseModel {
    // HTML自定義字段
}

public class TextContentModel extends BaseModel {
    // TEXT自定義字段
}
複製代碼

第二步定義策略:

public interface Strategy<T extends BaseModel> {
    public T parse(String sourceContent);
}

@Service
public class HtmlStrategy implements Strategy {

    @Override
    public HtmlContentModel parse(String sourceContent) {
        return new HtmlContentModel("html");
    }
}

@Service
public class TextStrategy implements Strategy {

    @Override
    public TextContentModel parse(String sourceContent) {
        return new TextContentModel("text");
    }
}
複製代碼

第三步定義策略工廠:

@Service
public class StrategyFactory implements InitializingBean {
    
    private Map<Integer,Strategy> strategyMap = new HashMap<>();  
    @Resource
    private Strategy<HtmlContentModel> htmlStrategy ;
    @Resource
    private Strategy<TextContentModel> textStrategy ;

    @Override
   public void afterPropertiesSet() throws Exception {
       strategyMap.put(RechargeTypeEnum.HTML.value(), htmlStrategy);			
       strategyMap.put(RechargeTypeEnum.TEXT.value(),textStrategy);
   }

   public Strategy getStrategy(int type) {
       return strategyMap.get(type);
   }
} 
複製代碼

第四步定義策略執行器:

@Service
public class StrategyExecutor<T extends BaseModel> {

    @Resource
    private StrategyFactory<T> strategyFactory;

    public T parse(String sourceContent, Integer type) {
        Strategy strategy = StrategyFactory.getStrategy(type);
        return strategy.parse(sourceContent);
    }
}
複製代碼

第五步執行測試用例:

public class Test {

    @Resource
    private StrategyExecutor  executor;

    @Test
    public void test() {
        // 解析HTML
        HtmlContentModel content1 = (HtmlContentModel) executor.parse("<a>測試內容</a>",  DocTypeEnum.HTML.value());
        System.out.println(content1);

        // 解析TEXT
        TextContentModel content2 = (TextContentModel)executor.calRecharge("測試內容",  DocTypeEnum.TEXT.value());
        System.out.println(content2);
    }
}
複製代碼

若是新增文本類型再擴展新策略便可,咱們再回顧策略模式定義會有更深的體會:定義一系列算法,封裝每個算法,並使它們能夠互換。策略模式可使算法的變化獨立於使用它們的客戶端代碼。


3.2 DUBBO源碼應用

在上述實例中咱們將策略存儲在map容器,咱們思考一下還有沒有其它地方能夠存儲策略?答案是配置文件。下面就要介紹SPI機制,我認爲這個機制在廣義上實現了策略模式。

SPI(Service Provider Interface)是一種服務發現機制,本質是將接口實現類的全限定名配置在文件中,並由服務加載器讀取配置文件加載實現類,這樣能夠在運行時動態爲接口替換實現類,咱們經過SPI機制能夠爲程序提供拓展功能。


3.2.1 JDK SPI

咱們首先分析JDK自身SPI機制,定義一個數據驅動接口並提供兩個驅動實現,最後經過serviceLoader加載驅動。

(1) 新建DataBaseDriver工程並定義接口

public interface DataBaseDriver {
    String connect(String hostIp);
}
複製代碼

(2) 打包這個工程爲JAR

<dependency>
  <groupId>com.javafont.spi</groupId>
  <artifactId>DataBaseDriver</artifactId>
  <version>1.0.0-SNAPSHOT</version>
</dependency>
複製代碼

(3) 新建MySQLDriver工程引用上述依賴並實現DataBaseDriver接口

import com.javafont.database.driver.DataBaseDriver;

public class MySQLDataBaseDriver implements DataBaseDriver {
    @Override
    public String connect(String hostIp) {
        return "MySQL DataBase Driver connect";
    }
}
複製代碼

(4) 在MySQLDriver項目新建文件

src/main/resources/META-INF/services/com.javafont.database.driver.DataBaseDriver
複製代碼

(5) 在上述文件新增以下內容

com.javafont.database.mysql.driver.MySQLDataBaseDriver
複製代碼

(6) 按照上述相同步驟建立工程OracleDriver

(7) 打包上述兩個項目

<dependency>
  <groupId>com.javafont.spi</groupId>
  <artifactId>MySQLDriver</artifactId>
  <version>1.0.0-SNAPSHOT</version>
</dependency>

<dependency>
  <groupId>com.javafont.spi</groupId>
  <artifactId>OracleDriver</artifactId>
  <version>1.0.0-SNAPSHOT</version>
</dependency>
複製代碼

(8) 新建測試項目引入上述MySQLDriver、OracleDriver

public class DataBaseConnector {
    public static void main(String[] args) {
        ServiceLoader<DataBaseDriver> serviceLoader = ServiceLoader.load(DataBaseDriver.class);
        Iterator<DataBaseDriver> iterator = serviceLoader.iterator();
        while (iterator.hasNext()) {
            DataBaseDriver driver = iterator.next();
            System.out.println(driver.connect("localhost"));
        }
    }
}

// 輸出結果
// MySQL DataBase Driver connect
// Oracle DataBase Driver connect
複製代碼

咱們並無指定使用哪一個驅動進行數據庫鏈接,而是經過ServiceLoader方式加載全部實現了DataBaseDriver接口的實現類。假設咱們只須要使用MySQL數據庫驅動那麼直接引入相應依賴便可。


3.2.2 DUBBO SPI

咱們發現JDK SPI機制仍是有一些不完善之處,例如經過ServiceLoader會加載全部實現了某個接口的實現類,不能經過一個key去指定獲取某個實現類,可是DUBBO本身實現的SPI機制解決了這個問題。

例如Protocol接口有以下實現類:

org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
複製代碼

咱們如今要將這些類配置在配置文件,配置文件在以下目錄:

META-INF/services/
META-INF/dubbo/
META-INF/dubbo/internal/
複製代碼

配置方式和JDK SPI方式配置不同,每一個實現類都有key與之對應:

dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
複製代碼

使用時經過擴展點方式加載實現類:

public class ReferenceConfig<T> extends AbstractReferenceConfig {

    private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

    private T createProxy(Map<String, String> map) {
        if (isJvmRefer) {
            URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        }
    }
}
複製代碼

getAdaptiveExtension()是加載自適應擴展點,javassist會爲自適應擴展點生成動態代碼:

public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {

    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg1 == null)
            throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}
複製代碼

extension對象就是根據url中protocol屬性爲Injvm最終加載InjvmProtocol對象,這就獲取到了咱們制定的業務對象。因此我認爲SPI體現了策略模式。


4 裝飾器模式

裝飾器模式能夠動態將責任附加到對象上,在不改變原始類接口狀況下,對原始類功能進行加強,而且支持多個裝飾器的嵌套使用。實現裝飾器模式須要如下組件:

(1) Component(抽象構件)

核心業務抽象:可使用接口或者抽象類

(2) ConcreteComponent(具體構件)

實現核心業務:最終執行的業務代碼

(3) Decorator(抽象裝飾器)

抽象裝飾器類:實現Component而且組合一個Component對象

(4) ConcreteDecorator(具體裝飾器)

具體裝飾內容:裝飾核心業務代碼


4.1 裝飾器實例

有一名足球運動員要去踢球,咱們用球鞋和球襪爲他裝飾一番,這樣可使其戰力值增長,咱們使用裝飾器模式實現這個實例。

(1) Component

/** * 抽象構件(能夠用接口替代) */
public abstract class Component {

    /** * 踢足球(業務核心方法) */
    public abstract void playFootBall();
}
複製代碼

(2) ConcreteComponent

/** * 具體構件 */
public class ConcreteComponent extends Component {

    @Override
    public void playFootBall() {
        System.out.println("球員踢球");
    }
}
複製代碼

(3) Decorator

/** * 抽象裝飾器 */
public abstract class Decorator extends Component {
    private Component component = null;

    public Decorator(Component component) {
        this.component = component;
    }

    @Override
    public void playFootBall() {
        this.component.playFootBall();
    }
}
複製代碼

(4) ConcreteDecorator

/** * 球襪裝飾器 */
public class ConcreteDecoratorA extends Decorator {

    public ConcreteDecoratorA(Component component) {
        super(component);
    }

    /** * 定義球襪裝飾邏輯 */
    private void decorateMethod() {
        System.out.println("換上球襪戰力值增長");
    }

    /** * 重寫父類方法 */
    @Override
    public void playFootBall() {
        this.decorateMethod();
        super.playFootBall();
    }
}

/** * 球鞋裝飾器 */
public class ConcreteDecoratorB extends Decorator {

    public ConcreteDecoratorB(Component component) {
        super(component);
    }

    /** * 定義球鞋裝飾邏輯 */
    private void decorateMethod() {
        System.out.println("換上球鞋戰力值增長");
    }

    /** * 重寫父類方法 */
    @Override
    public void playFootBall() {
        this.decorateMethod();
        super.playFootBall();
    }
}
複製代碼

(5) 運行測試

public class TestDecoratorDemo {
    public static void main(String[] args) {
        Component component = new ConcreteComponent();
        component = new ConcreteDecoratorA(component);
        component = new ConcreteDecoratorB(component);
        component.playFootBall();
    }
}

// 換上球鞋戰力值增長
// 換上球襪戰力值增長
// 球員踢球
複製代碼

4.2 DUBBO源碼應用

DUBBO是經過SPI機制實現裝飾器模式,咱們以Protocol接口進行分析,首先分析裝飾器類,抽象裝飾器核心要點是實現了Component而且組合一個Component對象。

public class ProtocolFilterWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolFilterWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }
}

public class ProtocolListenerWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolListenerWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }
}
複製代碼

在配置文件中配置裝飾器:

filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
複製代碼

經過SPI加載擴展點時會使用裝飾器裝飾具體構件:

public class ReferenceConfig<T> extends AbstractReferenceConfig {

    private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

    private T createProxy(Map<String, String> map) {
        if (isJvmRefer) {
            URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        }
    }
}
複製代碼

最終生成refprotocol爲以下對象:

ProtocolFilterWrapper(ProtocolListenerWrapper(InjvmProtocol))
複製代碼

5 責任鏈模式

責任鏈模式將請求發送和接收解耦,讓多個接收對象都有機會處理這個請求。這些接收對象串成一條鏈路並沿着這條鏈路傳遞這個請求,直到鏈路上某個接收對象可以處理它。咱們介紹責任鏈模式兩種應用場景和四種代碼實現方式,最後介紹了DUBBO如何應用責任鏈構建過濾器鏈路。


5.1 應用場景:命中當即中斷

咱們實現一個關鍵詞過濾功能。系統設置三個關鍵詞過濾器,輸入內容命中任何一個過濾器規則就返回校驗不經過,鏈路當即中斷無需繼續進行。

(1) 實現方式一

public interface ContentFilter {
    public boolean filter(String content);
}

public class AaaContentFilter implements ContentFilter {
    private final static String KEY_CONTENT = "aaa";

    @Override
    public boolean filter(String content) {
        boolean isValid = Boolean.FALSE;
        if (StringUtils.isEmpty(content)) {
            return isValid;
        }
        isValid = !content.contains(KEY_CONTENT);
        return isValid;
    }
}

public class BbbContentFilter implements ContentFilter {
    private final static String KEY_CONTENT = "bbb";

    @Override
    public boolean filter(String content) {
        boolean isValid = Boolean.FALSE;
        if (StringUtils.isEmpty(content)) {
            return isValid;
        }
        isValid = !content.contains(KEY_CONTENT);
        return isValid;
    }
}

public class CccContentFilter implements ContentFilter {
    private final static String KEY_CONTENT = "ccc";

    @Override
    public boolean filter(String content) {
        boolean isValid = Boolean.FALSE;
        if (StringUtils.isEmpty(content)) {
            return isValid;
        }
        isValid = !content.contains(KEY_CONTENT);
        return isValid;
    }
}
複製代碼

具體過濾器已經完成,下面構造過濾器責任鏈路:

@Service
public class ContentFilterChain {
    private List<ContentFilter> filters = new ArrayList<ContentFilter>();

    @PostConstruct
    public void init() {
        ContentFilter aaaContentFilter = new AaaContentFilter();
        ContentFilter bbbContentFilter = new BbbContentFilter();
        ContentFilter cccContentFilter = new CccContentFilter();
        filters.add(aaaContentFilter);
        filters.add(bbbContentFilter);
        filters.add(cccContentFilter);
    }

    public void addFilter(ContentFilter filter) {
        filters.add(filter);
    }

    public boolean filter(String content) {
        if (CollectionUtils.isEmpty(filters)) {
            throw new RuntimeException("ContentFilterChain is empty");
        }
        for (ContentFilter filter : filters) {
            boolean isValid = filter.filter(content);
            if (!isValid) {
                System.out.println("校驗不經過");
                return isValid;
            }
        }
        return Boolean.TRUE;
    }
}

public class Test {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" });
        ContentFilterChain chain = (ContentFilterChain) context.getBean("contentFilterChain");
        System.out.println(context);
        boolean result1 = chain.filter("ccc");
        boolean result2 = chain.filter("ddd");
        System.out.println("校驗結果1=" + result1);
        System.out.println("校驗結果2=" + result2);
    }
}
複製代碼

(2) 實現方式二

public abstract class FilterHandler {

    /** 下一個節點 **/
    protected FilterHandler successor = null;

    public void setSuccessor(FilterHandler successor) {
        this.successor = successor;
    }

    public final boolean filter(String content) {
        /** 執行自身方法 **/
        boolean isValid = doFilter(content);
        if (!isValid) {
            System.out.println("校驗不經過");
            return isValid;
        }
        /** 執行下一個節點鏈路 **/
        if (successor != null && this != successor) {
            isValid = successor.filter(content);
        }
        return isValid;
    }
    /** 每一個節點過濾方法 **/
    protected abstract boolean doFilter(String content);
}

public class AaaContentFilterHandler extends FilterHandler {
    private final static String KEY_CONTENT = "aaa";

    @Override
    protected boolean doFilter(String content) {
        boolean isValid = Boolean.FALSE;
        if (StringUtils.isEmpty(content)) {
            return isValid;
        }
        isValid = !content.contains(KEY_CONTENT);
        return isValid;
    }
}

// 省略其它過濾器代碼
複製代碼

具體過濾器已經完成,下面構造過濾器責任鏈路:

@Service
public class FilterHandlerChain {
    private FilterHandler head = null;
    private FilterHandler tail = null;

    @PostConstruct
    public void init() {
        FilterHandler aaaHandler = new AaaContentFilterHandler();
        FilterHandler bbbHandler = new BbbContentFilterHandler();
        FilterHandler cccHandler = new CccContentFilterHandler();
        addHandler(aaaHandler);
        addHandler(bbbHandler);
        addHandler(cccHandler);
    }

    public void addHandler(FilterHandler handler) {
        if (head == null) {
            head = tail = handler;
        }
        /** 設置當前tail繼任者 **/
        tail.setSuccessor(handler);

        /** 指針從新指向tail **/
        tail = handler;
    }

    public boolean filter(String content) {
        if (null == head) {
            throw new RuntimeException("FilterHandlerChain is empty");
        }
        /** head發起調用 **/
        return head.filter(content);
    }
}

public class Test {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" });
        FilterHandlerChain chain = (FilterHandlerChain) context.getBean("filterHandlerChain");
        System.out.println(context);
        boolean result1 = chain.filter("ccc");
        boolean result2 = chain.filter("ddd");
        System.out.println("校驗結果1=" + result1);
        System.out.println("校驗結果2=" + result2);
    }
}
複製代碼

5.2 應用場景:全鏈路執行

咱們實現一個考題生成功能。在線考試系統根據不一樣年級生成不一樣考題。系統設置三個考題生成器,每一個生成器都會執行,根據學生年級決定是否生成考題,無需生成則執行下一個生成器。

(1) 實現方式一

public interface QuestionGenerator {
    public Question generateQuestion(String gradeInfo);
}

public class AaaQuestionGenerator implements QuestionGenerator {

    @Override
    public Question generateQuestion(String gradeInfo) {
        if (!gradeInfo.equals("一年級")) {
            return null;
        }
        Question question = new Question();
        question.setId("aaa");
        question.setScore(10);
        return question;
    }
}

// 省略其它生成器代碼
複製代碼

具體生成器已經編寫完成,下面構造生成器責任鏈路:

@Service
public class QuestionChain {
    private List<QuestionGenerator> generators = new ArrayList<QuestionGenerator>();

    @PostConstruct
    public void init() {
        QuestionGenerator aaaQuestionGenerator = new AaaQuestionGenerator();
        QuestionGenerator bbbQuestionGenerator = new BbbQuestionGenerator();
        QuestionGenerator cccQuestionGenerator = new CccQuestionGenerator();
        generators.add(aaaQuestionGenerator);
        generators.add(bbbQuestionGenerator);
        generators.add(cccQuestionGenerator);
    }

    public List<Question> generate(String gradeInfo) {
        if (CollectionUtils.isEmpty(generators)) {
            throw new RuntimeException("QuestionChain is empty");
        }
        List<Question> questions = new ArrayList<Question>();
        for (QuestionGenerator generator : generators) {
            Question question = generator.generateQuestion(gradeInfo);
            if (null == question) {
                continue;
            }
            questions.add(question);
        }
        return questions;
    }
}

public class Test {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" });
        System.out.println(context);
        QuestionChain chain = (QuestionChain) context.getBean("questionChain");
        List<Question> questions = chain.generate("一年級");
        System.out.println(questions);
    }
}
複製代碼

(2) 實現方式二

public abstract class GenerateHandler {

    /** 下一個節點 **/
    protected GenerateHandler successor = null;

    public void setSuccessor(GenerateHandler successor) {
        this.successor = successor;
    }

    public final List<Question> generate(String gradeInfo) {
        List<Question> result = new ArrayList<Question>();

        /** 執行自身方法 **/
        Question question = doGenerate(gradeInfo);
        if (null != question) {
            result.add(question);
        }

        /** 執行下一個節點鏈路 **/
        if (successor != null && this != successor) {
            List<Question> successorQuestions = successor.generate(gradeInfo);
            if (null != successorQuestions) {
                result.addAll(successorQuestions);
            }
        }
        return result;
    }
    /** 每一個節點生成方法 **/
    protected abstract Question doGenerate(String gradeInfo);
}

public class AaaGenerateHandler extends GenerateHandler {

    @Override
    protected Question doGenerate(String gradeInfo) {
        if (!gradeInfo.equals("一年級")) {
            return null;
        }
        Question question = new Question();
        question.setId("aaa");
        question.setScore(10);
        return question;
    }
}

// 省略其它生成器代碼
複製代碼

具體生成器已經完成,下面構造生成器責任鏈路:

@Service
public class GenerateChain {
    private GenerateHandler head = null;
    private GenerateHandler tail = null;

    @PostConstruct
    public void init() {
        GenerateHandler aaaHandler = new AaaGenerateHandler();
        GenerateHandler bbbHandler = new BbbGenerateHandler();
        GenerateHandler cccHandler = new CccGenerateHandler();
        addHandler(aaaHandler);
        addHandler(bbbHandler);
        addHandler(cccHandler);
    }

    public void addHandler(GenerateHandler handler) {
        if (head == null) {
            head = tail = handler;
        }
        /** 設置當前tail繼任者 **/
        tail.setSuccessor(handler);

        /** 指針從新指向tail **/
        tail = handler;
    }

    public List<Question> generate(String gradeInfo) {
        if (null == head) {
            throw new RuntimeException("GenerateChain is empty");
        }
        /** head發起調用 **/
        return head.generate(gradeInfo);
    }
}

public class Test {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" });
        GenerateChain chain = (GenerateChain) context.getBean("generateChain");
        System.out.println(context);
        List<Question> result = chain.generate("一年級");
        System.out.println(result);
    }
}
複製代碼

5.3 DUBBO源碼應用

生產者和消費者最終執行對象都是過濾器鏈路最後一個節點,整個鏈路包含多個過濾器進行業務處理。咱們看看生產者和消費者默認過濾器鏈路。

生產者過濾器鏈路
EchoFilter > ClassloaderFilter > GenericFilter > ContextFilter > TraceFilter > TimeoutFilter > MonitorFilter > ExceptionFilter > AbstractProxyInvoker

消費者過濾器鏈路
ConsumerContextFilter > FutureFilter > MonitorFilter > DubboInvoker
複製代碼

ProtocolFilterWrapper做爲鏈路生成核心經過匿名類方式構建過濾器鏈路,咱們以消費者構建過濾器鏈路爲例:

public class ProtocolFilterWrapper implements Protocol {
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {

        // invoker = DubboInvoker
        Invoker<T> last = invoker;

        // 查詢符合條件過濾器列表
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;

                // 構造一個簡化Invoker
                last = new Invoker<T>() {
                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        // 構造過濾器鏈路
                        Result result = filter.invoke(next, invocation);
                        if (result instanceof AsyncRpcResult) {
                            AsyncRpcResult asyncResult = (AsyncRpcResult) result;
                            asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
                            return asyncResult;
                        } else {
                            return filter.onResponse(result, invoker, invocation);
                        }
                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // RegistryProtocol不構造過濾器鏈路
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        Invoker<T> invoker = protocol.refer(type, url);
        return buildInvokerChain(invoker, Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }
}
複製代碼

6 保護性暫停模式

在多線程編程實踐中咱們確定會面臨線程間數據交互的問題。在處理這類問題時須要使用一些設計模式,從而保證程序的正確性和健壯性。

保護性暫停設計模式就是解決多線程間數據交互問題的一種模式。本文先從基礎案例介紹保護性暫停基本概念和實踐,再由淺入深,最終分析DUBBO源碼中保護性暫停設計模式使用場景。


6.1 保護性暫停實例

咱們設想這樣一種場景:線程A生產數據,線程B讀取數據這個數據。

可是有一種狀況:線程B準備讀取數據時,此時線程A尚未生產出數據。

在這種狀況下線程B不能一直空轉,也不能當即退出,線程B要等到生產數據完成並拿到數據以後才退出。

那麼在數據沒有生產出這段時間,線程B須要執行一種等待機制,這樣能夠達到對系統保護目的,這就是保護性暫停。

保護性暫停有多種實現方式,本文咱們用synchronized/wait/notify的方式實現。

class Resource {
    private MyData data;
    private Object lock = new Object();

    public MyData getData(int timeOut) {
        synchronized (lock) {
            // 運行時長
            long timePassed = 0;
            // 開始時間
            long begin = System.currentTimeMillis();
            // 若是結果爲空
            while (data == null) {
                try {
                    // 若是運行時長大於超時時間退出循環
                    if (timePassed > timeOut) {
                        break;
                    }
                    // 若是運行時長小於超時時間表示虛假喚醒 -> 只需再等待時間差值
                    long waitTime = timeOut - timePassed;

                    // 等待時間差值
                    lock.wait(waitTime);

                    // 結果不爲空直接返回
                    if (data != null) {
                        break;
                    }
                    // 被喚醒後計算運行時長
                    timePassed = System.currentTimeMillis() - begin;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (data == null) {
                throw new RuntimeException("超時未獲取到結果");
            }
            return data;
        }
    }

    public void sendData(MyData data) {
        synchronized (lock) {
            this.data = data;
            lock.notifyAll();
        }
    }
}

/** * 保護性暫停實例 */
public class ProtectDesignTest {

    public static void main(String[] args) {
        Resource resource = new Resource();
        new Thread(() -> {
            try {
                MyData data = new MyData("hello");
                System.out.println(Thread.currentThread().getName() + "生產數據=" + data);
                // 模擬發送耗時
                TimeUnit.SECONDS.sleep(3);
                resource.sendData(data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1").start();

        new Thread(() -> {
            MyData data = resource.getData(1000);
            System.out.println(Thread.currentThread().getName() + "接收到數據=" + data);
        }, "t2").start();
    }
}
複製代碼

6.2 加一個編號

如今再來設想一個場景:如今有三個生產數據的線程一、二、3,三個獲取數據的線程四、五、6,咱們但願每一個獲取數據線程都只拿到其中一個生產線程的數據,不能多拿也不能少拿。

這裏引入一個Futures模型,這個模型爲每一個資源進行編號並存儲在容器中,例如線程1生產的數據被拿走則從容器中刪除,一直到容器爲空結束。

@Getter
@Setter
public class MyNewData implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final AtomicLong ID = new AtomicLong(0);
    private Long id;
    private String message;

    public MyNewData(String message) {
        this.id = newId();
        this.message = message;
    }

    /** * 自增到最大值會回到最小值(負值能夠做爲識別ID) */
    private static long newId() {
        return ID.getAndIncrement();
    }

    public Long getId() {
        return this.id;
    }
}

class MyResource {
    private MyNewData data;
    private Object lock = new Object();

    public MyNewData getData(int timeOut) {
        synchronized (lock) {
            long timePassed = 0;
            long begin = System.currentTimeMillis();
            while (data == null) {
                try {
                    if (timePassed > timeOut) {
                        break;
                    }
                    long waitTime = timeOut - timePassed;
                    lock.wait(waitTime);
                    if (data != null) {
                        break;
                    }
                    timePassed = System.currentTimeMillis() - begin;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (data == null) {
                throw new RuntimeException("超時未獲取到結果");
            }
            return data;
        }
    }

    public void sendData(MyNewData data) {
        synchronized (lock) {
            this.data = data;
            lock.notifyAll();
        }
    }
}

class MyFutures {
    private static final Map<Long, MyResource> FUTURES = new ConcurrentHashMap<>();

    public static MyResource newResource(MyNewData data) {
        final MyResource future = new MyResource();
        FUTURES.put(data.getId(), future);
        return future;
    }

    public static MyResource getResource(Long id) {
        return FUTURES.remove(id);
    }

    public static Set<Long> getIds() {
        return FUTURES.keySet();
    }
}


/** * 保護性暫停實例 */
public class ProtectDesignTest {

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 3; i++) {
            final int index = i;
            new Thread(() -> {
                try {
                    MyNewData data = new MyNewData("hello_" + index);
                    MyResource resource = MyFutures.newResource(data);
                    // 模擬發送耗時
                    TimeUnit.SECONDS.sleep(1);
                    resource.sendData(data);
                    System.out.println("生產數據data=" + data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }).start();
        }
        TimeUnit.SECONDS.sleep(1);

        for (Long i : MyFutures.getIds()) {
            final long index = i;
            new Thread(() -> {
                MyResource resource = MyFutures.getResource(index);
                int timeOut = 3000;
                System.out.println("接收數據data=" + resource.getData(timeOut));
            }).start();
        }
    }
}
複製代碼

6.3 DUBBO源碼應用

咱們順着這一個鏈路跟蹤代碼:消費者發送請求 > 提供者接收請求並執行,而且將運行結果發送給消費者 >消費者接收結果。

(1) 消費者發送請求

消費者發送的數據包含請求ID,而且將關係維護進FUTURES容器

final class HeaderExchangeChannel implements ExchangeChannel {

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
}

class DefaultFuture implements ResponseFuture {

    // FUTURES容器
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();

    private DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        // 請求ID
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
}

複製代碼

(2) 提供者接收請求並執行,而且將運行結果發送給消費者

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        // response與請求ID對應
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
            Object data = req.getData();
            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable) data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);
            channel.send(res);
            return;
        }
        // message = RpcInvocation包含方法名、參數名、參數值等
        Object msg = req.getData();
        try {

            // DubboProtocol.reply執行實際業務方法
            CompletableFuture<Object> future = handler.reply(channel, msg);

            // 若是請求已經完成則發送結果
            if (future.isDone()) {
                res.setStatus(Response.OK);
                res.setResult(future.get());
                channel.send(res);
                return;
            }
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }
}
複製代碼

(3) 消費者接收結果

如下DUBBO源碼很好體現了保護性暫停這個設計模式,說明參看註釋

class DefaultFuture implements ResponseFuture {
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();

    public static void received(Channel channel, Response response) {
        try {
            // 取出對應的請求對象
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                            + ", response " + response
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                               + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }


    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {

                    // 放棄鎖並使當前線程阻塞,直到發出信號中斷它或者達到超時時間
                    done.await(timeout, TimeUnit.MILLISECONDS);

                    // 阻塞結束後再判斷是否完成
                    if (isDone()) {
                        break;
                    }

                    // 阻塞結束後判斷是否超時
                    if(System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // response對象仍然爲空則拋出超時異常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

    private void doReceived(Response res) {
        lock.lock();
        try {
            // 接收到服務器響應賦值response
            response = res;
            if (done != null) {
                // 喚醒get方法中處於等待的代碼塊
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
}
複製代碼

7 雙重檢查鎖模式

單例設計模式能夠保證在整個應用某個類只能存在一個對象實例,而且這個類只提供一個取得其對象實例方法,一般這個對象建立和銷燬比較消耗資源,例如數據庫鏈接對象等等。咱們分析一個雙重檢查鎖實現的單例模式實例。

public class MyDCLConnection {
    private static volatile MyDCLConnection myConnection = null;

    private MyDCLConnection() {
        System.out.println(Thread.currentThread().getName() + " -> init connection");
    }

    public static MyDCLConnection getConnection() {
        if (null == myConnection) {
            synchronized (MyDCLConnection.class) {
                if (null == myConnection) {
                    myConnection = new MyDCLConnection();
                }
            }
        }
        return myConnection;
    }
}
複製代碼

在DUBBO服務本地暴露時使用了雙重檢查鎖模式判斷exporter是否已經存在避免重複建立:

public class RegistryProtocol implements Protocol {

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl);
                    final Exporter<T> strongExporter = (Exporter<T>) protocol.export(invokerDelegete);
                    exporter = new ExporterChangeableWrapper<T>(strongExporter, originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }
}
複製代碼

8 文章總結

本文咱們結合DUBBO源碼分析了模板方法模式、動態代理模式、策略模式、裝飾器模式、責任鏈模式、保護性暫停模式、雙重檢查鎖模式,我認爲在閱讀源碼時要學習其中優秀的設計模式和代碼片斷,這樣才能夠提升代碼水平,但願本文對你們有所幫助。

歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我微信「java_front」一塊兒交流學習

相關文章
相關標籤/搜索