RabbitMQ消息隊列總結

AMQP[高級消息隊列協議] 是一個異步消息傳遞所使用的應用層協議規範(是線路層協議)
AMQP 客戶端可以無視消息的來源任意發送和接受信息php

隊列的使用場景:

一、與業務的主要邏輯無關,但又須要執行,就能夠使用隊列異步處理。json

  將與主邏輯無關的操做放入隊列,由於程序要等待所有操做執行完才返回用戶操做結果,就會減低用戶體驗。緩存

二、耗時操做放隊列執行、通常不關心執行結果的(發送郵件)。bash

三、利用隊列的特性,能夠使一些應用更加簡單。服務器

  一、隊列支持多種語言客戶端,能夠實現不一樣語言間的相互通訊。異步

  二、隊列有支持delay特性,能夠實現簡單的定時效果。分佈式

四、兩個(多個)系統間須要經過定時任務來同步數據(數據交換)。spa

五、異構系統的不一樣進程間相互調用、通信。code

基本概念:

Broker:簡單來講就是消息隊列服務器實體。server

  接收客戶端鏈接,實現AMQP消息隊列和路由功能的進程。

Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。

  從鏈接通道(Channel)接收消息,並按照特定的路由規則發送給隊列。接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,ExchangeType有direct、Fanout和Topic三種。

Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。

   當消息不能被正常消費時緩存這些消息。(存儲還未被消費者消費的消息)

Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。

  一、即路由規則,告訴交換機將何種類型的消息發送到某個隊列中。

  二、Exchange在與多個Message Queue發生Binding後會生成一張路由表。

  三、路由表中存儲着Message Queue所需消息的限制條件即Binding Key,當Exchange收到Message時會解析其Header獲得Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,二者的匹配方式由Exchange Type決定。

Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。

vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。(每一個virtual Host裏面有若干個Exchange和Queue)

  虛擬機能夠持有多個交換機、隊列和綁定。

producer:消息生產者,就是投遞消息的程序。

consumer:消息消費者,就是接受消息的程序。

channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。

  一、僅僅建立了客戶端到Broker之間的鏈接後,客戶端仍是不能發送消息的。須要爲每個Connection建立Channel,AMQP協議規定只有經過Channel才能執行AMQP的命令。

  二、一個Connection能夠包含多個Channel,須要Channel是由於TCP鏈接的創建和釋放都是十分昂貴的。

message: 消息體,由producer產生,經broker被consumer所消費。

  包括Header,Body兩部分,Header是由Producer添加上的各類屬性集合,控制Message是否被緩存,接收的queue是哪一個等等。Body是真正須要傳送的數據。

Connection:鏈接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP鏈接。

模型:

Publisher --> Broker(消息隊列服務器實體) --> Consumer

client[Publisher Application] --> Message --> Virtual Host[ Exchange-->Message queue ] --> client[Consumer]

RabbitMQ的使用場景:

一、單發送,單接收。

  簡單的發送與接收,沒有特別的處理。

二、單發送多接收。

  一個發送端,多個接收端,如分佈式的任務派發。爲了保證消息發送的可靠性,不丟失消息,使消息持久化了。同時爲了防止接收端在處理消息時down掉,只有在消息處理完成後才發送ack消息。

三、發佈、訂閱模式。

  發佈、訂閱模式,發送端發送廣播消息,多個接收端接收。

四、Routing (按路線發送接收)

  發送端按routing key發送消息,不一樣的接收端按不一樣的routing key接收消息。

五、Topics (按topic發送接收)

  發送端不僅按固定的routing key發送消息,而是按字符串「匹配」發送,接收端一樣如此。

# RabbitMQ的開啓關閉命令:
[root@localhost ~]# /opt/rabbitmq/sbin/rabbitmq-server -detached  start   // 啓動rabbitMQ
[root@localhost ~]# /opt/rabbitmq/sbin/rabbitmqctl status 			 	  // 查看狀態
[root@localhost ~]# /opt/rabbitmq/sbin/rabbitmqctl stop				 	  // 關閉rabbitMQ

 

實現的PHP代碼:

消息消費者端:

<?php 

//鏈接RabbitMQ  
$conn_args = array( 
	'host'=>'localhost' , 
	'port'=> '5672', 
	'login'=>'guest' , 
	'password'=> 'guest',
	'vhost' =>'/'
);  
$conn = new AMQPConnection($conn_args);  
$conn->connect();  
//設置queue名稱,使用exchange,綁定routingkey  
$channel = new AMQPChannel($conn);  
$q = new AMQPQueue($channel);  
$q->setName('queue_name');  
$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);  
$q->declareQueue();  
$q->bind('direct_exchange_name', 'routingkey_name');     
//消息獲取  
$messages = $q->get(AMQP_AUTOACK) ;  
if ($messages){  
      var_dump(json_decode($messages->getBody(), true ));  
}  
$conn->disconnect();

消息生產者端:

<?php 

//鏈接RabbitMQ  
$conn_args = array( 
	'host'=>'localhost' , 
	'port'=> '5672', 
	'login'=>'guest' ,   
	'password'=> 'guest',
	'vhost' =>'/'
);  
$conn = new AMQPConnection($conn_args);  
$conn->connect();  
//建立exchange名稱和類型  
$channel = new AMQPChannel($conn);  
$ex = new AMQPExchange($channel);  
$ex->setName('direct_exchange_name');  
$ex->setType(AMQP_EX_TYPE_DIRECT);  
$ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);  
$ex->declareExchange();  
//建立queue名稱,使用exchange,綁定routingkey  
$q = new AMQPQueue($channel);  
$q->setName('queue_name');  
$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);  
$q->declareQueue();  
$q->bind('direct_exchange_name', 'routingkey_name');  
//消息發佈  
$channel->startTransaction();  
$message = json_encode(array('Hello World!','DIRECT'));  
$ex->publish($message, 'routingkey_name');  
$channel->commitTransaction();  
$conn->disconnect();
相關文章
相關標籤/搜索