消息中間件簡單介紹java
可靠性(Reliability) | 使用了一些機制來保證可靠性,如持久、傳輸確認、發佈確認 |
靈活性(Flexible Routing) | 在消息進入隊列以前,經過Exchange來路由消息,能夠將多個Exchange綁定一塊兒 |
消息集羣(Clustering) | 多個RabbitMQ服務器開以組成一個集羣,造成一個邏輯Broker |
高可用(Highly Available Queues) | 隊列能夠在集羣中機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍能夠用 |
多中協議(Multi-protocol) | 支持多種消息隊列協議,好比STOMP、MQTT等等 |
多語言客戶端(Many Clients) | 幾乎支持全部經常使用語言,好比java、.NET、Ruby等等 |
管理界面(Management UI) | 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息Broker的許多方面 |
跟蹤機制(Tracing) | 若是消息異常,RabbitMQ提供了消息跟蹤機制,使用者能夠找出發生了什麼 |
插件機制(Plugin System) | 提供了許多插件,從多方面進行擴展,也可編寫本身的插件 |
架構圖redis
主要概念spring
rabbitmq‐plugins enable rabbitmq_management
|
(4)從新啓動服務瀏覽器
(5)打開瀏覽器,地址欄輸入http://127.0.0.1:15672 ,便可看到管理界面的登錄頁服務器
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring‐rabbit</artifactId> <version>2.1.4.RELEASE</version> </dependency>
<?xml version="1.0" encoding="UTF‐8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema‐instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring‐beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring‐rabbit.xsd"> <!‐‐鏈接工廠‐‐> <rabbit:connection‐factory id="connectionFactory"
host="127.0.0.1"
port="5672"
username="guest"
password="guest" /> <rabbit:template id="rabbitTemplate" connection‐ factory="connectionFactory" />
</beans>
1 @Test
public void test() { 2 //解析配置文件,從中獲取RabbitTemplate對象 3 ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-rabbitmq-producer.xml"); 4 RabbitTemplate rabbitTemplate= (RabbitTemplate)context.getBean("rabbitTemplate"); 5 //發送消息(queue.test這個隊列名稱須要在RabbitMQ中手動建立) 6 rabbitTemplate.convertAndSend("","queue.test","直接模式"); 7 System.out.println("消息發送成功"); 8 //關閉對象 9 ((ClassPathXmlApplicationContext) context).close(); 10 }
代碼實現、消息消費者架構
1 public class MessageConsumer implements MessageListener { 2 public void onMessage(Message message) { 3 System.out.println("接收到消息:" +new String(message.getBody()) ); 4 } 5 }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--鏈接工廠--> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"/> <!--隊列名稱--> <rabbit:queue name="queue.test" /> <!--消費者監聽類class爲監聽類路徑--> <bean id="messageConsumer" class="cn.itcast.demo.MessageConsumer"> </bean> <!--設置監聽容器--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queue-names="queue.test" ref="messageConsumer"/> </rabbit:listener-container> </beans>
@Test public void test2(){ //運行此方法會自動調用消息監聽類,並打印監聽到的消息 ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-rabbitmq-consumer.xml"); }
運行結果app
接收到消息:直接模式
rabbitTemplate.convertAndSend("exchange.fanout_test","","分列模式走起");
修改消息、消費者配置文件(applicationContext-rabbitmq-consumer.xml)異步
<!--隊列名稱--> <rabbit:queue name="queue.test" /> <rabbit:queue name="queue.test1" /> <!--消費者監聽類class爲監聽類路徑--> <bean id="messageConsumer" class="cn.itcast.demo.MessageConsumer"></bean> <bean id="messageConsumer1" class="cn.itcast.demo.MessageConsumer1"> </bean> <!--設置監聽容器--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queue-names="queue.test" ref="messageConsumer"/> <rabbit:listener queue-names="queue.test1" ref="messageConsumer1"/> </rabbit:listener-container>
在增長一個消息監聽者類,而後運行測試代碼Test2便可分佈式
運行結果性能
我是Message、接收到消息:分列模式走起
我是Message一、接收到消息:分列模式走起
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--配置connection-factory,指定鏈接rabbit server參數 --> <rabbit:connection-factory id="connectionFactory" username="guest" password="guest" host="127.0.0.1" port="5672" publisher-confirms="true"/> <rabbit:admin connection-factory="connectionFactory"></rabbit:admin> <!--建立交換機(過時隊列的交換機)--> <rabbit:direct-exchange id="exchange.delay.order.begin" name="exchange.delay.order.begin" durable="false" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queue.delay.order.begin" key="delay"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- 延時隊列(過時隊列) --> <rabbit:queue name="queue.delay.order.begin" durable="false"> <rabbit:queue-arguments> <!-- 設置隊列過時時間爲1分鐘 --> <entry key="x-message-ttl" value="60000" value-type="java.lang.Long"/> <entry key="x-dead-letter-exchange" value="exchange.delay.order.done"/> <entry key="x-dead-letter-routing-key" value="delay"/> </rabbit:queue-arguments> </rabbit:queue> <!--死信交換機定義--> <rabbit:direct-exchange id="exchange.delay.order.done" name="exchange.delay.order.done" durable="false" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queue.delay.order.done" key="delay"/> <!-- binding key 相同爲 【delay】exchange轉發消息到多個隊列 --> <!--<rabbit:binding queue="queue.delay.order.done.two" key="delay" />--> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:queue name="queue.delay.order.done" durable="false"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <!-- 消息接收者 --> <rabbit:listener-container connection-factory="connectionFactory" channel-transacted="false"> <rabbit:listener queues="queue.delay.order.done" ref="orderMessageListener"/> </rabbit:listener-container> <!--建立一個消息監聽對象--> <bean class="cn.itcast.demo.MessageConsumer" id="orderMessageListener"></bean> </beans>
建立測試類消息生產者(發送消息)
@Test public void test1() { //解析配置文件,從中獲取RabbitTemplate對象 ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-mq.xml"); RabbitTemplate rabbitTemplate= (RabbitTemplate)context.getBean("rabbitTemplate"); rabbitTemplate.convertAndSend("exchange.delay.order.begin","delay","延時隊列模式走起"); System.out.println("消息發送成功"); //關閉對象 ((ClassPathXmlApplicationContext) context).close(); }
建立消息監聽器(MessageConsumer)
public class MessageConsumer implements MessageListener { public void onMessage(Message message) { System.out.println("我是Message、接收到消息:" +new String(message.getBody()) ); } }
建立測試類消息消費者
@Test public void test2(){ //運行此方法會自動調用消息監聽類,並打印監聽到的消息 ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-mq.xml"); }
運行結果(因爲死信隊列時間設置爲1分鐘,因此須要一分鐘後纔會監聽到消息)
我是Message、接收到消息:延時隊列模式走起
如何解決消息重複消費
讓生產者發送每條數據的時候,裏面加一個全局惟一的id,相似訂單id之類的東西,而後你這裏消費到了以後,先根據這個id去好比redis裏查一下,以前消費過嗎?若是沒有消費過,你就處理,而後這個id寫redis。若是消費過了,那你就別處理了,保證別重複處理相同的消息便可。