1 MQ 簡介安全
消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,它能夠在分佈式環境下擴展進程間的通訊。對於消息中間件,常見角色大體也就有 Producer(生產者)、Consumer(消費者)。服務器
常見的消息中間件產品:異步
1). ActiveMQ分佈式
ActiveMQ 是 Apache 出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持 JMS1.1 和 J2EE 1.4 規範的 JMS Provider 實現。咱們在本次課程中介紹 ActiveMQ 的使用。ide
2). RabbitMQspa
AMQP 協議的領導實現,支持多種場景。淘寶的 MySQL 集羣內部有使用它進行通信,OpenStack 開源雲平臺的通訊組件,最早在金融行業獲得運用。3d
3). ZeroMQserver
史上最快的消息隊列系統中間件
4). Kafka對象
Apache 下的一個子項目 。特色:高吞吐,在一臺普通的服務器上既能夠達到 10W/s的吞吐速率;徹底的分佈式系統。適合處理海量數據
2 MQ 做用
1). 解耦 :中間件中的生產者只管發送消息 , 消費者只要從隊列當中獲取消息進行消費 就能夠 , 從而來實現業務的解耦 .
2). 冗餘存儲 : 有些狀況下,處理數據的過程會失敗。消息中間件能夠把數據進行持久化 直 到它們已經被徹底處理,經過這一方式規避了數據丟失風險。在把一個消息從消息中間 件中刪 除以前,須要你的處理系統明確地指出該消息己經被處理完成,從而確保你的數據 被安全地保 存直到你使用完畢。
3).可恢復性: 當系統一部分組件失效時,不會影響到整個系統 。 消息中間件下降了進程間的 稿合度,因此即便一個處理消息的進程掛掉,加入消息中間件中的消息仍然能夠在系統恢復後 進行處理 。
4). 順序保證: 在大多數使用場景下,數據處理的順序很重要,大部分消息中間件支持必定程 度上的順序性。
5). 緩衝: 在任何重要的系統中,都會存在須要不一樣處理時間的元素。消息中間件經過一個緩 衝層來幫助任務最高效率地執行,寫入消息中間件的處理會盡量快速 。
6). 異步通訊: 在不少時候應用不想也不須要當即處理消息 。 消息中間件提供了異步處理機制,容許應用把一些消息放入消息中間件中,但並不當即處理它,在以後須要的時候再慢慢處理 。
3 RabbitMQ 安裝及啓動
3.1 安裝依賴環境
rpm -ivh erlang-20.3.8.6-1.el6.x86_64.rpm
yum -y install epel-release
yum -y install socat
3.2 安裝 rabbitMQ
rpm -ivh rabbitmq-server-3.7.7-1.el6.noarch.rpm
3.3 添加用戶
默認狀況下管理界面只能在 Linux 系統本機能夠訪問, 若是想其餘的主機也能訪問,須要
配置:
rabbitmq-plugins enable rabbitmq_management
添加訪問用戶:
rabbitmqctl add_user admin admin
3.4 RabbitMQ 啓動/中止
啓動 : service rabbitmq-server start
中止: service rabbitmq-server stop
查看狀態: service rabbitmq-server status
4 Rabbit MQ 管理界面訪問
4.1 Overview 概要
該欄目主要展現的是 MQ 的概要信息 , 如消息的數量, Connection , Channel,Exchange , Queue , Consumer 的數量.
4.2 Exchange 交換器
該欄目主要展現的是當前虛擬主機下的交換器,也能夠在此添加一個新的交換器, 而且配置對應的交換器的規則屬性 。
4.3 Queues 隊列
該欄目展現的是消息隊列的信息,裏面有各個隊列的概要信息, 也能夠在此欄目添加隊列Queue
4.4 Admin 系統管理
該欄目展現的是用戶管理的信息, 包含用戶列表的展現,添加用戶,添加虛擬主機等信息
5 RabbitMQ 的相關概念
5.1 生產者與消費者
5.1.1 生產者
Producer: 生產者,就是投遞消息的一方。
生產者建立消息,而後發佈到 RabbitMQ 中。消息通常能夠包含 2 個部分:消息體和標籤 (Label)。消息體也能夠稱之爲 payload ,在實際應用中,消 息體通常是一個帶有業務邏輯結構 的數據,好比一個 JSON 字符串。固然能夠進一步對這個消息體進行序列化操做。消息的標籤用來表述這條消息 , 好比 一個交換器的名稱和一個路由鍵 。 生產者把消息交由 RabbitMQ , RabbitMQ 以後會根據標籤把消息發送給感興趣的消費者(Consumer ) 。
5.1.2 消費者
Consumer: 消費者 ,就是接收消息的一方。
消費者鏈接到 RabbitMQ 服務器,並訂閱到隊列上 。當消費者消費一 條消息時 ,只是消費消息的消息體(payload )。在消息路由的過程當中 ,消息的標籤會丟棄 ,存入到隊列中的消息只有消息體,消費者也只會消費到消息體 ,也就不知道消息的生產者是誰,固然消費者也不須要知道 。
5.2 隊列
Queue: 隊列,是 RabbitMQ 的內部對象,用 於存儲消息。
5.3 交換器, 路由鍵, 綁定
5.3.1 交換器
Exchange: 交換器。在上圖中咱們暫時能夠理解成生產者將消息投遞到隊列中,實際上 這個在 RabbitMQ 中不會發生。真實狀況是,生產者將消息發送到 Exchange (交換器),由交換器將消息路由到一個或者多個隊列中。若是路由不到,或 許會返回給生產者,或許直接丟棄。這裏能夠將 RabbitMQ 中的交換器看做一個簡單的實體。
RabbitMQ 中的 交換器有四種類型,四種類型分別是 fanout、direct、topic 、headers,不一樣的類型有着不 同的路由策略。
5.3.2 路由鍵
RoutingKey : 路由鍵 。
生產者將消息發給交換器 的時候, 通常會指定 一個 RoutingKey ,用 來指定這個消息的路由規則,而這個 RoutingKey 須要與交換器類型和綁定鍵 (BindingKey) 聯合使用才能最終生效。
在交換器類型和綁定鍵 (BindingKey) 固定的狀況下,生產者能夠在發送消息給交換器時, 經過指定 RoutingKey 來決定消息流向哪裏。
5.3.3 綁定
Binding: 綁定。RabbitMQ 中經過綁定將交換器與隊列關聯起來,在綁定的時候通常會指定一個綁定鍵(BindingKey) ,這樣 RabbitMQ 就知道如何正確地將消息路由到隊列了。
5.4 交換器類型
1).fanout : 它會把全部發送到該交換器的消息路由到全部與該交換器綁定的隊列中。
2).direct: 該類型的交換器路由規則也很簡單,它會把消息路由到那些BindingKey 和 RoutingKey 徹底匹配的隊列中。
3).topic : 前面講到 direct 類型的交換器路由規則是徹底匹配 BindingKey和RoutingKey,可是這種嚴格的匹配方式在不少狀況下不能知足實際業務的需求。topic類型的交換器在匹配規則上進行了擴展,它與 direct 類型的交換器類似,也是將消息路由到 BindingKey 和 RoutingKey 相匹配的隊 列中,但這裏的匹配規則有些不一樣,它約定:
RoutingKey 爲一個點號"." 分割的字符串 , 如 : com.itcast.client ,com.itheima.exam。
BindingKey 與 RoutingKey 同樣也是點號"." 分割的字符串。
BindingKey 中能夠存在兩種特殊的字符串 "*" 和 "#" , 用於模糊匹配,其中
"#"用於匹配一個單詞, "*"用於匹配多個單個(能夠是零個)。
4). headers : 該類型的交換器不依賴於路由鍵的匹配規則來路由消息,而是根據發送的消息內容中 的 headers 屬性進行匹配。
6 生產者發送消息
6.1 隊列綁定
6.1.1 建立隊列
在 RabbitMQ 的後臺管理界面中建立一個隊列 , 指定隊列名稱。
6.1.2 建立交換器 Exchange
在 RabbitMQ 的後臺管理界面中建立一個交換器,指定交換器的名稱, 而且指定交換器類型。
6.1.3 綁定隊列與交換器
在交換器列表點擊對應的交換器 , 進入到綁定界面 , 指定隊列名稱 queue , 指 定 RoutingKey,經過該 RoutingKey 來綁定該隊列與交換器 Exchange 。
以後,在發送消息時, 指定了 Exchange ,及 RoutingKey, 就能夠將該消息路由 到該隊列 queue 中。
6.2 發送消息邏輯代碼
6.2.1 引入依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.2.0</version>
</dependency>
6.2.2 發送消息
@Test
public void test1() throws Exception{
//指定往哪個交換器中發送消息
String exchangeName = "itcast.v0.topic";
//指定消息的路由RoutingKey
String routingKey = "itcast.item.add";
//建立一個鏈接工廠 , 指定 主機,端口, 訪問的虛擬主機, 用戶名, 密碼
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.142.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//建立一個連接
Connection connection = connectionFactory.newConnection();
//建立一個通道Channel
Channel channel = connection.createChannel();
//調用basicPublish循環發送50條消息 , 每條消息之間,間隔1秒
for (int i = 0; i < 50; i++) {
channel.basicPublish(exchangeName , routingKey ,
MessageProperties.TEXT_PLAIN , ("生產者生產的消息
083100"+i).getBytes());
TimeUnit.SECONDS.sleep(1);
}
}
6.3 發送消息平臺監測
7 消費者接受消息
7.1 引入依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.2.0</version>
</dependency>
7.2 接收消息
@Test
public void test1() throws Exception{
//指定隊列名稱
String queueName = "itcast_item_add_queue";
//獲取鏈接工廠 , 指定主機 , 端口, 虛擬主機 , 用戶名 , 密碼
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.142.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//建立鏈接
Connection connection = connectionFactory.newConnection();
//建立通道Channel
Channel channel = connection.createChannel();
//設置客戶端最多接收未被ack的消息的個數
channel.basicQos(10);
//構造消息者, 進行消息的消費
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag : " + consumerTag);
System.out.println("properties : " +
JSON.toJSONString(properties));
System.out.println("envelope : " + JSON.toJSONString(envelope));
System.out.println("receive Message : " + new String(body) );
System.out.println("----------------------------------------");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//經過消息已經接受到
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, consumer);
System.in.read();//讓程序等待在這裏, 一直監聽該消息隊列
channel.close();
connection.close();
}
7.3 結果輸出
其中:consumerTag : 消息消費者的標籤properties : 消息內容的頭信息數據envelope : 消息體的數據包,其中包含消息發送時指定的exchange, routingKey等信息.