簡易RPC框架-上下文

上下文

記的學英語的時候,老是不記的某個詞是什麼意思,而後就看不下去了,只能問周圍的同窗或者老師或者去查詞典,他們的建議是經過上下文去推測這個詞大概的意思,反正我那會上學時沒理解,因此英文一直比較差。html

如今英語水平也沒提升多少,儘管有點領會。java

後來慢慢理解了一些,由於有些詞有不少種意思,放在某個場景下多是一個意思,放在另一個場景下又是其它的意思,這裏不舉例子了,上文有必定的類似度。git

RPC客戶端上下文

客戶端因爲是一個獨立的環境,因此能夠認爲它處於一個屬於本身的上下文中,與其它的端隔離。在上下文中能夠指定一些公共的參數來提供給接口使用,好比:github

RPC版本號
客戶端請求ID
各種自定義的公共參數web

RPC服務端上下文

服務端同理,也處於一個屬於本身的上下文中,與其它端隔離。canvas

RPC上下文的做用

基本線程級別的訪問,讓客戶端或者服務端可以像訪問本地變量同樣訪問RPC框架級別的變量。好比咱們想將客戶端的一個請求ID傳遞給服務端,這個請求ID做用於全部接口,好比RPC的調用鏈追蹤,有兩種方式:ruby

接口中增長請求ID參數markdown

這個方案顯然是不能接受的,由於須要改的接口過多。多線程

接口不改的狀況下,在RPC框架中提供一個上下文,其中包含請求IDapp

這個方案顯然成本最小,好比這種樣調用:RpcContext.getRequestId();

 

  • 上圖中的Context是指RPC框架級變量的一個本地副本,爲了簡化調用複雜度。
  • 上圖中的Invocation是RPC框架級的變量,它與上面提到的Context相互配合,作到調用端與框架自己的解耦合

實現

定義上下文對象

在RpcContext對象中增長一個map類型的參數對象,能夠存聽任意擴展的參數。

public class RpcContext {

    private Map<String,Object> contextParameters;

    public Map<String, Object> getContextParameters() {
        return contextParameters;
    }

    public void setContextParameters(Map<String, Object> contextParameters) {
        this.contextParameters = contextParameters;
    }

    private static final ThreadLocal<RpcContext> rpcContextThreadLocal=new ThreadLocal<RpcContext>(){
        @Override
        protected RpcContext initialValue() {
            RpcContext context= new RpcContext();
            context.setContextParameters(Maps.newHashMap());
            return context;
        }
    };

    public static RpcContext getContext() {
        return rpcContextThreadLocal.get();
    }

    public static void removeContext() {
        rpcContextThreadLocal.remove();
    }

    private RpcContext() {
    }
}

RPC請求對象中增長上下文參數

RpcRequest增長以下字段,用於服務端調用。

private Map<String,Object> contextParameters;

RpcInvocation接口中增長上下文參數

在後續新增長的過濾器使用。

private Map<String,Object> contextParameters;

客戶端代理

RpcProxy在組裝RpcRequest對象時,從RpcContext中獲取最新的參數傳遞給RpcReuest,從而傳遞給服務端。

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        RpcRequest request = new RpcRequest();
        //...

        request.setContextParameters(RpcContext.getContext().getContextParameters());

        //...
    }

客戶端上下文過濾器

主要做用就是從本線線程變量中獲取參數傳遞給RpcInvocation。

註解上的order屬性文章後面詳細介紹

@ActiveFilter(group = {ConstantConfig.CONSUMER},order = -1000)
public class ClientContextFilter implements RpcFilter {

    private final static Logger logger = LoggerFactory.getLogger(ClientContextFilter.class);

    @Override
    public Object invoke(RpcInvoker invoker, RpcInvocation invocation) {
        Map<String,Object> contextParameters=invocation.getContextParameters();
        if(null==contextParameters){
            contextParameters= Maps.newHashMap();
        }
        Map<String,Object> contextParametersFromRpcContext= RpcContext.getContext().getContextParameters();
        if(null!=contextParametersFromRpcContext) {
            contextParameters.putAll(contextParametersFromRpcContext);
        }
        Object rpcResponse=invoker.invoke(invocation);
        logger.info("ClientContextFilter.invoke end");
        return rpcResponse;
    }
}

服務端上下文過濾器

