rabbitmq作爲如今最流行的消息中間件之一,我居然還沒用過,不可原諒,因此本身查看下資料在本地本身用一下php
介紹:
RabbitMQ 是一個在AMQP基礎上實現的企業級消息系統。它接受並轉發消息。你能夠將其視爲郵局:當你將要發佈的郵件放在郵箱中時,您能夠確信 Postman 先生最終會將郵件發送給收件人。在這個比喻中,RabbitMQ 是一個郵箱,郵局和郵遞員,即一端不斷往郵局中收取消息,而另外一端則能夠讀取或者訂閱隊列中的發送消息。html
RabbitMQ 和郵局之間的主要區別在於它不處理紙張,而是接受,存儲和轉發二進制數據塊的消息。mysql
常見術語:
RabbitMQ 使用一些術語:生產者(publisher)、隊列(queue)、消費者(consumer)。sql
關係:
生產者(業務系統 ,發送消息的程序)-->消息隊列(類比一個郵箱,存在於RabbitMQ)--->消費者(隊列處理系統,等待消息而後處理的程序)
注意,隊列類比一個郵箱,存在於RabbitMQ,然而信息流經過RabbitMQ和您的應用程序,他們只能存儲在一個隊列。隊列只受主機內存和磁盤限制的約束,它本質上是一個很大的消息緩衝區。會有許多生產者能夠發送到一個隊列的消息,許多消費者能夠嘗試從一個隊列接收數據。shell
好處:數據庫
在項目中,將一些無需即時返回且耗時的操做提取出來,進行了異步操做,而這種異步處理的方式大大的節省了服務器的請求時間,從而提升了系統的吞吐量。並且不影響服務器作其餘相應,不獨佔服務器資源。json
如:註冊用戶這種服務,它可能解耦成好幾種獨立的服務(帳號驗證,郵箱驗證碼,手機短信碼等)。它們做爲消費者,等待用戶輸入數據,在前臺數據提交以後會通過分解併發送到各個服務所在的url,分發的那個角色就至關於生產者。消費者在獲取數據時候有可能一次不能處理完,那麼它們各自有一個請求隊列,那就是內存緩衝區了。作這項工做的框架叫作消息隊列。服務器
因此消息隊列適用於如下幾種經典場景:併發
1).數據冗餘時:如電商系統中的訂單處理系統,傳統處理模式是:下訂單的時候,訂單系統可能會調用庫存系統的接口,這樣兩個系統之間存在一個嚴重依賴關係,若是庫存系統宕機,那麼整個流程都會受到影響。如今大多公司的處理方法是:引入消息隊列,下完訂單,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。composer
即便在下單時庫存系統不能正常使用。也不影響正常下單,由於下單後,訂單系統寫入消息隊列就再也不關心其餘的後續操做了。
2).解耦:對庫存系統來講,採用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操做。這樣實現了兩個系統間的解耦。
3).流量削峯:流量劇增狀況下,如搶紅包、秒殺等。
4).異步通信:異步操做下的狀況都適用。
5).擴展性:如已經有了財務系統,後續須要配貨系統,屆時只須要配貨系統訂閱消息隊列就能夠了。
6).排序保證:隊列自己就能夠作成單進單出的單線程的系統,從而保證數據按照順序進行處理。
可用於隊列的幾種介質:
Mysql:可靠性高、易實現,速度慢
Redis:速度快,單條大消息包時效率低
消息系統:專業性強、可靠,學習成本高,如rabbitmq
消息處理觸發機制:
死循環方式讀取:易實現,故障時沒法及時修復
定時任務:壓力均分,有處理量的上限
守護進程:相似於PHP-FPM和PHP-CG,須要shell基礎
解耦案例:隊列處理訂單系統和配送系統
訂單系統接受用戶訂單->訂單對列表(mysql)->配送系統+crontab(定時腳本shell)+標記配送結果
流量削峯案例:Redis的List類型實現秒殺(基於內存,速度快;mysql須向硬盤寫入,並且其餘業務也使用mysql,瞬間的大流量會形成壓力)
秒殺業務程序(檢查Redis已存放數據的長度,超出上限直接丟棄)->Redis->入庫程序(由於秒殺時間短,因此死循環處理Redis的數據)+數據庫
簡單實現:
rabbitmq須要一系列依賴,具體安裝步驟省略,能夠查看另外一篇博文 http://www.javashuo.com/article/p-qeesbccu-ez.html
這裏代碼須要安裝composer依賴
文件夾下新建composer.json,並添加
{ "require": { "php-amqplib/php-amqplib": ">=2.6.1" } }
執行composer install,就會自動在文件夾下生成vendor目錄
新建生產者send.php:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $q_name = 'task_queues'; $connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin'); $channel = $connection->channel(); $channel->queue_declare($q_name, false, true, false, false); for($i=0;$i<10;$i++){ sleep(1); $data = $i.' '.date('Y-m-d H:i:s',time()); $msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, '', $q_name); echo 'Rabbitmq Send Message: ', $data, "\n"; } $channel->close(); $connection->close(); ?>
新建消費者run.php:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $q_name = 'task_queues'; //建立鏈接和信道 $connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin'); $channel = $connection->channel(); //建立隊列 $channel->queue_declare($q_name, false, true, false, false); echo "Rabbitmq Waiting for messages. queue:".$q_name."\n"; $callback = function ($msg) { echo 'Received Message: '. $msg->body. "\n"; sleep(substr_count($msg->body, '.')); echo "Done \n"; //肯定此消息已經處理完成,不然不確認的話,queue會將此消息交給其餘consumer處理哦 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; //同時最多處理1條信息 $channel->basic_qos(null, 1, null); //回調 $channel->basic_consume($q_name, '', false, false, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
打開三個xshell窗口:
第一個消費者窗口:
第二個消費者窗口:
第三個生產者窗口:
最好仍是直接腳本執行,不然框架是很容易504的(我在我本身的框架就是這樣)。
腳本執行,sleep 1秒下會很清晰的模擬出消息隊列。
好了,初次使用就到這裏了。
附錄:
官網教程:https://www.rabbitmq.com/getstarted.html