Redis+RocketMQ實現併發條件下庫存的扣減/增長(秒殺庫存控制)

前言

前面個人博客介紹了有關分佈式鎖,分佈式事務相關的問題以及解決方案,可是仍是不能解決併發下單,扣減的問題,併發的時候因爲數據庫的隔離級別/樂觀鎖/悲觀鎖...老是會出現一些問題。最近集成了一套方案解決此類型問題,並能夠適用於通常狀況的秒殺方案。歡迎拍磚...html

情景分析

前提條件:
商品 P 庫存數量爲 100
用戶A,用戶B 同時分別想買P,而且A,B分別買一個
數據庫有版本號控制樂觀鎖
指望結果:
用戶A,B都買到商品P,而且商品P的庫存變爲98redis

分析:
1.之前碰到這類問題的時候,咱們說,能夠添加分佈式鎖(也就是悲觀鎖),將商品P的id鎖住,而後A,B提交訂
單的時候分別預扣商品P的庫存就行了(如方案一)。
2.是的,1分析的有道理,咱們的核心思路確實要將並行控制爲串行,可是事與願違。咱們仔細想想:
  若是,A,B想買的不只僅P一個商品,而是P,Q兩個商品呢?咱們該怎麼鎖?一次性將P,Q都鎖住?顯然不是
  很現實,而且實際上提交訂單的時候都是從購物車下單,一個購物車裏包括多個商品去下單也是很常見的,
  而且還有以下邏輯你們仔細思考:
  
  用戶A下單->用戶A讀到庫存爲100
  用戶A預扣庫存,判斷剩餘庫存是否>=0,100-1=99>0,則A預扣
  用戶A的下單流程....
  此時A的事務沒有提交,B下單了:
  用戶B下單->用戶B讀到庫存爲100(這裏讀到100,是由於A還有其餘邏輯在執行,還未提交事務,B讀未提交了!)
  用戶B預扣庫存,判斷剩餘庫存是否>=0,100-1=99>0,則B預扣
  用戶B的下單流程....   
  
  最後不論A/B誰先提交事務,後面提交事務的就不會扣減庫存成功。由於版本號不一致(就算沒有樂觀鎖,
  修改的結果也會錯,並且錯的更離譜)。最終的結局就是庫存是99
  
3.解決方案
  目前控制庫存的方案有不少種,我這邊介紹經過redis預減庫存,經過mq發送消息同步的扣減數據庫庫存的
  方案。

方案一數據庫

@Transactional
public void createOrder(...){
    //1.校驗,扣減庫存
    check(Item item);
    
    //2.建立訂單
    
    //3.建立支付單

}

@RedisLock("pId")
public void check(Item item){
}

解決方案僞代碼緩存

//固然,咱們管理平臺新建商品時須要初始化到redis庫存裏,
  //這裏暫時就不介紹了
  
  
  //下單部分
  @Transactional
    public void createOrder(...){
        //1.校驗,扣減庫存
        check(Item item);
        
        //2.建立訂單
        
        //3.建立支付單
        
        //4.redis扣減庫存
    
    }
    
    //支付回調部分
    @Transactional
    public void wxCall(...){
        //1.校驗訂單狀態
        
        //2.修改訂單,支付單狀態
        
        //3.mq發送全局順序消息 扣減庫存

    }
    
    
    //取消支付部分
    @Transactional
    public void cancelOrder(...){
        //1.校驗訂單狀態
        
        //2.修改訂單,支付單狀態
        
        //3.redis回退庫存

    }
    
    
    //退貨/退款部分
       @Transactional
    public void returnOrder(...){
        //1.校驗訂單狀態
        
        //2.修改訂單,支付單狀態
        
        //3.redis回退庫存
        
        //4.mq發送全局順序消息

    }
代碼部分
  • 實現思路服務器

    • 咱們使用redis的lua腳原本實現扣減庫存
    • 因爲是分佈式環境下因此還須要一個分佈式鎖來控制只能有一個服務去初始化庫存
    • 須要提供一個回調函數,在初始化庫存的時候去調用這個函數獲取初始化庫存
  • 初始化庫存回調函數(IStockCallback )併發

/**
 * 獲取庫存回調
 * create by liuliang
 * on 2019-11-13  10:45
 */
public interface IStockCallback {
    /**
     * 獲取庫存
     * @return
     */
    String getStock();
}
  • 分佈式鎖控制初始化庫存
