簡易RPC框架-過濾器機制

過濾器

字面義上理解的過濾器相似下圖,從一堆物品中篩選出符合條件的留下,不符合的丟棄。css

GOF 職責鏈

GOF中有一種設計模式叫職責鏈,或者叫責任鏈,常規的UML圖以下:java

正統的職責鏈是將一個請求發給第一個接收者,接收者判斷是否屬於本身能處理的,若是能處理則執行操做並停止請求下發,流程到此爲止。若是不能處理則將請求下發給下一個接收者一直到最後一個接收者。git

變體職責鏈

上面提到正統的職責鏈有一個特色:當找到符合條件的執行者後流程停止並不會將請求繼續下發給後續的執行者。這類場景比較適合好比審覈操做,一個報銷單由主管,經理,CTO一級一級的審批不能越權。github

解耦合/細分web

在實際項目中,每每會遇到很是複雜的業務場景,有多是須要執行的方法特別多,也有多是由於須要執行的方法有可能事先不知道,須要在運行時才能判斷。若是將這些邏輯所有寫在一個類或者一個方法中就會出現這樣的問題:算法

  • 耦合度高,一個方法或者一個類須要關聯全部業務方法以及相關類
  • 不易擴展,須要執行的業務常常發生變化,若是每次變化都去修改統一的方法或者類,不符合開閉原則,維護成本很是高

因此就有了下圖,一堆須要執行的方法發給第一個接收者,接收者判斷哪些方法是本身能夠執行的,有執行的就執行,而後不管是否有可執行的方法在處理完成後都將請求繼續下發給後面的接收者。每一個接收者完成本身負責的內容,多個接收者完成了複雜任務的分解。後端

RPC 過濾器

RPC過濾器與Spring MVC中的Filter做用基本相同,其中很大一個做用就是動態的給客戶端或者是服務端增長切面功能,好比:設計模式

  • 權限控制
  • 加密解密
  • 訪問日誌
  • 限流控制
  • 併發控制
  • ......

下圖是我在RPC項目中實現過濾器機制的UML示意圖服務器

RpcFilter

定義一個過濾器接口,只包含一個invoke方法。網絡

public interface RpcFilter<T> {
    <T> T invoke(RpcInvoker invoker, RpcInvocation invocation);
}

RpcInvoker

這是RPC客戶端以及服務端執行服務端方法或者是將客戶端請求發送給服務端時須要調用的方法接口。

這個角色在Netty中也能夠叫Handle,這個接口與上面的RpcFilter有點相似,只是在RPC框架中體現的角色不一樣而已,具體看UML圖可知道二者關係。

public interface RpcInvoker {
    Object invoke(RpcInvocation invocation);
}

RpcServerInvoker

服務端的一個執行者實現,包含兩個核心功能:

構建職責鏈

  • this.filterMap,是經過註解獲取到的一組過濾器,此處不詳細講由於與本文關係不大
  • RpcInvoker的invoker方法實際調用的是RpcFilter的方法,並將自身實例傳遞給RpcFilter,目的是構建職責鏈的關聯關係
public RpcInvoker buildInvokerChain(final RpcInvoker invoker) {
    RpcInvoker last = invoker;
    List<RpcFilter> filters = Lists.newArrayList(this.filterMap.values());

    if (filters.size() > 0) {
        for (int i = filters.size() - 1; i >= 0; i --) {
            final RpcFilter filter = filters.get(i);
            final RpcInvoker next = last;
            last = new RpcInvoker() {
                @Override
                public Object invoke(RpcInvocation invocation) {
                    return filter.invoke(next, invocation);
                }
            };
        }
    }
    return last;
}

此處並無考慮職責鏈的排序,能夠經過過濾器的註解上增長排序數字來解決。目前我寫的過濾器註解中並無實現排序功能,能夠增長一個order的屬性,而後在須要指定順序的過濾器上增長對應屬性值來支持。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ActiveFilter {
    String[] group() default {};
    String[] value() default {};
}

發請求給職責鏈

服務端在讀取到客戶端的請求後,首先經過構建職責鏈獲得RpcInvoker,而後調用RpcInvoker的invoke方法將請求下發。

@Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage message) {

        this.executor.execute(new Runnable() {
            @Override
            public void run() {
                RpcInvoker rpcInvoker=....
                RpcResponse response=(RpcResponse) rpcInvoker.invoke......
    }

過濾器案例

通常HTTP請求在不一樣網絡角色中處理請求的能力會呈現爲一個漏斗型,越上層職責越輕,越往下層職責越重,所對應的就是越上層處理請求量越大,越下層處理請求量越小。好比負載均衡器只負責請求轉發而不負責具體的任務執行,然後端的Service服務器會執行大量的IO操做或者是消耗cpu的計算任務等,因此這二者在處理請求的量上每每是數量級的。

當出現大量請求時,爲了有效的保護後端服務的穩定性(儘可能不出現宕機),除了橫向擴展服務器外還能夠經過一些軟件手段緩解後端服務的壓力,這就是一般說的限流,本文由於須要簡單實現一個限制的過濾器,因此直接引用現成的限流算法:令牌桶。

下面是客戶端請求限流的一個簡單實現,客戶端在給服務端發起請求以前須要獲取令牌,若是獲取到則發送請求,若是獲取不到一直等待。固然爲了防止死鎖,能夠調用帶超時時間的獲取令牌方法。

@ActiveFilter(group = {ConstantConfig.CONSUMER})
public class AccessLimitFilter implements RpcFilter {

    private final static Logger logger = LoggerFactory.getLogger(AccessLimitFilter.class);

    @Override
    public Object invoke(RpcInvoker invoker, RpcInvocation invocation) {
        logger.info("before acquire");
        AccessLimitManager.acquire();

        Object rpcResponse=invoker.invoke(invocation);
        logger.info("after acquire");
        return rpcResponse;
    }

    static class AccessLimitManager{
        private final static RateLimiter rateLimiter=RateLimiter.create(2);

        public static void acquire(){
            rateLimiter.acquire();
        }
    }
}

實現的比較粗糙,桶的大小是寫死的,應該實現爲可配置型,後續抽空完善下。

本文源碼

https://github.com/jiangmin168168/jim-framework

文中代碼是依賴上述項目的,若是有不明白的可下載源碼

引用

本文中的圖取自於網格

相關文章
相關標籤/搜索