rabbitmq學習總結

  近幾日學習了一下rabbitmq消息中間件,因爲好久好久以前已經學過,並且自己入門難度也不高,因此學習起來較爲簡單,現將全部內容及與spring、springboot整合方法記錄:html

  首先消息中間件定義:java

1.什麼是MQ

    消息隊列(Message Queue,簡稱MQ),從字面意思上看,本質是個隊列,FIFO先入先出,只不過隊列中存放的內容是message而已。
    其主要用途:不一樣進程Process/線程Thread之間通訊。

爲何會產生消息隊列?有幾個緣由:

    不一樣進程(process)之間傳遞消息時,兩個進程之間耦合程度太高,改動一個進程,引起必須修改另外一個進程,爲了隔離這兩個進程,在兩進程間抽離出一層(一個模塊),全部兩進程之間傳遞的消息,都必須經過消息隊列來傳遞,單獨修改某一個進程,不會影響另外一個;

    不一樣進程(process)之間傳遞消息時,爲了實現標準化,將消息的格式規範化了,而且,某一個進程接受的消息太多,一會兒沒法處理完,而且也有前後順序,必須對收到的消息進行排隊,所以誕生了事實上的消息隊列;
 git

我從別人博客中看到了幾篇文章,寫的很是棒 如今將其記錄github

https://mp.weixin.qq.com/s?__biz=MzAxOTc0NzExNg==&mid=2665513507&idx=1&sn=d6db79c1ae03ba9260fb0fb77727bb54&chksm=80d67a60b7a1f376e7ad1e2c3276e8b565f045b1c7e21ef90926f69d99f969557737eb5d8128&mpshare=1&scene=1&srcid=1019awkBx8kaLyFohcuW4Ee7web

這個是經過故事的方式寫出消息中間件究竟是什麼正則表達式

https://mp.weixin.qq.com/s?__biz=MjM5ODYxMDA5OQ==&mid=2651960012&idx=1&sn=c6af5c79ecead98daa4d742e5ad20ce5&chksm=bd2d07108a5a8e0624ae6ad95001c4efe09d7ba695f2ddb672064805d771f3f84bee8123b8a6&mpshare=1&scene=1&srcid=04054h4e90lz5Qc2YKnLNuvYspring

這個是消息中間件的使用場景docker

https://github.com/jasonGeng88/blog/blob/master/201705/MQ.mdjson

這個故事是消息中間件在實際開發中的應用場合緩存

 

還有一個協議須要注意:

AMQP

 

 

 還有rabbitmq的五種隊列

 

 

 如今我將其解釋一下:

第一種.直接給 就是最簡單的一對一(經過隊列發)

第二種:能夠當作是第一種的擴展,就是一對多,你們共享同一個消息

第三種:其實就是將消息發送給交換機,而後交換機在把消息發送給綁定在此交換機的隊列上

第四種 路由模式,你能夠當作是根據具體關鍵字來發  或者說是服務每一個隊列承擔着一個或多個服務,消息會根據服務來發

第五種 topic模式,跟上面的差很少 不過他是至關於多匹配,至關於正則表達式

第六種 好像是遠程調用

 

而後是和安裝和使用,安裝的話我直接用的docker

其他安裝手段的話你必須得先安裝elrang語言,由於rabbitmq就是用這個語言編寫的

接下來是各類模式的使用

稍後我會傳到github去  其實難點很少 主要是幾個方法的使用及各類信道綁定啊之類的 大體流程是 

獲取鏈接------建立通道-------建立通道----發佈信息

獲取鏈接------建立通道------根據知道的通道建立消費者------獲取到達消息

 

獲取鏈接------建立通道------建立交換機----------發佈信息

獲取鏈接------建立通道------申明隊列-------綁定交換機------定義消費者(根據隊列)------獲取到達消息

其他幾種方法無非就是多加了點關鍵字  沒有什麼差異

 

 

重點一:與spring整合

我見到的基本有兩種方式

第一種就是寫配置bean類

 

@Configuration
public class HelloWorldConfiguration {

