在以前的一篇博客RabbitMQ入門:認識並安裝RabbitMQ(以Windows系統爲例)中,咱們安裝了RabbitMQ而且對其也有的初步的認識,今天就來寫個入門小例子來加深概念理解並瞭解代碼怎麼實現。html
本篇博客圍繞下面幾個方面展開:java
Now, Let's begin !數組
1、代碼前的理論熱身服務器
咱們來看張圖:app
Publisher(生產者)生成消息,而後publish(發佈)消息到exchange(路由器,也有資料翻譯成交換機),而後根據路由規則將消息傳遞到Queue(隊列),最終交由Consumer(消費者)進行消費處理。async
這裏的生產者和消費者都是咱們的應用,所以咱們的代碼中要實現這兩個部分。ide
中間的節點就是RabbitMQ 提供的內容,須要再生產者和消費者裏面調用其接口來定義和使用這些節點。oop
2、代碼實例:Hello RabbitMQ測試
package com.sam.hello_rabbitmq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Provider { //定義隊列名 static String QUEUE_NAME = "helloRabbit"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { //1.建立鏈接和通道 connection = factory.newConnection(); channel = connection.createChannel(); //2.爲通道聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //3.發佈消息 String msg = " hello rabbitmq, welcome to sam's blog."; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("provider send a msg: " + msg); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { //4.關閉鏈接 if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
在第2步中,channel.queueDeclare 用來建立隊列,有5個參數:String queue, 隊列名; boolean durable, 該隊列是否須要持久化; boolean exclusive,該隊列是否爲該通道獨佔的(其餘通道是否能夠消費該隊列); boolean autoDelete,該隊列再也不使用的時候,是否讓RabbitMQ服務器自動刪除掉; Map<String, Object> arguments 其餘參數。第3步中,channel.basicPublish 發佈消息(用在生產者),有4個參數:String exchange, 路由器(有的資料翻譯成交換機)的名字,即將消息發到哪一個路由器; String routingKey, 路由鍵,即發佈消息時,該消息的路由鍵是什麼; BasicProperties props, 指定消息的基本屬性; byte[] body 消息體,也就是消息的內容,是字節數組。 可能你會疑惑,爲何沒有exchange呢?由於若是聲明瞭隊列,能夠不聲明路由器。ui
package com.sam.hello_rabbitmq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class HelloConsumer { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立鏈接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.爲通道聲明隊列 channel.queueDeclare(Provider.QUEUE_NAME, false, false, false, null); System.out.println(" **** keep alive ,waiting for messages, and then deal them"); // 3.經過回調生成消費者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { //獲取消息內容而後處理 String msg = new String(body, "UTF-8"); System.out.println("*********** HelloConsumer" + " get message :[" + msg +"]"); } }; //4.消費消息 channel.basicConsume(Provider.QUEUE_NAME, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
在第4步中,channel.basicConsume 用來接收消息,用在消費者,有3個參數:String queue, 隊列名字,即要從哪一個隊列中接收消息; boolean autoAck, 是否自動確認,默認true; Consumer callback 消費者,即誰接收消息。
3、運行代碼並調試問題
代碼寫好了,接下來進行測試,
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:124) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:120) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:142) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:952) at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333) at com.sam.hello_rabbitmq.Provider.main(Provider.java:36) Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136) ... 3 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:509) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:340) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109) at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:643) at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) at java.lang.Thread.run(Thread.java:745) Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10) at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:345) at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:286) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:600) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:534) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:527) at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:68) at com.sam.hello_rabbitmq.Provider.main(Provider.java:60)
關鍵堆棧信息是:inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true',說是helloRabbit這個隊列durable(是否須要持久化)
參數已經設定成了true 可是代碼中指定的是false,衝突了,納尼?訪問RabbitMQ管理頁面:http://localhost:15672/#/queues 發現已經存在一個隊列helloRabbit,
點helloRabbit的連接,發現隊列的durable屬性確實是true。哦,原來我以前在作別的練習的時候,建立過一個叫這個名字的隊列了,並且屬性值恰好爲true.
那麼接下來刪掉這個既存的隊列
再去執行Provider.java,後臺打印了內容,而且隊列中有了一條ready的消息。
問題解決!
結果符合預期。
到此,所有工做完美結束。