使用Sentinel實現隔離、限流

在18年Hystrix中止更新,Sentinel和Resilience4j逐步成熟,在國內Sentinel的使用企業更加多一些,接下來經過一個實站例子把Sentinel的主要功能使用起來。java

功能對比

  Sentinel Hystrix resilience4j
隔離策略 信號量隔離(併發線程數限流) 線程池隔離/信號量隔離 信號量隔離
熔斷降級策略 基於響應時間、異常比率、異常數 基於異常比率 基於異常比率、響應時間
實時統計實現 滑動窗口(LeapArray) 滑動窗口(基於 RxJava) Ring Bit Buffer
動態規則配置 支持多種數據源 支持多種數據源 有限支持
擴展性 多個擴展點 插件的形式 接口的形式
基於註解的支持 支持 支持 支持
限流 基於 QPS,支持基於調用關係的限流 有限的支持 Rate Limiter
流量整形 支持預熱模式、勻速器模式、預熱排隊模式 不支持 簡單的 Rate Limiter 模式
系統自適應保護 支持 不支持 不支持
控制檯 提供開箱即用的控制檯,可配置規則、查看秒級監控、機器發現等 簡單的監控查看 不提供控制檯,可對接其它監控系統

 

Sentinel 基本概念

資源

資源是 Sentinel 的關鍵概念。它能夠是 Java 應用程序中的任何內容,例如,由應用程序提供的服務,或由應用程序調用的其它應用提供的服務,甚至能夠是一段代碼。在接下來的文檔中,咱們都會用資源來描述代碼塊。git

只要經過 Sentinel API 定義的代碼,就是資源,可以被 Sentinel 保護起來。大部分狀況下,可使用方法簽名,URL,甚至服務名稱做爲資源名來標示資源。github

規則

圍繞資源的實時狀態設定的規則,能夠包括流量控制規則、熔斷降級規則以及系統保護規則。全部規則能夠動態實時調整。web

流量控制

流量控制在網絡傳輸中是一個經常使用的概念,它用於調整網絡包的發送數據。然而,從系統穩定性角度考慮,在處理請求的速度上,也有很是多的講究。任意時間到來的請求每每是隨機不可控的,而系統的處理能力是有限的。咱們須要根據系統的處理能力對流量進行控制。Sentinel 做爲一個調配器,能夠根據須要把隨機的請求調整成合適的形狀。算法

流量控制有如下幾個角度:spring

  • 資源的調用關係,例如資源的調用鏈路,資源和資源之間的關係;
  • 運行指標,例如 QPS、線程池、系統負載等;
  • 控制的效果,例如直接限流、冷啓動、排隊等。

Sentinel 的設計理念是讓您自由選擇控制的角度,並進行靈活組合,從而達到想要的效果。網絡

熔斷降級

什麼是熔斷降級

除了流量控制之外,下降調用鏈路中的不穩定資源也是 Sentinel 的使命之一。因爲調用關係的複雜性,若是調用鏈路中的某個資源出現了不穩定,最終會致使請求發生堆積。這個問題和 Hystrix 裏面描述的問題是同樣的。併發

image

Sentinel 和 Hystrix 的原則是一致的: 當調用鏈路中某個資源出現不穩定,例如,表現爲 timeout,異常比例升高的時候,則對這個資源的調用進行限制,並讓請求快速失敗,避免影響到其它的資源,最終產生雪崩的效果。app

熔斷降級設計理念

在限制的手段上,Sentinel 和 Hystrix 採起了徹底不同的方法。ide

Hystrix 經過線程池的方式,來對依賴(在咱們的概念中對應資源)進行了隔離。這樣作的好處是資源和資源之間作到了最完全的隔離。缺點是除了增長了線程切換的成本,還須要預先給各個資源作線程池大小的分配。

Sentinel 對這個問題採起了兩種手段:

  • 經過併發線程數進行限制

和資源池隔離的方法不一樣,Sentinel 經過限制資源併發線程的數量,來減小不穩定資源對其它資源的影響。這樣不但沒有線程切換的損耗,也不須要您預先分配線程池的大小。當某個資源出現不穩定的狀況下,例如響應時間變長,對資源的直接影響就是會形成線程數的逐步堆積。當線程數在特定資源上堆積到必定的數量以後,對該資源的新請求就會被拒絕。堆積的線程完成任務後纔開始繼續接收請求。

  • 經過響應時間對資源進行降級

除了對併發線程數進行控制之外,Sentinel 還能夠經過響應時間來快速降級不穩定的資源。當依賴的資源出現響應時間過長後,全部對該資源的訪問都會被直接拒絕,直到過了指定的時間窗口以後才從新恢復。

 

實戰

假如咱們有一個高併發場景,須要作搶購秒殺,須要獲取用戶資料,服務類的接口須要限流保護。

實體類

package com.xin.sentinel.demo.entity;

