Redis分佈式鎖的使用與實現原理

模擬一個電商裏面下單減庫存的場景。
1.首先在redis里加入商品庫存數量。
java

2.新建一個Spring Boot項目,在pom裏面引入相關的依賴。web

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

3.接下來,在application.yml配置redis屬性和指定應用的端口號:redis

server:
  port: 8090

spring:
  redis:
    host: 192.168.0.60
    port: 6379

4.新建一個Controller類,扣減庫存初版代碼:spring

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Objects;

@RestController
public class StockController {

    private static final Logger logger = LoggerFactory.getLogger(StockController.class);

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/reduceStock")
    public String reduceStock() {
        // 從redis中獲取庫存數量
        int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
        if (stock > 0) {
            // 減庫存
            int restStock = stock - 1;
            // 剩餘庫存再從新設置到redis中
            stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
            logger.info("扣減成功,剩餘庫存:{}", restStock);
        } else {
            logger.info("庫存不足,扣減失敗。");
        }

        return "success";
    }
}

上面初版的代碼存在什麼問題:超賣。假如多個線程同時調用獲取庫存數量的代碼,那麼每一個線程拿到的都是100,判斷庫存都大於0,均可以執行減庫存的操做。假如兩個線程都作減庫存更新緩存,那麼緩存的庫存變成99,但實際上,應該是減掉2個庫存。
那麼不少人的第一個想法是加synchronized同步代碼塊,由於獲取數量和減庫存不是原子性操做,有多個線程來執行代碼的時候,只容許一個線程執行代碼塊裏的代碼。那麼改完的第二版的代碼以下:緩存

@RequestMapping("/reduceStock")
    public String reduceStock() {
        synchronized (this) {
            // 從redis中獲取庫存數量
            int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
            if (stock > 0) {
                // 減庫存
                int restStock = stock - 1;
                // 剩餘庫存再從新設置到redis中
                stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
                logger.info("扣減成功,剩餘庫存:{}", restStock);
            } else {
                logger.info("庫存不足,扣減失敗。");
            }
        }

        return "success";
    }

但使用synchronize存在的問題,就是隻能保證單機環境運行時沒有問題的。但如今的軟件公司裏,基本上都是集羣架構,是多實例,前面使用Nginx作負載均衡,大概架構以下:
網絡

Nginx分發請求,把請求發送到不一樣的Tomcat容器,而synchronize只能保證一個應用是沒有問題的。架構

那麼代碼改進第三版,就是引入redis分佈式鎖,具體代碼以下:併發

@RequestMapping("/reduceStock")
    public String reduceStock() {
        String lockKey = "stockKey";
        try {
            boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
            if (!result) {
                return "errorCode";
            }
            // 從redis中獲取庫存數量
            int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
            if (stock > 0) {
                // 減庫存
                int restStock = stock - 1;
                // 剩餘庫存再從新設置到redis中
                stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
                logger.info("扣減成功,剩餘庫存:{}", restStock);
            } else {
                logger.info("庫存不足,扣減失敗。");
            }
        } finally {
            stringRedisTemplate.delete(lockKey)
        }
        return "success";
    }

若是有一個線程拿到鎖,那麼其餘的線程就會等待。必定要記得在finally裏面把使用完的鎖要刪除掉。不然一旦拋出異常,只有一個線程會一直持有鎖,其餘線程沒有機會獲取。
但若是在執行if (stock > 0) {代碼塊裏的代碼,由於宕機或重啓沒有執行完,也會一直持有鎖,因此,這裏須要把鎖加一個超時時間:app

boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
   stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

但若是上面兩行代碼在中間執行出問題了,設置超時時間的代碼還沒執行,也會出現鎖不能釋放的問題。好在有對應的方法:就是把上面兩行代碼設置成一個原子操做:負載均衡

// 這裏默認設置超時時間爲10秒
   boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);

到此爲止,若是併發量不是很大的話,基本上是沒有問題的。

可是,若是請求的併發量很大,就會出現新的問題:有種比較特殊的狀況,第一個線程執行了15秒,可是執行到10秒鐘的時候,鎖已經失效釋放了,那麼在高併發場景下,第二個線程發現鎖已經失效,那麼它就能夠拿到這把鎖進行加鎖,
假設第二個線程執行須要8秒,它執行到5秒鐘後,此時第一個線程已經執行完了,執行完那一刻,進行了刪除key的操做,可是此時的鎖是第二個線程加的,這樣第一個線程把第二個線程加的鎖刪掉了。
那意味着第三個線程又能夠拿到鎖,第三個線程執行了3秒鐘,此時第二個線程執行完畢,那麼第二個線程把第三個線程的鎖又刪除了。致使鎖失效。
那麼解決的思路就是,我本身加的鎖,不要被別人刪掉。那麼能夠爲每一個進來的請求生成一個惟一的id,做爲分佈式鎖的值,而後在釋放時,判斷一下當前線程的id,是否是和緩存裏的id是否相等。

