RabbitMQ-從基礎到實戰(1)— Hello RabbitMQhtml
RabbitMQ-從基礎到實戰(2)— 防止消息丟失java
RabbitMQ-從基礎到實戰(3)— 消息的交換(上)spring
RabbitMQ-從基礎到實戰(4)— 消息的交換(中)app
RabbitMQ-從基礎到實戰(5)— 消息的交換(下)ide
原計劃這章應該講RabbitMQ的RPC調用的,後來想一想,這個場景應該用的很少,如今比較火的微服務,要麼用dubbo,要麼用spring cloud,用RabbitMQ作RPC比較少見,因此就先跳過了,有須要再補充。微服務
其實網上RabbitMQ和Spring集成的教程有很多,我也大體看了看,大部分都是言簡意賅,代碼配置一貼,而後就能夠用了,而我但願個人教程能多和你們一塊兒探討一些「爲何」。ui
Spring AMQP中有兩個單詞,Spring都知道,那AMQP是什麼?spa
中文意思是,高級消息隊列協議,而後用蹩腳的英語猜一下,advance message queue protocol,差很少了,advance變成形容詞高級的-advanced,queue變成queuing(排隊論,學術一點),因此,AMQP就是Advanced Message Queuing Protocal。翻譯
AMQP 0-9-1 是RabbitMQ支持的協議之一,0-9-1是個版本號,正常狀況下推薦使用,它所表現出的形式,就是前面幾張介紹的內容debug
RabbitMQ還支持其餘版本的協議,具體能夠參考這裏
Spring AMQP的定義以下
The Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions. It provides a "template" as a high-level abstraction for sending and receiving messages. It also provides support for Message-driven POJOs with a "listener container". These libraries facilitate management of AMQP resources while promoting the use of dependency injection and declarative configuration
意外的發現谷歌翻譯的很通順
Spring AMQP項目將核心Spring概念應用於基於AMQP的消息傳遞解決方案的開發。 它提供了一個「模板」做爲發送和接收消息的高級抽象。 它還經過「偵聽器容器」爲消息驅動的POJO提供支持。 這些庫促進AMQP資源的管理,同時促進使用依賴注入和聲明性配置
先不用Spring,只在Main方法中集成Spring AMQP,方便分析。
在pom文件中加入以下配置,不須要再引入RabbitMQ的包,Spring AMQP已經包含了4.0.x的客戶端
Spring AMQP now uses the new 4.0.x version of the
amqp-client
library provided by the RabbitMQ team. This client has auto recovery configured by default
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.1.RELEASE</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.1</version> </dependency>
新建一個類,
1 package com.liyang.ticktock.rabbitmq; 2 3 import org.springframework.amqp.core.BindingBuilder; 4 import org.springframework.amqp.core.Queue; 5 import org.springframework.amqp.core.TopicExchange; 6 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 7 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 8 import org.springframework.amqp.rabbit.core.RabbitAdmin; 9 import org.springframework.amqp.rabbit.core.RabbitTemplate; 10 import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; 11 import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; 12 13 14 public class App 15 { 16 public static void main( String[] args ) throws InterruptedException 17 { 18 //獲取一個鏈接工廠,用戶默認是guest/guest(只能使用部署在本機的RabbitMQ) 19 //是Spring實現的對com.rabbitmq.client.Connection的包裝 20 ConnectionFactory cf = new CachingConnectionFactory("localhost"); 21 22 //對AMQP 0-9-1的實現 23 RabbitAdmin admin = new RabbitAdmin(cf); 24 //聲明一個隊列 25 Queue queue = new Queue("myQueue"); 26 admin.declareQueue(queue); 27 //聲明一個exchange 28 TopicExchange exchange = new TopicExchange("myExchange"); 29 admin.declareExchange(exchange); 30 //綁定隊列到exchange,加上routingKey foo.* 31 admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*")); 32 33 //監聽容器 34 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf); 35 //監聽者對象 36 Object listener = new Object() { 37 @SuppressWarnings("unused") 38 public void handleMessage(String foo) { 39 System.out.println(foo); 40 } 41 }; 42 //經過這個適配器代理listener 43 MessageListenerAdapter adapter = new MessageListenerAdapter(listener); 44 //把適配器(listener)設置給Container 45 container.setMessageListener(adapter); 46 //設置該容器監聽的隊列名,能夠傳多個,public void setQueueNames(String... queueName) { 47 container.setQueueNames("myQueue"); 48 //開始監聽 49 container.start(); 50 51 //發送模版,設置上鍊接工廠 52 RabbitTemplate template = new RabbitTemplate(cf); 53 //發送消息 54 template.convertAndSend("myExchange", "foo.bar", "Hello, world!"); 55 56 Thread.sleep(1000); 57 container.stop(); 58 } 59 }
運行結果以下:
以上代碼中
ConnectionFactory包裝了全部物理鏈接信息,而後傳遞給RabbitAdmin建立了RabbitMQ支持協議的鏈接(AMQP 0-9-1);
聲明隊列和交換中心後,經過BindingBuilder把隊列的綁定關係聲明到admin上;
建立一個消息處理類,用一個適配器(MessageListenerAdapter)包裝它,並註冊到監聽容器中,啓動監聽;
最後經過鏈接信息建立一個Rabbit模板,調用發送方法。
這裏引伸出一個問題,爲何要用監聽容器(SimpleMessageListenerContainer),咱們點開它的outline,以下,能夠看到,它是對監聽這個動做的抽象,一個容器能夠有多個Consumer,而且能夠控制如超時時間等配置。
能夠看出,比起直接使用RabbitMQ客戶端,以上代碼已經簡化了一部分,最明顯的部分就是,不須要手動去關Channel、Connection了,對以上概念的介紹都穿插在前幾章,這裏再也不贅述。
這些代碼用Spring的方式注入將會更加簡潔
首先,POM文件中要把Spring引進來
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.7.RELEASE</version> </dependency>
而後增長applicatioContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd "> <context:annotation-config/> <context:component-scan base-package="com.liyang.ticktock.rabbitmq.listener"/> <!-- 配置ConnectionFactory --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" username="guest" password="guest" port="5672" /> <!-- 等同new一個RabbitAdmin --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 聲明一個隊列 --> <rabbit:queue name="myQueue" /> <!-- 聲明一個topic類型的exchange,並把上面聲明的隊列綁定在上面,routingKey="foo.*" --> <rabbit:topic-exchange name="myExchange"> <rabbit:bindings> <rabbit:binding queue="myQueue" pattern="foo.*" /> <!-- 這裏還能夠繼續綁定其餘隊列 --> </rabbit:bindings> </rabbit:topic-exchange> <!-- 聲明一個rabbitTemplate,指定鏈接信息,發送消息到myExchange上,routingKey在程序中設置,此處的配置在程序中能夠用set修改 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="myExchange"/> <!-- 配置監聽容器,指定消息處理類,處理方法,還能夠配置自動確認等--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="myListener" method="listen" queue-names="myQueue" /> <!-- 能夠繼續註冊監聽 --> </rabbit:listener-container> </beans>
這個配置文件中,咱們使用了context的自動掃描裝配,不用配bean了,還使用了rabbit的命名空間 xmlns:rabbit
這個東西可能有些童鞋不知道怎麼配的,其實沒什麼神祕的,咱們打開根路徑看看裏面有什麼「http://www.springframework.org/schema/」
和本地目錄同樣,就是個存東西的地方,rabbit命名空間,在這個路徑後面加上rabbit對應的目錄就好了
再進入rabbit目錄,發現裏面是一些不一樣版本的xsd文件,因此,在schemaLocation中,配上使用的具體版本的xsd就能夠了
言歸正傳,上面的配置文件,把大量原來在代碼中實現的東西挪到了xml中,一目瞭然,方便修改。注意註釋加粗的三處,如今咱們能夠用Spring實現咱們原來用原生RabbitMQ客戶端實現的全部功能了
在發送消息以前,還須要寫一個類用來處理消息,而且這個類要有linsten方法,和配置文件中聲明的 method="listen" 對應
1 package com.liyang.ticktock.rabbitmq.listener; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 import org.springframework.stereotype.Component; 6 7 @Component 8 public class MyListener { 9 Logger logger = LoggerFactory.getLogger(MyListener.class); 10 11 public void listen(String message){ 12 logger.debug("received:"+message); 13 } 14 15 }
剩下在java中的代碼就很是簡潔了,監聽在Spring環境啓動後就自動開始了,咱們只須要發消息就行,代碼以下
直接一個main方法,省的還要捉一隻湯姆貓
1 public static void main(String[] args) throws InterruptedException { 2 //啓動Spring環境 3 AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml"); 4 //僞裝是Autowired的 5 RabbitTemplate template = ctx.getBean(RabbitTemplate.class); 6 //設置routingKey 7 template.setRoutingKey("foo.bar"); 8 //發送,exchange,routingKey什麼的都配好了 9 template.convertAndSend("Hello, world!"); 10 11 //關掉環境 12 Thread.sleep(1000); 13 ctx.destroy(); 14 }
運行結果以下:
不是很重要,先欠着,有空補上
最近實在太忙,這篇博客利用工做的碎片時間,寫了有一個多星期,囧。
在寫這篇系列教程的同時,我也獲益良多,後面會繼續爲你們介紹一些RabbitMQ實用的高級特性,請多多支持