服務端上下文過濾器與客戶端的做用相反,是從RpcInvocation中獲取參數傳遞給本地線程變量RpcContext,後面在執行服務端方法時就能夠方便的經過RpcContext獲取指定變量。

@ActiveFilter(group = {ConstantConfig.PROVIDER},order = -1000)
public class ServerContextFilter implements RpcFilter {

    private final static Logger logger = LoggerFactory.getLogger(ServerContextFilter.class);

    @Override
    public Object invoke(RpcInvoker invoker, RpcInvocation invocation) {
        Map<String,Object> contextParameters=invocation.getContextParameters();
        RpcContext.getContext().setContextParameters(contextParameters);
        Object rpcResponse=invoker.invoke(invocation);
        logger.info("ServerContextFilter.invoke end");
        return rpcResponse;
    }
}

過濾器排序

由於咱們的RpcContext是個本地線程變量,並且Rpc服務端是多線程處理業務,因此須要在請求結束後及時的清理掉相關本地線程變量信息。這就須要清理上下文的過濾動做在最後執行,不然有會出現服務端方法尚未執行就被清空了參數。建立一個工具類,專門用來處理獲取客戶端以及服務端過濾器。

過濾器註解增長排序屬性

增長order字段,升級排列。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ActiveFilter {
    String[] group() default {};
    String[] value() default {};
    int order() default 999999999;
}

過濾器工具類

建立ActiveFilterUtil,包含下面兩個函數。

過濾器排序函數

獲取特定註解的類,而後根據註解上的排序屬性升序排序。

private static List<Object> getActiveFilter(){
    List<Object> rpcFilterList= Lists.newArrayList();
    Map<String, Object> rpcFilterMapObject = ApplicationContextUtils.getApplicationContext().getBeansWithAnnotation(ActiveFilter.class);
    if (null!=rpcFilterMapObject) {
        rpcFilterList = Lists.newArrayList(rpcFilterMapObject.values());
        Collections.sort(rpcFilterList, new Comparator<Object>() {
            @Override
            public int compare(Object o1, Object o2) {
                ActiveFilter activeFilterO1 = o1.getClass().getAnnotation(ActiveFilter.class);
                ActiveFilter activeFilterO2 = o2.getClass().getAnnotation(ActiveFilter.class);
                return activeFilterO1.order() > activeFilterO2.order() ? 1 : -1;
            }
        });
    }
    return rpcFilterList;
}

獲取RPC過濾器列表

提供給客戶端以及服務端的一個協助方法,便於客戶端以及服務構建過濾器職責鏈。

public static Map<String,RpcFilter> getFilterMap(boolean isServer){
    List<Object> rpcFilterList=getActiveFilter();
    Map<String,RpcFilter> filterMap=new HashMap<>();
    for (Object filterBean : rpcFilterList) {
        Class<?>[] interfaces = filterBean.getClass().getInterfaces();
        ActiveFilter activeFilter=filterBean.getClass().getAnnotation(ActiveFilter.class);
        String includeFilterGroupName=!isServer?ConstantConfig.CONSUMER:ConstantConfig.PROVIDER;
        if(null!=activeFilter.group()&& Arrays.stream(activeFilter.group()).filter(p->p.contains(includeFilterGroupName)).count()==0){
            continue;
        }
        for(Class<?> clazz:interfaces) {
            if(clazz.isAssignableFrom(RpcFilter.class)){
                filterMap.put(filterBean.getClass().getName(),(RpcFilter) filterBean);
            }
        }
    }
    return filterMap;
}

客戶端以及服務端初始化

獲取過濾器map的邏輯改成調用上面ActiveFilterUtil.getFilterMap方法。

使用示例

客戶端設置參數

設置一個RPC版本號的參數。

@RequestMapping("/{productId}")
    public Product getById(@PathVariable final long productId) throws UnknownHostException {
        RpcContext.getContext().addContextParameter("rpc-version","1.0");
        //...
    }

服務端獲取參數

簡單的在服務端打印出RPC版本號。

 logger.info("get context parameter from server,rpc-version={}",String.valueOf(RpcContext.getContext().getContextParameter("rpc-version")));

輸出日誌以下:

本文源碼

https://github.com/jiangmin168168/jim-framework

文中代碼是依賴上述項目的,若是有不明白的可下載源碼

相關文章
相關標籤/搜索