分佈式限流

前言

本文接着上文應用限流進行討論。java

以前談到的限流方案只能針對於單個 JVM 有效,也就是單機應用。而對於如今廣泛的分佈式應用也得有一個分佈式限流的方案。git

基於此嘗試寫了這個組件:github

https://github.com/crossoverJie/distributed-redis-toolredis

DEMO

如下采用的是算法

https://github.com/crossoverJie/springboot-cloudspring

來作演示。安全

在 Order 應用提供的接口中採起了限流。首先是配置了限流工具的 Bean:springboot

@Configuration
public class RedisLimitConfig {


    @Value("${redis.limit}")
    private int limit;


    @Autowired
    private JedisConnectionFactory jedisConnectionFactory;

    @Bean
    public RedisLimit build() {
        RedisClusterConnection clusterConnection = jedisConnectionFactory.getClusterConnection();
        JedisCluster jedisCluster = (JedisCluster) clusterConnection.getNativeConnection();
        RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster)
                .limit(limit)
                .build();

        return redisLimit;
    }
}

接着在 Controller 使用組件:併發

@Autowired
    private RedisLimit redisLimit ;

    @Override
    @CheckReqNo
    public BaseResponse<OrderNoResVO> getOrderNo(@RequestBody OrderNoReqVO orderNoReq) {
        BaseResponse<OrderNoResVO> res = new BaseResponse();

        //限流
        boolean limit = redisLimit.limit();
        if (!limit){
            res.setCode(StatusEnum.REQUEST_LIMIT.getCode());
            res.setMessage(StatusEnum.REQUEST_LIMIT.getMessage());
            return res ;
        }

        res.setReqNo(orderNoReq.getReqNo());
        if (null == orderNoReq.getAppId()){
            throw new SBCException(StatusEnum.FAIL);
        }
        OrderNoResVO orderNoRes = new OrderNoResVO() ;
        orderNoRes.setOrderId(DateUtil.getLongTime());
        res.setCode(StatusEnum.SUCCESS.getCode());
        res.setMessage(StatusEnum.SUCCESS.getMessage());
        res.setDataBody(orderNoRes);
        return res ;
    }

爲了方便使用,也提供了註解:app

@Override
    @ControllerLimit
    public BaseResponse<OrderNoResVO> getOrderNoLimit(@RequestBody OrderNoReqVO orderNoReq) {
        BaseResponse<OrderNoResVO> res = new BaseResponse();
        // 業務邏輯
        return res ;
    }

該註解攔截了 http 請求,會再請求達到閾值時直接返回。

普通方法也可以使用:

@CommonLimit
public void doSomething(){}

會在調用達到閾值時拋出異常。

爲了模擬併發,在 User 應用中開啓了 10 個線程調用 Order(限流次數爲5) 接口(也可以使用專業的併發測試工具 JMeter 等)。

@Override
    public BaseResponse<UserResVO> getUserByFeign(@RequestBody UserReqVO userReq) {
        //調用遠程服務
        OrderNoReqVO vo = new OrderNoReqVO();
        vo.setAppId(1L);
        vo.setReqNo(userReq.getReqNo());

        for (int i = 0; i < 10; i++) {
            executorService.execute(new Worker(vo, orderServiceClient));
        }

        UserRes userRes = new UserRes();
        userRes.setUserId(123);
        userRes.setUserName("張三");

        userRes.setReqNo(userReq.getReqNo());
        userRes.setCode(StatusEnum.SUCCESS.getCode());
        userRes.setMessage("成功");

        return userRes;
    }
    

    private static class Worker implements Runnable {

        private OrderNoReqVO vo;
        private OrderServiceClient orderServiceClient;

        public Worker(OrderNoReqVO vo, OrderServiceClient orderServiceClient) {
            this.vo = vo;
            this.orderServiceClient = orderServiceClient;
        }

        @Override
        public void run() {

            BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNoCommonLimit(vo);
            logger.info("遠程返回:" + JSON.toJSONString(orderNo));

        }
    }
爲了驗證分佈式效果啓動了兩個 Order 應用。

效果以下:

實現原理

實現原理其實很簡單。既然要達到分佈式全侷限流的效果,那天然須要一個第三方組件來記錄請求的次數。

其中 Redis 就很是適合這樣的場景。

  • 每次請求時將當前時間(精確到秒)做爲 Key 寫入到 Redis 中,超時時間設置爲 2 秒,Redis 將該 Key 的值進行自增。
  • 當達到閾值時返回錯誤。
  • 寫入 Redis 的操做用 Lua 腳原本完成,利用 Redis 的單線程機制能夠保證每一個 Redis 請求的原子性。

Lua 腳本以下:

--lua 下標從 1 開始
-- 限流 key
local key = KEYS[1]
-- 限流大小
local limit = tonumber(ARGV[1])

-- 獲取當前流量大小
local curentLimit = tonumber(redis.call('get', key) or "0")

if curentLimit + 1 > limit then
    -- 達到限流大小 返回
    return 0;
else
    -- 沒有達到閾值 value + 1
    redis.call("INCRBY", key, 1)
    redis.call("EXPIRE", key, 2)
    return curentLimit + 1
end

Java 中的調用邏輯:

