brew install rabbitmq ## 進入安裝目錄 cd /usr/local/Cellar/rabbitmq/3.7.5 # 啓動 brew services start rabbitmq # 當前窗口啓動 rabbitmq-server
<!-- more -->java
啓動控制檯以前須要先開啓插件git
./rabbitmq-plugins enable rabbitmq_management
進入控制檯: http://localhost:15672/github
用戶名和密碼:guest,guest
ide
首先是得啓動mq學習
## 添加帳號 ./rabbitmqctl add_user admin admin ## 添加訪問權限 ./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" ## 設置超級權限 ./rabbitmqctl set_user_tags admin administrator
pom引入依賴測試
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> </dependency>
開始寫代碼編碼
public class RabbitMqTest { //消息隊列名稱 private final static String QUEUE_NAME = "hello"; @Test public void send() throws java.io.IOException, TimeoutException { //建立鏈接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); //建立鏈接 Connection connection = factory.newConnection(); //建立消息通道 Channel channel = connection.createChannel(); //生成一個消息隊列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); for (int i = 0; i < 10; i++) { String message = "Hello World RabbitMQ count: " + i; //發佈消息,第一個參數表示路由(Exchange名稱),未""則表示使用默認消息路由 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } //關閉消息通道和鏈接 channel.close(); connection.close(); } @Test public void consumer() throws java.io.IOException, java.lang.InterruptedException, TimeoutException { //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); //建立鏈接 Connection connection = factory.newConnection(); //建立消息信道 Channel channel = connection.createChannel(); //消息隊列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("[*] Waiting for message. To exist press CTRL+C"); AtomicInteger count = new AtomicInteger(0); //消費者用於獲取消息信道綁定的消息隊列中的信息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { System.out.println(" [x] Received '" + message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME, false, consumer); Thread.sleep(1000 * 60); } }
須要注意的一點是:插件
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
首先執行塞入數據,執行完畢以後,能夠到控制檯進行查看:日誌
能夠看到多出了一個Queue,對列名爲hello,總共有10條數據
接下來就是消費數據了,執行consumer方法,輸出日誌
[*] Waiting for message. To exist press CTRL+C [x] Received 'Hello World RabbitMQ count: 0 [x] Done [x] Received 'Hello World RabbitMQ count: 1 [x] Done [x] Received 'Hello World RabbitMQ count: 2 [x] Done [x] Received 'Hello World RabbitMQ count: 3 [x] Done [x] Received 'Hello World RabbitMQ count: 4 [x] Done [x] Received 'Hello World RabbitMQ count: 5 [x] Done [x] Received 'Hello World RabbitMQ count: 6 [x] Done [x] Received 'Hello World RabbitMQ count: 7 [x] Done [x] Received 'Hello World RabbitMQ count: 8 [x] Done [x] Received 'Hello World RabbitMQ count: 9 [x] Done
回頭去查看queue,發現總得數據量爲0了
對於ack的問題,若是在消費數據的時候,出現異常,而我不但願數據丟失,這個時候就須要考慮手動ack的機制來保證了
首先須要設置手動ack
// 設置autoAck爲false channel.basicConsume(QUEUE_NAME, false, consumer);
其次在消費數據完畢以後,主動ack/nack
if (success) { channel.basicAck(envelope.getDeliveryTag(), false); } else { channel.basicNack(envelope.getDeliveryTag(), false, false); }
一灰灰的我的博客,記錄全部學習和工做中的博文,歡迎你們前去逛逛
盡信書則不如,以上內容,純屬一家之言,因我的能力有限,如發現bug或者有更好的建議,隨時歡迎批評指正