Spring Cloud Gateway 結合配置中心限流

前言

上篇文章《Spring Cloud Gateway 限流操做》我講過複雜的限流場景能夠經過擴展RedisRateLimiter來實現本身的限流策略。
假設你領導給你安排了一個任務,具體需求以下:
• 針對具體的接口作限流
• 不一樣接口限流的力度能夠不一樣
• 能夠動態調整限流配置,實時生效
若是你接到上面的任務,你會怎麼去設計+實現呢?
每一個人看待問題的角度不一樣,天然思考出來的方案也不一樣,正所謂條條大路通羅馬,能到達畝的地的路那就是一條好路。git

如何分析需求

下面我給出個人實現方式,僅供各位參考,大牛請忽略。
具體問題具體分析,針對需求點,分別去作分析。
需求一 「如何針對具體的接口作限流」 這個在上篇文章中也有講過,只須要讓KeyResolver返回的是接口的URI便可,這樣限流的維度那就是對這個接口進行限流。
需求二 「不一樣接口限流的力度能夠不一樣」 這個經過配置的方式明顯實現不了,配置中的replenishRate和burstCapacity都是配置死的,若是要作成動態的那麼必須的本身經過擴展RedisRateLimiter來實現。
前提是必須有一個配置列表,這個配置列表就是每一個接口對應的限流數值。有了這個配置咱們就能夠經過請求的接口獲取這個接口對應的限流值。
需求三「能夠動態調整限流配置,實時生效」 這個的話也比較容易,不管你是存文件,存數據庫,存緩存只要每次都去讀取,必然是實時生效的,可是性能問題咱們不得不考慮啊。
存文件,讀取文件,耗IO,主要是不方便修改
存數據庫,能夠經過web界面去修改,也能夠直接改數據庫,每次都要查詢,性能不行
存分佈式緩存(redis),性能比數據庫有提升
對比下來確定是緩存是最優的方案,還有更好的方案嗎?
有,結合配置中心來作,我這邊用本身的配置中心(https://github.com/yinjihuan/smconf)來說解,換成其餘的配置中心也是同樣的思路
配置中心的優勢在於它原本就是用來存儲配置的,配置在項目啓動時加載完畢,當有修改時推送更新,每次讀取都在本地對象中,性能好。
具體方案有了以後咱們就能夠開始擼代碼了,可是你有想過這麼多接口的限流值怎麼初始化嗎?手動一個個去加?
不一樣的服務維護的小組不一樣,固然也有多是一個小組維護,從設計者的角度來思考,應該把設置的權利交給用戶,交給咱們的接口開發者,每一個接口可以承受多少併發讓用戶來定,你的職責就是在網關進行限流。固然在公司中具體的限制量也不必定會由開發人員來定哈,這個得根據壓測結果,作最好的調整。github

話很少說-開始擼碼

首先咱們定義本身的RedisRateLimiter,複製源碼稍微改造下便可, 這邊只貼核心代碼。web

public class CustomRedisRateLimiter extends AbstractRateLimiter<CustomRedisRateLimiter.Config>
        implements ApplicationContextAware {

    public static final String CONFIGURATION_PROPERTY_NAME = "custom-redis-rate-limiter";
    public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript";
    public static final String REMAINING_HEADER = "X-RateLimit-Remaining";
    public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate";
    public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity";

    public CustomRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, RedisScript<List<Long>> script,
            Validator validator) {
        super(Config.class, CONFIGURATION_PROPERTY_NAME, validator);
        this.redisTemplate = redisTemplate;
        this.script = script;
        initialized.compareAndSet(false, true);
    }

    public CustomRedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity) {
        super(Config.class, CONFIGURATION_PROPERTY_NAME, null);
        this.defaultConfig = new Config().setReplenishRate(defaultReplenishRate).setBurstCapacity(defaultBurstCapacity);
    }

    // 限流配置
    private RateLimitConf rateLimitConf;

    @Override
    @SuppressWarnings("unchecked")
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        // 加載配置
        this.rateLimitConf = context.getBean(RateLimitConf.class);  
    }

    /**
     * This uses a basic token bucket algorithm and relies on the fact that
     * Redis scripts execute atomically. No other operations can run between
     * fetching the count and writing the new count.
     */
    @Override
    @SuppressWarnings("unchecked")
    public Mono<Response> isAllowed(String routeId, String id) {
        if (!this.initialized.get()) {
            throw new IllegalStateException("RedisRateLimiter is not initialized");
        }

        //Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);

        if (rateLimitConf == null) {
            throw new IllegalArgumentException("No Configuration found for route " + routeId);
        }
        Map<String,Integer> routeConfig = rateLimitConf.getLimitMap();

        // Key的格式:服務名稱.接口URI.類型
        String replenishRateKey = routeId + "." + id + ".replenishRate";
        int replenishRate = routeConfig.get(replenishRateKey) == null ? routeConfig.get("default.replenishRate") : routeConfig.get(replenishRateKey);

        String burstCapacityKey = routeId + "." + id + ".burstCapacity";
        int burstCapacity = routeConfig.get(burstCapacityKey) == null ? routeConfig.get("default.burstCapacity") : routeConfig.get(burstCapacityKey);

        try {
            List<String> keys = getKeys(id);

            // The arguments to the LUA script. time() returns unixtime in
            // seconds.
            List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
                    Instant.now().getEpochSecond() + "", "1");
            // allowed, tokens_left = redis.eval(SCRIPT, keys, args)
            Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
            // .log("redisratelimiter", Level.FINER);
            return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
                    .reduce(new ArrayList<Long>(), (longs, l) -> {
                        longs.addAll(l);
                        return longs;
                    }).map(results -> {
                        boolean allowed = results.get(0) == 1L;
                        Long tokensLeft = results.get(1);

                        Response response = new Response(allowed, getHeaders(replenishRate, burstCapacity, tokensLeft));

                        if (log.isDebugEnabled()) {
                            log.debug("response: " + response);
                        }
                        return response;
                    });
        } catch (Exception e) {
            /*
             * We don't want a hard dependency on Redis to allow traffic. Make
             * sure to set an alert so you know if this is happening too much.
             * Stripe's observed failure rate is 0.01%.
             */
            log.error("Error determining if user allowed from redis", e);
        }
        return Mono.just(new Response(true, getHeaders(replenishRate, burstCapacity, -1L)));
    }

    public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity, Long tokensLeft) {
        HashMap<String, String> headers = new HashMap<>();
        headers.put(this.remainingHeader, tokensLeft.toString());
        headers.put(this.replenishRateHeader, String.valueOf(replenishRate));
        headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));
        return headers;
    }

}