    protected final String helloWorldQueueName = "hello.world.queue";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.13.132");
        connectionFactory.setUsername("wujifu");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/vhose_1");
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        //The routing key is set to the name of the queue by the broker for the default exchange.
        template.setRoutingKey(this.helloWorldQueueName);
        //Where we will synchronously receive messages from
        template.setDefaultReceiveQueue(this.helloWorldQueueName);
        return template;
    }

    @Bean
    // Every queue is bound to the default direct exchange
    public Queue helloWorldQueue() {
        return new Queue(this.helloWorldQueueName);
    }

    /*
    @Bean
    public Binding binding() {
        return declare(new Binding(helloWorldQueue(), defaultDirectExchange()));
    }*/

    /*
    @Bean
    public TopicExchange helloExchange() {
        return declare(new TopicExchange("hello.world.exchange"));
    }*/

    /*
    public Queue declareUniqueQueue(String namePrefix) {
        Queue queue = new Queue(namePrefix + "-" + UUID.randomUUID());
        rabbitAdminTemplate().declareQueue(queue);
        return queue;
    }

    // if the default exchange isn't configured to your liking....
    @Bean Binding declareP2PBinding(Queue queue, DirectExchange exchange) {
        return declare(new Binding(queue, exchange, queue.getName()));
    }

    @Bean Binding declarePubSubBinding(String queuePrefix, FanoutExchange exchange) {
        return declare(new Binding(declareUniqueQueue(queuePrefix), exchange));
    }

    @Bean Binding declarePubSubBinding(UniqueQueue uniqueQueue, TopicExchange exchange) {
        return declare(new Binding(uniqueQueue, exchange));
    }

    @Bean Binding declarePubSubBinding(String queuePrefix, TopicExchange exchange, String routingKey) {
        return declare(new Binding(declareUniqueQueue(queuePrefix), exchange, routingKey));
    }*/

}

 

還有一種是配置文件的方法

<!-- 公共部分 -->
<!-- 建立鏈接類 鏈接安裝好的 rabbitmq -->
<bean id="connectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="localhost" />    
    <!-- username,訪問RabbitMQ服務器的帳戶,默認是guest -->
    <property name="username" value="${rmq.manager.user}" />
    <!-- username,訪問RabbitMQ服務器的密碼,默認是guest -->   
    <property name="password" value="${rmq.manager.password}" />
    <!-- host,RabbitMQ服務器地址,默認值"localhost" -->   
    <property name="host" value="${rmq.ip}" />   
    <!-- port,RabbitMQ服務端口,默認值爲5672 -->
    <property name="port" value="${rmq.port}" />
    <!-- channel-cache-size,channel的緩存數量,默認值爲25 -->
    <property name="channel-cache-size" value="50" />
    <!-- cache-mode,緩存鏈接模式,默認值爲CHANNEL(單個connection鏈接,鏈接以後關閉,自動銷燬) -->
    <property name="cache-mode" value="CHANNEL" />   
</bean>
<!--或者這樣配置,connection-factory元素實際就是註冊一個org.springframework.amqp.rabbit.connection.CachingConnectionFactory實例
<rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}"
username="${rmq.manager.user}" password="${rmq.manager.password}" />-->
<rabbit:admin connection-factory="connectionFactory"/>

<!--定義消息隊列,durable:是否持久化,若是想在RabbitMQ退出或崩潰的時候,不會失去全部的queue和消息,須要同時標誌隊列(queue)和交換機(exchange)是持久化的,即rabbit:queue標籤和rabbit:direct-exchange中的durable=true,而消息(message)默認是持久化的能夠看類org.springframework.amqp.core.MessageProperties中的屬性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;exclusive: 僅建立者可使用的私有隊列,斷開後自動刪除;auto_delete: 當全部消費客戶端鏈接斷開後,是否自動刪除隊列 -->
<rabbit:queue name="spittle.alert.queue.1" id="queue_1" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="spittle.alert.queue.2" id="queue_2" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="spittle.alert.queue.3" id="queue_3" durable="true" auto-delete="false" exclusive="false" />

<!--綁定隊列,rabbitmq的exchangeType經常使用的三種模式:direct,fanout,topic三種,咱們用direct模式,即rabbit:direct-exchange標籤,Direct交換器很簡單,若是是Direct類型,就會將消息中的RoutingKey與該Exchange關聯的全部Binding中的BindingKey進行比較,若是相等,則發送到該Binding對應的Queue中。有一個須要注意的地方:若是找不到指定的exchange,就會報錯。但routing key找不到的話,不會報錯,這條消息會直接丟失,因此此處要當心,auto-delete:自動刪除,若是爲Yes,則該交換機全部隊列queue刪除後,自動刪除交換機,默認爲false -->
<rabbit:direct-exchange id="spittle.fanout" name="spittle.fanout" durable="true" auto-delete="false">
    <rabbit:bindings>
        <rabbit:binding queue="spittle.alert.queue.1" key="{alert.queue.1}"></rabbit:binding>
        <rabbit:binding queue="spittle.alert.queue.2" key="{alert.queue.2}"></rabbit:binding>
        <rabbit:binding queue="spittle.alert.queue.3" key="{alert.queue.3}"></rabbit:binding>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!-- 生產者部分 -->
