RabbitMQ實戰二(消峯限流補充)

Hello,我是一名在互聯網撿破爛的程序員,最近破爛還挺好撿的,天天都是東逛逛西逛逛,收了不少的破爛呢。。。java

收廢鐵了,十塊一斤。快拿來賣哦,什麼爛電冰箱,爛電視機,無論什麼破爛我都要。。。程序員

天天騎着個人爛三輪車,天天都是活的苟且偷生的,我好可憐。。。web

嗚嗚嗚嗚redis

無論有錢木錢,都進來看一看瞧一瞧哦。。spring

好了~~~數據庫

今天咱們來接着講,若是你是直接來閱讀的這一期的話,那你要去看上一期的內容哦,這樣咱們才能夠銜接起來的。apache

限流操做

咱們就直接從Controller層開始講解了哈json

1. 修改咱們的OrderController
OrderController
package com.example.rabbitmq.controller;
​
import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestOrder;
import com.example.rabbitmq.service.OrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
​
​
/**
 * 訂單Controller
 */
@RestController
@RequestMapping("order")
public class OrderController {
​
​
 @Autowired(required = false)
 private OrderService orderService;
​
 private static Integer count = 0;
​
 private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
​
 /**
 * 使用RabbitMQ限流建立訂單
 * @return
 */
 @PostMapping("create/{goodsId}")
 public ApiResponse create(@PathVariable("goodsId") Long goodsId){
​
 ApiResponse apiResponse = this.orderService.create(goodsId);
​
 LOGGER.info("流量請求:" + count++);
​
 return apiResponse;
 }
​
 /**
 * 無RabbitMQ建立訂單
 * @return
 */
 @PostMapping("/save/{goodsId}")
 public ApiResponse save(@PathVariable("goodsId") Long goodsId){
​
 ApiResponse apiResponse = this.orderService.save(goodsId);
​
 LOGGER.info("流量請求:" + count++);
​
 return apiResponse;
 }
}

其中咱們只添加了一個使用限流操做的接口,和普通的接口同樣同樣的windows

2. 修改咱們的訂單Service
OrderService
package com.example.rabbitmq.service;
​
import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestOrder;
​
import java.util.List;
​
public interface OrderService {
​
 /**
 * 使用RabbitMQ限流建立訂單
 * @return
 */
 ApiResponse create(Long goodsId);
​
 /**
 * 無RabbitMQ消峯限流
 * @return
 */
 ApiResponse save(Long goodsId);
​
 /**
 * 建立訂單
 * @param testOrder
 */
 void createOrder(TestOrder testOrder);
}
​

其中的create方法主要是將咱們的請求所有接到消息隊列中api

真正建立訂單的方法是createOrder

3. 修改咱們的訂單ServiceImpl
OrderServiceImpl
package com.example.rabbitmq.service.impl;
​
import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestGoods;
import com.example.rabbitmq.entity.TestOrder;
import com.example.rabbitmq.mapper.GoodsMapper;
import com.example.rabbitmq.mapper.OrderMapper;
import com.example.rabbitmq.service.OrderService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
​
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.UUID;
​
/**
 * OrderService
 */
@Service
public class OrderServiceImpl implements OrderService {
​
 @Autowired(required = false)
 private AmqpTemplate amqpTemplate;
​
 @Autowired(required = false)
 private OrderMapper orderMapper;
​
 @Autowired(required = false)
 private GoodsMapper goodsMapper;
​
 /**
 * 使用RabbitMQ限流建立訂單
 *
 * @return
 */
 @Override
 public ApiResponse create(Long goodsId) {
​
 try {
​
 // 判斷參數
 if (goodsId == null){
​
 return new ApiResponse().code(444).msg("參數錯誤");
 }
​
 // 發送消息
 this.amqpTemplate.convertAndSend("order.create",goodsId);
​
 return new ApiResponse().code(200).msg("下單中,請稍後");
​
 }catch (Exception e){
​
 return new ApiResponse().code(500).msg("服務器錯誤");
 }
​
 }
​
​
 /**
 * 使用RabbitMQ限流建立訂單
 *
 * @param testOrder
 * @return
 */
 @Override
 public void createOrder(TestOrder testOrder) {
​
 this.orderMapper.create(testOrder);
 }
​
 /**
 * 無RabbitMQ消峯限流
 *
 * @return
 */
 @Override
 public ApiResponse save(Long goodsId) {
​
 if (goodsId == null){
​
 return new ApiResponse().code(400).msg("參數錯誤");
 }
​
 // 根據商品Id查詢商品
 TestGoods testGoods = this.goodsMapper.selectStockById(goodsId);
​
 // 商品不存在或者商品庫存爲0
 if (testGoods == null || testGoods.getGoodsStock() <= 0){
​
 return new ApiResponse().code(400).msg("商品不存在或者庫存爲0");
 }
​
 // 直接添加
 // 建立訂單
 TestOrder testOrder = new TestOrder();
​
 // 設置參數
 testOrder.setOrderUserEmail("111@qq.com");
​
 testOrder.setOrderUserName("FC");
​
 testOrder.setOrderDate(new Date());
​
 this.orderMapper.save(testOrder);
​
 // 更新庫存
 this.goodsMapper.updateGoodsStock(goodsId);
​
 return new ApiResponse().code(200).msg("訂單建立成功");
 }
}

