1.一、rabbitMQ的優勢(適用範圍)
1. 基於erlang語言開發具備高可用高併發的優勢,適合集羣服務器。
2. 健壯、穩定、易用、跨平臺、支持多種語言、文檔齊全。
3. 有消息確認機制和持久化機制,可靠性高。
4. 開源
其餘MQ的優點:
1. Apache ActiveMQ曝光率最高,可是可能會丟消息。
2. ZeroMQ延遲很低、支持靈活拓撲,可是不支持消息持久化和崩潰恢復。java
1.二、幾個概念說明
producer&consumer
producer指的是消息生產者,consumer消息的消費者。
Queue
消息隊列,提供了FIFO的處理機制,具備緩存消息的能力。rabbitmq中,隊列消息能夠設置爲持久化,臨時或者自動刪除。
設置爲持久化的隊列,queue中的消息會在server本地硬盤存儲一份,防止系統crash,數據丟失
設置爲臨時隊列,queue中的數據在系統重啓以後就會丟失
設置爲自動刪除的隊列,當不存在用戶鏈接到server,隊列中的數據會被自動刪除Exchangegit
Exchange相似於數據通訊網絡中的交換機,提供消息路由策略。rabbitmq中,producer不是經過信道直接將消息發送給queue,而是先發送給Exchange。一個Exchange能夠和多個Queue進行綁定,producer在傳遞消息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由算法,將消息路由給指定的queue。和Queue同樣,Exchange也可設置爲持久化,臨時或者自動刪除。
Exchange有4種類型:direct(默認),fanout, topic, 和headers,不一樣類型的Exchange轉發消息的策略有所區別:
Direct
直接交換器,工做方式相似於單播,Exchange會將消息發送徹底匹配ROUTING_KEY的Queue
fanout
廣播是式交換器,無論消息的ROUTING_KEY設置爲何,Exchange都會將消息轉發給全部綁定的Queue。
topic
主題交換器,工做方式相似於組播,Exchange會將消息轉發和ROUTING_KEY匹配模式相同的全部隊列,好比,ROUTING_KEY爲user.stock的Message會轉發給綁定匹配模式爲 * .stock,user.stock, * . * 和#.user.stock.#的隊列。( * 表是匹配一個任意詞組,#表示匹配0個或多個詞組)
headers
消息體的header匹配(ignore)
Binding
所謂綁定就是將一個特定的 Exchange 和一個特定的 Queue 綁定起來。Exchange 和Queue的綁定能夠是多對多的關係。
virtual host
在rabbitmq server上能夠建立多個虛擬的message broker,又叫作virtual hosts (vhosts)。每個vhost本質上是一個mini-rabbitmq server,分別管理各自的exchange,和bindings。vhost至關於物理的server,能夠爲不一樣app提供邊界隔離,使得應用安全的運行在不一樣的vhost實例上,相互之間不會干擾。producer和consumer鏈接rabbit server須要指定一個vhost。github
1. MessageProducer算法
1 MessageProducerpackage com.hjp.rabbitmq.rabbitmq.samples3; 2 3 import javax.annotation.Resource; 4 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 import org.springframework.amqp.core.AmqpTemplate; 8 import org.springframework.stereotype.Service; 9 10 @Service 11 public class MessageProducer { 12 13 private Logger logger = LoggerFactory.getLogger(MessageProducer.class); 14 15 @Resource 16 private AmqpTemplate amqpTemplate; 17 18 public void sendMessage(Object message) { 19 20 logger.info("to send message:{}", message); 21 amqpTemplate.convertAndSend("queueTestKey", message); 22 23 } 24 }
2.MessageConsumerspring
package com.hjp.rabbitmq.rabbitmq.samples3; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class MessageConsumer implements MessageListener { private Logger logger = LoggerFactory.getLogger(MessageConsumer.class); @Override public void onMessage(Message message) { logger.info("receive message:{}",message); } }
3.rabbitmq.xmlsql
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!--配置connection-factory,指定鏈接rabbit server參數 --> <rabbit:connection-factory id="connectionFactory" username="guest" password="guest" host="localhost" port="5672" /> <!--定義rabbit template用於數據的接收和發送 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="exchangeTest" /> <!--經過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 --> <rabbit:admin connection-factory="connectionFactory" /> <!--定義queue --> <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" /> <!-- 定義direct exchange,綁定queueTest --> <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- 消息接收者 --> <bean id="messageReceiver" class="com.hjp.rabbitmq.rabbitmq.samples3.MessageConsumer"></bean> <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="queueTest" ref="messageReceiver"/> </rabbit:listener-container> </beans>
4.application.xmlapache
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> <import resource="classpath*:rabbitmq.xml" /> <!-- 掃描指定package下全部帶有如@controller,@services,@resource,@ods並把所註釋的註冊爲Spring Beans --> <context:component-scan base-package="com.hjp.rabbitmq.rabbitmq.samples3" /> <!-- 激活annotation功能 --> <context:annotation-config /> <!-- 激活annotation功能 --> <context:spring-configured /> </beans>
5.log4j配置緩存
log4j.rootLogger=DEBUG,Console,Stdout
#Console
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
log4j.logger.java.sql.ResultSet=INFO
log4j.logger.org.apache=INFO
log4j.logger.java.sql.Connection=DEBUG
log4j.logger.java.sql.Statement=DEBUG
log4j.logger.java.sql.PreparedStatement=DEBUG
log4j.appender.Stdout = org.apache.log4j.DailyRollingFileAppender
#log4j.appender.Stdout.File = E://logs/log.log
log4j.appender.Stdout.Append = true
log4j.appender.Stdout.Threshold = DEBUG
log4j.appender.Stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.Stdout.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
代碼下載:https://github.com/jianpingh/rabbitmq-samples安全