前言
前面個人博客介紹了有關分佈式鎖,分佈式事務相關的問題以及解決方案,可是仍是不能解決併發下單,扣減的問題,併發的時候因爲數據庫的隔離級別/樂觀鎖/悲觀鎖...老是會出現一些問題。最近集成了一套方案解決此類型問題,並能夠適用於通常狀況的秒殺方案。歡迎拍磚...html
情景分析
前提條件:
商品 P 庫存數量爲 100
用戶A,用戶B 同時分別想買P,而且A,B分別買一個
數據庫有版本號控制樂觀鎖
指望結果:
用戶A,B都買到商品P,而且商品P的庫存變爲98redis
分析: 1.之前碰到這類問題的時候,咱們說,能夠添加分佈式鎖(也就是悲觀鎖),將商品P的id鎖住,而後A,B提交訂 單的時候分別預扣商品P的庫存就行了(如方案一)。 2.是的,1分析的有道理,咱們的核心思路確實要將並行控制爲串行,可是事與願違。咱們仔細想想: 若是,A,B想買的不只僅P一個商品,而是P,Q兩個商品呢?咱們該怎麼鎖?一次性將P,Q都鎖住?顯然不是 很現實,而且實際上提交訂單的時候都是從購物車下單,一個購物車裏包括多個商品去下單也是很常見的, 而且還有以下邏輯你們仔細思考: 用戶A下單->用戶A讀到庫存爲100 用戶A預扣庫存,判斷剩餘庫存是否>=0,100-1=99>0,則A預扣 用戶A的下單流程.... 此時A的事務沒有提交,B下單了: 用戶B下單->用戶B讀到庫存爲100(這裏讀到100,是由於A還有其餘邏輯在執行,還未提交事務,B讀未提交了!) 用戶B預扣庫存,判斷剩餘庫存是否>=0,100-1=99>0,則B預扣 用戶B的下單流程.... 最後不論A/B誰先提交事務,後面提交事務的就不會扣減庫存成功。由於版本號不一致(就算沒有樂觀鎖, 修改的結果也會錯,並且錯的更離譜)。最終的結局就是庫存是99 3.解決方案 目前控制庫存的方案有不少種,我這邊介紹經過redis預減庫存,經過mq發送消息同步的扣減數據庫庫存的 方案。
方案一數據庫
@Transactional public void createOrder(...){ //1.校驗,扣減庫存 check(Item item); //2.建立訂單 //3.建立支付單 } @RedisLock("pId") public void check(Item item){ }
解決方案僞代碼緩存
//固然,咱們管理平臺新建商品時須要初始化到redis庫存裏, //這裏暫時就不介紹了 //下單部分 @Transactional public void createOrder(...){ //1.校驗,扣減庫存 check(Item item); //2.建立訂單 //3.建立支付單 //4.redis扣減庫存 } //支付回調部分 @Transactional public void wxCall(...){ //1.校驗訂單狀態 //2.修改訂單,支付單狀態 //3.mq發送全局順序消息 扣減庫存 } //取消支付部分 @Transactional public void cancelOrder(...){ //1.校驗訂單狀態 //2.修改訂單,支付單狀態 //3.redis回退庫存 } //退貨/退款部分 @Transactional public void returnOrder(...){ //1.校驗訂單狀態 //2.修改訂單,支付單狀態 //3.redis回退庫存 //4.mq發送全局順序消息 }
代碼部分
實現思路服務器
初始化庫存回調函數(IStockCallback )併發
/** * 獲取庫存回調 * create by liuliang * on 2019-11-13 10:45 */ public interface IStockCallback { /** * 獲取庫存 * @return */ String getStock(); }
/** * * Redis分佈式鎖 * 使用 SET resource-name anystring NX EX max-lock-time 實現 * <p> * 該方案在 Redis 官方 SET 命令頁有詳細介紹。 * http://doc.redisfans.com/string/set.html * <p> * 在介紹該分佈式鎖設計以前,咱們先來看一下在從 Redis 2.6.12 開始 SET 提供的新特性, * 命令 SET key value [EX seconds] [PX milliseconds] [NX|XX],其中: * <p> * EX seconds — 以秒爲單位設置 key 的過時時間; * PX milliseconds — 以毫秒爲單位設置 key 的過時時間; * NX — 將key 的值設爲value ,當且僅當key 不存在,等效於 SETNX。 * XX — 將key 的值設爲value ,當且僅當key 存在,等效於 SETEX。 * <p> * 命令 SET resource-name anystring NX EX max-lock-time 是一種在 Redis 中實現鎖的簡單方法。 * <p> * 客戶端執行以上的命令: * <p> * 若是服務器返回 OK ,那麼這個客戶端得到鎖。 * 若是服務器返回 NIL ,那麼客戶端獲取鎖失敗,能夠在稍後再重試。 * * * create by liuliang * on 2019-11-13 10:49 */ public class RedisStockLock { private static Logger logger = LoggerFactory.getLogger(RedisStockLock.class); private RedisTemplate<String, Object> redisTemplate; /** * 將key 的值設爲value ,當且僅當key 不存在,等效於 SETNX。 */ public static final String NX = "NX"; /** * seconds — 以秒爲單位設置 key 的過時時間,等效於EXPIRE key seconds */ public static final String EX = "EX"; /** * 調用set後的返回值 */ public static final String OK = "OK"; /** * 默認請求鎖的超時時間(ms 毫秒) */ private static final long TIME_OUT = 100; /** * 默認鎖的有效時間(s) */ public static final int EXPIRE = 60; /** * 解鎖的lua腳本 */ public static final String UNLOCK_LUA; static { StringBuilder sb = new StringBuilder(); sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] "); sb.append("then "); sb.append(" return redis.call(\"del\",KEYS[1]) "); sb.append("else "); sb.append(" return 0 "); sb.append("end "); UNLOCK_LUA = sb.toString(); } /** * 鎖標誌對應的key */ private String lockKey; /** * 記錄到日誌的鎖標誌對應的key */ private String lockKeyLog = ""; /** * 鎖對應的值 */ private String lockValue; /** * 鎖的有效時間(s) */ private int expireTime = EXPIRE; /** * 請求鎖的超時時間(ms) */ private long timeOut = TIME_OUT; /** * 鎖標記 */ private volatile boolean locked = false; final Random random = new Random(); /** * 使用默認的鎖過時時間和請求鎖的超時時間 * * @param redisTemplate * @param lockKey 鎖的key(Redis的Key) */ public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey) { this.redisTemplate = redisTemplate; this.lockKey = lockKey + "_lock"; } /** * 使用默認的請求鎖的超時時間,指定鎖的過時時間 * * @param redisTemplate * @param lockKey 鎖的key(Redis的Key) * @param expireTime 鎖的過時時間(單位:秒) */ public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime) { this(redisTemplate, lockKey); this.expireTime = expireTime; } /** * 使用默認的鎖的過時時間,指定請求鎖的超時時間 * * @param redisTemplate * @param lockKey 鎖的key(Redis的Key) * @param timeOut 請求鎖的超時時間(單位:毫秒) */ public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, long timeOut) { this(redisTemplate, lockKey); this.timeOut = timeOut; } /** * 鎖的過時時間和請求鎖的超時時間都是用指定的值 * * @param redisTemplate * @param lockKey 鎖的key(Redis的Key) * @param expireTime 鎖的過時時間(單位:秒) * @param timeOut 請求鎖的超時時間(單位:毫秒) */ public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime, long timeOut) { this(redisTemplate, lockKey, expireTime); this.timeOut = timeOut; } /** * 嘗試獲取鎖 超時返回 * * @return */ public boolean tryLock() { // 生成隨機key lockValue = UUID.randomUUID().toString(); // 請求鎖超時時間,納秒 long timeout = timeOut * 1000000; // 系統當前時間,納秒 long nowTime = System.nanoTime(); while ((System.nanoTime() - nowTime) < timeout) { if (OK.equalsIgnoreCase(this.set(lockKey, lockValue, expireTime))) { locked = true; // 上鎖成功結束請求 return locked; } // 每次請求等待一段時間 seleep(10, 50000); } return locked; } /** * 嘗試獲取鎖 當即返回 * * @return 是否成功得到鎖 */ public boolean lock() { lockValue = UUID.randomUUID().toString(); //不存在則添加 且設置過時時間(單位ms) String result = set(lockKey, lockValue, expireTime); locked = OK.equalsIgnoreCase(result); return locked; } /** * 以阻塞方式的獲取鎖 * * @return 是否成功得到鎖 */ public boolean lockBlock() { lockValue = UUID.randomUUID().toString(); while (true) { //不存在則添加 且設置過時時間(單位ms) String result = set(lockKey, lockValue, expireTime); if (OK.equalsIgnoreCase(result)) { locked = true; return locked; } // 每次請求等待一段時間 seleep(10, 50000); } } /** * 解鎖 * <p> * 能夠經過如下修改,讓這個鎖實現更健壯: * <p> * 不使用固定的字符串做爲鍵的值,而是設置一個不可猜想(non-guessable)的長隨機字符串,做爲口令串(token)。 * 不使用 DEL 命令來釋放鎖,而是發送一個 Lua 腳本,這個腳本只在客戶端傳入的值和鍵的口令串相匹配時,纔對鍵進行刪除。 * 這兩個改動能夠防止持有過時鎖的客戶端誤刪現有鎖的狀況出現。 */ public Boolean unlock() { // 只有加鎖成功而且鎖還有效纔去釋放鎖 // 只有加鎖成功而且鎖還有效纔去釋放鎖 if (locked) { return (Boolean) redisTemplate.execute(new RedisCallback<Boolean>() { @Override public Boolean doInRedis(RedisConnection connection) throws DataAccessException { Object nativeConnection = connection.getNativeConnection(); Long result = 0L; List<String> keys = new ArrayList<>(); keys.add(lockKey); List<String> values = new ArrayList<>(); values.add(lockValue); // 集羣模式 if (nativeConnection instanceof JedisCluster) { result = (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, values); } // 單機模式 if (nativeConnection instanceof Jedis) { result = (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, values); } if (result == 0 && !StringUtils.isEmpty(lockKeyLog)) { logger.info("Redis分佈式鎖,解鎖{}失敗!解鎖時間:{}", lockKeyLog, System.currentTimeMillis()); } locked = result == 0; return result == 1; } }); } return true; } /** * 獲取鎖狀態 * @Title: isLock * @Description: TODO * @return * @author yuhao.wang */ public boolean isLock() { return locked; } /** * 重寫redisTemplate的set方法 * <p> * 命令 SET resource-name anystring NX EX max-lock-time 是一種在 Redis 中實現鎖的簡單方法。 * <p> * 客戶端執行以上的命令: * <p> * 若是服務器返回 OK ,那麼這個客戶端得到鎖。 * 若是服務器返回 NIL ,那麼客戶端獲取鎖失敗,能夠在稍後再重試。 * * @param key 鎖的Key * @param value 鎖裏面的值 * @param seconds 過去時間(秒) * @return */ private String set(final String key, final String value, final long seconds) { Assert.isTrue(!StringUtils.isEmpty(key), "key不能爲空"); return (String) redisTemplate.execute(new RedisCallback<String>() { @Override public String doInRedis(RedisConnection connection) throws DataAccessException { Object nativeConnection = connection.getNativeConnection(); String result = null; if (nativeConnection instanceof JedisCommands) { result = ((JedisCommands) nativeConnection).set(key, value, NX, EX, seconds); } if (!StringUtils.isEmpty(lockKeyLog) && !StringUtils.isEmpty(result)) { logger.info("獲取鎖{}的時間:{}", lockKeyLog, System.currentTimeMillis()); } return result; } }); } /** * @param millis 毫秒 * @param nanos 納秒 * @Title: seleep * @Description: 線程等待時間 * @author yuhao.wang */ private void seleep(long millis, int nanos) { try { Thread.sleep(millis, random.nextInt(nanos)); } catch (InterruptedException e) { logger.info("獲取分佈式鎖休眠被中斷:", e); } } public String getLockKeyLog() { return lockKeyLog; } public void setLockKeyLog(String lockKeyLog) { this.lockKeyLog = lockKeyLog; } public int getExpireTime() { return expireTime; } public void setExpireTime(int expireTime) { this.expireTime = expireTime; } public long getTimeOut() { return timeOut; } public void setTimeOut(long timeOut) { this.timeOut = timeOut; } }
/** * 扣庫存 * create by liuliang * on 2019-11-13 10:46 */ @Service public class StockComponent { Logger logger = LoggerFactory.getLogger(StockComponent.class); /** * 不限庫存 */ public static final long UNINITIALIZED_STOCK = -3L; /** * Redis 客戶端 */ @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 執行扣庫存的腳本 */ public static final String STOCK_LUA; static { /** * * @desc 扣減庫存Lua腳本 * 庫存(stock)-1:表示不限庫存 * 庫存(stock)0:表示沒有庫存 * 庫存(stock)大於0:表示剩餘庫存 * * @params 庫存key * @return * -3:庫存未初始化 * -2:庫存不足 * -1:不限庫存 * 大於等於0:剩餘庫存(扣減以後剩餘的庫存) * redis緩存的庫存(value)是-1表示不限庫存,直接返回-1 */ StringBuilder sb = new StringBuilder(); sb.append("if (redis.call('exists', KEYS[1]) == 1) then"); sb.append(" local stock = tonumber(redis.call('get', KEYS[1]));"); sb.append(" local num = tonumber(ARGV[1]);"); sb.append(" if (stock == -1) then"); sb.append(" return -1;"); sb.append(" end;"); sb.append(" if (stock >= num) then"); sb.append(" return redis.call('incrby', KEYS[1], 0 - num);"); sb.append(" end;"); sb.append(" return -2;"); sb.append("end;"); sb.append("return -3;"); STOCK_LUA = sb.toString(); } /** * @param key 庫存key * @param expire 庫存有效時間,單位秒 * @param num 扣減數量 * @param stockCallback 初始化庫存回調函數 * @return -2:庫存不足; -1:不限庫存; 大於等於0:扣減庫存以後的剩餘庫存 */ public long stock(String key, long expire, int num, IStockCallback stockCallback) { long stock = stock(key, num); // 初始化庫存 if (stock == UNINITIALIZED_STOCK) { RedisStockLock redisLock = new RedisStockLock(redisTemplate, key); try { // 獲取鎖 if (redisLock.tryLock()) { // 雙重驗證,避免併發時重複回源到數據庫 stock = stock(key, num); if (stock == UNINITIALIZED_STOCK) { // 獲取初始化庫存 final String initStock = stockCallback.getStock(); // 將庫存設置到redis redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS); // 調一次扣庫存的操做 stock = stock(key, num); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { redisLock.unlock(); } } return stock; } /** * 加庫存(還原庫存) * * @param key 庫存key * @param num 庫存數量 * @return */ public long addStock(String key, int num) { return addStock(key, null, num); } /** * 加庫存 * * @param key 庫存key * @param expire 過時時間(秒) * @param num 庫存數量 * @return */ public long addStock(String key, Long expire, int num) { boolean hasKey = redisTemplate.hasKey(key); // 判斷key是否存在,存在就直接更新 if (hasKey) { return redisTemplate.opsForValue().increment(key, num); } Assert.notNull(expire,"初始化庫存失敗,庫存過時時間不能爲null"); RedisStockLock redisLock = new RedisStockLock(redisTemplate, key); try { if (redisLock.tryLock()) { // 獲取到鎖後再次判斷一下是否有key hasKey = redisTemplate.hasKey(key); if (!hasKey) { // 初始化庫存 redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { redisLock.unlock(); } return num; } /** * 獲取庫存 * * @param key 庫存key * @return -1:不限庫存; 大於等於0:剩餘庫存 */ public int getStock(String key) { Integer stock = (Integer) redisTemplate.opsForValue().get(key); return stock == null ? -1 : stock; } /** * 扣庫存 * * @param key 庫存key * @param num 扣減庫存數量 * @return 扣減以後剩餘的庫存【-3:庫存未初始化; -2:庫存不足; -1:不限庫存; 大於等於0:扣減庫存以後的剩餘庫存】 */ private Long stock(String key, int num) { // 腳本里的KEYS參數 List<String> keys = new ArrayList<>(); keys.add(key); // 腳本里的ARGV參數 List<String> args = new ArrayList<>(); args.add(Integer.toString(num)); long result = redisTemplate.execute(new RedisCallback<Long>() { @Override public Long doInRedis(RedisConnection connection) throws DataAccessException { Object nativeConnection = connection.getNativeConnection(); // 集羣模式和單機模式雖然執行腳本的方法同樣,可是沒有共同的接口,因此只能分開執行 // 集羣模式 if (nativeConnection instanceof JedisCluster) { return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args); } // 單機模式 else if (nativeConnection instanceof Jedis) { return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args); } return UNINITIALIZED_STOCK; } }); return result; } }
/** * 庫存操做對外接口 * * create by liuliang * on 2019-11-13 11:00 */ @Slf4j @Service public class StockService { @Autowired private StockComponent stockComponent; @Autowired private ProductSkuMapper skuMapper; @Autowired private ProductMapper productMapper; @Autowired private RocketMQConfig rocketMQConfig; @Autowired private PresentRocketProducer presentRocketProducer; private static final String REDIS_STOCK_KEY="redis_key:stock:"; /** * 扣減庫存 * @param skuId * @param num * @return */ public Boolean stock(String skuId,Integer num) { // 庫存ID String redisKey = REDIS_STOCK_KEY + skuId; long stock = stockComponent.stock(redisKey, 60 * 60, num, () -> initStock(skuId)); if(stock < 0){//異常,庫存不足 log.info("庫存不足........"); ProductSku productSku = skuMapper.selectById(skuId); throw new MallException(MsRespCode.STOCK_NUMBER_ERROR,new Object[]{productMapper.selectById(productSku.getProductId()).getTitle()}); } return stock >= 0 ; } /** * 添加redis - sku庫存數量 * @param skuId * @param num * @return */ public Long addStock(String skuId ,Integer num) { // 庫存ID String redisKey = REDIS_STOCK_KEY + skuId; long l = stockComponent.addStock(redisKey, num); return l; } /** * 獲取初始的庫存 * * @return */ private String initStock(String skuId) { //初始化庫存 ProductSku productSku = skuMapper.selectById(skuId); return productSku.getStockNumber()+""; } /** * 獲取sku庫存 * @param skuId * @return */ public Integer getStock(String skuId) { // 庫存ID String redisKey = REDIS_STOCK_KEY + skuId; return stockComponent.getStock(redisKey); } }
redis序列化app
/** * create by liuliang * on 2019-11-13 11:29 */ @Configuration public class RedisConfig { /** * 重寫Redis序列化方式,使用Json方式: * 當咱們的數據存儲到Redis的時候,咱們的鍵(key)和值(value)都是經過Spring提供的Serializer序列化到數據庫的。RedisTemplate默認使用的是JdkSerializationRedisSerializer,StringRedisTemplate默認使用的是StringRedisSerializer。 * Spring Data JPA爲咱們提供了下面的Serializer: * GenericToStringSerializer、Jackson2JsonRedisSerializer、JacksonJsonRedisSerializer、JdkSerializationRedisSerializer、OxmSerializer、StringRedisSerializer。 * 在此咱們將本身配置RedisTemplate並定義Serializer。 * * @param redisConnectionFactory * @return */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); // 設置值(value)的序列化採用Jackson2JsonRedisSerializer。 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); // 設置鍵(key)的序列化採用StringRedisSerializer。 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } }