RabbitMQ的簡單模式快速入門與超時異常的處理方法

本文適合JAVA新人,想了解RabbitMQ又不想去看官網文檔的人(英語水看的頭疼(◎﹏◎),但建議有能力仍是去看官網文檔)。

消息隊列MQ(一)

MQ全稱爲Message Queue,消息隊列是應用程序和應用程序之間的通訊方法。html

先引入一下常見的通信方案。java

爲何使用MQmysql

在項目中,可將一些無需即時返回且耗時的操做提取出來,進行異步處理,而這種異步處理的方式大大的節省服務器的請求響應時間,從而提升了系統的吞吐量linux

 開發中消息隊列一般有以下應用場景:sql

應用解耦、異步處理(提升系統響應速度)、流量削峯(高峯堆積消息,峯後繼續處理消息)、日誌處理(分佈式日誌,通常使用kafka)、純粹通信。apache

 

AMQP 和 JMS

MQ是消息通訊的模型;實現MQ的大體有兩種主流方式:AMQP、JMS。

AMQP

AMQP高級消息隊列協議,是一個進程間傳遞異步消息的網絡協議,更準確的說是一種binary wire-level protocol(連接協議)。這是其和JMS的本質差異,AMQP不從API層進行限定,而是直接定義網絡交換的數據格式。vim

JMS

JMS即Java消息服務(JavaMessage Service)應用程序接口,是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。windows

 AMQP 與 JMS 區別

JMS是定義了統一的接口,來對消息操做進行統一;AMQP是經過規定協議來統一數據交互的格式服務器

JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,所以是跨語言的。網絡

JMS規定了兩種消息模式;而AMQP的消息模式更加豐富。

消息隊列產品:目前市面上成熟主流的MQ有Kafka 、RocketMQ、RabbitMQ,本文主要介紹RabbitMQ使用。

使用Erlang(語言)編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它變的很是重量級,更適合於企業級的開發。同時實現了Broker架構,核心思想是生產者不會將消息直接發送給隊列,消息在發送給客戶端時先在中心隊列排隊。對路由(Routing),負載均衡(Load balance)、數據持久化都有很好的支持。多用於進行企業級的ESB整合。

RabbitMQ介紹

RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message Queue 高級消息隊列協議)協議實現的消息隊列,它是一種應用程序之間的通訊方法,消息隊列在分佈式系統開發中應用很是普遍。

RabbitMQ官方地址:http://www.rabbitmq.com/

RabbitMQ提供了6種模式:簡單模式,work工做隊列(集羣)模式,Publish/Subscribe發佈與訂閱(交換機的廣播)模式,Routing(交換機的定向)路由模式,Topics主題(路由靈活)模式,RPC遠程調用模式(遠程調用,不太算MQ;不做介紹);//括號內的是本身的理解方式僅供參考。詳細能夠去看官方介紹。

官網對應模式介紹:https://www.rabbitmq.com/getstarted.html

安裝RabbirMQ

兩種方式:windows環境與Linux環境(這裏跳過)

我是LinuxCenOS6.7安裝的3.6.10版本

啓動成功參考以下兩張圖

先在WEB頁面管理用戶

角色說明: Tags

一、超級管理員(administrator)

可登錄管理控制檯,可查看全部的信息,而且能夠對用戶,策略(policy)進行操做。

二、監控者(monitoring)

可登錄管理控制檯,同時能夠查看rabbitmq節點的相關信息(進程數,內存使用狀況,磁盤使用狀況等)

三、策略制定者(policymaker) :可登錄管理控制檯, 同時能夠對policy進行管理。但沒法查看節點的相關信息(上圖紅框標識的部分)。

四、普通管理者(management):僅可登錄管理控制檯,沒法看到節點信息,也沒法對策略進行管理。

五、其餘 :  沒法登錄管理控制檯,一般就是普通的生產者和消費者。

Virtual Hosts配置

在RabbitMQ中能夠虛擬消息服務器Virtual Host,每一個Virtual Hosts至關於一個相對獨立的RabbitMQ服務器,每一個VirtualHost之間是相互隔離的。exchange、queue、message不能互通。 至關於mysql的db。Virtual Name通常以/開頭。

添加隊列,這裏須要將上下兩張圖結合起來看

需改用戶的密碼

查看默認的交換機

常見的端口

RabbitMQ入門

目標:入門案例將使用RabbitMQ的簡單模式實現通信過程。

1.建立Maven工程,先在pom.xml添加依賴。

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6     <groupId>com.jxjdemo</groupId>
 7     <artifactId>rabbitmq1_demo</artifactId>
 8     <version>1.0-SNAPSHOT</version>
 9 
10     <dependencies>
11         <dependency> <!--rabbitmq的依賴-->
12             <groupId>com.rabbitmq</groupId>
13             <artifactId>amqp-client</artifactId>
14             <version>5.6.0</version>
15         </dependency>
16     </dependencies>
17 </project>

