java B2B2C 仿淘寶電子商城系統-基於Rabbitmq實現延遲消息

  1. 預備知識

1.1 消息傳遞java

首先咱們知道消費者是從隊列中獲取消息的,那麼消息是如何到達隊列的?git

當咱們發送一條消息時,首先會發給交換器(exchange),交換器根據規則(路由鍵:routing key)將會肯定消息投遞到那個隊列(queue)。算法

須要JAVA Spring Cloud大型企業分佈式微服務雲構建的B2B2C電子商務平臺源碼 一零三八七七四六二六spring

帶着這幾個關鍵字:交換器、路由鍵和隊列。json

1.2 交換器類型bash

如以前所說,交換器根據規則決定消息的路由方向。所以,rabbitmq的消息投遞分類即是從交換器開始的,不一樣的交換器實現不一樣的路由算法便實現了不一樣的消息投遞方式。服務器

direct交換器app

direct -> routingKey -> queue,至關一種點對點的消息投遞,若是路由鍵匹配,就直接投遞到相應的隊列分佈式

fanout交換器ide

fanout交換器至關於實現了一(交換器)對多(隊列)的廣播投遞方式

topic交換器

提供一種模式匹配的投遞方式,咱們能夠根據主題來決定消息投遞到哪一個隊列。

1.3 消息延遲

本文想要實現一個可延遲發送的消息機制。消息如何延遲?

ttl (time to live) 消息存活時間

ttl是指一個消息的存活時間。

Per-Queue Message TTL in Queues

引用官方的一句話:

TTL can be set for a given queue by setting the x-message-ttl argument to queue.declare, or by setting the message-ttl policy. A message that has been in the queue for longer than the configured TTL is said to be dead. 咱們能夠經過x-message-ttl設置一個隊列中消息的過時時間,消息一旦過時,將會變成死信(dead-letter),能夠選擇從新路由。

Per-Message TTL in Publishers

引用官方的一句話:

A TTL can be specified on a per-message basis, by setting the expiration field in the basic AMQP class when sending a basic.publish.

The value of the expiration field describes the TTL period in milliseconds. The same constraints as for x-message-ttl apply. Since the expiration field must be a string, the broker will (only) accept the string representation of the number.

咱們能夠經過設置每一條消息的屬性expiration,指定單條消息有效期。消息一旦過時,將會變成死信(dead-letter),能夠選擇從新路由。

從新路由-死信交換機(Dead Letter Exchanges) 引用官方一句話:

Dead Letter Exchanges

Messages from a queue can be ‘dead-lettered’; that is, republished to another exchange when any of the following events occur:

The message is rejected (basic.reject or basic.nack) with requeue=false, The TTL for the message expires; or The queue length limit is exceeded. Dead letter exchanges (DLXs) are normal exchanges. They can be any of the usual types and are declared as usual. To set the dead letter exchange for a queue, set the x-dead-letter-exchange argument to the name of the exchange.

咱們能夠經過設置死信交換器(x-dead-letter-exchange)來從新發送消息到另一個隊列,而這個隊列將是最終的消費隊列。

  1. 具體實現

rabbitmq配置

屬性文件-rabbitmq.properties

交換、路由等配置按照以上策略,其中,添加了prefetch參數來根據服務器能力控制消費數量。

鏈接用戶名

mq.user =sms_user

密碼

mq.password =123456

主機

mq.host =192.168.99.100

端口

mq.port =5672

默認virtual-host

mq.vhost =/

the default cache size for channels is 25

mq.channelCacheSize =50

發送消息路由

sms.route.key =sms_route_key

延遲消息隊列

sms.delay.queue =sms_delay_queue

延遲消息交換器

sms.delay.exchange =sms_delay_exchange

消息的消費隊列

sms.queue =sms_queue

消息交換器

sms.exchange =sms_exchange

每秒消費消息數量

sms.prefetch =30

