RabbitMQ-Spring AMQP

上篇文章RabbitMQ基礎入門學習了rabbitMQ一些基礎的api,固然spring也在原生代碼的基礎上作了更多的封裝,這篇文章就基於spring-rabbit,學習一下spring的實現。html

引入jar:java

<dependency>spring

    <groupId>org.springframework.amqp</groupId>api

    <artifactId>spring-rabbit</artifactId>spring-mvc

    <version>1.5.0.RELEASE</version>mvc

</dependency>post

上篇文章沒有測試到Topic exchange,下面就使用Topic exchange作測試。學習

1.基於springframework.amqp.rabbit java寫法。測試

 1 import org.springframework.amqp.core.BindingBuilder;
 2 import org.springframework.amqp.core.Queue;
 3 import org.springframework.amqp.core.TopicExchange;
 4 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 5 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
 6 import org.springframework.amqp.rabbit.core.RabbitAdmin;
 7 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 8 import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
 9 import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
10 
11 @RabbitListener(queues = "hello")
12 public class Tut1Java {
13 
14     public static void main(final String... args) throws Exception {
15 
16         CachingConnectionFactory cf = new CachingConnectionFactory();
17         cf.setAddresses("192.168.1.7:5672");
18         cf.setUsername("admin");
19         cf.setPassword("admin");
20         
21         // set up the queue, exchange, binding on the broker
22         RabbitAdmin admin = new RabbitAdmin(cf);
23         Queue queue = new Queue("myQueue");
24         admin.declareQueue(queue);
25         TopicExchange exchange = new TopicExchange("myExchange");
26         admin.declareExchange(exchange);
27         admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*"));
28 
29         // set up the listener and container
30         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
31         Object listener = new Object() {
32             // 接受到消息時,會執行此方法
33             public void handleMessage(String foo) {
34                 System.out.println("Tut1Java " + foo);
35             }
36         };
37         MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
38         container.setMessageListener(adapter);
39         container.setQueueNames("myQueue");
40         container.start();
41 
42         // send something
43         RabbitTemplate template = new RabbitTemplate(cf);
44         // 只有routingKey符合foo.*規則的纔會被接受處理
45         template.convertAndSend("myExchange", "foo.bar", "Hello, world!");
46         container.stop();
47     }
48 }

 整塊代碼能夠簡單的實現了發送接收消息,主要分爲四個部分。ui

16~19行 初始化一個CachingConnectionFactory,其實底層也是原生的ConnectionFactory。

22~27行 主要是設置queue和exchange,並把它們按照"foo.*"的路由規則綁定起來。

  new Queue("myQueue"),建立一個本地持久話名字叫myQueue的隊列。

  declareQueue(queue),聲明一個隊列。

  new TopicExchange("myExchange"),建立一個topic exchange,看名字也知道exchange類型是topic,咱們只要傳遞參數就行了。固然也會有FanoutExchange、DirectExchange、HeadersExchange。

  BindingBuilder.bind(queue).to(exchange).with("foo.*"),將queue綁定到exchange上,並以"foo.*"做爲他們之間的路由規則。

30~40行 主要是經過SimpleMessageListenerContainer去監聽消息,而且能夠設置特定類的方法去執行處理接受到的消息。

Object listener = new Object() {
    // 接受到消息時,會執行此方法
    public void handleMessage(String foo) {
        System.out.println("Tut1Java " + foo    );
    }
    public void handleMessage2(String foo) {
        System.out.println("Tut1Java2 " + foo);
    }
};
 MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
 adapter.setDefaultListenerMethod("handleMessage2");
 container.setMessageListener(adapter);    

若是消息監聽寫成上面這樣,那麼將執行handleMessage2(String foo) 。

43~46行 利用RabbitTemplate發送消息,三個參數依次爲exchange、routingKey和發送的消息,

首先exchange名稱要和admin聲明的一致,routingKey要符合當前topic exchange的路由規則,不然消息不會發送到當前隊列中。

再看一下topic exchange的模型:

最後運行一下,能夠發現消息被順利打印出來了。

2.基於spring配置寫法

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
        http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd">

<rabbit:connection-factory id="connectionFactory" username="admin" password="admin" host="192.168.1.7" port="5672" virtual-host="/" />

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
    exchange="myExchange" routing-key="foo.bar"/>

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue name="myQueue" />

<rabbit:topic-exchange name="myExchange">
    <rabbit:bindings>
        <rabbit:binding queue="myQueue" pattern="foo.*" />
    </rabbit:bindings>
</rabbit:topic-exchange>

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="foo" method="listen" queue-names="myQueue" />
</rabbit:listener-container>

<bean id="foo" class="rabbitMQ.springAMQP.Foo" />

</beans>  

在context.xml中能夠看到不少熟悉的rabbitMQ的對象,這種寫法只不過從上面new出來的對象,改成使用spring的xml去聲明各類bean對象。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Tut1Spring {

    public static void main(final String... args) throws Exception {

        AbstractApplicationContext ctx =
            new ClassPathXmlApplicationContext("spring/context.xml");
        RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
        template.convertAndSend("Tut1Spring " + "Hello, world!");
        Thread.sleep(1000);
        ctx.destroy();
    }
}
public class Foo {

    public void listen(String foo) {
        System.out.println("Foo=" + foo);
    }
}

Foo爲處理接受並處理消息的類,在xml中也指定了執行的方法爲listen。

最後運行一下,能夠發現消息被順利打印出來了。

 

其實不管是哪一種方式的實現,內部本質仍是基於最原生的api,咱們只要理解最基礎的部分,這些理解起來仍是比較容易的。 

相關文章
相關標籤/搜索