須要在setApplicationContext中加載咱們的配置類,配置類的定義以下:面試

@CxytianDiConf(system="fangjia-gateway")
public class RateLimitConf {
    // 限流配置
    @ConfField(value = "limitMap")
    private Map<String, Integer> limitMap = new HashMap<String, Integer>(){{
        put("default.replenishRate", 100);
        put("default.burstCapacity", 1000);
    }};
    public void setLimitMap(Map<String, Integer> limitMap) {
        this.limitMap = limitMap;
    }
    public Map<String, Integer> getLimitMap() {
        return limitMap;
    }
}

全部的接口對應的限流信息都在map中,有默認值,若是沒有對應的配置就用默認的值對接口進行限流。
isAllowed方法中經過‘服務名稱.接口URI.類型’組成一個Key, 經過這個Key去Map中獲取對應的值。
類型的做用主要是用來區分replenishRate和burstCapacity兩個值。
接下來就是配置CustomRedisRateLimiter:redis

@Bean
@Primary
public CustomRedisRateLimiter customRedisRateLimiter(
                  ReactiveRedisTemplate<String, String> redisTemplate,     
                  @Qualifier(CustomRedisRateLimiter.REDIS_SCRIPT_NAME)  RedisScript<List<Long>> redisScript,
                  Validator validator) {
    return new CustomRedisRateLimiter(redisTemplate, redisScript, validator);
}

網關這邊的邏輯已經實現好了,接下來就是須要在具體的服務中自定義註解,而後將限流的參數初始化到咱們的配置中心就能夠了。
定義註解spring

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ApiRateLimit {

    /**
     * 速率
     * @return
     */
    int replenishRate() default 100;

    /**
     * 容積
     * @return
     */
    int burstCapacity() default 1000;

}