@RequestMapping("/reduceStock")
    public String reduceStock() {
        String lockKey = "stockKey";
        String id = UUID.randomUUID().toString();
        try {
            // 這裏默認設置超時時間爲30秒
            boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, id, 30, TimeUnit.SECONDS);
            if (!result) {
                return "errorCode";
            }
            // 從redis中獲取庫存數量
            int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
            if (stock > 0) {
                // 減庫存
                int restStock = stock - 1;
                // 剩餘庫存再從新設置到redis中
                stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
                logger.info("扣減成功,剩餘庫存:{}", restStock);
            } else {
                logger.info("庫存不足,扣減失敗。");
            }
        } finally {
            if (id.contentEquals(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(lockKey)))) {
                stringRedisTemplate.delete(lockKey);
            }
        }
        return "success";
    }

到此爲止,一個比較完善的鎖就實現了,能夠應付大部分場景。
固然,上面的代碼還有一個問題,就是一個線程執行時間超過了過時時間,後面的代碼尚未執行完,鎖就已經刪除了,仍是會有些bug存在。解決的方法是給鎖續命的操做。
在當前主線程獲取到鎖之後,能夠fork出一個線程,執行Timer定時器操做,假如默認超時時間爲30秒,那麼定時器每隔10秒去看下這把鎖仍是否存在,存在就說明這個鎖裏的邏輯尚未執行完,那麼就能夠把當前主線程的超時時間從新設置爲30秒;若是不存在,就直接結束掉。

可是上面的邏輯,在高併發場景下,實現比較完善仍是比較困難的。好在如今已經有比較成熟的框架,那就是Redisson。官方地址https://redisson.org。
下面用Redisson來實現分佈式鎖。
首先引入依賴包:

<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.6.5</version>
        </dependency>

配置類:

@Configuration
public class RedissonConfig {
    @Bean
    public Redisson redisson() {
        // 單機模式
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.0.60:6379").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }
}

接下來用redisson重寫上面的減庫存操做:

@Resource
    private Redisson redisson;
    
    @RequestMapping("/reduceStock")
    public String reduceStock() {
        String lockKey = "stockKey";
        RLock redissonLock = redisson.getLock(lockKey);
        try {
            // 加鎖,鎖續命
            redissonLock.lock();
            // 從redis中獲取庫存數量
            int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
            if (stock > 0) {
                // 減庫存
                int restStock = stock - 1;
                // 剩餘庫存再從新設置到redis中
                stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
                logger.info("扣減成功,剩餘庫存:{}", restStock);
            } else {
                logger.info("庫存不足,扣減失敗。");
            }
        } finally {
           redissonLock.unlock();
        }
        return "success";
    }

其實就是三個步驟:獲取鎖,加鎖,釋放鎖。

先簡單看下Redisson的實現原理:

這裏先說一下Redis不少操做使用Lua腳原本實現原子性操做,關於Lua語法,能夠去網上找下相關教程。
使用Lua腳本的好處有:
1.減小網絡開銷,多個命令可使用一次請求完成;
2.實現了原子性操做,Redis會把Lua腳本做爲一個總體去執行;
3.實現事務,Redis自帶的事務功能有限,而Lua腳本實現了事務的常規操做,並且還支持回滾。

可是Lua實際上不會使用不少,若是Lua腳本執行時間過長,由於Redis是單線程,所以會致使堵塞。

最後,說下Redisson分佈式鎖的代碼實現,
找到上面的redissonLock.lock();
lock方法點進去,一直點到RedissonLock類裏面的lockInterruptibly方法:

@Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        // 獲取線程id
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

重點看下tryAcquire方法,把線程id做爲一個參數傳遞進來,在這個方法裏面,找到tryLockInnerAsync方法點進去,

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

這裏就是一堆Lua腳本,先看第一個if命令,先去判斷 KEYS[1](就是對應的鎖key的名字),若是不存在,在hashmap裏,設置一個屬性爲線程id,值爲1,再把map的過時時間設置爲internalLockLeaseTime,這個值默認是30秒,

上面的操做對應的命令是:

hset keyname id:thread 1
pexpire keyname 30

而後返回nil,至關於null,那程序return了。
另外,Redisson還支持重入鎖,那第二個if就是執行重入鎖的操做,會判斷鎖是否存在,而且傳入的線程id是不是當前線程的id,若果是,支持重複加鎖進行自增操做;
若是是其餘線程調用lock方法,上面兩個if判斷不會走,會返回鎖剩餘過時時間。

接着返回到tryAcquireAsync方法裏面往下看:
其實是加了一個監聽器,在監聽器裏面有個很重要的方法scheduleExpirationRenewal,一看這個名字就能大概猜出是什麼功能,
裏面有個定時任務的輪詢,

private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                // 判斷傳遞進來的線程id是不是咱們以前主線程設置的id,若是是,則增長續命,增長30秒。
                RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            task.cancel();
        }
    }

接着推遲10秒鐘(internalLockLeaseTime / 3),再執行續命操做邏輯。

到最後,再回到lockInterruptibly方法,
若是ttl 爲null,說明加鎖成功了,就返回null,那若是其餘線程的話,就會返回剩餘過時時間,那麼就會進入到while死循環裏,一直嘗試加鎖,調用tryAcquire方法,在瑣失效之後,再會嘗試獲取加鎖。

到此爲止,分析完畢。

相關文章
相關標籤/搜索