RabbitMQ使用多路由,多隊列來破除流控

流控機制是咱們在使用RabbitMQ最頭疼的問題,一旦併發激增時,消費者消費隊列消息就像滴水同樣慢。服務器

如今咱們下單後,須要給通知中心發送消息,讓通知中心通知服務商收取訂單,並確認提供服務。併發

咱們先給Order接口添加一個發送消息的方法。app

public interface Order {
    public void makeOrder(Order order);
    public OrderSuccessResult getResult(Order order);
    public void postOrder(Order order);
}

實現類實現該方法dom

@Data
@AllArgsConstructor
@NoArgsConstructor
@ServiceOrderVersion(value = 1)
@RequiredArgsConstructor
public class ServiceOrder extends AbstractOrder {
    private Long id;
    @NonNull
    private String code;
    @NonNull
    private Store store;
    @NonNull
    private ProviderService service;
    @NonNull
    private Car car;
    @NonNull
    private Date serviceDate;
    @NonNull
    private String contact;
    @NonNull
    private String contactTel;
    private AppUser user;
    @NonNull
    private String content;
    private int status;
    private Date createDate;


    @Override
    public void makeOrder(Order order) {
        ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean(ServiceOrderDao.class);
        IdService idService = SpringBootUtil.getBean(IdService.class);
        ((ServiceOrder)order).setId(idService.genId());
        ((ServiceOrder)order).setCode(getCodeInfo(idService));
        AppUser loginAppUser = AppUserUtil.getLoginAppUser();
        AppUser user = new AppUser();
        user.setId(loginAppUser.getId());
        user.setUsername(loginAppUser.getUsername());
        ((ServiceOrder)order).setUser(user);
        ((ServiceOrder)order).setStatus(1);
        ((ServiceOrder)order).setCreateDate(new Date());
        serviceOrderDao.save((ServiceOrder) order);
    }

    @Override
    public OrderSuccessResult getResult(Order order) {
        ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean(ServiceOrderSuccessResultFactory.class);
        this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult();
        return this.orderSuccessResult.getResult(order);
    }

    @Override
    public void postOrder(Order order) {
        MessageSender sender = SpringBootUtil.getBean(MessageSender.class);
        CompletableFuture.runAsync(() ->
                sender.send(OwnerCarCenterMq.MQ_EXCHANGE_ORDER,
                        OwnerCarCenterMq.ROUTING_KEY_ORDER,
                        order)
        );
    }

    private String getCodeInfo(IdService idService) {
        String flow = String.valueOf(idService.genId());
        flow = flow.substring(14,flow.length());
        String pre = DateUtils.format(new Date(), DateUtils.pattern9);
        return pre + flow;
    }
}

其中咱們定義了這麼一組隊列名,交換機,和路由異步

public interface OwnerCarCenterMq {
    /**
     * 隊列名
     */
    String ORDER_QUEUE = "order";
    /**
     * 服務系統exchange名
     */
    String MQ_EXCHANGE_ORDER = "order.topic.exchange";

    /**
     * 服務添加routing key
     */
    String ROUTING_KEY_ORDER = "post.order";
}

爲了不流控,咱們定義了10個隊列,並所有綁定到一個交換機上。ide

@Configuration
public class RabbitmqConfig {

   @Bean
   public List<Queue> orderQueues() {
      List<Queue> queues = new ArrayList<>();
      for (int i = 1;i < 11;i++) {
         Queue queue = new Queue(OwnerCarCenterMq.ORDER_QUEUE + "_" + i);
         queues.add(queue);
      }
      return queues;
   }

   @Bean
   public TopicExchange orderExchange() {
      return new TopicExchange(OwnerCarCenterMq.MQ_EXCHANGE_ORDER);
   }


   @Bean
   public List<Binding> bindingOrders() {
      List<Binding> bindings = new ArrayList<>();
      for (int i = 1;i < 11;i++) {
         Binding binding = BindingBuilder.bind(orderQueues().get(i - 1)).to(orderExchange())
               .with(OwnerCarCenterMq.ROUTING_KEY_ORDER + "_" + i);
         bindings.add(binding);
      }
      return bindings;
   }
}

從新封裝消息提供者,每次發送都隨機選取一個路由來進行發送。post

