簡易RPC框架-熔斷降級機制

熔斷與降級

爲何在RPC環節中有熔斷以及降級的需求,詳細的緣由這裏很少解釋,從網上搜索一張圖作示意。javascript

熔斷

我理解熔段主要解決以下幾個問題:css

  • 當所依賴的對象不穩定時,可以起到快速失敗的目的
  • 快速失敗後,可以根據必定的算法動態試探所依賴對象是否恢復

好比產品詳細頁獲取產品的好評總數時,因爲後端服務異常致使客戶端每次都須要等到超時。若是短期內服務不能恢復,那麼這段時間內的全部請求時間都將是最大的超時時間,這類消費時間又得不到正確結果的現象是不能容忍的。因此遇到這類狀況,就須要根據必定的算法斷定服務短期不可用,將後面的請求進行快速失敗處理,這樣能夠節省服務等待時間。java

同時,後端服務是有可能自主或者人爲在必定時間內恢復的,因此以前被斷定爲快速失敗的服務,須要有能力去試探服務是否已經恢復。git

上面提到的快速失敗以及自主恢復現象就是熔斷github

降級

降級是指本身的待遇降低了,從RPC調用環節來說,就是去訪問一個本地的假裝者而不是真實的服務,但這對調用端來講是沒有區別的。拿電商展現某個產品的詳細頁來講:web

  • 當加載評論時,因爲評論服務不可用,此時能夠返回一些默認的評論
  • 當加載產品庫存,因爲庫存服務不可用,此時能夠固定顯示一個庫存數

上面提供返回默認評論,固定庫存的服務就是假裝服務,這類服務通常不依賴其它服務,穩定性最高。由假裝者提供服務給客戶端的現象就是服務降級。算法

RPC如何支持熔斷與降級

一種最簡單的辦法就是借用hystrix來實現。後端

引入包依賴

因爲示例未採用註解式方案,因此只須要引用下面兩個包便可。app

<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-core</artifactId>
    <version>${hystrix-version}</version>
</dependency>
<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-metrics-event-stream</artifactId>
    <version>${hystrix-version}</version>
</dependency>

實現命令模式

hystrix遵循命令模式,這裏能夠往這個標準的UML圖上去套。ide

建立一個新的類,RpcHystrixCommand,繼承自HystrixCommand便可。

我這裏採用線程隔離方式。

構造函數參數

因爲須要遠程調用,因此構造函數須要接收遠程調用所需求必要參數

/**
 * 遠程目標方法
 */
private Method method;

/**
 * 遠程目標接口
 */
private Object obj;

/**
 * 遠程方法所須要的參數
 */
private Object[] params;

/**
 * 遠程接口客戶端引用註解
 */
private RpcReference rpcReference;

/**
 * RPC客戶端配置
 */
private ReferenceConfig referenceConfig;

構造函數方法簽名:

public RpcHystrixCommand(Object obj, Method method, Object[] params, RpcReference rpcReference, ReferenceConfig referenceConfig)

初始化hystrix

這裏只是一個示例,因此參數設置比較隨意,詳細的可參考文檔。

super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CircuitBreakerRpcHystrixCommandGroup"))
                        .andCommandKey(HystrixCommandKey.Factory.asKey("CircuitBreakerRpcHystrixCommandKey"))
                        .andCommandPropertiesDefaults(
                                HystrixCommandProperties.Setter()
                                        .withCircuitBreakerEnabled(true)
                                        .withCircuitBreakerRequestVolumeThreshold(1)
                                        .withCircuitBreakerErrorThresholdPercentage(50)
                                        .withCircuitBreakerSleepWindowInMilliseconds(5*1000)
                                        .withMetricsRollingStatisticalWindowInMilliseconds(10*1000)
                        )
                       .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("CircuitBreakerRpcHystrixCommandPool"))
                       .andThreadPoolPropertiesDefaults(
                                HystrixThreadPoolProperties.Setter().withCoreSize(100)
                )
        );