<!-- 發送消息的producer類,也就是生產者 -->
<bean id="msgProducer" class="com.asdf.sdf.ClassA">
    <!-- value中的值就是producer中的的routingKey,也就是隊列名稱,它與上面的rabbit:bindings標籤中的key必須相同 -->
    <property name="queueName" value="{alert.queue.1}"/>
</bean>

<!-- spring amqp默認的是jackson 的一個插件,目的將生產者生產的數據轉換爲json存入消息隊列,因爲fastjson的速度快於jackson,這裏替換爲fastjson的一個實現 -->
<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>
<!-- 或者配置jackson -->
<!--
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
-->

<rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />

<!-- 消費者部分 -->
<!-- 自定義接口類 -->
<bean id="testHandler" class="com.rabbit.TestHandler"></bean>

<!-- 用於消息的監聽的代理類MessageListenerAdapter -->
<bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" >
        <!-- 類名 -->
    <constructor-arg ref="testHandler" />
        <!-- 方法名 -->
    <property name="defaultListenerMethod" value="handlerTest"></property>
                <property name="messageConverter" ref="jsonMessageConverter"></property>
</bean>

<!-- 配置監聽acknowledeg="manual"設置手動應答,它可以保證即便在一個worker處理消息的時候用CTRL+C來殺掉這個worker,或者一個consumer掛了(channel關閉了、connection關閉了或者TCP鏈接斷了),也不會丟失消息。由於RabbitMQ知道沒發送ack確認消息致使這個消息沒有被徹底處理,將會對這條消息作re-queue處理。若是此時有另外一個consumer鏈接,消息會被從新發送至另外一個consumer會一直重發,直到消息處理成功,監聽容器acknowledge="auto" concurrency="30"設置發送次數,最多發送30次 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20">
        <rabbit:listener queues="spittle.alert.queue.1" ref="testQueueListenerAdapter" />
    <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" />
    <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" />
</rabbit:listener-container>

 

其他幾種exchange

fanOut:

<!-- Fanout 扇出,顧名思義,就是像風扇吹麪粉同樣,吹獲得處都是。若是使用fanout類型的exchange,那麼routing key就不重要了。由於凡是綁定到這個exchange的queue,都會受到消息。 -->
<rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange">  
        <rabbit:bindings>  
            <rabbit:binding queue="test_delay_queue"/>  
        </rabbit:bindings>  
</rabbit:fanout-exchange>
<!-- 發送端不是按固定的routing key發送消息,而是按字符串「匹配」發送,接收端一樣如此 -->
<rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="Q1" pattern="error.*.log" />
            <rabbit:binding queue="Q2" pattern="error.level1.log" />
            <rabbit:binding queue="Q3" pattern="error.level2.log" />
        </rabbit:bindings>
</rabbit:topic-exchange>

還有相關的消費者和提供者

@Resource  
private RabbitTemplate rabbitTemplate;
private String queueName;  
public void sendMessage(CommonMessage msg){
         try {  
              logger.error("發送信息開始");
              System.out.println(rabbitTemplate.getConnectionFactory().getHost());  
             //發送信息  queueName交換機,就是上面的routingKey msg.getSource() 爲 test_key 
             rabbitTemplate.convertAndSend(queueName,msg.getSource(), msg);
             //若是是普通字符串消息須要先序列化,再發送消息
             //rabbitTemplate.convertAndSend(queueName,msg.getSource(), SerializationUtils.serialize(msg));
             logger.error("發送信息結束");
         } catch (Exception e) {  
             e.printStackTrace();
         }
    }

public void setQueueName(String queueName) {
    this.queueName = queueName;
}
public class TestHandler  {
    @Override
    public void handlerTest(CommonMessage commonMessage) {
        System.out.println("DetailQueueConsumer: " + new String(message.getBody()));
    }
}

如今說說我我的的理解

其實思路與其餘幾個工具或者說方法 的用法都是同樣的 在配置文件(配置類中配置好類工廠 模板 而後再函數中直接拿bean或者模板直接用就行了)

 

接下來是與springboot的整合

與springboot的整合我用到了父子模塊的用法 如今記錄一下,否則會忘!就是建立一個空的springboot項目,而後右擊添加新項目 新項目的存儲位置必定要處於父項目的目錄下

而後具體整合也和spring差很少 甚至能夠說是比spring還要簡單一點

1.在配置文件中寫上屬性

2.寫一個配置類

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class DirectRabbitConfig {
 
    //隊列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        return new Queue("TestDirectQueue",true);
    }
 
    //Direct交換機 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
        return new DirectExchange("TestDirectExchange");
    }
 
    //綁定  將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }
}