在這裏咱們能夠清楚的看到了,咱們是怎麼將請求所有接入到咱們的消息隊列中的

咱們這樣作的思想就是,咱們須要有一箇中間商來幫助咱們接收消息,那麼這個中間商要比咱們的持久層要厲害些,能夠接收不少的請求,咱們再慢慢的消費這些消息

4. 消費者
OrderListener
package com.example.rabbitmq.listener;
​
import com.example.rabbitmq.entity.TestGoods;
import com.example.rabbitmq.entity.TestOrder;
import com.example.rabbitmq.service.GoodsService;
import com.example.rabbitmq.service.OrderService;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.util.Date;
import java.util.List;
​
/**
 * 訂單請求消息生產者
 */
@Component
public class OrderListener {
​
 private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);
​
 @Autowired(required = false)
 private OrderService orderService;
​
 @Autowired
 private AmqpTemplate amqpTemplate;
​
 @Autowired(required = false)
 private GoodsService goodsService;
​
 /**
 * 建立訂單消息監聽
 * @param goodsId
 */
 @RabbitListener(bindings = @QueueBinding(
 value = @Queue(value = "ORDER.QUEUE", durable = "true"),
​
 arguments = {@Argument(name = "x-max-length", value = "10"),
 @Argument(name = "dead-letter-exchange",value = "reject-publish")
 },
​
 exchange = @Exchange(value = "ORDER.EXCHANGE", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),
​
 key = {"order.create"}
 ))
 public void create(Long goodsId, Channel channel, Message message) throws IOException {
​
 try {
​
 if (goodsId == null) {
​
 return;
 }
​
 // 先根據商品Id查詢商品庫存
 TestGoods goods = this.goodsService.selectGoodsById(goodsId);
​
 if (goods == null || goods.getGoodsStock() <= 0){
​
 return;
​
 }
​
 // 建立訂單
 TestOrder testOrder = new TestOrder();
​
 // 設置參數
 testOrder.setOrderUserName("FC");
​
 testOrder.setOrderUserEmail("111@qq.com");
​
 testOrder.setOrderDate(new Date());
​
 // 執行添加
 this.orderService.createOrder(testOrder);
​
 // 更新庫存
 this.goodsService.updateGoodsStock(goodsId);
​
 LOGGER.info("消費成功");
​
 } catch (Exception e) {
​
 LOGGER.error("消費失敗");
​
 }finally {
​
 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 }
 }
}

這個就不用我多說了吧。這裏面的細枝末節,須要自行查閱。

也能夠去參考 RabbitMQ實踐應用一

這樣就能夠簡單的實現咱們的消峯限流啦。。。。

咱們開始咱們的測試吧

咱們首先仍是用postman來測試咱們的接口是否可用

咱們能夠在控制檯看見

這樣咱們就簡單的消費成功了,庫存和訂單也都更新和添加成功啦

固然這樣咱們也是不可能的,那咱們仍是要去用壓力測試來試試能不能頂住呢

仍是用咱們的jmeter來作壓力測試,只須要修改接口就行

這樣看咱們的就是有條理的執行啦

可是這樣咱們的請求就沒有所有打在數據庫上,這樣咱們就能夠實現限流啦。。。

咱們再來看咱們的數據庫呢?有木有像沒有限流的出現超賣的狀況呀

喲喲喲,盡然沒有出現超賣的狀況,那這就算是實現了限流操做

咱們再來看訂單是否是100個呢

咱們是從115開始的,看看是否是214結束呢

哇哦,果真是預想的同樣。。。。。

咱們再看時間,是在同一時間下的訂單。。。

那就證實了咱們的限流操做

