MQ全稱爲Message Queue,消息隊列是應用程序和應用程序之間的通訊方法。html
先引入一下常見的通信方案。java
爲何使用MQ?mysql
在項目中,可將一些無需即時返回且耗時的操做提取出來,進行異步處理,而這種異步處理的方式大大的節省服務器的請求響應時間,從而提升了系統的吞吐量。linux
開發中消息隊列一般有以下應用場景:sql
應用解耦、異步處理(提升系統響應速度)、流量削峯(高峯堆積消息,峯後繼續處理消息)、日誌處理(分佈式日誌,通常使用kafka)、純粹通信。apache
AMQP高級消息隊列協議,是一個進程間傳遞異步消息的網絡協議,更準確的說是一種binary wire-level protocol(連接協議)。這是其和JMS的本質差異,AMQP不從API層進行限定,而是直接定義網絡交換的數據格式。vim
JMS即Java消息服務(JavaMessage Service)應用程序接口,是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。windows
JMS是定義了統一的接口,來對消息操做進行統一;AMQP是經過規定協議來統一數據交互的格式服務器
JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,所以是跨語言的。網絡
JMS規定了兩種消息模式;而AMQP的消息模式更加豐富。
消息隊列產品:目前市面上成熟主流的MQ有Kafka 、RocketMQ、RabbitMQ,本文主要介紹RabbitMQ使用。
使用Erlang(語言)編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它變的很是重量級,更適合於企業級的開發。同時實現了Broker架構,核心思想是生產者不會將消息直接發送給隊列,消息在發送給客戶端時先在中心隊列排隊。對路由(Routing),負載均衡(Load balance)、數據持久化都有很好的支持。多用於進行企業級的ESB整合。
RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message Queue 高級消息隊列協議)協議實現的消息隊列,它是一種應用程序之間的通訊方法,消息隊列在分佈式系統開發中應用很是普遍。
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6種模式:簡單模式,work工做隊列(集羣)模式,Publish/Subscribe發佈與訂閱(交換機的廣播)模式,Routing(交換機的定向)路由模式,Topics主題(路由靈活)模式,RPC遠程調用模式(遠程調用,不太算MQ;不做介紹);//括號內的是本身的理解方式僅供參考。詳細能夠去看官方介紹。
官網對應模式介紹:https://www.rabbitmq.com/getstarted.html
安裝RabbirMQ
兩種方式:windows環境與Linux環境(這裏跳過)
我是LinuxCenOS6.7安裝的3.6.10版本
啓動成功參考以下兩張圖
角色說明: Tags
一、超級管理員(administrator)
可登錄管理控制檯,可查看全部的信息,而且能夠對用戶,策略(policy)進行操做。
二、監控者(monitoring)
可登錄管理控制檯,同時能夠查看rabbitmq節點的相關信息(進程數,內存使用狀況,磁盤使用狀況等)
三、策略制定者(policymaker) :可登錄管理控制檯, 同時能夠對policy進行管理。但沒法查看節點的相關信息(上圖紅框標識的部分)。
四、普通管理者(management):僅可登錄管理控制檯,沒法看到節點信息,也沒法對策略進行管理。
五、其餘 : 沒法登錄管理控制檯,一般就是普通的生產者和消費者。
在RabbitMQ中能夠虛擬消息服務器Virtual Host,每一個Virtual Hosts至關於一個相對獨立的RabbitMQ服務器,每一個VirtualHost之間是相互隔離的。exchange、queue、message不能互通。 至關於mysql的db。Virtual Name通常以/開頭。
添加隊列,這裏須要將上下兩張圖結合起來看
需改用戶的密碼
查看默認的交換機
常見的端口
目標:入門案例將使用RabbitMQ的簡單模式實現通信過程。
1.建立Maven工程,先在pom.xml添加依賴。
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 <groupId>com.jxjdemo</groupId> 7 <artifactId>rabbitmq1_demo</artifactId> 8 <version>1.0-SNAPSHOT</version> 9 10 <dependencies> 11 <dependency> <!--rabbitmq的依賴--> 12 <groupId>com.rabbitmq</groupId> 13 <artifactId>amqp-client</artifactId> 14 <version>5.6.0</version> 15 </dependency> 16 </dependencies> 17 </project>
2.新建生產者類,生產發送消息
1 package com.jxjdemo.mq.simple; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 public class SimpleProducer { 8 public static void main(String args[]) throws Exception{ 9 //一、建立連接工廠對象-factory=newConnectionFactory()。建立連接用 10 ConnectionFactory factory = new ConnectionFactory(); 11 12 //二、設置RabbitMQ服務主機地址,默認localhost-factory.setHost("localhost") 13 factory.setHost("192.168.211.128"); 14 //三、設置RabbitMQ服務端口,默認-1-factory.setPort(5672) 15 factory.setPort(5672); 16 //四、設置虛擬主機名字,默認/-factory.setVirtualHost("szitheima") 17 factory.setVirtualHost("shujuku1122"); 18 //五、設置用戶鏈接名,默認guest-factory.setUsername("admin") 19 factory.setUsername("admin"); 20 //六、設置連接密碼,默認guest-factory.setPassword("admin") 21 factory.setPassword("123456"); 22 // factory.setConnectionTimeout(5000); 23 // factory.setWorkPoolTimeout(5000); 24 // factory.setHandshakeTimeout(5000); 25 //七、建立連接-connection=factory.newConnection() 26 Connection connection = factory.newConnection(); //報錯,拋異常 27 //八、建立頻道-channel=connection.createChannel() 28 Channel channel = connection.createChannel(); 29 //九、聲明隊列-channel.queueDeclare(名稱,是否持久化(true先存硬盤,讀完再刪),是否獨佔本鏈接,是否自動刪除(false讀完再刪),附加參數) 30 channel.queueDeclare("simplequeue", true, false, false, null); 31 //十、建立消息-Stringm=xxx 32 String msg = "這是咱們第一次發送 MQ消息"; 33 //十一、消息發送-channel.basicPublish(交換機[默認DefaultExchage],路由key[簡單模式能夠傳遞隊列名稱],消息其它屬性,消息內容) 34 channel.basicPublish("", "simplequeue", null, msg.getBytes("utf-8")); 35 //十二、關閉資源-channel.close();connection.close() 36 channel.close(); 37 connection.close(); 38 } 39 }
執行後發個消息,沒看到異常。
擴展:這裏遇到的異常有,時間超時
解決方法一:
發送不成功報錯,就先重啓MQ,在重啓【管理員的方式啓動】IDE,通常都是MQ的問題。
發送消息爲空,消息不能有空格。注意庫名字。
解決方法二:
咱們安裝系統會給系統起個名字致使:修改後的主機名並無在linux系統的hosts文件中,所以解析的時候,沒法直接從該文件中獲取,須要多重解析,才能解析該主機名。
不一樣的linux版本,這個配置文件也可能不一樣vim /etc/hosts
繼續說發送成功的事情。
3.建立消費者,接收消息。
1 package com.jxjdemo.mq.simple;
2 3 import com.rabbitmq.client.*; 4 5 import javax.security.auth.callback.Callback; 6 import java.io.IOException; 7 import java.util.concurrent.TimeoutException; 8
//這裏刪除了文檔註釋
16 public class SimpleConsumer { 17 public static void main(String args[]) throws IOException, TimeoutException { 18 //一、建立連接工廠對象-factory=newConnectionFactory() 19 ConnectionFactory factory = new ConnectionFactory(); 20 //二、設置RabbitMQ服務主機地址,默認localhost-factory.setHost("localhost") 21 factory.setHost("192.168.211.128"); 22 //三、設置RabbitMQ服務端口,默認-1-factory.setPort(5672) 23 factory.setPort(5672); 24 //四、設置虛擬主機名字,默認/-factory.setVirtualHost("szitheima") 25 factory.setVirtualHost("shujuku1122"); 26 //五、設置用戶鏈接名,默認guest-factory.setUsername("admin") 27 factory.setUsername("admin"); 28 //六、設置連接密碼,默認guest-factory.setPassword("admin") 29 factory.setPassword("123456"); 30 //七、建立連接-connection=factory.newConnection() 31 Connection connection = factory.newConnection(); 32 //八、建立頻道-channel=connection.createChannel() 33 Channel channel = connection.createChannel(); 34 //九、聲明隊列-channel.queueDeclare(名稱,是否持久化,是否獨佔本鏈接,是否自動刪除,附加參數) 35 channel.queueDeclare("simplequeue",true ,false , false,null ); 36 //10接收消息 37 Consumer callback = new DefaultConsumer(channel){ 38 /** 39 * @param consumerTag 消費者標籤,在channel.basicConsume時候能夠指定 40 * @param envelope 信封,消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌(收到消息失敗後是否須要從新發送) 41 * @param properties 屬性信息(生產者的發送時指定) 42 * @param body 消息內容 43 * @throws IOException 44 */ 45 @Override 46 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 47 Long deliveryTag = envelope.getDeliveryTag(); //消息ID 48 String exchange = envelope.getExchange(); 49 String routingKey = envelope.getRoutingKey(); //路由KEY 50 //消息內容 51 String msg = new String(body,"utf-8"); 52 System.out.println( 53 "routingKey:" + routingKey + 54 "routingKey:" + routingKey + 55 ",exchange:" + exchange + 56 ",deliveryTag:" + deliveryTag + 57 ",message:" + msg); 58 } 59 }; 60 channel.basicConsume("simplequeue", callback); 61 //不關閉,繼續接受消息 62 } 63 }
執行後看到一下結果
當你的代碼運行到這裏,那麼恭喜你入門成功。
此次暫時先到這裏結束。欲知其餘4種模式且看下回慢慢分解。