簡單接口進行消息的推送

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@RestController
public class SendMessageController {
 
    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,這提供了接收/發送等等方法
 
    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //將消息攜帶綁定鍵值:TestDirectRouting 發送到交換機TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
        return "ok";
    }
 
 
}

 

而後是消費者方的寫法 也是同樣的 此處必須注意兩邊都同時得有配置文件 由於比較分紅了兩個模塊了

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class DirectRabbitConfig {
 
    //隊列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        return new Queue("TestDirectQueue",true);
    }
 
    //Direct交換機 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
        return new DirectExchange("TestDirectExchange");
    }
 
    //綁定  將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }
}

而後建立消費類

@Component
@RabbitListener(queues = "TestDirectQueue")//監聽的隊列名稱 TestDirectQueue
public class DirectReceiver {
 
    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("DirectReceiver消費者收到消息  : " + testMessage.toString());
    }
 
}

這裏面消費者是用、

  • 使用 @RabbitListener 註解標記方法,當監聽到隊列 debug 中有消息時則會進行接收並處理
  • 經過 @RabbitListener 的 bindings 屬性聲明 Binding(若 RabbitMQ 中不存在該綁定所須要的 Queue、Exchange、RouteKey 則自動建立,若存在則拋出異常)聲明綁定隊列
  • @EnableRabbit和@Configuration一塊兒使用,能夠加在類或者方法上,這個註解開啓了容器對註冊的bean的@RabbitListener檢查。

  • @RabbitListener 和 @RabbitHandler結合使用,不一樣類型的消息使用不一樣的方法來處理。

 

其他模式的使用也差很少 就是稍微多點配置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
 
@Configuration
public class TopicRabbitConfig {
    //綁定鍵
    public final static String man = "topic.man";
    public final static String woman = "topic.woman";
 
    @Bean
    public Queue firstQueue() {
        return new Queue(TopicRabbitConfig.man);
    }
 
    @Bean
    public Queue secondQueue() {
        return new Queue(TopicRabbitConfig.woman);
    }
 
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }
 
 
    //將firstQueue和topicExchange綁定,並且綁定的鍵值爲topic.man
    //這樣只要是消息攜帶的路由鍵是topic.man,纔會分發到該隊列
    @Bean
    Binding bindingExchangeMessage() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
    }
 
    //將secondQueue和topicExchange綁定,並且綁定的鍵值爲用上通配路由鍵規則topic.#
    // 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列
    @Bean
    Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }
 
}

 

 

workqueue方法就是多建立幾個隊列 多綁定幾回就好了

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
 
@Configuration
public class FanoutRabbitConfig {
 
    /**
     *  建立三個隊列 :fanout.A   fanout.B  fanout.C
     *  將三個隊列都綁定在交換機 fanoutExchange 上
     *  由於是扇型交換機, 路由鍵無需配置,配置也不起做用
     */
 
 
    @Bean
    public Queue queueA() {
        return new Queue("fanout.A");
    }
 
    @Bean
    public Queue queueB() {
        return new Queue("fanout.B");
    }
 
    @Bean
    public Queue queueC() {
        return new Queue("fanout.C");
    }
 
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }
 
    @Bean
    Binding bindingExchangeA() {
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }
 
    @Bean
    Binding bindingExchangeB() {
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }
 
    @Bean
    Binding bindingExchangeC() {
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }
}

到此 具體的入門級用法都已經講完了 具體的模型我會傳到github上,接下來還有一些重點關於消息確認的知識(能夠用來確保消息不丟失)

一:確認種類

RabbitMQ的消息確認有兩種。

一種是消息發送確認。這種是用來確認生產者將消息發送給交換器,交換器傳遞給隊列的過程當中,消息是否成功投遞。發送確認分爲兩步,一是確認是否到達交換器,二是確認是否到達隊列。

第二種是消費接收確認。這種是確認消費者是否成功消費了隊列中的消息。

二:消息發送確認

(1)ConfirmCallback

經過實現ConfirmCallBack接口,消息發送到交換器Exchange後觸發回調。

 

 

 

使用該功能須要開啓確認,spring-boot中配置以下:

spring.rabbitmq.publisher-confirms = true

(2)ReturnCallback

經過實現ReturnCallback接口,若是消息從交換器發送到對應隊列失敗時觸發(好比根據發送消息時指定的routingKey找不到隊列時會觸發)

 

使用該功能須要開啓確認,spring-boot中配置以下:

spring.rabbitmq.publisher-returns = true

 

 