public class User {
    int id;
    String name;
    int age;
    int level;
    String address;
    String password;
    String phone;

    public User(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public User(String name) {
        this.id = -1;
        this.name = name;
    }

    public User() {
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public int getLevel() {
        return level;
    }

    public void setLevel(int level) {
        this.level = level;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                ", level=" + level +
                ", address='" + address + '\'' +
                ", password='" + password + '\'' +
                ", phone='" + phone + '\'' +
                '}';
    }
}
View Code

服務類

package com.xin.sentinel.demo.service;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.annotation.SentinelResource;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.xin.sentinel.demo.entity.User;
import com.xin.sentinel.demo.dao.DB;
import org.springframework.stereotype.Service;

import java.util.Collections;

@Service
public class UserService {

    public static final String USER_RES = "userResource";

    public UserService(){
        // 定義熱點限流的規則,對第一個參數設置 qps 限流模式,閾值爲5
        FlowRule rule = new FlowRule();
        rule.setResource(USER_RES);
        // 限流類型,qps
        rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        // 設置閾值
        rule.setCount(4);
        // 限制哪一個調用方
        rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
        // 基於調用關係的流量控制
        rule.setStrategy(RuleConstant.STRATEGY_DIRECT);
        // 流控策略
        rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
        FlowRuleManager.loadRules(Collections.singletonList(rule));
    }

    /**
     * SphU 包含了 try-catch 風格的 API。用這種方式,當資源發生了限流以後會拋出 BlockException。
     * 這個時候能夠捕捉異常,進行限流以後的邏輯處理.
     * @param uid
     * @return
     */
    public User getUser(int uid){
        Entry entry = null;
        // 資源名可以使用任意有業務語義的字符串,好比方法名、接口名或其它可惟一標識的字符串。
        try {
            // 流控代碼
            entry = SphU.entry(USER_RES);
            // 業務代碼
            User user = new User();
            user.setId(uid);
            user.setName("user-" + uid);
            DB.InsertUser(user); //長耗時的工做
            return user;
        }catch(BlockException e){
            // 被限流了
            System.out.println("[getUser] has been protected! Time="+System.currentTimeMillis());
        }finally {
            if(entry!=null){
                entry.exit();
            }
        }
        return null;
    }


    /**
     * 經過 @SentinelResource 註解定義資源並配置 blockHandler 和 fallback 函數來進行限流以後的處理
     * @param id
     * @return
     */
    @SentinelResource(blockHandler = "blockHandlerForGetUser")
    public User getUserById(String id) {
        throw new RuntimeException("getUserById command failed");
    }

    // blockHandler 函數,原方法調用被限流/降級/系統保護的時候調用
    public User blockHandlerForGetUser(String id, BlockException ex) {
        return new User("admin");
    }


}
View Code

控制器

package com.xin.sentinel.demo.controller;

import com.xin.sentinel.demo.entity.User;
import com.xin.sentinel.demo.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class Demo {

    @Autowired
    private UserService userService;
    /**
     * 獲取用戶信息
     */
    @GetMapping("/getUser")
    public @ResponseBody User getUser(@RequestParam("id") int id) {
        return userService.getUser(id);
    }


}
View Code

 

流量規則的定義

重要屬性:

Field 說明 默認值
resource 資源名,資源名是限流規則的做用對象  
count 限流閾值  
grade 限流閾值類型,QPS 或線程數模式 QPS 模式
limitApp 流控針對的調用來源 default,表明不區分調用來源
strategy 調用關係限流策略:直接、鏈路、關聯 根據資源自己(直接)
controlBehavior 流控效果(直接拒絕 / 排隊等待 / 慢啓動模式),不支持按調用關係限流 直接拒絕

同一個資源能夠同時有多個限流規則。

經過代碼定義流量控制規則

理解上面規則的定義以後,咱們能夠經過調用 FlowRuleManager.loadRules() 方法來用硬編碼的方式定義流量控制規則,好比:

private static void initFlowQpsRule() {
    List<FlowRule> rules = new ArrayList<>();
    FlowRule rule1 = new FlowRule();
    rule1.setResource(resource);
    // Set max qps to 20
    rule1.setCount(20);
    rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule1.setLimitApp("default");
    rules.add(rule1);
    FlowRuleManager.loadRules(rules);
}

流程圖

運行上面的demo,還有日誌輸出,目錄相似:C:\Users\Administrator\logs\csp

1589891884000|2020-05-19 20:38:04|userResource|1|0|1|0|13|0|0|0

含義分別是:

流量控制主要有兩種統計類型,一種是統計線程數,另一種則是統計 QPS。類型由 FlowRule.grade 字段來定義。其中,0 表明根據併發數量來限流,1 表明根據 QPS 來進行流量控制。

線程控制隔離

線程數限流用於保護業務線程數不被耗盡。例如,當應用所依賴的下游應用因爲某種緣由致使服務不穩定、響應延遲增長,對於調用者來講,意味着吞吐量降低和更多的線程數佔用,極端狀況下甚至致使線程池耗盡。爲應對高線程佔用的狀況,業內有使用隔離的方案,好比經過不一樣業務邏輯使用不一樣線程池來隔離業務自身之間的資源爭搶(線程池隔離),或者使用信號量來控制同時請求的個數(信號量隔離)。這種隔離方案雖然可以控制線程數量,但沒法控制請求排隊時間。當請求過多時排隊也是無益的,直接拒絕可以迅速下降系統壓力。Sentinel線程數限流不負責建立和管理線程池(對,說的就是hystrix),而是簡單統計當前請求上下文的線程個數,若是超出閾值,新的請求會被當即拒絕。

線程隔離的例子

public class FlowThreadDemo {

    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();
    private static AtomicInteger total = new AtomicInteger();
    private static AtomicInteger activeThread = new AtomicInteger();

    private static volatile boolean stop = false;
    private static final int threadCount = 100;

    private static int seconds = 60 + 40;
    private static volatile int methodBRunningTime = 2000;

    public static void main(String[] args) throws Exception {
        System.out.println(
            "MethodA will call methodB. After running for a while, methodB becomes fast, "
                + "which make methodA also become fast ");
        tick();
        initFlowRule();

        for (int i = 0; i < threadCount; i++) {
            Thread entryThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        Entry methodA = null;
                        try {
                            TimeUnit.MILLISECONDS.sleep(5);
                            methodA = SphU.entry("methodA");
                            activeThread.incrementAndGet();
                            Entry methodB = SphU.entry("methodB");
                            TimeUnit.MILLISECONDS.sleep(methodBRunningTime);
                            methodB.exit();
                            pass.addAndGet(1);
                        } catch (BlockException e1) {
                            block.incrementAndGet();
                        } catch (Exception e2) {
                            // biz exception
                        } finally {
                            total.incrementAndGet();
                            if (methodA != null) {
                                methodA.exit();
                                activeThread.decrementAndGet();
                            }
                        }
                    }
                }
            });
            entryThread.setName("working thread");
            entryThread.start();
        }
    }

    private static void initFlowRule() {
        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule rule1 = new FlowRule();
        rule1.setResource("methodA");
        // set limit concurrent thread for 'methodA' to 20
        rule1.setCount(20);
        rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
        rule1.setLimitApp("default");

        rules.add(rule1);
        FlowRuleManager.loadRules(rules);
    }

    private static void tick() {
        Thread timer = new Thread(new TimerTask());
        timer.setName("sentinel-timer-task");
        timer.start();
    }

    static class TimerTask implements Runnable {

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            System.out.println("begin to statistic!!!");

            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;

            while (!stop) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                long globalTotal = total.get();
                long oneSecondTotal = globalTotal - oldTotal;
                oldTotal = globalTotal;

                long globalPass = pass.get();
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;

                long globalBlock = block.get();
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;

                System.out.println(seconds + " total qps is: " + oneSecondTotal);
                System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
                    + ", pass:" + oneSecondPass
                    + ", block:" + oneSecondBlock
                    + " activeThread:" + activeThread.get());
                if (seconds-- <= 0) {
                    stop = true;
                }
                if (seconds == 40) {
                    System.out.println("method B is running much faster; more requests are allowed to pass");
                    methodBRunningTime = 20;
                }
            }

            long cost = System.currentTimeMillis() - start;
            System.out.println("time cost: " + cost + " ms");
            System.out.println("total:" + total.get() + ", pass:" + pass.get()
                + ", block:" + block.get());
            System.exit(0);
        }
    }
}
View Code

QPS隔離

當 QPS 超過某個閾值的時候,則採起措施進行流量控制。流量控制的手段包括下面 3 種,對應 FlowRule 中的 controlBehavior 字段:

  1. 直接拒絕(RuleConstant.CONTROL_BEHAVIOR_DEFAULT)方式。該方式是默認的流量控制方式,當QPS超過任意規則的閾值後,新的請求就會被當即拒絕,拒絕方式爲拋出FlowException。這種方式適用於對系統處理能力確切已知的狀況下,好比經過壓測肯定了系統的準確水位時。具體的例子參見 FlowqpsDemo

  2. 冷啓動(RuleConstant.CONTROL_BEHAVIOR_WARM_UP)方式。該方式主要用於系統長期處於低水位的狀況下,當流量忽然增長時,直接把系統拉昇到高水位可能瞬間把系統壓垮。經過"冷啓動",讓經過的流量緩慢增長,在必定時間內逐漸增長到閾值上限,給冷系統一個預熱的時間,避免冷系統被壓垮的狀況。具體的例子參見 WarmUpFlowDemo

  1. 勻速器(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)方式。這種方式嚴格控制了請求經過的間隔時間,也便是讓請求以均勻的速度經過,對應的是漏桶算法
相關文章
相關標籤/搜索