run()函數

run()函數就是正常調用時所須要執行的方法,將調用遠程通訊的邏輯遷移到此,因爲此處不涉及今天講的熔斷降級,因此不用關內心面的代碼。

@Override
protected Object run() {
  // 執行遠程調用
}

擴展rpcReference註解以支持降級

在以前的註解中增長一個屬性,用來配置服務假裝者所屬的類對象

public @interface RpcReference {
    /**
     * 服務降級的假裝者類對象
     * @return
     */
    Class<?> fallbackServiceClazz() default Object.class;
}

getFallback()函數

當快速失敗時,咱們但願返回一些預先準備好的值給到客戶端,實現這個需求就須要實現這個fallback函數。

假裝者的邏輯因爲是客戶端控制,因此咱們經過參數來動態支持。 經過rpcReference註解能夠獲取配置的假裝者

protected Object getFallback() {

        Method[] methods = this.rpcReference.fallbackServiceClazz().getMethods();
        for (Method methodFallback : methods) {
            if(this.method.getName().equals(methodFallback.getName())){
                try {
                    Object fallbackServiceMock= ApplicationContextUtils.getApplicationContext().getBean(this.rpcReference.fallbackServiceClazz());
                    return  methodFallback.invoke(fallbackServiceMock,this.params);
                } catch (IllegalAccessException e) {
                    logger.error("RpcHystrixCommand.getFallback error",e);
                } catch (InvocationTargetException e) {
                    logger.error("RpcHystrixCommand.getFallback error",e);
                }
            }
        }
        throw new RpcException("service fallback unimplement");
    }

RpcProxy嵌入熔斷降級機制

代理的invoke方法,將改調用命令模式的execute方法來代替。

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

    RpcHystrixCommand rpcHystrixCommand=new RpcHystrixCommand(proxy,method,args,this.reference,this.referenceConfig);
    return rpcHystrixCommand.execute();
}

客戶端使用

dubbo有一個mock機制,功能有些弱,有興趣能夠自行研究。我這裏更加傾向於根據邏輯來判斷是否使用熔斷降級,降級的邏輯須要有更多的支持。

Spring Cloud的熔斷降級的作法與個人相似,它是經過註解在接口上

@FeignClient(value = "JIM-CLOUD-PROVIDER-SERVER",fallback = ProductServiceHystrix.class)
public interface ProductService {
    @RequestMapping(value = "/product/{productId}",method = RequestMethod.GET)
    String getById(@PathVariable("productId") final long productId);

}

建立假裝者接口

定義假裝者接口,約定成員方法的簽名與真身相同。

public interface ProductCommentMockService {
    Product getById(Long productId);
}

實現假裝者接口

實現假裝者接口,這裏不光是簡單的固定數據,可心任意編寫假裝者業務邏輯,與普通的service bean 沒有區別。

@Service
public class ProductCommentMockServiceImpl implements ProductCommentMockService {
    @Override
    public Product getById(Long productId) {

        Product mockProduct=new Product();
        mockProduct.setId(0L);
        mockProduct.setName("mock product name");

        return mockProduct;
    }
}

服務引用使用熔斷降級機制

在引用遠程服務接口的註解上,配置假裝者接口的類便可。

@RpcReference(
        maxExecutesCount = 1,
        fallbackServiceClazz = ProductCommentMockService.class
)
private ProductService productService;

測試

故意不啓動服務端,請求接口,此時出現mock數聽說明組件功能正常。

{"id":0,"name":"mock product name"}

待解決問題

因爲熔斷器採用的是新線程執行,因此會影響Rpc上下文傳遞的參數傳遞,後續我再解決。

本文源碼

https://github.com/jiangmin168168/jim-framework

文中代碼是依賴上述項目的,若是有不明白的可下載源碼

相關文章
相關標籤/搜索