Redis實現的分佈式鎖和分佈式限流

  隨着如今分佈式愈來愈廣泛,分佈式鎖也十分經常使用,個人上一篇文章解釋了使用zookeeper實現分佈式鎖(傳送門),本次我們說一下如何用Redis實現分佈式鎖和分佈限流。html

  Redis有個事務鎖,就是以下的命令,這個命令的含義是將一個value設置到一個key中,若是不存在將會賦值而且設置超時時間爲30秒,如何這個key已經存在了,則不進行設置。java

SET key value NX PX 30000

  這個事務鎖很好的解決了兩個單獨的命令,一個設置set key value nx,即該key不存在的話將對其進行設置,另外一個是expire key seconds,設置該key的超時時間。咱們能夠想一下,若是這兩個命令用程序單獨使用會存在什麼問題:git

  1. 若是一個set key的命令設置了key,而後程序異常了,expire時間沒有設置,那麼這個key會一直鎖住。github

  2. 若是一個set key時出現了異常,可是直接執行了expire,過了一下子以後另外一個進行set key,還沒怎麼執行代碼,結果key過時了,別的線程也進入了鎖。web

  還有不少出問題的可能點,這裏咱們就不討論了,下面我們來看看如何實現吧。本文使用的Spring Boot 2.x + Spring data redis + Swagger +lombok + AOP + lua腳本。在實現的過程當中遇到了不少問題,都一一解決實現了。依賴的POM文件以下:redis

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hqs</groupId>
    <artifactId>distributedlock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>distributedlock</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  使用了兩個lua腳本,一個用於執行lock,另外一個執行unlock。我們簡單看一下,lock腳本就是採用Redis事務執行的set nx px命令,其實還有set nx ex命令,這個ex命令是採用秒的方式進行設置過時時間,這個px是採用毫秒的方式設置過時時間。value須要使用一個惟一的值,這個值在解鎖的時候須要判斷是否一致,若是一致的話就進行解鎖。這個也是官方推薦的方法。另外在lock的地方我設置了一個result,用於輸出測試時的結果,這樣就能夠結合程序去進行debug了。spring

local expire = tonumber(ARGV[2])
local ret = redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', expire)
local strret = tostring(ret)
--用於查看結果,我本機獲取鎖成功後程序返回隨機結果"table: 0x7fb4b3700fe0",不然返回"false"
redis.call('set', 'result', strret)
if strret == 'false' then
    return false
else
    return true
end
redis.call('del', 'result')
if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('del', KEYS[1])
else
    return 0
end

  來看下代碼,主要寫了兩個方法,一個是用與鎖另一個是用於結解鎖。這塊須要注意的是使用RedisTemplate<String, String>,這塊意味着key和value必定都是String的,我在使用的過程當中就出現了一些錯誤。首先初始化兩個腳本到程序中,而後調用執行腳本。apache

package com.hqs.distributedlock.lock;


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import java.util.Collections;

@Slf4j
@Component
public class DistributedLock {

    //注意RedisTemplate用的String,String,後續全部用到的key和value都是String的
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    RedisScript<Boolean> lockScript;

    @Autowired
    RedisScript<Long> unlockScript;

    public Boolean distributedLock(String key, String uuid, String secondsToLock) {
        Boolean locked = false;
        try {
            String millSeconds = String.valueOf(Integer.parseInt(secondsToLock) * 1000);
            locked =redisTemplate.execute(lockScript, Collections.singletonList(key), uuid, millSeconds);
            log.info("distributedLock.key{}: - uuid:{}: - timeToLock:{} - locked:{} - millSeconds:{}",
                    key, uuid, secondsToLock, locked, millSeconds);
        } catch (Exception e) {
            log.error("error", e);
        }
        return locked;
    }

    public void distributedUnlock(String key, String uuid) {
        Long unlocked = redisTemplate.execute(unlockScript, Collections.singletonList(key),
                uuid);
        log.info("distributedLock.key{}: - uuid:{}: - unlocked:{}", key, uuid, unlocked);

    }

}

  還有一個就是腳本定義的地方須要注意,返回的結果集必定是Long, Boolean,List, 一個反序列化的值。這塊要注意。app

package com.hqs.distributedlock.config;


import com.sun.org.apache.xpath.internal.operations.Bool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.scripting.ScriptSource;
import org.springframework.scripting.support.ResourceScriptSource;


