原文:https://juejin.im/entry/5bd491c85188255ac2629bef?utm_source=coffeephp.comphp
在分佈式領域,咱們不免會遇到併發量突增,對後端服務形成高壓力,嚴重甚至會致使系統宕機。爲避免這種問題,咱們一般會爲接口添加限流、降級、熔斷等能力,從而使接口更爲健壯。Java領域常見的開源組件有Netflix的hystrix,阿里系開源的sentinel等,都是蠻不錯的限流熔斷框架。前端
今天咱們就基於Redis組件的特性,實現一個分佈式限流組件,名字就定爲shield-ratelimiter。
java
首先解釋下爲什麼採用Redis做爲限流組件的核心。node
通俗地講,假設一個用戶(用IP判斷)每秒訪問某服務接口的次數不能超過10次,那麼咱們能夠在Redis中建立一個鍵,並設置鍵的過時時間爲60秒。git
當一個用戶對此服務接口發起一次訪問就把鍵值加1,在單位時間(此處爲1s)內當鍵值增長到10的時候,就禁止訪問服務接口。PS:在某種場景中添加訪問時間間隔仍是頗有必要的。咱們本次不考慮間隔時間,只關注單位時間內的訪問次數。github
原理已經講過了,說下需求。redis
基於上述需求,咱們決定基於註解方式進行核心功能開發,基於Spring-boot-starter做爲基礎環境,從而可以很好的適配Spring環境。spring
另外,在本次開發中,咱們不經過簡單的調用Redis的java類庫API實現對Redis的incr操做。後端
緣由在於,咱們要保證整個限流的操做是原子性的,若是用Java代碼去作操做及判斷,會有併發問題。這裏我決定採用Lua腳本進行核心邏輯的定義。springboot
在正式開發前,我簡單介紹下對Redis的操做中,爲什麼推薦使用Lua腳本。
Redis添加了對Lua的支持,可以很好的知足原子性、事務性的支持,讓咱們免去了不少的異常邏輯處理。對於Lua的語法不是本文的主要內容,感興趣的能夠自行查找資料。
到這裏,咱們正式開始手寫限流組件的進程。
項目基於maven構建,主要依賴Spring-boot-starter,咱們主要在springboot上進行開發,所以自定義的開發包能夠直接依賴下面這個座標,方便進行包管理。版本號自行選擇穩定版。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>1.4.2.RELEASE</version> </dependency>
因爲咱們是基於Redis進行的限流操做,所以須要整合Redis的類庫,上面已經講到,咱們是基於Springboot進行的開發,所以這裏能夠直接整合RedisTemplate。
這裏咱們引入spring-boot-starter-redis的依賴。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-redis</artifactId> <version>1.4.2.RELEASE</version> </dependency>
新建一個Redis的配置類,命名爲RedisCacheConfig,使用javaconfig形式注入CacheManager及RedisTemplate。爲了操做方便,咱們採用了Jackson進行序列化。代碼以下
@Configuration @EnableCaching public class RedisCacheConfig { private static final Logger LOGGER = LoggerFactory.getLogger(RedisCacheConfig.class); @Bean public CacheManager cacheManager(RedisTemplate<?, ?> redisTemplate) { CacheManager cacheManager = new RedisCacheManager(redisTemplate); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Springboot Redis cacheManager 加載完成"); } return cacheManager; } @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); //使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值(默認使用JDK的序列化方式) Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(mapper); template.setValueSerializer(serializer); //使用StringRedisSerializer來序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); template.afterPropertiesSet(); LOGGER.info("Springboot RedisTemplate 加載完成"); return template; } }
注意 要使用 @Configuration 標註此類爲一個配置類,固然你可使用 @Component, 可是不推薦,緣由在於 @Component 註解雖然也能夠看成配置類,可是並不會爲其生成CGLIB代理Class,而使用@Configuration,CGLIB會爲其生成代理類,進行性能的提高。
咱們的包開發完畢以後,調用方的application.properties須要進行相關配置以下:
#單機模式redis spring.redis.host=127.0.0.1 spring.redis.port=6379 spring.redis.pool.maxActive=8 spring.redis.pool.maxWait=-1 spring.redis.pool.maxIdle=8 spring.redis.pool.minIdle=0 spring.redis.timeout=10000 spring.redis.password=
若是有密碼的話,配置password便可。
這裏爲單機配置,若是須要支持哨兵集羣,則配置以下,Java代碼不須要改動,只須要變更配置便可。注意 兩種配置不能共存!
#哨兵集羣模式 # database name spring.redis.database=0 # server password 密碼,若是沒有設置可不配 spring.redis.password= # pool settings ...池配置 spring.redis.pool.max-idle=8 spring.redis.pool.min-idle=0 spring.redis.pool.max-active=8 spring.redis.pool.max-wait=-1 # name of Redis server 哨兵監聽的Redis server的名稱 spring.redis.sentinel.master=mymaster # comma-separated list of host:port pairs 哨兵的配置列表 spring.redis.sentinel.nodes=127.0.0.1:26379,127.0.0.1:26479,127.0.0.1:26579
爲了調用方便,咱們定義一個名爲RateLimiter 的註解,內容以下
/** * @author snowalker * @version 1.0 * @date 2018/10/27 1:25 * @className RateLimiter * @desc 限流注解 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RateLimiter { /** * 限流key * @return */ String key() default "rate:limiter"; /** * 單位時間限制經過請求數 * @return */ long limit() default 10; /** * 過時時間,單位秒 * @return */ long expire() default 1; }
該註解明確只用於方法,主要有三個屬性。
expire–incr的值的過時時間,業務中表示限流的單位時間。
定義好註解後,須要開發註解使用的切面,這裏咱們直接使用aspectj進行切面的開發。先看代碼
@Aspect @Component public class RateLimterHandler { private static final Logger LOGGER = LoggerFactory.getLogger(RateLimterHandler.class); @Autowired RedisTemplate redisTemplate; private DefaultRedisScript<Long> getRedisScript; @PostConstruct public void init() { getRedisScript = new DefaultRedisScript<>(); getRedisScript.setResultType(Long.class); getRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("rateLimter.lua"))); LOGGER.info("RateLimterHandler[分佈式限流處理器]腳本加載完成"); }
這裏是注入了RedisTemplate,使用其API進行Lua腳本的調用。
init() 方法在應用啓動時會初始化DefaultRedisScript,並加載Lua腳本,方便進行調用。
PS: Lua腳本放置在classpath下,經過ClassPathResource進行加載。
@Pointcut("@annotation(com.snowalker.shield.ratelimiter.core.annotation.RateLimiter)") public void rateLimiter() {}
這裏咱們定義了一個切點,表示只要註解了 @RateLimiter 的方法,都可以觸發限流操做。
@Around("@annotation(rateLimiter)") public Object around(ProceedingJoinPoint proceedingJoinPoint, RateLimiter rateLimiter) throws Throwable { if (LOGGER.isDebugEnabled()) { LOGGER.debug("RateLimterHandler[分佈式限流處理器]開始執行限流操做"); } Signature signature = proceedingJoinPoint.getSignature(); if (!(signature instanceof MethodSignature)) { throw new IllegalArgumentException("the Annotation @RateLimter must used on method!"); } /** * 獲取註解參數 */ // 限流模塊key String limitKey = rateLimiter.key(); Preconditions.checkNotNull(limitKey); // 限流閾值 long limitTimes = rateLimiter.limit(); // 限流超時時間 long expireTime = rateLimiter.expire(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("RateLimterHandler[分佈式限流處理器]參數值爲-limitTimes={},limitTimeout={}", limitTimes, expireTime); } /** * 執行Lua腳本 */ List<String> keyList = new ArrayList(); // 設置key值爲註解中的值 keyList.add(limitKey); /** * 調用腳本並執行 */ Long result = (Long) redisTemplate.execute(getRedisScript, keyList, expireTime, limitTimes); if (result == 0) { String msg = "因爲超過單位時間=" + expireTime + "-容許的請求次數=" + limitTimes + "[觸發限流]"; LOGGER.debug(msg); return "false"; } if (LOGGER.isDebugEnabled()) { LOGGER.debug("RateLimterHandler[分佈式限流處理器]限流執行結果-result={},請求[正常]響應", result); } return proceedingJoinPoint.proceed(); } }
這段代碼的邏輯爲,獲取 @RateLimiter 註解配置的屬性:key、limit、expire,並經過 redisTemplate.execute(RedisScript script, List keys, Object… args) 方法傳遞給Lua腳本進行限流相關操做,邏輯很清晰。
這裏咱們定義若是腳本返回狀態爲0則爲觸發限流,1表示正常請求。
這裏是咱們整個限流操做的核心,經過執行一個Lua腳本進行限流的操做。腳本內容以下
--獲取KEY local key1 = KEYS[1] local val = redis.call('incr', key1) local ttl = redis.call('ttl', key1) --獲取ARGV內的參數並打印 local expire = ARGV[1] local times = ARGV[2] redis.log(redis.LOG_DEBUG,tostring(times)) redis.log(redis.LOG_DEBUG,tostring(expire)) redis.log(redis.LOG_NOTICE, "incr "..key1.." "..val); if val == 1 then redis.call('expire', key1, tonumber(expire)) else if ttl == -1 then redis.call('expire', key1, tonumber(expire)) end end if val > tonumber(times) then return 0 end return 1
邏輯很通俗,我簡單介紹下。
當過時後,又是新的一輪循環,整個過程是一個原子性的操做,可以保證單位時間不會超過咱們預設的請求閾值。
到這裏咱們即可以在項目中進行測試。
這裏我貼一下核心代碼,咱們定義一個接口,並註解 @RateLimiter(key = 「ratedemo:1.0.0」, limit = 5, expire = 100) 表示模塊ratedemo:sendPayment:1.0.0
在100s內容許經過5個請求,這裏的參數設置是爲了方便看結果。實際中,咱們一般會設置1s內容許經過的次數。
@Controller public class TestController { private static final Logger LOGGER = LoggerFactory.getLogger(TestController.class); @ResponseBody @RequestMapping("ratelimiter") @RateLimiter(key = "ratedemo:1.0.0", limit = 5, expire = 100) public String sendPayment(HttpServletRequest request) throws Exception { return "正常請求"; } }
咱們經過RestClient請求接口,日誌返回以下:
2018-10-28 00:00:00.602 DEBUG 17364 --- [nio-8888-exec-1] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]開始執行限流操做 2018-10-28 00:00:00.688 DEBUG 17364 --- [nio-8888-exec-1] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]限流執行結果-result=1,請求[正常]響應 2018-10-28 00:00:00.860 DEBUG 17364 --- [nio-8888-exec-3] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]開始執行限流操做 2018-10-28 00:00:01.183 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]開始執行限流操做 2018-10-28 00:00:01.520 DEBUG 17364 --- [nio-8888-exec-3] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]限流執行結果-result=1,請求[正常]響應 2018-10-28 00:00:01.521 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]限流執行結果-result=1,請求[正常]響應 2018-10-28 00:00:01.557 DEBUG 17364 --- [nio-8888-exec-5] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]開始執行限流操做 2018-10-28 00:00:01.558 DEBUG 17364 --- [nio-8888-exec-5] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]限流執行結果-result=1,請求[正常]響應 2018-10-28 00:00:01.774 DEBUG 17364 --- [nio-8888-exec-7] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]開始執行限流操做 2018-10-28 00:00:02.111 DEBUG 17364 --- [nio-8888-exec-8] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]開始 2018-10-28 00:00:02.169 DEBUG 17364 --- [nio-8888-exec-7] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]限流執行結果-result=1,請求[正常]響應 2018-10-28 00:00:02.169 DEBUG 17364 --- [nio-8888-exec-8] c.s.s.r.core.handler.RateLimterHandler : 因爲超過單位時間=100-容許的請求次數=5[觸發限流] 2018-10-28 00:00:02.276 DEBUG 17364 --- [io-8888-exec-10] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]開始執行限流操做 2018-10-28 00:00:02.276 DEBUG 17364 --- [io-8888-exec-10] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]參數值爲-limitTimes=5,limitTimeout=100 2018-10-28 00:00:02.278 DEBUG 17364 --- [io-8888-exec-10] c.s.s.r.core.handler.RateLimterHandler : 因爲超過單位時間=100-容許的請求次數=5[觸發限流] 2018-10-28 00:00:02.445 DEBUG 17364 --- [nio-8888-exec-2] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]開始執行限流操做 2018-10-28 00:00:02.445 DEBUG 17364 --- [nio-8888-exec-2] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]參數值爲-limitTimes=5,limitTimeout=100 2018-10-28 00:00:02.446 DEBUG 17364 --- [nio-8888-exec-2] c.s.s.r.core.handler.RateLimterHandler : 因爲超過單位時間=100-容許的請求次數=5[觸發限流] 2018-10-28 00:00:02.628 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]開始執行限流操做 2018-10-28 00:00:02.628 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分佈式限流處理器]參數值爲-limitTimes=5,limitTimeout=100 2018-10-28 00:00:02.629 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler : 因爲超過單位時間=100-容許的請求次數=5[觸發限流]
根據日誌可以看到,正常請求5次後,返回限流觸發,說明咱們的邏輯生效,對前端而言也是能夠看到false標記,代表咱們的Lua腳本限流邏輯是正確的,這裏具體返回什麼標記須要調用方進行明確的定義。
咱們經過Redis的incr及expire功能特性,開發定義了一套基於註解的分佈式限流操做,核心邏輯基於Lua保證了原子性。達到了很好的限流的目的,生產上,能夠基於該特色進行定製本身的限流組件,固然你能夠參考本文的代碼,相信你寫的必定比個人demo更好!