啓動監聽器,讀取註解,初始化配置數據庫

/**
 * 初始化API網關須要進行併發限制的API
 * @author yinjihuan
 *
 */
public class InitGatewayApiLimitRateListener implements ApplicationListener<ApplicationReadyEvent> {

    // Controller包路徑
    private String controllerPath;

    private RateLimitConf rateLimitConf;

    private ConfInit confInit;

    private String applicationName;

    public InitGatewayApiLimitRateListener(String controllerPath) {
        this.controllerPath = controllerPath;
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        this.rateLimitConf = event.getApplicationContext().getBean(RateLimitConf.class);
        this.confInit = event.getApplicationContext().getBean(ConfInit.class);
        this.applicationName = event.getApplicationContext().getEnvironment().getProperty("spring.application.name");
        try {
            initLimitRateAPI();
        } catch (Exception e) {
            throw new RuntimeException("初始化須要進行併發限制的API異常", e);
        }
    }

    /**
     * 初始化須要進行併發限制的API
     * @throws IOException
     * @throws ClassNotFoundException
     */
    private void initLimitRateAPI() throws IOException, ClassNotFoundException {
        Map<String, Integer> limitMap = rateLimitConf.getLimitMap();
        ClasspathPackageScannerUtils scan = new ClasspathPackageScannerUtils(this.controllerPath);
        List<String> classList = scan.getFullyQualifiedClassNameList();
        for (String clazz : classList) {
            Class<?> clz = Class.forName(clazz);
            if (!clz.isAnnotationPresent(RestController.class)) {
                continue;
            }
            Method[] methods = clz.getDeclaredMethods();
            for (Method method : methods) {
                if (method.isAnnotationPresent(ApiRateLimit.class)) {
                    ApiRateLimit apiRateLimit = method.getAnnotation(ApiRateLimit.class);
                    String replenishRateKey = applicationName + "." + getApiUri(clz, method) + ".replenishRate";
                    String burstCapacityKey = applicationName + "." + getApiUri(clz, method) + ".burstCapacity";
                    limitMap.put(replenishRateKey, apiRateLimit.replenishRate());
                    limitMap.put(burstCapacityKey, apiRateLimit.burstCapacity());
                }
            }
        }
        rateLimitConf.setLimitMap(limitMap);
        // 初始化值到配置中心
        confInit.init(rateLimitConf);
    }

     private String getApiUri(Class<?> clz, Method method) {
            StringBuilder uri = new StringBuilder();
            uri.append(clz.getAnnotation(RequestMapping.class).value()[0]);
            if (method.isAnnotationPresent(GetMapping.class)) {
                uri.append(method.getAnnotation(GetMapping.class).value()[0]);
            } else if (method.isAnnotationPresent(PostMapping.class)) {
                uri.append(method.getAnnotation(PostMapping.class).value()[0]);
            } else if (method.isAnnotationPresent(RequestMapping.class)) {
                uri.append(method.getAnnotation(RequestMapping.class).value()[0]);
            }
            return uri.toString();
     }
}

配置監聽器api

SpringApplication application = new SpringApplication(FshHouseServiceApplication.class);
application.addListeners(new InitGatewayApiLimitRateListener("com.fangjia.fsh.house.controller"));
context = application.run(args);
最後使用就很簡單了,只須要增長註解就能夠了
@ApiRateLimit(replenishRate=10, burstCapacity=100)
@GetMapping("/data")
public HouseInfo getData(@RequestParam("name") String name) {
    return new HouseInfo(1L, "上海", "虹口", "東體小區");
}

我這邊只是給你們提供一種去實現的思路,也許你們還有更好的方案。
我以爲只要不讓每一個開發都去關心這種非業務性質的功能,那就能夠了,都在框架層面處理掉。固然實現原理能夠跟你們分享下,會用很好,既會用又瞭解原理那就更好了。
最後給你們推薦一個朋友的面試寶典,有要換工做或者快要面試的朋友能夠學習下。
Spring Cloud Gateway 結合配置中心限流緩存

猿天地
Spring Cloud Gateway 結合配置中心限流併發

尹吉歡我不差錢啊喜歡做者

相關文章
相關標籤/搜索