@Configuration
@Slf4j
public class BeanConfiguration {

    /**
     * The script resultType should be one of
     * Long, Boolean, List, or a deserialized value type. It can also be null if the script returns
     * a throw-away status (specifically, OK).
     * @return
     */
    @Bean
    public RedisScript<Long> limitScript() {
        RedisScript redisScript = null;
        try {
            ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/limit.lua"));
//            log.info("script:{}", scriptSource.getScriptAsString());
            redisScript = RedisScript.of(scriptSource.getScriptAsString(), Long.class);
        } catch (Exception e) {
            log.error("error", e);
        }
        return redisScript;

    }

    @Bean
    public RedisScript<Boolean> lockScript() {
        RedisScript<Boolean> redisScript = null;
        try {
            ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/lock.lua"));
            redisScript = RedisScript.of(scriptSource.getScriptAsString(), Boolean.class);
        } catch (Exception e) {
            log.error("error" , e);
        }
        return redisScript;
    }

    @Bean
    public RedisScript<Long> unlockScript() {
        RedisScript<Long> redisScript = null;
        try {
            ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/unlock.lua"));
            redisScript = RedisScript.of(scriptSource.getScriptAsString(), Long.class);
        } catch (Exception e) {
            log.error("error" , e);
        }
        return redisScript;
    }


    @Bean
    public RedisScript<Long> limitAnother() {
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("/scripts/limit.lua")));
        redisScript.setResultType(Long.class);
        return redisScript;
    }

}

  好了,這塊就寫好了,而後寫好controller類準備測試。dom

  

   @PostMapping("/distributedLock")
    @ResponseBody
    public String distributedLock(String key, String uuid, String secondsToLock, String userId) throws Exception{
//        String uuid = UUID.randomUUID().toString();
        Boolean locked = false;
        try {
            locked = lock.distributedLock(key, uuid, secondsToLock);
            if(locked) {
                log.info("userId:{} is locked - uuid:{}", userId, uuid);
                log.info("do business logic");
                TimeUnit.MICROSECONDS.sleep(3000);
            } else {
                log.info("userId:{} is not locked - uuid:{}", userId, uuid);
            }
        } catch (Exception e) {
            log.error("error", e);
        } finally {
            if(locked) {
                lock.distributedUnlock(key, uuid);
            }
        }

        return "ok";
    }

  我也寫了一個測試類,用於測試和輸出結果, 使用100個線程,而後鎖的時間設置10秒,controller裏邊須要休眠3秒模擬業務執行。

    @Test
    public void distrubtedLock() {
        String url = "http://localhost:8080/distributedLock";
        String uuid = "abcdefg";
//        log.info("uuid:{}", uuid);
        String key = "redisLock";
        String secondsToLive = "10";

        for(int i = 0; i < 100; i++) {
            final int userId = i;
            new Thread(() -> {
                MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
                params.add("uuid", uuid);
                params.add("key", key);
                params.add("secondsToLock", secondsToLive);
                params.add("userId", String.valueOf(userId));
                String result = testRestTemplate.postForObject(url, params, String.class);
                System.out.println("-------------" + result);
            }
            ).start();
        }

    }

  獲取鎖的地方就會執行do business logic, 而後會有部分線程獲取到鎖並執行業務,執行完業務的就會釋放鎖。

  分佈式鎖就實現好了,接下來實現分佈式限流。先看一下limit的lua腳本,須要給腳本傳兩個值,一個值是限流的key,一個值是限流的數量。獲取當前key,而後判斷其值是否爲nil,若是爲nil的話須要賦值爲0,而後進行加1而且和limit進行比對,若是大於limt即返回0,說明限流了,若是小於limit則須要使用Redis的INCRBY key 1,就是將key進行加1命令。而且設置超時時間,超時時間是秒,而且若是有須要的話這個秒也是能夠用參數進行設置。

--lua 下標從 1 開始
-- 限流 key
local key = KEYS[1]
-- 限流大小
local limit = tonumber(ARGV[1])

-- 獲取當前流量大小
local curentLimit = tonumber(redis.call('get', key) or "0")

if curentLimit + 1 > limit then
    -- 達到限流大小 返回
    return 0;
else
    -- 沒有達到閾值 value + 1
    redis.call("INCRBY", key, 1)
    -- EXPIRE後邊的單位是秒
    redis.call("EXPIRE", key, 10)
    return curentLimit + 1