/**
 *
 * Redis分佈式鎖
 * 使用 SET resource-name anystring NX EX max-lock-time 實現
 * <p>
 * 該方案在 Redis 官方 SET 命令頁有詳細介紹。
 * http://doc.redisfans.com/string/set.html
 * <p>
 * 在介紹該分佈式鎖設計以前,咱們先來看一下在從 Redis 2.6.12 開始 SET 提供的新特性,
 * 命令 SET key value [EX seconds] [PX milliseconds] [NX|XX],其中:
 * <p>
 * EX seconds — 以秒爲單位設置 key 的過時時間;
 * PX milliseconds — 以毫秒爲單位設置 key 的過時時間;
 * NX — 將key 的值設爲value ,當且僅當key 不存在,等效於 SETNX。
 * XX — 將key 的值設爲value ,當且僅當key 存在,等效於 SETEX。
 * <p>
 * 命令 SET resource-name anystring NX EX max-lock-time 是一種在 Redis 中實現鎖的簡單方法。
 * <p>
 * 客戶端執行以上的命令:
 * <p>
 * 若是服務器返回 OK ,那麼這個客戶端得到鎖。
 * 若是服務器返回 NIL ,那麼客戶端獲取鎖失敗,能夠在稍後再重試。
 *
 *
 * create by liuliang
 * on 2019-11-13  10:49
 */

public class RedisStockLock {

    private static Logger logger = LoggerFactory.getLogger(RedisStockLock.class);

    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 將key 的值設爲value ,當且僅當key 不存在,等效於 SETNX。
     */
    public static final String NX = "NX";

    /**
     * seconds — 以秒爲單位設置 key 的過時時間,等效於EXPIRE key seconds
     */
    public static final String EX = "EX";

    /**
     * 調用set後的返回值
     */
    public static final String OK = "OK";

    /**
     * 默認請求鎖的超時時間(ms 毫秒)
     */
    private static final long TIME_OUT = 100;

    /**
     * 默認鎖的有效時間(s)
     */
    public static final int EXPIRE = 60;

    /**
     * 解鎖的lua腳本
     */
    public static final String UNLOCK_LUA;

    static {
        StringBuilder sb = new StringBuilder();
        sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
        sb.append("then ");
        sb.append("    return redis.call(\"del\",KEYS[1]) ");
        sb.append("else ");
        sb.append("    return 0 ");
        sb.append("end ");
        UNLOCK_LUA = sb.toString();
    }

    /**
     * 鎖標誌對應的key
     */
    private String lockKey;

    /**
     * 記錄到日誌的鎖標誌對應的key
     */
    private String lockKeyLog = "";

    /**
     * 鎖對應的值
     */
    private String lockValue;

    /**
     * 鎖的有效時間(s)
     */
    private int expireTime = EXPIRE;

    /**
     * 請求鎖的超時時間(ms)
     */
    private long timeOut = TIME_OUT;

    /**
     * 鎖標記
     */
    private volatile boolean locked = false;

    final Random random = new Random();