配置rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <context:property-placeholder location="rabbitmq.properties"/>
    <!--配置connection-factory,指定鏈接rabbit server參數 -->
    <rabbit:connection-factory id="connectionFactory"
                       username="${mq.user}" password="${mq.password}"
                       host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}" />

    <!--定義rabbit template用於數據的接收和發送 -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

    <!--經過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--定義queue -->
    <rabbit:queue name="${sms.queue}" durable="true" auto-delete="false" exclusive="false" />
    <!-- 建立延遲,有消息有效期的隊列 -->
    <rabbit:queue name="${sms.delay.queue}" durable="true" auto-delete="false">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl">
                <!-- 隊列默認消息過時時間 -->
                <value type="java.lang.Long">3600000</value>
            </entry>
            <!-- 消息過時根據從新路由 -->
            <entry key="x-dead-letter-exchange" value="${sms.exchange}"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!-- 定義direct exchange,sms_queue -->
    <rabbit:direct-exchange name="${sms.exchange}" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="${sms.queue}" key="${sms.route.key}"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <!-- 延遲消息配置,durable=true 持久化生效 -->
    <rabbit:direct-exchange name="${sms.delay.exchange}" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="${sms.delay.queue}" key="${sms.route.key}"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 消息接收者 -->
    <bean id="messageReceiver" class="git.yampery.consumer.MsgConsumer"/>
    <!-- queue litener  觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象-->
    <rabbit:listener-container connection-factory="connectionFactory" prefetch="${sms.prefetch}">
        <rabbit:listener queues="${sms.queue}" ref="messageReceiver"/>
    </rabbit:listener-container>
</beans>

複製代碼

消息發佈者

package git.yampery.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
* @decription MsgProducer
* <p>生產者</p>
* @author Yampery
* @date 2018/2/11 11:44
*/
@Component
public class MsgProducer {

   @Resource
   private AmqpTemplate amqpTemplate;
   @Value("${sms.delay.exchange}") private String SMS_DELAY_EXCHANGE;
   @Value("${sms.exchange}") private String SMS_EXCHANGE;
   @Value("${sms.route.key}") private String SMS_ROUTE_KEY;

   /**
    * 延遲消息放入延遲隊列中
    * @param msg
    * @param expiration
    */
   public void publish(String msg, String expiration) {
       amqpTemplate.convertAndSend(SMS_DELAY_EXCHANGE, SMS_ROUTE_KEY, msg, message -> {
           // 設置消息屬性-過時時間
           message.getMessageProperties().setExpiration(expiration);
           return message;
       });
   }

   /**
    * 非延遲消息放入待消費隊列
    * @param msg
    */
   public void publish(String msg) {
       amqpTemplate.convertAndSend(SMS_EXCHANGE, SMS_ROUTE_KEY, msg);
   }
}

複製代碼

消費者

package git.yampery.consumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

/**
* @decription MsgConsumer
* <p>消費者</p>
* @author Yampery
* @date 2018/2/11 11:43
*/
public class MsgConsumer implements MessageListener {
   @Override
   public void onMessage(Message message) {
       String msg;
       try {
           // 線程每秒消費一次
           Thread.sleep(1000);
           msg = new String(message.getBody(), "utf-8");
           System.out.println(msg);

       } catch (Exception e) {
           // 這裏並無對服務異常等失敗的消息作處理,直接丟棄了
           // 防止因業務異常致使消息失敗形成unack阻塞再隊列裏
           // 能夠選擇路由到另一個專門處理消費失敗的隊列
           return;
       }
   }
}

複製代碼

測試

package git.yampery.mq;

import com.alibaba.fastjson.JSONObject;
import git.yampery.producer.MsgProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

/**
 * @decription TestMq
 * <p>測試</p>
 * @author Yampery
 * @date 2018/2/11 15:03
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestMq {

    @Resource
    private MsgProducer producer;

    @Test
    public void testMq() {
        JSONObject jObj = new JSONObject();
        jObj.put("msg", "這是一條短信");
        producer.publish(jObj.toJSONString(), String.valueOf(10 * 1000));
    }
}

複製代碼

java B2B2C 仿淘寶電子商城系統

相關文章
相關標籤/搜索