@Slf4j
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String exchange,String routingKey,Object content) {
        log.info("send content=" + content);
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        ThreadLocalRandom random = ThreadLocalRandom.current();
        this.rabbitTemplate.convertAndSend(exchange,routingKey + "_" + random.nextInt(1,11),serialize(content));
    }

    /**
     * 確認後回調:
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.info("send ack fail, cause = " + cause);
        } else {
            log.info("send ack success");
        }
    }

    /**
     * 失敗後return回調:
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
    }

    /**
     * 對消息對象進行二進制序列化
     * @param o
     * @return
     */
    private byte[] serialize(Object o) {
        Kryo kryo = new Kryo();
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Output output = new Output(stream);
        kryo.writeObject(output, o);
        output.close();
        return stream.toByteArray();
    }
}

咱們能夠看到在ServiceOrder裏,咱們是經過異步來進行發送到。測試

Controller以下ui

@Slf4j
@RestController
public class OrderController {
    private ThreadLocal<OrderFactory> orderFactory = new ThreadLocal<>();
    private ThreadLocal<Order> orderService = new ThreadLocal<>();
    @Autowired
    private OrderBean orderBean;

    @Transactional
    @SuppressWarnings("unchecked")
    @PostMapping("/makeeorder")
    public Result<OrderSuccessResult> makeOrder(@RequestBody String orderStr, @RequestParam("type") String type) {
        log.info(orderStr);
        Order order = setOrderFactory(orderStr,type);
        orderService.get().makeOrder(order);
        orderService.get().postOrder(order);
        return Result.success(orderService.get().getResult(order));
    }

    /**
     * 判斷是哪種類型的訂單來獲取哪種類型的具體訂單工廠
     * @param orderStr
     * @return
     */
    private Order setOrderFactory(String orderStr,String type) {
        Class<?> classType = orderBean.getOrderMap().get(type);
        Object order = JSONObject.parseObject(orderStr, classType);
//        if (orderStr.contains("service")) {
//            order = JSON.parseObject(orderStr, ServiceOrder.class);
//        }else if (orderStr.contains("product")) {
//            order = JSON.parseObject(orderStr, ProductOrder.class);
//        }
        Class<?> classFactoryType = orderBean.getOrderFactoryMap().get(type + "Factory");
        this.orderFactory.set((OrderFactory) SpringBootUtil.getBean(classFactoryType));
//        if (order instanceof ServiceOrder) {
//            this.orderFactory.set(SpringBootUtil.getBean(ServiceOrderFactory.class));
//        }else if (order instanceof ProductOrder) {
//            this.orderFactory.set(SpringBootUtil.getBean(ProductOrderFactory.class));
//        }
        orderService.set(orderFactory.get().getOrder());
        return (Order) order;
    }
}

最後是在咱們的通知中心模塊接收消息,同時對這10個隊列實行監控this

@Slf4j
@Component
@RabbitListener(queues = {OwnerCarCenterMq.ORDER_QUEUE + "_" + 1,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 2,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 3,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 4,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 5,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 6,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 7,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 8,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 9,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 10})
public class ServiceOrderConsummer {
    @Getter
    private Queue<ServiceOrder> serviceOrders = new ConcurrentLinkedDeque<>();
    @RabbitHandler
    public void receiceOrder(byte[] data, Channel channel, Message message) throws IOException {
        try {
            //告訴服務器收到這條消息 已經被我消費了 能夠在隊列刪掉;不然消息服務器覺得這條消息沒處理掉 後續還會在發
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            ServiceOrder order = unSerialize(data);
            this.serviceOrders.add(order);
            log.info(String.valueOf(order));
        } catch (IOException e) {
            e.printStackTrace();
            //丟棄這條消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            log.info("receiver fail");
        }
    }

    /**
     * 反序列化
     * @param data
     * @return
     */
    private ServiceOrder unSerialize(byte[] data) {
        Input input = null;
        try {
            Kryo kryo = new Kryo();
            input = new Input(new ByteArrayInputStream(data));
            return kryo.readObject(input,ServiceOrder.class);
        }
        finally {
            input.close();
        }
    }
}

項目啓動後,咱們能夠看到rabbitmq的狀況以下

現咱們來對其進行壓測,啓動Jmeter,咱們使用1000線程來進行壓測測試。各配置以下

保存文件上傳服務器,由於本人是華爲雲的服務器,故在服務器上進行壓測,不進行遠程壓測

在服務器的jmeter的bin目錄下輸入

./jmeter -n -t model/rabbit.jmx -l log.jtl

這裏-n爲不啓動圖形界面,-t使用咱們上傳的配置文件,-l記錄日誌

壓測結果以下

咱們在壓測過程當中來看一下rabbitmq的UI界面

消費基本上是實時的,沒有出現流控積壓現象。

相關文章
相關標籤/搜索