OK,這裏咱們的消峯限流就所有完成了,可能不是那麼完善,也有不少的漏洞。

就算庫存爲0,流量仍是會分批打到咱們的數據上面,有木有辦法,直接捨棄這些流量呢?

這就須要本身思考了,要化爲本身的東西纔算是真正理解。。。。

總結

從某種意義上說,消費者的限流策略有助於那麼處理消息效率高的消費者多消費一些消息,效率低一些的消費者少推送一些消息,從而能夠達到能者多勞的目的,儘量發揮消費者處理消息的能力。在項目中,爲了緩解生產者和消費者兩邊效率不平衡的影響,一般會對消費者進行限流處理,保證消費者端正常消費消息,儘量避免服務器崩潰以及宕機現象。

加入Redis緩存實現限流操做

1、爲何要加入Redis緩存

咱們在前面實現了簡單的限流操做,對用戶下訂單有了很好的維護及在併發狀況下能夠撐住。

可是,咱們在前面留了一個問題,當咱們庫存爲0時,可是咱們的流量還存在不少,雖然是分批打入咱們的數據庫,這樣對咱們來講是很不友好的。數據庫中根本沒有庫存了,就不該該將剩餘的流量打入到咱們的數據層。

那麼我咱們應該怎麼解決呢?

這裏咱們就加入了 Redis緩存技術,相信你在使用這個技術以前就很瞭解這個Redis了吧。。。。

2、流程圖

流程圖不標準,根據本身的理解畫的。。。

將就一下吧。。。。

3、代碼實現

咱們只須要修改部分的代碼

3.1 引入新的依賴

相信你猜就知道了吧

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
3.2 修改咱們的application.yml
#redis配置
 redis:
 host: 192.168.2.4 #主機地址
 database: 4

使用前須要安裝Redis,咱們能夠在Linux上安裝也能夠在windows上安裝

推薦在Linux上安裝,咱們的大部分都是部署在Linux上的

Redis的使用操做

3.3 修改咱們的OrderListener
package com.example.rabbitmq.listener;
​
import com.example.rabbitmq.entity.TestGoods;
import com.example.rabbitmq.entity.TestOrder;
​
import com.example.rabbitmq.service.GoodsService;
import com.example.rabbitmq.service.OrderService;
import com.example.rabbitmq.utils.JsonUtils;
import com.rabbitmq.client.Channel;
​
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.locks.ReentrantLock;
​
​
/**
 * 訂單請求消息生產者
 */
@Component
public class OrderListener {
​
 private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);
​
 @Autowired(required = false)
 private OrderService orderService;
​
 @Autowired(required = false)
 private GoodsService goodsService;
​
 private static String GOODS_NAME = "goods:id:";
​
 @Autowired(required = false)
 private StringRedisTemplate stringRedisTemplate;
​
 private static ReentrantLock lock = new ReentrantLock();
​
 /**
 * 建立訂單消息監聽
 *
 * @param goodsId
 */
 @RabbitListener(bindings = @QueueBinding(
 value = @Queue(value = "ORDER.QUEUE", durable = "true"),
​
 arguments = {@Argument(name = "x-max-length", value = "10"),
 @Argument(name = "dead-letter-exchange", value = "reject-publish")
 },
​
 exchange = @Exchange(value = "ORDER.EXCHANGE", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),
​
 key = {"order.create"}
 ))
 public void create(Long goodsId, Channel channel, Message message) throws IOException {
​
 try {
​
 if (goodsId == null) {
​
 return;
 }
​
 String json = this.getRedisData(goodsId);
​
 // Redis緩存中沒有命中
 if (StringUtils.isBlank(json)){
​
 /**
 * 上鎖
 */
 if (lock.tryLock()){
​
 // 從數據庫庫中拿到數據
 TestGoods testGoods = this.getDBData(goodsId);
​
 if (testGoods == null || testGoods.getGoodsStock() <= 0){
​
 return;
 }
​
 // 建立訂單
 TestOrder testOrder = new TestOrder();
​
 // 設置參數
 testOrder.setOrderUserName("FC");
​
 testOrder.setOrderUserEmail("111@qq.com");
​
 testOrder.setOrderDate(new Date());
​
 // 執行添加
 this.orderService.createOrder(testOrder);
​
 // 更新庫存
 testGoods.setGoodsStock(testGoods.getGoodsStock() - 1);
​
 json = JsonUtils.serialize(testGoods);
​
 this.setRedisDate(goodsId, json);
​
 // 釋放鎖
 lock.unlock();
 }
 } else {
​
 Thread.sleep(100L);
​
 json = this.getRedisData(goodsId);
​
 // 將Json轉化爲對象
 TestGoods testGoods = JsonUtils.parse(json, TestGoods.class);
​
 if (testGoods == null || testGoods.getGoodsStock() <= 0){
​
 return;
 }
​
 // 建立訂單
 TestOrder testOrder = new TestOrder();
​
 // 設置參數
 testOrder.setOrderUserName("FC");
​
 testOrder.setOrderUserEmail("111@qq.com");
​
 testOrder.setOrderDate(new Date());
​
 // 執行添加
 this.orderService.createOrder(testOrder);
​
 // 更新庫存
 testGoods.setGoodsStock(testGoods.getGoodsStock() - 1);
​
 json = JsonUtils.serialize(testGoods);
​
 this.setRedisDate(goodsId, json);
 }
​
 } catch (Exception e) {
​
 LOGGER.error("消費失敗");
​
 } finally {
​
 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 }
 }