根據前面的知識(深刻了解RabbitMQ工做原理及簡單使用Rabbit的幾種工做模式介紹與實踐)咱們知道,若是要保證消息的可靠性,須要對消息進行持久化處理,然而消息持久化除了須要代碼的設置以外,還有一個重要步驟是相當重要的,那就是保證你的消息順利進入Broker(代理服務器),如圖所示:

 

 

 

正常狀況下,若是消息通過交換器進入隊列就能夠完成消息的持久化,但若是消息在沒有到達broker以前出現意外,那就形成消息丟失,有沒有辦法能夠解決這個問題?

RabbitMQ有兩種方式來解決這個問題:

  1. 經過AMQP提供的事務機制實現;
  2. 使用發送者確認模式實現;

1、事務使用

事務的實現主要是對信道(Channel)的設置,主要的方法有三個:

  1. channel.txSelect()聲明啓動事務模式;

  2. channel.txComment()提交事務;

  3. channel.txRollback()回滾事務;

從上面的能夠看出事務都是以tx開頭的,tx應該是transaction extend(事務擴展模塊)的縮寫,若是有準確的解釋歡迎在博客下留言。

咱們來看具體的代碼實現:

// 建立鏈接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);   
Connection conn = factory.newConnection();
// 建立信道
Channel channel = conn.createChannel();
// 聲明隊列
channel.queueDeclare(_queueName, true, false, false, null);
String message = String.format("時間 => %s", new Date().getTime());
try {
    channel.txSelect(); // 聲明事務
    // 發送消息
    channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
    channel.txCommit(); // 提交事務
} catch (Exception e) {
    channel.txRollback();
} finally {
    channel.close();
    conn.close();
}

反正就是提交失敗就回滾 可是效率很低

從上面能夠看出,非事務模式的性能是事務模式的性能高149倍,個人電腦測試是這樣的結果,不一樣的電腦配置略有差別,但結論是同樣的,事務模式的性能要差不少,那有沒有既能保證消息的可靠性又能兼顧性能的解決方案呢?那就是接下來要講的Confirm發送方確認模式。

2、Confirm發送方確認模式

Confirm發送方確認模式使用和事務相似,也是經過設置Channel進行發送方確認的。

Confirm的三種實現方式:

方式一:channel.waitForConfirms()普通發送方確認模式;

方式二:channel.waitForConfirmsOrDie()批量確認模式;

方式三:channel.addConfirmListener()異步監聽發送方確認模式;

 

方式一:普通Confirm模式

// 建立鏈接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 建立信道
Channel channel = conn.createChannel();
// 聲明隊列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 開啓發送方確認模式
channel.confirmSelect();
String message = String.format("時間 => %s", new Date().getTime());
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
if (channel.waitForConfirms()) {
    System.out.println("消息發送成功" );
}

看代碼能夠知道,咱們只須要在推送消息以前,channel.confirmSelect()聲明開啓發送方確認模式,再使用channel.waitForConfirms()等待消息被服務器確認便可。

方式二:批量Confirm模式

// 建立鏈接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 建立信道
Channel channel = conn.createChannel();
// 聲明隊列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 開啓發送方確認模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
    String message = String.format("時間 => %s", new Date().getTime());
    channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(); //直到全部信息都發布,只要有一個未確認就會IOException
System.out.println("所有執行完成");

以上代碼能夠看出來channel.waitForConfirmsOrDie(),使用同步方式等全部的消息發送以後纔會執行後面代碼,只要有一個消息未被確認就會拋出IOException異常。

 

方式三:異步Confirm模式

// 建立鏈接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 建立信道
Channel channel = conn.createChannel();
// 聲明隊列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 開啓發送方確認模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
    String message = String.format("時間 => %s", new Date().getTime());
    channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
}
//異步監聽確認和未確認的消息
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("未確認消息,標識:" + deliveryTag);
    }
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println(String.format("已確認消息,標識:%d,多個消息:%b", deliveryTag, multiple));
    }
});

異步模式的優勢,就是執行效率高,不須要等待消息執行完,只須要監聽消息便可,以上異步返回的信息以下:

能夠看出,代碼是異步執行的,消息確認有多是批量確認的,是否批量確認在於返回的multiple的參數,此參數爲bool值,若是true表示批量執行了deliveryTag這個值之前的全部消息,若是爲false的話表示單條確認。

 

最好補充一點:因爲我本身整理的可能不夠全面 因此我放幾個連接

https://blog.csdn.net/qq_35387940/article/details/100514134   https://www.cnblogs.com/nizuimeiabc1/p/9608763.html 整合篇 

https://blog.csdn.net/u013256816/article/details/55515234 消息確認機制

相關文章
相關標籤/搜索