public boolean limit() {
        String key = String.valueOf(System.currentTimeMillis() / 1000);
        Object result = null;
        if (jedis instanceof Jedis) {
            result = ((Jedis) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
        } else if (jedis instanceof JedisCluster) {
            result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
        } else {
            //throw new RuntimeException("instance is error") ;
            return false;
        }

        if (FAIL_CODE != (Long) result) {
            return true;
        } else {
            return false;
        }
    }

因此只須要在須要限流的地方調用該方法對返回值進行判斷便可達到限流的目的。

固然這只是利用 Redis 作了一個粗暴的計數器,若是想實現相似於上文中的令牌桶算法能夠基於 Lua 自行實現。

Builder 構建器

在設計這個組件時想盡可能的提供給使用者清晰、可讀性、不易出錯的 API。

好比第一步,如何構建一個限流對象。

最經常使用的方式天然就是構造函數,若是有多個域則能夠採用重疊構造器的方式:

public A(){}
public A(int a){}
public A(int a,int b){}

缺點也是顯而易見的:若是參數過多會致使難以閱讀,甚至若是參數類型一致的狀況下客戶端顛倒了順序,但不會引發警告從而出現難以預測的結果。

第二種方案能夠採用 JavaBean 模式,利用 setter 方法進行構建:

A a = new A();
a.setA(a);
a.setB(b);

這種方式清晰易讀,但卻容易讓對象處於不一致的狀態,使對象處於線程不安全的狀態。

因此這裏採用了第三種建立對象的方式,構建器:

public class RedisLimit {

    private JedisCommands jedis;
    private int limit = 200;

    private static final int FAIL_CODE = 0;

    /**
     * lua script
     */
    private String script;

    private RedisLimit(Builder builder) {
        this.limit = builder.limit ;
        this.jedis = builder.jedis ;
        buildScript();
    }


    /**
     * limit traffic
     * @return if true
     */
    public boolean limit() {
        String key = String.valueOf(System.currentTimeMillis() / 1000);
        Object result = null;
        if (jedis instanceof Jedis) {
            result = ((Jedis) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
        } else if (jedis instanceof JedisCluster) {
            result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
        } else {
            //throw new RuntimeException("instance is error") ;
            return false;
        }

        if (FAIL_CODE != (Long) result) {
            return true;
        } else {
            return false;
        }
    }


    /**
     * read lua script
     */
    private void buildScript() {
        script = ScriptUtil.getScript("limit.lua");
    }


    /**
     *  the builder
     * @param <T>
     */
    public static class Builder<T extends JedisCommands>{
        private T jedis = null ;

        private int limit = 200;


        public Builder(T jedis){
            this.jedis = jedis ;
        }

        public Builder limit(int limit){
            this.limit = limit ;
            return this;
        }

        public RedisLimit build(){
            return new RedisLimit(this) ;
        }

    }
}

這樣客戶端在使用時:

RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster)
                .limit(limit)
                .build();

更加的簡單直接,而且避免了將建立過程分紅了多個子步驟。

這在有多個構造參數,但又不是必選字段時頗有做用。

所以順便將分佈式鎖的構建器方式也一併更新了:

https://github.com/crossoverJie/distributed-redis-tool#features

更多內容能夠參考 Effective Java

API

從上文能夠看出,使用過程就是調用 limit 方法。

//限流
    boolean limit = redisLimit.limit();
    if (!limit){
       //具體限流邏輯
    }

爲了減小侵入性,也爲了簡化客戶端提供了兩種註解方式。

@ControllerLimit

該註解能夠做用於 @RequestMapping 修飾的接口中,並會在限流後提供限流響應。

實現以下:

@Component
public class WebIntercept extends WebMvcConfigurerAdapter {

    private static Logger logger = LoggerFactory.getLogger(WebIntercept.class);


    @Autowired
    private RedisLimit redisLimit;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new CustomInterceptor())
                .addPathPatterns("/**");
    }


    private class CustomInterceptor extends HandlerInterceptorAdapter {
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
                                 Object handler) throws Exception {


            if (redisLimit == null) {
                throw new NullPointerException("redisLimit is null");
            }

            if (handler instanceof HandlerMethod) {
                HandlerMethod method = (HandlerMethod) handler;

                ControllerLimit annotation = method.getMethodAnnotation(ControllerLimit.class);
                if (annotation == null) {
                    //skip
                    return true;
                }

                boolean limit = redisLimit.limit();
                if (!limit) {
                    logger.warn("request has bean limit");
                    response.sendError(500, "request limit");
                    return false;
                }

            }

            return true;

        }
    }
}

其實就是實現了 SpringMVC 中的攔截器,並在攔截過程當中判斷是否有使用註解,從而調用限流邏輯。

前提是應用須要掃描到該類,讓 Spring 進行管理。

@ComponentScan(value = "com.crossoverjie.distributed.intercept")

@CommonLimit

固然也能夠在普通方法中使用。實現原理則是 Spring AOP (SpringMVC 的攔截器本質也是 AOP)。

@Aspect
@Component
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class CommonAspect {

    private static Logger logger = LoggerFactory.getLogger(CommonAspect.class);

    @Autowired
    private RedisLimit redisLimit ;

    @Pointcut("@annotation(com.crossoverjie.distributed.annotation.CommonLimit)")
    private void check(){}

    @Before("check()")
    public void before(JoinPoint joinPoint) throws Exception {

        if (redisLimit == null) {
            throw new NullPointerException("redisLimit is null");
        }

        boolean limit = redisLimit.limit();
        if (!limit) {
            logger.warn("request has bean limit");
            throw new RuntimeException("request has bean limit") ;
        }

    }
}

很簡單,也是在攔截過程當中調用限流。

固然使用時也得掃描到該包:

@ComponentScan(value = "com.crossoverjie.distributed.intercept")

總結

限流在一個高併發大流量的系統中是保護應用的一個利器,成熟的方案也不少,但願對剛瞭解這一塊的朋友提供一些思路。

以上全部的源碼:

感興趣的朋友能夠點個 Star 或是提交 PR。

號外

最近在總結一些 Java 相關的知識點,感興趣的朋友能夠一塊兒維護。

地址: https://github.com/crossoverJie/Java-Interview

圖片描述

相關文章
相關標籤/搜索