    /**
     * 使用默認的鎖過時時間和請求鎖的超時時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     */
    public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey + "_lock";
    }

    /**
     * 使用默認的請求鎖的超時時間,指定鎖的過時時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param expireTime    鎖的過時時間(單位:秒)
     */
    public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime) {
        this(redisTemplate, lockKey);
        this.expireTime = expireTime;
    }

    /**
     * 使用默認的鎖的過時時間,指定請求鎖的超時時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param timeOut       請求鎖的超時時間(單位:毫秒)
     */
    public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, long timeOut) {
        this(redisTemplate, lockKey);
        this.timeOut = timeOut;
    }

    /**
     * 鎖的過時時間和請求鎖的超時時間都是用指定的值
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param expireTime    鎖的過時時間(單位:秒)
     * @param timeOut       請求鎖的超時時間(單位:毫秒)
     */
    public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime, long timeOut) {
        this(redisTemplate, lockKey, expireTime);
        this.timeOut = timeOut;
    }

    /**
     * 嘗試獲取鎖 超時返回
     *
     * @return
     */
    public boolean tryLock() {
        // 生成隨機key
        lockValue = UUID.randomUUID().toString();
        // 請求鎖超時時間,納秒
        long timeout = timeOut * 1000000;
        // 系統當前時間,納秒
        long nowTime = System.nanoTime();
        while ((System.nanoTime() - nowTime) < timeout) {
            if (OK.equalsIgnoreCase(this.set(lockKey, lockValue, expireTime))) {
                locked = true;
                // 上鎖成功結束請求
                return locked;
            }

            // 每次請求等待一段時間
            seleep(10, 50000);
        }
        return locked;
    }

    /**
     * 嘗試獲取鎖 當即返回
     *
     * @return 是否成功得到鎖
     */
    public boolean lock() {
        lockValue = UUID.randomUUID().toString();
        //不存在則添加 且設置過時時間(單位ms)
        String result = set(lockKey, lockValue, expireTime);
        locked = OK.equalsIgnoreCase(result);
        return locked;
    }

    /**
     * 以阻塞方式的獲取鎖
     *
     * @return 是否成功得到鎖
     */
    public boolean lockBlock() {
        lockValue = UUID.randomUUID().toString();
        while (true) {
            //不存在則添加 且設置過時時間(單位ms)
            String result = set(lockKey, lockValue, expireTime);
            if (OK.equalsIgnoreCase(result)) {
                locked = true;
                return locked;
            }

            // 每次請求等待一段時間
            seleep(10, 50000);
        }
    }

    /**
     * 解鎖
     * <p>
     * 能夠經過如下修改,讓這個鎖實現更健壯:
     * <p>
     * 不使用固定的字符串做爲鍵的值,而是設置一個不可猜想(non-guessable)的長隨機字符串,做爲口令串(token)。
     * 不使用 DEL 命令來釋放鎖,而是發送一個 Lua 腳本,這個腳本只在客戶端傳入的值和鍵的口令串相匹配時,纔對鍵進行刪除。
     * 這兩個改動能夠防止持有過時鎖的客戶端誤刪現有鎖的狀況出現。
     */
    public Boolean unlock() {
        // 只有加鎖成功而且鎖還有效纔去釋放鎖
        // 只有加鎖成功而且鎖還有效纔去釋放鎖
        if (locked) {
            return (Boolean) redisTemplate.execute(new RedisCallback<Boolean>() {
                @Override
                public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                    Object nativeConnection = connection.getNativeConnection();
                    Long result = 0L;

                    List<String> keys = new ArrayList<>();
                    keys.add(lockKey);
                    List<String> values = new ArrayList<>();
                    values.add(lockValue);

                    // 集羣模式
                    if (nativeConnection instanceof JedisCluster) {
                        result = (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, values);
                    }

                    // 單機模式
                    if (nativeConnection instanceof Jedis) {
                        result = (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, values);
                    }

                    if (result == 0 && !StringUtils.isEmpty(lockKeyLog)) {
                        logger.info("Redis分佈式鎖,解鎖{}失敗!解鎖時間:{}", lockKeyLog, System.currentTimeMillis());
                    }

                    locked = result == 0;
                    return result == 1;
                }
            });
        }

        return true;
    }

    /**
     * 獲取鎖狀態
     * @Title: isLock
     * @Description: TODO
     * @return
     * @author yuhao.wang
     */
    public boolean isLock() {

        return locked;
    }

    /**
     * 重寫redisTemplate的set方法
     * <p>
     * 命令 SET resource-name anystring NX EX max-lock-time 是一種在 Redis 中實現鎖的簡單方法。
     * <p>
     * 客戶端執行以上的命令:
     * <p>
     * 若是服務器返回 OK ,那麼這個客戶端得到鎖。
     * 若是服務器返回 NIL ,那麼客戶端獲取鎖失敗,能夠在稍後再重試。
     *
     * @param key     鎖的Key
     * @param value   鎖裏面的值
     * @param seconds 過去時間(秒)
     * @return
     */
    private String set(final String key, final String value, final long seconds) {
        Assert.isTrue(!StringUtils.isEmpty(key), "key不能爲空");
        return (String) redisTemplate.execute(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                String result = null;
                if (nativeConnection instanceof JedisCommands) {
                    result = ((JedisCommands) nativeConnection).set(key, value, NX, EX, seconds);
                }

                if (!StringUtils.isEmpty(lockKeyLog) && !StringUtils.isEmpty(result)) {
                    logger.info("獲取鎖{}的時間:{}", lockKeyLog, System.currentTimeMillis());
                }

                return result;
            }
        });
    }

    /**
     * @param millis 毫秒
     * @param nanos  納秒
     * @Title: seleep
     * @Description: 線程等待時間
     * @author yuhao.wang
     */
    private void seleep(long millis, int nanos) {
        try {
            Thread.sleep(millis, random.nextInt(nanos));
        } catch (InterruptedException e) {
            logger.info("獲取分佈式鎖休眠被中斷:", e);
        }
    }

    public String getLockKeyLog() {
        return lockKeyLog;
    }

    public void setLockKeyLog(String lockKeyLog) {
        this.lockKeyLog = lockKeyLog;
    }

    public int getExpireTime() {
        return expireTime;
    }

    public void setExpireTime(int expireTime) {
        this.expireTime = expireTime;
    }

    public long getTimeOut() {
        return timeOut;
    }

    public void setTimeOut(long timeOut) {
        this.timeOut = timeOut;
    }


}
  • 庫存扣減服務