end

  執行limit的腳本和執行lock的腳本相似。

package com.hqs.distributedlock.limit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import java.util.Collections;

/**
 * @author huangqingshi
 * @Date 2019-01-17
 */
@Slf4j
@Component
public class DistributedLimit {

    //注意RedisTemplate用的String,String,後續全部用到的key和value都是String的
    @Autowired
    private RedisTemplate<String, String> redisTemplate;


    @Autowired
    RedisScript<Long> limitScript;

    public Boolean distributedLimit(String key, String limit) {
        Long id = 0L;

        try {
            id = redisTemplate.execute(limitScript, Collections.singletonList(key),
                    limit);
            log.info("id:{}", id);
        } catch (Exception e) {
            log.error("error", e);
        }

        if(id == 0L) {
            return false;
        } else {
            return true;
        }
    }

}

  接下來我們寫一個限流注解,而且設置註解的key和限流的大小:

package com.hqs.distributedlock.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 自定義limit註解
 * @author huangqingshi
 * @Date 2019-01-17
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistriLimitAnno {
    public String limitKey() default "limit";
    public int limit() default 1;
}

  而後對註解進行切面,在切面中判斷是否超過limit,若是超過limit的時候就須要拋出異常exceeded limit,不然正常執行。

package com.hqs.distributedlock.aspect;

import com.hqs.distributedlock.annotation.DistriLimitAnno;
import com.hqs.distributedlock.limit.DistributedLimit;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

/**
 * @author huangqingshi
 * @Date 2019-01-17
 */
@Slf4j
@Aspect
@Component
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class LimitAspect {

    @Autowired
    DistributedLimit distributedLimit;

    @Pointcut("@annotation(com.hqs.distributedlock.annotation.DistriLimitAnno)")
    public void limit() {};

    @Before("limit()")
    public void beforeLimit(JoinPoint joinPoint) throws Exception {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        DistriLimitAnno distriLimitAnno = method.getAnnotation(DistriLimitAnno.class);
        String key = distriLimitAnno.limitKey();
        int limit = distriLimitAnno.limit();
        Boolean exceededLimit = distributedLimit.distributedLimit(key, String.valueOf(limit));
        if(!exceededLimit) {
            throw new RuntimeException("exceeded limit");
        }
    }

}

  由於有拋出異常,這裏我弄了一個統一的controller錯誤處理,若是controller出現Exception的時候都須要走這塊異常。若是是正常的RunTimeException的時候獲取一下,不然將異常獲取一下而且輸出。

package com.hqs.distributedlock.util;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.context.request.NativeWebRequest;

import javax.servlet.http.HttpServletRequest;
import java.util.HashMap;
import java.util.Map;

/**
 * @author huangqingshi
 * @Date 2019-01-17
 * 統一的controller錯誤處理
 */
@Slf4j
@ControllerAdvice
public class UnifiedErrorHandler {
    private static Map<String, String> res = new HashMap<>(2);

    @ExceptionHandler(value = Exception.class)
    @ResponseStatus(HttpStatus.OK)
    @ResponseBody
    public Object processException(HttpServletRequest req, Exception e) {
        res.put("url", req.getRequestURL().toString());

        if(e instanceof RuntimeException) {
            res.put("mess", e.getMessage());
        } else {
            res.put("mess", "sorry error happens");
        }
        return res;
    }

}

  好了,接下來將註解寫到自定義的controller上,limit的大小爲10,也就是10秒鐘內限制10次訪問。

@PostMapping("/distributedLimit")
    @ResponseBody
    @DistriLimitAnno(limitKey="limit", limit = 10)
    public String distributedLimit(String userId) {
        log.info(userId);
        return "ok";
    }

  也是來一段Test方法來跑,老方式100個線程開始跑,只有10次,其餘的都是limit。沒有問題。

  總結一下,此次實現採用了使用lua腳本和Redis實現了鎖和限流,可是真實使用的時候還須要多測試,另外若是這次Redis也是採用的單機實現方法,使用集羣的時候可能須要改造一下。關於鎖這塊其實Reids他們本身也實現了RedLock, java實現的版本Redission。也有不少公司使用了,功能很是強大。各類場景下都用到了。

  若有問題,歡迎拍磚~

相關文章
相關標籤/搜索