AMQP[高級消息隊列協議] 是一個異步消息傳遞所使用的應用層協議規範(是線路層協議)
AMQP 客戶端可以無視消息的來源任意發送和接受信息php
一、與業務的主要邏輯無關,但又須要執行,就能夠使用隊列異步處理。json
將與主邏輯無關的操做放入隊列,由於程序要等待所有操做執行完才返回用戶操做結果,就會減低用戶體驗。緩存
二、耗時操做放隊列執行、通常不關心執行結果的(發送郵件)。bash
三、利用隊列的特性,能夠使一些應用更加簡單。服務器
一、隊列支持多種語言客戶端,能夠實現不一樣語言間的相互通訊。異步
二、隊列有支持delay特性,能夠實現簡單的定時效果。分佈式
四、兩個(多個)系統間須要經過定時任務來同步數據(數據交換)。spa
五、異構系統的不一樣進程間相互調用、通信。code
Broker:簡單來講就是消息隊列服務器實體。server
接收客戶端鏈接,實現AMQP消息隊列和路由功能的進程。
Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
從鏈接通道(Channel)接收消息,並按照特定的路由規則發送給隊列。接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,ExchangeType有direct、Fanout和Topic三種。
Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。
當消息不能被正常消費時緩存這些消息。(存儲還未被消費者消費的消息)
Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。
一、即路由規則,告訴交換機將何種類型的消息發送到某個隊列中。
二、Exchange在與多個Message Queue發生Binding後會生成一張路由表。
三、路由表中存儲着Message Queue所需消息的限制條件即Binding Key,當Exchange收到Message時會解析其Header獲得Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,二者的匹配方式由Exchange Type決定。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。(每一個virtual Host裏面有若干個Exchange和Queue)
虛擬機能夠持有多個交換機、隊列和綁定。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。
一、僅僅建立了客戶端到Broker之間的鏈接後,客戶端仍是不能發送消息的。須要爲每個Connection建立Channel,AMQP協議規定只有經過Channel才能執行AMQP的命令。
二、一個Connection能夠包含多個Channel,須要Channel是由於TCP鏈接的創建和釋放都是十分昂貴的。
message: 消息體,由producer產生,經broker被consumer所消費。
包括Header,Body兩部分,Header是由Producer添加上的各類屬性集合,控制Message是否被緩存,接收的queue是哪一個等等。Body是真正須要傳送的數據。
Connection:鏈接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP鏈接。
Publisher --> Broker(消息隊列服務器實體) --> Consumer
client[Publisher Application] --> Message --> Virtual Host[ Exchange-->Message queue ] --> client[Consumer]
RabbitMQ的使用場景:
一、單發送,單接收。
簡單的發送與接收,沒有特別的處理。
二、單發送多接收。
一個發送端,多個接收端,如分佈式的任務派發。爲了保證消息發送的可靠性,不丟失消息,使消息持久化了。同時爲了防止接收端在處理消息時down掉,只有在消息處理完成後才發送ack消息。
三、發佈、訂閱模式。
發佈、訂閱模式,發送端發送廣播消息,多個接收端接收。
四、Routing (按路線發送接收)
發送端按routing key發送消息,不一樣的接收端按不一樣的routing key接收消息。
五、Topics (按topic發送接收)
發送端不僅按固定的routing key發送消息,而是按字符串「匹配」發送,接收端一樣如此。
# RabbitMQ的開啓關閉命令: [root@localhost ~]# /opt/rabbitmq/sbin/rabbitmq-server -detached start // 啓動rabbitMQ [root@localhost ~]# /opt/rabbitmq/sbin/rabbitmqctl status // 查看狀態 [root@localhost ~]# /opt/rabbitmq/sbin/rabbitmqctl stop // 關閉rabbitMQ
<?php //鏈接RabbitMQ $conn_args = array( 'host'=>'localhost' , 'port'=> '5672', 'login'=>'guest' , 'password'=> 'guest', 'vhost' =>'/' ); $conn = new AMQPConnection($conn_args); $conn->connect(); //設置queue名稱,使用exchange,綁定routingkey $channel = new AMQPChannel($conn); $q = new AMQPQueue($channel); $q->setName('queue_name'); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); $q->declareQueue(); $q->bind('direct_exchange_name', 'routingkey_name'); //消息獲取 $messages = $q->get(AMQP_AUTOACK) ; if ($messages){ var_dump(json_decode($messages->getBody(), true )); } $conn->disconnect();
<?php //鏈接RabbitMQ $conn_args = array( 'host'=>'localhost' , 'port'=> '5672', 'login'=>'guest' , 'password'=> 'guest', 'vhost' =>'/' ); $conn = new AMQPConnection($conn_args); $conn->connect(); //建立exchange名稱和類型 $channel = new AMQPChannel($conn); $ex = new AMQPExchange($channel); $ex->setName('direct_exchange_name'); $ex->setType(AMQP_EX_TYPE_DIRECT); $ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); $ex->declareExchange(); //建立queue名稱,使用exchange,綁定routingkey $q = new AMQPQueue($channel); $q->setName('queue_name'); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); $q->declareQueue(); $q->bind('direct_exchange_name', 'routingkey_name'); //消息發佈 $channel->startTransaction(); $message = json_encode(array('Hello World!','DIRECT')); $ex->publish($message, 'routingkey_name'); $channel->commitTransaction(); $conn->disconnect();