小明在一家在線購物商城工做,最近來了一個新需求,須要他負責開發一個商品秒殺模塊,並且需求很緊急,老闆要求必須儘快上線。git
小明一開始是這麼作的,直接用數據庫鎖進行控制,獲取秒殺商品數量並加鎖,若是數量大於零則成功,不然秒殺失敗。redis
@Override @Transactional public Result startSeckilDBPCC_ONE(long seckillId, long userId) { //獲取秒殺商品數量並加鎖 String nativeSql = "SELECT number FROM seckill WHERE seckill_id=? FOR UPDATE"; Object object = dynamicQuery.nativeQueryObject(nativeSql, new Object[]{seckillId}); Long number = ((Number) object).longValue(); if(number>0){ nativeSql = "UPDATE seckill SET number=number-1 WHERE seckill_id=?"; dynamicQuery.nativeExecuteUpdate(nativeSql, new Object[]{seckillId}); SuccessKilled killed = new SuccessKilled(); killed.setSeckillId(seckillId); killed.setUserId(userId); killed.setState((short)0); killed.setCreateTime(new Timestamp(new Date().getTime())); dynamicQuery.save(killed); return Result.ok(SeckillStatEnum.SUCCESS); }else{ return Result.error(SeckillStatEnum.END); } }
寫了併發線程,跑了一下,沒問題,搞定!可是,小明轉頭一想,老闆曾經說過,此次活動宣傳力度很大,有可能會有不少用戶參與活動。剛好項目中使用了 Redis
做爲緩存,何不借用一下 Redis
的發佈訂閱功能,實現秒殺隊列,從而減輕後端數據庫的訪問壓力,提高服務性能!這但是個升職加薪,當上總經理,出任CTO,迎娶白富美的好機會。說幹就幹,複製、黏貼一把擼,很快小明就把消息隊列方案搞定了。spring
開發、測試、上線一條龍,活動開始了,秒殺商品是 100 部蘋果手機,活動結束之後,竟然產生了 106 個訂單!老闆很生氣,後果很嚴重,這個鍋必須有人得背,嚇得小明趕忙仔細複查複製粘貼的代碼。數據庫
監聽配置 RedisSubListenerConfig
:後端
@Configuration public class RedisSubListenerConfig { //初始化監聽器 @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic("seckill")); return container; } //利用反射來建立監聽到消息以後的執行方法 @Bean MessageListenerAdapter listenerAdapter(RedisConsumer redisReceiver) { return new MessageListenerAdapter(redisReceiver, "receiveMessage"); } //使用默認的工廠初始化redis操做模板 @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } }
生產者 RedisSender:
緩存
/** * 生產者 * @author 爪哇筆記 By https://blog.52itstyle.vip */ @Service public class RedisSender { @Autowired private StringRedisTemplate stringRedisTemplate; public void sendChannelMess(String channel, String message) { stringRedisTemplate.convertAndSend(channel, message); } }
消費者 RedisConsumer:
多線程
/** * 消費者 * @author 爪哇筆記 By https://blog.52itstyle.vip */ @Service public class RedisConsumer { @Autowired private ISeckillService seckillService; @Autowired private RedisUtil redisUtil; public void receiveMessage(String message) { //收到通道的消息以後執行秒殺操做 String[] array = message.split(";"); if(redisUtil.getValue(array[0])==null){//control層已經判斷了,其實這裏不須要再判斷了 Result result = seckillService.startSeckilDBPCC_TWO(Long.parseLong(array[0]), Long.parseLong(array[1])); if(result.equals(Result.ok(SeckillStatEnum.SUCCESS))){ WebSocketServer.sendInfo(array[0], "秒殺成功");//推送給前臺 }else{ WebSocketServer.sendInfo(array[0], "秒殺失敗");//推送給前臺 redisUtil.cacheValue(array[0], "ok");//秒殺結束 } }else{ WebSocketServer.sendInfo(array[0], "秒殺失敗");//推送給前臺 } } }
數據層代碼:併發
@Override @Transactional public Result startSeckil(long seckillId,long userId) { //因爲使用了隊列,小明這裏沒用數據庫鎖 String nativeSql = "SELECT number FROM seckill WHERE seckill_id=?"; Object object = dynamicQuery.nativeQueryObject(nativeSql, new Object[]{seckillId}); Long number = ((Number) object).longValue(); if(number>0){ //扣庫存 nativeSql = "UPDATE seckill SET number=number-1 WHERE seckill_id=?"; dynamicQuery.nativeExecuteUpdate(nativeSql, new Object[]{seckillId}); //建立訂單 SuccessKilled killed = new SuccessKilled(); killed.setSeckillId(seckillId); killed.setUserId(userId); killed.setState((short)0); Timestamp createTime = new Timestamp(new Date().getTime()); killed.setCreateTime(createTime); dynamicQuery.save(killed); //支付 return Result.ok(SeckillStatEnum.SUCCESS); }else{ return Result.error(SeckillStatEnum.END); } }
小明從新審讀了代碼,一開始小明以爲既然使用了隊列,數據庫層面就不必用數據庫鎖了,而後去掉了 for update
,很顯然問題就出在這裏。致使超賣的因素只有一個,那就是多線程併發搶佔資源,若是業務邏輯沒有作相應的措施,頗有可能致使超賣。分佈式
回到代碼來看,雖然秒殺用戶進入了隊列,可是 RedisConsumer
端有多是多線程處理隊列數據,小明爲了驗證想法,在消費端加入瞭如下代碼來打印線程名稱。ide
Thread th=Thread.currentThread(); System.out.println("Tread name:"+th.getName());
再次運行任務,果不其然,每一個秒殺用戶都開啓了一個線程處理任務:
Tread name:container-1 Tread name:container-2 Tread name:container-3 Tread name:container-4 Tread name:container-5 Tread name:container-6 ......
各位看官到這裏,線索已經很明確了,咱們只須要把消費端改形成單線程處理,問題就迎刃而解了。
使用 Redis
消息隊列,出現超賣問題是由於RedisMessageListenerContainer
的默認使用線程池是SimpleAsyncTaskExecutor
,每次消費都會建立一個線程來處理,這樣就會有大量的新線程被建立。有興趣的小夥伴能夠跟進源碼,瞭解更多詳細內容。
監聽配置 RedisSubListenerConfig
改造爲 :
@Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic("seckill")); /** * 若是不定義線程池,每一次消費都會建立一個線程,若是業務層面不作限制,就會致使秒殺超賣。 * 此處感謝網友 DIscord */ ThreadFactory factory = new ThreadFactoryBuilder() .setNameFormat("redis-listener-pool-%d").build(); Executor executor = new ThreadPoolExecutor( 1, 1, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), factory); container.setTaskExecutor(executor); return container; }
而後測試改造效果:
Tread name:redis-listener-pool-0 Tread name:redis-listener-pool-0 Tread name:redis-listener-pool-0 ......
那麼問題來了,這個鍋到底誰來背,開發、測試仍是產品?這麼好的宣傳機會,直接上頭條"XX 電商系統 bug 超賣,虧損超 10W 仍堅持發貨,稱不能虧了消費者"而後超的錢相關責任人擔一部分, perfect~。本故事純屬虛構,誰也不怪,若有雷同,純屬巧合。