2.新建生產者類,生產發送消息

 1 package com.jxjdemo.mq.simple;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 public class SimpleProducer {
 8     public static void main(String args[]) throws Exception{
 9 //一、建立連接工廠對象-factory=newConnectionFactory()。建立連接用
10         ConnectionFactory factory = new ConnectionFactory();
11 
12 //二、設置RabbitMQ服務主機地址,默認localhost-factory.setHost("localhost")
13         factory.setHost("192.168.211.128");
14 //三、設置RabbitMQ服務端口,默認-1-factory.setPort(5672)
15         factory.setPort(5672);
16 //四、設置虛擬主機名字,默認/-factory.setVirtualHost("szitheima")
17          factory.setVirtualHost("shujuku1122");
18 //五、設置用戶鏈接名,默認guest-factory.setUsername("admin")
19         factory.setUsername("admin");
20 //六、設置連接密碼,默認guest-factory.setPassword("admin")
21         factory.setPassword("123456");
22  //      factory.setConnectionTimeout(5000);
23 //        factory.setWorkPoolTimeout(5000);
24 //        factory.setHandshakeTimeout(5000);
25 //七、建立連接-connection=factory.newConnection()
26         Connection connection = factory.newConnection(); //報錯,拋異常
27 //八、建立頻道-channel=connection.createChannel()
28         Channel channel = connection.createChannel();
29 //九、聲明隊列-channel.queueDeclare(名稱,是否持久化(true先存硬盤,讀完再刪),是否獨佔本鏈接,是否自動刪除(false讀完再刪),附加參數)
30         channel.queueDeclare("simplequeue", true, false, false, null);
31 //十、建立消息-Stringm=xxx
32         String msg = "這是咱們第一次發送 MQ消息";
33 //十一、消息發送-channel.basicPublish(交換機[默認DefaultExchage],路由key[簡單模式能夠傳遞隊列名稱],消息其它屬性,消息內容)
34         channel.basicPublish("", "simplequeue", null, msg.getBytes("utf-8"));
35 //十二、關閉資源-channel.close();connection.close()
36         channel.close();
37        connection.close();
38     }
39 }

執行後發個消息,沒看到異常。

擴展:這裏遇到的異常有,時間超時

解決方法一:

發送不成功報錯,就先重啓MQ,在重啓【管理員的方式啓動】IDE,通常都是MQ的問題。

發送消息爲空,消息不能有空格。注意庫名字。

解決方法二:

咱們安裝系統會給系統起個名字致使:修改後的主機名並無在linux系統的hosts文件中,所以解析的時候,沒法直接從該文件中獲取,須要多重解析,才能解析該主機名。

 不一樣的linux版本,這個配置文件也可能不一樣vim /etc/hosts

 繼續說發送成功的事情。

3.建立消費者,接收消息。

 1 package com.jxjdemo.mq.simple;

 2 
 3 import com.rabbitmq.client.*;  4 
 5 import javax.security.auth.callback.Callback;  6 import java.io.IOException;  7 import java.util.concurrent.TimeoutException;  8 
//這裏刪除了文檔註釋
16 public class SimpleConsumer { 17 public static void main(String args[]) throws IOException, TimeoutException { 18 //一、建立連接工廠對象-factory=newConnectionFactory() 19 ConnectionFactory factory = new ConnectionFactory(); 20 //二、設置RabbitMQ服務主機地址,默認localhost-factory.setHost("localhost") 21 factory.setHost("192.168.211.128"); 22 //三、設置RabbitMQ服務端口,默認-1-factory.setPort(5672) 23 factory.setPort(5672); 24 //四、設置虛擬主機名字,默認/-factory.setVirtualHost("szitheima") 25 factory.setVirtualHost("shujuku1122"); 26 //五、設置用戶鏈接名,默認guest-factory.setUsername("admin") 27 factory.setUsername("admin"); 28 //六、設置連接密碼,默認guest-factory.setPassword("admin") 29 factory.setPassword("123456"); 30 //七、建立連接-connection=factory.newConnection() 31 Connection connection = factory.newConnection(); 32 //八、建立頻道-channel=connection.createChannel() 33 Channel channel = connection.createChannel(); 34 //九、聲明隊列-channel.queueDeclare(名稱,是否持久化,是否獨佔本鏈接,是否自動刪除,附加參數) 35 channel.queueDeclare("simplequeue",true ,false , false,null ); 36 //10接收消息 37 Consumer callback = new DefaultConsumer(channel){ 38 /** 39 * @param consumerTag 消費者標籤,在channel.basicConsume時候能夠指定 40 * @param envelope 信封,消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌(收到消息失敗後是否須要從新發送) 41 * @param properties 屬性信息(生產者的發送時指定) 42 * @param body 消息內容 43 * @throws IOException 44 */ 45 @Override 46 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 47 Long deliveryTag = envelope.getDeliveryTag(); //消息ID 48 String exchange = envelope.getExchange(); 49 String routingKey = envelope.getRoutingKey(); //路由KEY 50 //消息內容 51 String msg = new String(body,"utf-8"); 52 System.out.println( 53 "routingKey:" + routingKey + 54 "routingKey:" + routingKey + 55 ",exchange:" + exchange + 56 ",deliveryTag:" + deliveryTag + 57 ",message:" + msg); 58 } 59 }; 60 channel.basicConsume("simplequeue", callback); 61 //不關閉,繼續接受消息 62 } 63 }

執行後看到一下結果

當你的代碼運行到這裏,那麼恭喜你入門成功。

此次暫時先到這裏結束。欲知其餘4種模式且看下回慢慢分解。

相關文章
相關標籤/搜索