/**
 * 扣庫存
 * create by liuliang
 * on 2019-11-13  10:46
 */
@Service
public class StockComponent {

    Logger logger = LoggerFactory.getLogger(StockComponent.class);

    /**
     * 不限庫存
     */
    public static final long UNINITIALIZED_STOCK = -3L;

    /**
     * Redis 客戶端
     */
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 執行扣庫存的腳本
     */
    public static final String STOCK_LUA;




    static {
        /**
         *
         * @desc 扣減庫存Lua腳本
         * 庫存(stock)-1:表示不限庫存
         * 庫存(stock)0:表示沒有庫存
         * 庫存(stock)大於0:表示剩餘庫存
         *
         * @params 庫存key
         * @return
         *         -3:庫存未初始化
         *         -2:庫存不足
         *         -1:不限庫存
         *         大於等於0:剩餘庫存(扣減以後剩餘的庫存)
         *         redis緩存的庫存(value)是-1表示不限庫存,直接返回-1
         */
        StringBuilder sb = new StringBuilder();
        sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
        sb.append("    local stock = tonumber(redis.call('get', KEYS[1]));");
        sb.append("    local num = tonumber(ARGV[1]);");
        sb.append("    if (stock == -1) then");
        sb.append("        return -1;");
        sb.append("    end;");
        sb.append("    if (stock >= num) then");
        sb.append("        return redis.call('incrby', KEYS[1], 0 - num);");
        sb.append("    end;");
        sb.append("    return -2;");
        sb.append("end;");
        sb.append("return -3;");
        STOCK_LUA = sb.toString();
    }