​
 /**
 * 從Redis中獲取數據
 *
 * @param goodsId
 */
 private String getRedisData(Long goodsId) {
​
 // 從緩存中獲取數據
 String json = this.stringRedisTemplate.opsForValue().get(GOODS_NAME + goodsId);
​
 return json;
 }
​
 /**
 * 設置Redis緩存數據
 * @param goodsId
 * @param json
 */
 private void setRedisDate(Long goodsId, String json){
​
 this.stringRedisTemplate.opsForValue().set(GOODS_NAME + goodsId, json);
 }
​
 /**
 * 從數據庫中取出數據
 * @param goodsId
 * @return
 */
 private TestGoods getDBData(Long goodsId){
​
 TestGoods testGoods = this.goodsService.selectGoodsById(goodsId);
​
 return testGoods;
 }
}

主要代碼解釋

其中寫的三個方法就不用說了吧

getRedisData:從Redis中獲取數據

setRedisData:更新Redis中的數據

getDBData:從數據庫中獲取數據

  • 首先請求過來會帶着一個Id,先判斷這個Id是否爲空
if (goodsId == null) {
 return;
 }

若爲空,消息直接丟棄

  • 先從 Redis中獲取數據
String json = this.getRedisData(goodsId);
  • 判斷獲取的數據是否爲空,也就是緩存中是否有數據,若沒有數據,則上鎖,從數據庫中取出數據
/**
 * 上鎖
 */
  if (lock.tryLock()){
 // 從數據庫庫中拿到數據
 TestGoods testGoods = this.getDBData(goodsId);
  • 從數據庫中拿到數據後,將其放入緩存中,並釋放鎖
json = JsonUtils.serialize(testGoods);

this.setRedisDate(goodsId, json);

// 釋放鎖
lock.unlock();
  • 若從緩存中命中數據,那麼讓其休眠
Thread.sleep(100L);

json = this.getRedisData(goodsId);

OK,就這樣簡單的修改好咱們的代碼了,是否是很簡單呢。

4、截圖驗證

首先來看看咱們的數據庫中商品的庫存信息

再來看看咱們的訂單表

接下來,就直接上咱們的壓力測試,此次要使用6000個請求啦,看你還抵不抵得住

首先看咱們的控制檯

咱們的消息隊列直接將所有的瞬時流量收入囊中

而後咱們在分批處理咱們的流量,也就是下訂單啦

OK,咱們來看看咱們的Redis緩存是否正確呢?預判庫存爲0

哇哦,那還能夠,基本實現了

下面纔是咱們的重頭戲

看看咱們的數據庫是否有錯呢?

先來看庫存,預判爲100

爲何呢?咱們在下單中,基本上就沒有和數據庫打交道,把它給撇開了

咱們直接是在緩存中進行的,根本沒有更新數據庫的商品庫存,那確定是100啦

哇哦,果然是誒。。。。

那再來看咱們訂單表呢,你有木有超出100條呢

一張圖截不完,咱們是從715開始的,看看是不是814結束呢

哇哦,也是誒,那這就對了嘛。。。

咱們仔細一看,這一次咱們下單的時間就有差異了,沒有同一時間將下單。。。

5、總結

咱們使用Redis緩存,大大減輕了咱們數據庫的壓力,查詢商品只須要訪問一次數據庫,查詢的數據放入緩存。

訂單的下單時間也有所優化。

大家有木有什麼優化的呢?能夠思考哦,其實還有不少方案

相關文章
相關標籤/搜索