概念:html
消息隊列(Message Queue,簡稱MQ),本質是個隊列,FIFO先入先出,只不過隊列中存放的內容是一些Message
。java
傳統模式spring
使用隊列解耦json
例如:淘寶秒殺活動等。服務器
一個線程負責生產,一個線程負責總隊列裏取出來消費。app
一個線程生產,多個線程取出來消費。異步
一個發佈者發送消息,多個訂閱者能夠同時獲取到發佈的消息ide
生產者發送的消息會發往每一個與其綁定的隊列。學習
AMQP協議: Advanced Message Queuing Protocol 高級消息隊列協議,是一個異步消息傳遞所使用的應用層協議規範。fetch
組成:
服務主機:接收客戶端請求,並做出相應處理
虛擬主機:一個服務器能夠開啓多個Virtual Host,每一個虛擬主機都能提供完整的服務,有本身的權限控制。
生產者:發送消息
消費者:接收消息並處理
交換器:RabbitMQ中,消息不是直接發往隊列,而是要先給交換器,而後交換器按照必定的路由規則發送到相對應的隊列上。
路由Key:發送消息時要指定交換器和路由Key
綁定:將隊列和交換器綁定起來路由Key做爲綁定時的關鍵字
隊列:消息的載體,消費者從隊列中獲取消息,路由器根據路由規則把消息發往對應的隊列。
消息:隊列中存儲的信息單元。
生產者發送消息時指定交換器名和路由Key,而後交換器根據路由Key與綁定信息進行比對,找到對應的隊列後將信息發送出去。
消費者監聽某個隊列,若是有消息就取出來作對應操做,沒有就阻塞。
Direct:固定名稱匹配,只有路由Key與綁定的Key一致纔會將消息發送到該隊列。
Topic:主題模式,路由Key能夠用*#來填充
#能夠匹配任意多個單詞,*只能匹配一個單詞
好比bingdKey x.y x.y.z a.y a.b.z
x.*只能匹配x.y,而x.#能夠匹配x.y 和 x.y.z
Fanout:廣播模式
安裝RabbitMQ,並啓動服務。默認用戶名密碼guest。建立VirtualHost。
publicclass Recv { publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //聲明隊列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "hola"; //綁定隊列,經過鍵 hola將隊列和交換器綁定起來 channel.queueBind(queueName, exchangeName, routingKey);
//消費消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消費的路由鍵:" + routingKey); System.out.println("消費的內容類型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //確認消息 channel.basicAck(deliveryTag, false); System.out.println("消費的消息體內容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr);
} }); } } |
publicclass Send { publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hola"; //發佈消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close(); conn.close();
} } |
與1:1的相似,只不過兩個消費者共同消費一個隊列內的信息,複製一份消費者便可。
生產者發送的消息,發往每一個訂閱他的消費者那裏。全部消費者均可以獲取相同的信息。
發佈者A
publicclass Send {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "王力宏"; channel.exchangeDeclare(exchangeName, "fanout", true);
//發佈消息 byte[] messageBodyBytes = "王力宏發佈的消息:啦啦啦啦啦".getBytes(); channel.basicPublish(exchangeName, "", null, messageBodyBytes);
channel.close(); conn.close(); } } |
發佈者B
publicclass Send2 {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "趙薇"; channel.exchangeDeclare(exchangeName, "fanout", true);
//發佈消息 byte[] messageBodyBytes = "趙薇發佈的消息:啊啊啊啊".getBytes(); channel.basicPublish(exchangeName, "", null, messageBodyBytes);
channel.close(); conn.close(); } } |
訂閱者A1、A2
publicclass Recv {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "王力宏"; channel.exchangeDeclare(exchangeName, "fanout", true); //聲明隊列 String queueName = channel.queueDeclare().getQueue(); //綁定隊列,經過鍵 hola將隊列和交換器綁定起來 channel.queueBind(queueName, exchangeName, "");
while(true) { //消費消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消費的路由鍵:" + routingKey); System.out.println("消費的內容類型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //確認消息 channel.basicAck(deliveryTag, false); System.out.println("消費的消息體內容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } } } |
訂閱者B1、B2
publicclass Recv3 {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); channel.exchangeDeclare("趙薇", "fanout", true); //聲明交換器 String exchangeName = "趙薇"; //聲明隊列 String queueName = channel.queueDeclare().getQueue(); //綁定隊列,經過鍵 hola將隊列和交換器綁定起來 channel.queueBind(queueName, exchangeName, "");
while(true) { //消費消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消費的路由鍵:" + routingKey); System.out.println("消費的內容類型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //確認消息 channel.basicAck(deliveryTag, false); System.out.println("消費的消息體內容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr);
} }); } }
} |
生產者
publicclass Send {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "topic-exchange"; channel.exchangeDeclare(exchangeName, "topic", true);
// String routingKey = "#.B";
//發佈消息 for (inti = 0; i < 3; i++) { channel.basicPublish(exchangeName, routingKey, null, "ss".getBytes()); } //byte[] messageBodyBytes = "匹配消息".getBytes(); //channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); } } |
消費者A
publicclass Recv {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "topic-exchange"; channel.exchangeDeclare(exchangeName, "topic", true); //聲明隊列 String queueName = channel.queueDeclare("X", false, false, false, null).getQueue();
String routingKey = "X.A"; //綁定隊列,經過鍵 hola將隊列和交換器綁定起來 channel.queueBind(queueName, exchangeName, routingKey);
while(true) { //消費消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消費的路由鍵:" + routingKey); System.out.println("消費的內容類型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //確認消息 channel.basicAck(deliveryTag, false); System.out.println("消費的消息體內容:"); String bodyStr = new String(body); System.out.println(bodyStr); } }); } } } |
消費者B,routingKey routingKey.A
消費者C,routingKey routingKey.B
7-5、廣播模式,不須要管routingKey和bindingKey是否匹配。
生產者
publicclass Send {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "fanout-exchange"; channel.exchangeDeclare(exchangeName, "fanout", true);
// String routingKey = "hola";
//發佈消息 byte[] messageBodyBytes = "羣發消息".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close(); conn.close(); } } |
消費者
publicclass Recv {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "fanout-exchange"; channel.exchangeDeclare(exchangeName, "fanout", true); //聲明隊列 String queueName = channel.queueDeclare().getQueue();
String routingKey = "hola2"; //綁定隊列,經過鍵 hola將隊列和交換器綁定起來 channel.queueBind(queueName, exchangeName, routingKey);
while(true) { //消費消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消費的路由鍵:" + routingKey); System.out.println("消費的內容類型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //確認消息 channel.basicAck(deliveryTag, false); System.out.println("消費的消息體內容:"); String bodyStr = new String(body); System.out.println(bodyStr);
} }); } } } |
8-1、添加依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>x.x.x</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>x.x.xRELEASE</version> </dependency> |
8-2、配置文件中加入rabbit服務鏈接配置
mq.host=real_host mq.username=guest mq.password=guest mq.port=5672 mq.vhost=real_vhost |
8-3、新建application-mq.xml文件,添加配置信息
主要用來配置鏈接信息、Producer配置、隊列聲明、交換器聲明、隊列與交換器的綁定、隊列的監聽器配置(即消費者)等。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <!-- 全局配置 --> <!-- 定義RabbitMQ的鏈接工廠 --> <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" virtual-host="${mq.vhost}"/> <!-- MQ的管理,包括隊列、交換器等 --> <rabbit:admin connection-factory="connectionFactory"/>
<!-- Sender配置 --> <!-- spring template聲明--> <!-- 能夠不指定交換器,在每次發送請求時須要指明發給哪一個交換器 <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/> --> <rabbit:template exchange="test" id="amqpTemplate" connection-factory="connectionFactory"/><!-- message-converter="jsonMessageConverter" /> --> <!-- 消息對象json轉換類 --> <!-- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> -->
<!--定義queue 說明:durable:是否持久化 exclusive: 僅建立者可使用的私有隊列,斷開後自動刪除 auto_delete: 當全部消費客戶端鏈接斷開後,是否自動刪除隊列--> <rabbit:queue name="mq.A" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="mq.B" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="mq.C" durable="true" auto-delete="false" exclusive="false" />
<!-- 定義交換機,而且完成隊列和交換機的綁定 --> <rabbit:direct-exchange name="test" durable="true" auto-delete="false" id="test"> <rabbit:bindings> <rabbit:binding queue="mq.A" key="key.A"/> <rabbit:binding queue="mq.B" key="key.B"/> <rabbit:binding queue="mq.C" key="key.C"/> </rabbit:bindings> </rabbit:direct-exchange>
<!-- queues:監聽的隊列,多個的話用逗號(,)分隔 ref:監聽器 --> <!-- 配置監聽 acknowledeg = "manual" 設置手動應答 當消息處理失敗時:會一直重發 直到消息處理成功 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"> <!-- 配置監聽器 --> <rabbit:listener queues="mq.A" ref="listenerA"/> <rabbit:listener queues="mq.B" ref="listenerB"/> <rabbit:listener queues="mq.C" ref="listenerC"/> </rabbit:listener-container> </beans> |
生產者:Spring提供的AmqpTemplate,使用註解注入便可使用
@Autowired private AmqpTemplate amqpTemplate; publicvoid sendMsg(String msg) { amqpTemplate.convertAndSend("routingKey", "發送了消息A"); } |
監聽器:即收到隊列的消息後做何處理,要實現ChannelAwareMessageListener
例如:監聽器listenerA
@Component publicclass ListenerA implements ChannelAwareMessageListener {
privatefinalstatic Log logger = LogFactory.getLog(ListenerA.class);
@Override publicvoid onMessage(Message message, Channel channel) throws Exception { // TODO Auto-generated method stub String msg = new String(message.getBody()); System.out.println("A received : " + msg); logger.error(msg); } } |
有可能遇到程序崩潰或者Rabbit服務器宕機的狀況,那麼若是沒有持久化機制,全部數據都會丟失。
Durable:是否持久化參數設爲True便可
channel.exchangeDeclare(exchangeName, type, true);
在以前,消息分發給consumer後當即就會被標記爲已消費,這時候若是consumber接到了一個消息可是尚未來的及處理就異常退出,那麼這個消息的狀態是已被消費的,因而就會形成消息丟失的問題。
處理的代碼也很簡單,一共有兩個步驟。第一個把autoAck改爲false
//消費結果須要進行確認
channel.BasicConsume("firstTest", false, consumer);
第二部分就是在咱們消費完成後進行確認
//進行交付,肯定此消息已經處理完成
channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
若是沒有進行確認queue會把這個消息交給其它的consumer去處理,若是沒有交付的代碼,那麼這個消息會一直存在。
消息持久化步驟:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException; |
exchange表示exchange的名稱
routingKey表示routingKey的名稱
body表明發送的消息體
MessageProperties.PERSISTENT_TEXT_PLAIN 能夠設置爲持久化,類型爲文本
MessageProperties.PERSISTENT_BASIC 類型爲二進制數據
mandatory當mandatory標誌位設置爲true時,若是exchange沒法找到一個隊列取轉發,就返回給生產者。
immediate當immediate標誌位設置爲true時,若是exchange要轉發的隊列上沒有消費者時,就返回給生產者。
RabbitMQ可能會遇到的一個問題,即生成者不知道消息是否真正到達broker,那麼有沒有高效的解決方式呢?答案是採用Confirm模式。
生產者將信道設置成confirm模式,一旦信道進入confirm模式,全部在該信道上面發佈的消息都會被指派一個惟一的ID(從1開始),一旦消息被投遞到全部匹配的隊列以後,broker就會發送一個確認給生產者(包含消息的惟一ID),這就使得生產者知道消息已經正確到達目的隊列了,若是消息和隊列是可持久化的,那麼確認消息會將消息寫入磁盤以後發出,broker回傳給生產者的確認消息中deliver-tag域包含了確認消息的序列號,此外broker也能夠設置basic.ack的multiple域,表示到這個序列號以前的全部消息都已經獲得了處理。
confirm模式最大的好處在於他是異步的,一旦發佈一條消息,生產者應用程序就能夠在等信道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後,生產者應用即可以經過回調方法來處理該確認消息,若是RabbitMQ由於自身內部錯誤致使消息丟失,就會發送一條nack消息,生產者應用程序一樣能夠在回調方法中處理該nack消息。
在channel 被設置成 confirm 模式以後,全部被 publish 的後續消息都將被 confirm(即 ack) 或者被nack一次。可是沒有對消息被 confirm 的快慢作任何保證,而且同一條消息不會既被 confirm又被nack 。
//開啓Procedure確認機制 channel.confirmSelect(); //發佈消息 channel.basicPublish(exchangeName, routingKey,null,message); //消息發送成功的確認,也能夠設置超時時間 if (channel.waitForConfirms([long timeOut]) { System.out.println("send success..."); } else { System.out.println("send failed..."); } |
批量發送消息後再進行確認。
//待確認的序列 SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); //開啓確認機制 channel.confirmSelect(); //添加處理事件 channel.addConfirmListener(new ConfirmListener() { publicvoid handleAck(longdeliveryTag, booleanmultiple) throws IOException { if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } System.out.println("發送成功..."); } publicvoid handleNack(longdeliveryTag, booleanmultiple) throws IOException { System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } System.err.println("發送失敗..."); } });
//發送消息 for (inti = 0; i < 10; i++) { //獲取下個發送序號 longnextSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish(exchangeName, routingKey, null, ("車票ID:" + i).getBytes()); //加入待處理集合中 confirmSet.add(nextSeqNo); //休息0.2s Thread.sleep(200); } |
自動確認,默認是自動確認,即獲取消息後,直接確認。
手動確認,給當前消息設置狀態,當手動ack後服務端纔會刪除該消息,若是返回nack,從新入隊。
//手動確認 booleanautoAck = false; channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //…其餘處理操做 longdeliveryTag = envelope.getDeliveryTag(); //確認消息 channel.basicAck(deliveryTag, false); } }); |
https://blog.csdn.net/wangshuminjava/article/details/80998992
深刻解讀RabbitMQ工做原理及簡單使用
http://www.javashuo.com/article/p-cabanpbf-gk.html
RabbitMQ深刻學習指導
概念:
消息隊列(Message Queue,簡稱MQ),本質是個隊列,FIFO先入先出,只不過隊列中存放的內容是一些Message
。
傳統模式
使用隊列解耦
例如:淘寶秒殺活動等。
一個線程負責生產,一個線程負責總隊列裏取出來消費。
一個線程生產,多個線程取出來消費。
一個發佈者發送消息,多個訂閱者能夠同時獲取到發佈的消息
生產者發送的消息會發往每一個與其綁定的隊列。
AMQP協議: Advanced Message Queuing Protocol 高級消息隊列協議,是一個異步消息傳遞所使用的應用層協議規範。
組成:
服務主機:接收客戶端請求,並做出相應處理
虛擬主機:一個服務器能夠開啓多個Virtual Host,每一個虛擬主機都能提供完整的服務,有本身的權限控制。
生產者:發送消息
消費者:接收消息並處理
交換器:RabbitMQ中,消息不是直接發往隊列,而是要先給交換器,而後交換器按照必定的路由規則發送到相對應的隊列上。
路由Key:發送消息時要指定交換器和路由Key
綁定:將隊列和交換器綁定起來路由Key做爲綁定時的關鍵字
隊列:消息的載體,消費者從隊列中獲取消息,路由器根據路由規則把消息發往對應的隊列。
消息:隊列中存儲的信息單元。
生產者發送消息時指定交換器名和路由Key,而後交換器根據路由Key與綁定信息進行比對,找到對應的隊列後將信息發送出去。
消費者監聽某個隊列,若是有消息就取出來作對應操做,沒有就阻塞。
Direct:固定名稱匹配,只有路由Key與綁定的Key一致纔會將消息發送到該隊列。
Topic:主題模式,路由Key能夠用*#來填充
#能夠匹配任意多個單詞,*只能匹配一個單詞
好比bingdKey x.y x.y.z a.y a.b.z
x.*只能匹配x.y,而x.#能夠匹配x.y 和 x.y.z
Fanout:廣播模式
安裝RabbitMQ,並啓動服務。默認用戶名密碼guest。建立VirtualHost。
publicclass Recv { publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //聲明隊列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "hola"; //綁定隊列,經過鍵 hola將隊列和交換器綁定起來 channel.queueBind(queueName, exchangeName, routingKey);
//消費消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消費的路由鍵:" + routingKey); System.out.println("消費的內容類型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //確認消息 channel.basicAck(deliveryTag, false); System.out.println("消費的消息體內容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr);
} }); } } |
publicclass Send { publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hola"; //發佈消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close(); conn.close();
} } |
與1:1的相似,只不過兩個消費者共同消費一個隊列內的信息,複製一份消費者便可。
生產者發送的消息,發往每一個訂閱他的消費者那裏。全部消費者均可以獲取相同的信息。
發佈者A
publicclass Send {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "王力宏"; channel.exchangeDeclare(exchangeName, "fanout", true);
//發佈消息 byte[] messageBodyBytes = "王力宏發佈的消息:啦啦啦啦啦".getBytes(); channel.basicPublish(exchangeName, "", null, messageBodyBytes);
channel.close(); conn.close(); } } |
發佈者B
publicclass Send2 {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "趙薇"; channel.exchangeDeclare(exchangeName, "fanout", true);
//發佈消息 byte[] messageBodyBytes = "趙薇發佈的消息:啊啊啊啊".getBytes(); channel.basicPublish(exchangeName, "", null, messageBodyBytes);
channel.close(); conn.close(); } } |
訂閱者A1、A2
publicclass Recv {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "王力宏"; channel.exchangeDeclare(exchangeName, "fanout", true); //聲明隊列 String queueName = channel.queueDeclare().getQueue(); //綁定隊列,經過鍵 hola將隊列和交換器綁定起來 channel.queueBind(queueName, exchangeName, "");
while(true) { //消費消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消費的路由鍵:" + routingKey); System.out.println("消費的內容類型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //確認消息 channel.basicAck(deliveryTag, false); System.out.println("消費的消息體內容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } } } |
訂閱者B1、B2
publicclass Recv3 {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); channel.exchangeDeclare("趙薇", "fanout", true); //聲明交換器 String exchangeName = "趙薇"; //聲明隊列 String queueName = channel.queueDeclare().getQueue(); //綁定隊列,經過鍵 hola將隊列和交換器綁定起來 channel.queueBind(queueName, exchangeName, "");
while(true) { //消費消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消費的路由鍵:" + routingKey); System.out.println("消費的內容類型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //確認消息 channel.basicAck(deliveryTag, false); System.out.println("消費的消息體內容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr);
} }); } }
} |
生產者
publicclass Send {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "topic-exchange"; channel.exchangeDeclare(exchangeName, "topic", true);
// String routingKey = "#.B";
//發佈消息 for (inti = 0; i < 3; i++) { channel.basicPublish(exchangeName, routingKey, null, "ss".getBytes()); } //byte[] messageBodyBytes = "匹配消息".getBytes(); //channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); } } |
消費者A
publicclass Recv {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "topic-exchange"; channel.exchangeDeclare(exchangeName, "topic", true); //聲明隊列 String queueName = channel.queueDeclare("X", false, false, false, null).getQueue();
String routingKey = "X.A"; //綁定隊列,經過鍵 hola將隊列和交換器綁定起來 channel.queueBind(queueName, exchangeName, routingKey);
while(true) { //消費消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消費的路由鍵:" + routingKey); System.out.println("消費的內容類型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //確認消息 channel.basicAck(deliveryTag, false); System.out.println("消費的消息體內容:"); String bodyStr = new String(body); System.out.println(bodyStr); } }); } } } |
消費者B,routingKey routingKey.A
消費者C,routingKey routingKey.B
7-5、廣播模式,不須要管routingKey和bindingKey是否匹配。
生產者
publicclass Send {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "fanout-exchange"; channel.exchangeDeclare(exchangeName, "fanout", true);
// String routingKey = "hola";
//發佈消息 byte[] messageBodyBytes = "羣發消息".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close(); conn.close(); } } |
消費者
publicclass Recv {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); // |