    /**
     * @param key           庫存key
     * @param expire        庫存有效時間,單位秒
     * @param num           扣減數量
     * @param stockCallback 初始化庫存回調函數
     * @return -2:庫存不足; -1:不限庫存; 大於等於0:扣減庫存以後的剩餘庫存
     */
    public long stock(String key, long expire, int num, IStockCallback stockCallback) {
        long stock = stock(key, num);
        // 初始化庫存
        if (stock == UNINITIALIZED_STOCK) {
            RedisStockLock redisLock = new RedisStockLock(redisTemplate, key);
            try {
                // 獲取鎖
                if (redisLock.tryLock()) {
                    // 雙重驗證,避免併發時重複回源到數據庫
                    stock = stock(key, num);
                    if (stock == UNINITIALIZED_STOCK) {
                        // 獲取初始化庫存
                        final String initStock = stockCallback.getStock();
                        // 將庫存設置到redis
                        redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                        // 調一次扣庫存的操做
                        stock = stock(key, num);
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                redisLock.unlock();
            }

        }
        return stock;
    }

    /**
     * 加庫存(還原庫存)
     *
     * @param key    庫存key
     * @param num    庫存數量
     * @return
     */
    public long addStock(String key, int num) {

        return addStock(key, null, num);
    }

    /**
     * 加庫存
     *
     * @param key    庫存key
     * @param expire 過時時間(秒)
     * @param num    庫存數量
     * @return
     */
    public long addStock(String key, Long expire, int num) {
        boolean hasKey = redisTemplate.hasKey(key);
        // 判斷key是否存在,存在就直接更新
        if (hasKey) {
            return redisTemplate.opsForValue().increment(key, num);
        }

        Assert.notNull(expire,"初始化庫存失敗,庫存過時時間不能爲null");
        RedisStockLock redisLock = new RedisStockLock(redisTemplate, key);
        try {
            if (redisLock.tryLock()) {
                // 獲取到鎖後再次判斷一下是否有key
                hasKey = redisTemplate.hasKey(key);
                if (!hasKey) {
                    // 初始化庫存
                    redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            redisLock.unlock();
        }

        return num;
    }

    /**
     * 獲取庫存
     *
     * @param key 庫存key
     * @return -1:不限庫存; 大於等於0:剩餘庫存
     */
    public int getStock(String key) {
        Integer stock = (Integer) redisTemplate.opsForValue().get(key);
        return stock == null ? -1 : stock;
    }

    /**
     * 扣庫存
     *
     * @param key 庫存key
     * @param num 扣減庫存數量
     * @return 扣減以後剩餘的庫存【-3:庫存未初始化; -2:庫存不足; -1:不限庫存; 大於等於0:扣減庫存以後的剩餘庫存】
     */
    private Long stock(String key, int num) {
        // 腳本里的KEYS參數
        List<String> keys = new ArrayList<>();
        keys.add(key);
        // 腳本里的ARGV參數
        List<String> args = new ArrayList<>();
        args.add(Integer.toString(num));

        long result = redisTemplate.execute(new RedisCallback<Long>() {
            @Override
            public Long doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                // 集羣模式和單機模式雖然執行腳本的方法同樣,可是沒有共同的接口,因此只能分開執行
                // 集羣模式
                if (nativeConnection instanceof JedisCluster) {
                    return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
                }

                // 單機模式
                else if (nativeConnection instanceof Jedis) {
                    return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
                }
                return UNINITIALIZED_STOCK;
            }
        });
        return result;
    }
}
  • 庫存操做對外接口
/**
 *  庫存操做對外接口
 *
 * create by liuliang
 * on 2019-11-13  11:00
 */
@Slf4j
@Service
public class StockService {

    @Autowired
    private StockComponent stockComponent;

    @Autowired
    private ProductSkuMapper skuMapper;

    @Autowired
    private ProductMapper productMapper;

    @Autowired
    private RocketMQConfig rocketMQConfig;

    @Autowired
    private PresentRocketProducer presentRocketProducer;

    private static final String REDIS_STOCK_KEY="redis_key:stock:";

    /**
     * 扣減庫存
     * @param skuId
     * @param num
     * @return
     */
    public Boolean stock(String skuId,Integer num) {
        // 庫存ID
        String redisKey = REDIS_STOCK_KEY + skuId;
        long stock = stockComponent.stock(redisKey, 60 * 60, num, () -> initStock(skuId));
        if(stock < 0){//異常,庫存不足
            log.info("庫存不足........");
            ProductSku productSku = skuMapper.selectById(skuId);
            throw new MallException(MsRespCode.STOCK_NUMBER_ERROR,new Object[]{productMapper.selectById(productSku.getProductId()).getTitle()});
        }
        return stock >= 0 ;
    }



    /**
     * 添加redis - sku庫存數量
     * @param skuId
     * @param num
     * @return
     */
    public Long addStock(String skuId ,Integer num) {
        // 庫存ID
        String redisKey = REDIS_STOCK_KEY + skuId;
        long l = stockComponent.addStock(redisKey, num);
        return l;
    }


    /**
     * 獲取初始的庫存
     *
     * @return
     */
    private String initStock(String skuId) {
        //初始化庫存
        ProductSku productSku = skuMapper.selectById(skuId);
        return productSku.getStockNumber()+"";
    }

    /**
     * 獲取sku庫存
     * @param skuId
     * @return
     */
    public Integer getStock(String skuId) {

        // 庫存ID
        String redisKey = REDIS_STOCK_KEY + skuId;

        return stockComponent.getStock(redisKey);
    }
}

redis序列化app

/**
 * create by liuliang
 * on 2019-11-13  11:29
 */
@Configuration
public class RedisConfig {
    /**
     * 重寫Redis序列化方式,使用Json方式:
     * 當咱們的數據存儲到Redis的時候,咱們的鍵(key)和值(value)都是經過Spring提供的Serializer序列化到數據庫的。RedisTemplate默認使用的是JdkSerializationRedisSerializer,StringRedisTemplate默認使用的是StringRedisSerializer。
     * Spring Data JPA爲咱們提供了下面的Serializer:
     * GenericToStringSerializer、Jackson2JsonRedisSerializer、JacksonJsonRedisSerializer、JdkSerializationRedisSerializer、OxmSerializer、StringRedisSerializer。
     * 在此咱們將本身配置RedisTemplate並定義Serializer。
     *
     * @param redisConnectionFactory
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        // 設置值(value)的序列化採用Jackson2JsonRedisSerializer。
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        // 設置鍵(key)的序列化採用StringRedisSerializer。
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}
相關文章
相關標籤/搜索