流控機制是咱們在使用RabbitMQ最頭疼的問題,一旦併發激增時,消費者消費隊列消息就像滴水同樣慢。服務器
如今咱們下單後,須要給通知中心發送消息,讓通知中心通知服務商收取訂單,並確認提供服務。併發
咱們先給Order接口添加一個發送消息的方法。app
public interface Order { public void makeOrder(Order order); public OrderSuccessResult getResult(Order order); public void postOrder(Order order); }
實現類實現該方法dom
@Data @AllArgsConstructor @NoArgsConstructor @ServiceOrderVersion(value = 1) @RequiredArgsConstructor public class ServiceOrder extends AbstractOrder { private Long id; @NonNull private String code; @NonNull private Store store; @NonNull private ProviderService service; @NonNull private Car car; @NonNull private Date serviceDate; @NonNull private String contact; @NonNull private String contactTel; private AppUser user; @NonNull private String content; private int status; private Date createDate; @Override public void makeOrder(Order order) { ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean(ServiceOrderDao.class); IdService idService = SpringBootUtil.getBean(IdService.class); ((ServiceOrder)order).setId(idService.genId()); ((ServiceOrder)order).setCode(getCodeInfo(idService)); AppUser loginAppUser = AppUserUtil.getLoginAppUser(); AppUser user = new AppUser(); user.setId(loginAppUser.getId()); user.setUsername(loginAppUser.getUsername()); ((ServiceOrder)order).setUser(user); ((ServiceOrder)order).setStatus(1); ((ServiceOrder)order).setCreateDate(new Date()); serviceOrderDao.save((ServiceOrder) order); } @Override public OrderSuccessResult getResult(Order order) { ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean(ServiceOrderSuccessResultFactory.class); this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult(); return this.orderSuccessResult.getResult(order); } @Override public void postOrder(Order order) { MessageSender sender = SpringBootUtil.getBean(MessageSender.class); CompletableFuture.runAsync(() -> sender.send(OwnerCarCenterMq.MQ_EXCHANGE_ORDER, OwnerCarCenterMq.ROUTING_KEY_ORDER, order) ); } private String getCodeInfo(IdService idService) { String flow = String.valueOf(idService.genId()); flow = flow.substring(14,flow.length()); String pre = DateUtils.format(new Date(), DateUtils.pattern9); return pre + flow; } }
其中咱們定義了這麼一組隊列名,交換機,和路由異步
public interface OwnerCarCenterMq { /** * 隊列名 */ String ORDER_QUEUE = "order"; /** * 服務系統exchange名 */ String MQ_EXCHANGE_ORDER = "order.topic.exchange"; /** * 服務添加routing key */ String ROUTING_KEY_ORDER = "post.order"; }
爲了不流控,咱們定義了10個隊列,並所有綁定到一個交換機上。ide
@Configuration public class RabbitmqConfig { @Bean public List<Queue> orderQueues() { List<Queue> queues = new ArrayList<>(); for (int i = 1;i < 11;i++) { Queue queue = new Queue(OwnerCarCenterMq.ORDER_QUEUE + "_" + i); queues.add(queue); } return queues; } @Bean public TopicExchange orderExchange() { return new TopicExchange(OwnerCarCenterMq.MQ_EXCHANGE_ORDER); } @Bean public List<Binding> bindingOrders() { List<Binding> bindings = new ArrayList<>(); for (int i = 1;i < 11;i++) { Binding binding = BindingBuilder.bind(orderQueues().get(i - 1)).to(orderExchange()) .with(OwnerCarCenterMq.ROUTING_KEY_ORDER + "_" + i); bindings.add(binding); } return bindings; } }
從新封裝消息提供者,每次發送都隨機選取一個路由來進行發送。post
@Slf4j @Component public class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; public void send(String exchange,String routingKey,Object content) { log.info("send content=" + content); this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); ThreadLocalRandom random = ThreadLocalRandom.current(); this.rabbitTemplate.convertAndSend(exchange,routingKey + "_" + random.nextInt(1,11),serialize(content)); } /** * 確認後回調: * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.info("send ack fail, cause = " + cause); } else { log.info("send ack success"); } } /** * 失敗後return回調: * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey); } /** * 對消息對象進行二進制序列化 * @param o * @return */ private byte[] serialize(Object o) { Kryo kryo = new Kryo(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); Output output = new Output(stream); kryo.writeObject(output, o); output.close(); return stream.toByteArray(); } }
咱們能夠看到在ServiceOrder裏,咱們是經過異步來進行發送到。測試
Controller以下ui
@Slf4j @RestController public class OrderController { private ThreadLocal<OrderFactory> orderFactory = new ThreadLocal<>(); private ThreadLocal<Order> orderService = new ThreadLocal<>(); @Autowired private OrderBean orderBean; @Transactional @SuppressWarnings("unchecked") @PostMapping("/makeeorder") public Result<OrderSuccessResult> makeOrder(@RequestBody String orderStr, @RequestParam("type") String type) { log.info(orderStr); Order order = setOrderFactory(orderStr,type); orderService.get().makeOrder(order); orderService.get().postOrder(order); return Result.success(orderService.get().getResult(order)); } /** * 判斷是哪種類型的訂單來獲取哪種類型的具體訂單工廠 * @param orderStr * @return */ private Order setOrderFactory(String orderStr,String type) { Class<?> classType = orderBean.getOrderMap().get(type); Object order = JSONObject.parseObject(orderStr, classType); // if (orderStr.contains("service")) { // order = JSON.parseObject(orderStr, ServiceOrder.class); // }else if (orderStr.contains("product")) { // order = JSON.parseObject(orderStr, ProductOrder.class); // } Class<?> classFactoryType = orderBean.getOrderFactoryMap().get(type + "Factory"); this.orderFactory.set((OrderFactory) SpringBootUtil.getBean(classFactoryType)); // if (order instanceof ServiceOrder) { // this.orderFactory.set(SpringBootUtil.getBean(ServiceOrderFactory.class)); // }else if (order instanceof ProductOrder) { // this.orderFactory.set(SpringBootUtil.getBean(ProductOrderFactory.class)); // } orderService.set(orderFactory.get().getOrder()); return (Order) order; } }
最後是在咱們的通知中心模塊接收消息,同時對這10個隊列實行監控this
@Slf4j @Component @RabbitListener(queues = {OwnerCarCenterMq.ORDER_QUEUE + "_" + 1, OwnerCarCenterMq.ORDER_QUEUE + "_" + 2, OwnerCarCenterMq.ORDER_QUEUE + "_" + 3, OwnerCarCenterMq.ORDER_QUEUE + "_" + 4, OwnerCarCenterMq.ORDER_QUEUE + "_" + 5, OwnerCarCenterMq.ORDER_QUEUE + "_" + 6, OwnerCarCenterMq.ORDER_QUEUE + "_" + 7, OwnerCarCenterMq.ORDER_QUEUE + "_" + 8, OwnerCarCenterMq.ORDER_QUEUE + "_" + 9, OwnerCarCenterMq.ORDER_QUEUE + "_" + 10}) public class ServiceOrderConsummer { @Getter private Queue<ServiceOrder> serviceOrders = new ConcurrentLinkedDeque<>(); @RabbitHandler public void receiceOrder(byte[] data, Channel channel, Message message) throws IOException { try { //告訴服務器收到這條消息 已經被我消費了 能夠在隊列刪掉;不然消息服務器覺得這條消息沒處理掉 後續還會在發 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); ServiceOrder order = unSerialize(data); this.serviceOrders.add(order); log.info(String.valueOf(order)); } catch (IOException e) { e.printStackTrace(); //丟棄這條消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); log.info("receiver fail"); } } /** * 反序列化 * @param data * @return */ private ServiceOrder unSerialize(byte[] data) { Input input = null; try { Kryo kryo = new Kryo(); input = new Input(new ByteArrayInputStream(data)); return kryo.readObject(input,ServiceOrder.class); } finally { input.close(); } } }
項目啓動後,咱們能夠看到rabbitmq的狀況以下
現咱們來對其進行壓測,啓動Jmeter,咱們使用1000線程來進行壓測測試。各配置以下
保存文件上傳服務器,由於本人是華爲雲的服務器,故在服務器上進行壓測,不進行遠程壓測
在服務器的jmeter的bin目錄下輸入
./jmeter -n -t model/rabbit.jmx -l log.jtl
這裏-n爲不啓動圖形界面,-t使用咱們上傳的配置文件,-l記錄日誌
壓測結果以下
咱們在壓測過程當中來看一下rabbitmq的UI界面
消費基本上是實時的,沒有出現流控積壓現象。