初識rabbitmq

rabbitmq作爲如今最流行的消息中間件之一,我居然還沒用過,不可原諒,因此本身查看下資料在本地本身用一下php

介紹:
RabbitMQ 是一個在AMQP基礎上實現的企業級消息系統。它接受並轉發消息。你能夠將其視爲郵局:當你將要發佈的郵件放在郵箱中時,您能夠確信 Postman 先生最終會將郵件發送給收件人。在這個比喻中,RabbitMQ 是一個郵箱,郵局和郵遞員,即一端不斷往郵局中收取消息,而另外一端則能夠讀取或者訂閱隊列中的發送消息。html

RabbitMQ 和郵局之間的主要區別在於它不處理紙張,而是接受,存儲和轉發二進制數據塊的消息。mysql

常見術語:
RabbitMQ 使用一些術語:生產者(publisher)、隊列(queue)、消費者(consumer)。sql

關係:
生產者(業務系統 ,發送消息的程序)-->消息隊列(類比一個郵箱,存在於RabbitMQ)--->消費者(隊列處理系統,等待消息而後處理的程序)
注意,隊列類比一個郵箱,存在於RabbitMQ,然而信息流經過RabbitMQ和您的應用程序,他們只能存儲在一個隊列。隊列只受主機內存和磁盤限制的約束,它本質上是一個很大的消息緩衝區。會有許多生產者能夠發送到一個隊列的消息,許多消費者能夠嘗試從一個隊列接收數據。shell

好處:數據庫

在項目中,將一些無需即時返回且耗時的操做提取出來,進行了異步操做,而這種異步處理的方式大大的節省了服務器的請求時間,從而提升了系統的吞吐量。並且不影響服務器作其餘相應,不獨佔服務器資源。json

如:註冊用戶這種服務,它可能解耦成好幾種獨立的服務(帳號驗證,郵箱驗證碼,手機短信碼等)。它們做爲消費者,等待用戶輸入數據,在前臺數據提交以後會通過分解併發送到各個服務所在的url,分發的那個角色就至關於生產者。消費者在獲取數據時候有可能一次不能處理完,那麼它們各自有一個請求隊列,那就是內存緩衝區了。作這項工做的框架叫作消息隊列。服務器

因此消息隊列適用於如下幾種經典場景:併發

1).數據冗餘時:如電商系統中的訂單處理系統,傳統處理模式是:下訂單的時候,訂單系統可能會調用庫存系統的接口,這樣兩個系統之間存在一個嚴重依賴關係,若是庫存系統宕機,那麼整個流程都會受到影響。如今大多公司的處理方法是:引入消息隊列,下完訂單,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。composer

即便在下單時庫存系統不能正常使用。也不影響正常下單,由於下單後,訂單系統寫入消息隊列就再也不關心其餘的後續操做了。

2).解耦:對庫存系統來講,採用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操做。這樣實現了兩個系統間的解耦。

3).流量削峯:流量劇增狀況下,如搶紅包、秒殺等。

4).異步通信:異步操做下的狀況都適用。

5).擴展性:如已經有了財務系統,後續須要配貨系統,屆時只須要配貨系統訂閱消息隊列就能夠了。

6).排序保證:隊列自己就能夠作成單進單出的單線程的系統,從而保證數據按照順序進行處理。

可用於隊列的幾種介質:

Mysql:可靠性高、易實現,速度慢

Redis:速度快,單條大消息包時效率低

消息系統:專業性強、可靠,學習成本高,如rabbitmq

消息處理觸發機制:

死循環方式讀取:易實現,故障時沒法及時修復

定時任務:壓力均分,有處理量的上限

守護進程:相似於PHP-FPM和PHP-CG,須要shell基礎

解耦案例:隊列處理訂單系統和配送系統

訂單系統接受用戶訂單->訂單對列表(mysql)->配送系統+crontab(定時腳本shell)+標記配送結果

流量削峯案例:Redis的List類型實現秒殺(基於內存,速度快;mysql須向硬盤寫入,並且其餘業務也使用mysql,瞬間的大流量會形成壓力)

秒殺業務程序(檢查Redis已存放數據的長度,超出上限直接丟棄)->Redis->入庫程序(由於秒殺時間短,因此死循環處理Redis的數據)+數據庫

簡單實現:

rabbitmq須要一系列依賴,具體安裝步驟省略,能夠查看另外一篇博文  http://www.javashuo.com/article/p-qeesbccu-ez.html

這裏代碼須要安裝composer依賴

文件夾下新建composer.json,並添加

{
    "require": {
        "php-amqplib/php-amqplib": ">=2.6.1"
    }
}

執行composer install,就會自動在文件夾下生成vendor目錄

新建生產者send.php:

<?php 

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$q_name = 'task_queues';
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin');
$channel = $connection->channel();

$channel->queue_declare($q_name, false, true, false, false);

for($i=0;$i<10;$i++){
    sleep(1);
    $data = $i.' '.date('Y-m-d H:i:s',time());
    $msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', $q_name);

echo 'Rabbitmq Send Message: ', $data, "\n";
}


$channel->close();
$connection->close();

?>

新建消費者run.php:

<?php 

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$q_name = 'task_queues';
//建立鏈接和信道
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin');
$channel = $connection->channel();

//建立隊列
$channel->queue_declare($q_name, false, true, false, false);

echo "Rabbitmq Waiting for messages. queue:".$q_name."\n";

$callback = function ($msg) {
    echo 'Received Message: '. $msg->body. "\n";
    sleep(substr_count($msg->body, '.'));
    echo "Done \n";
    //肯定此消息已經處理完成,不然不確認的話,queue會將此消息交給其餘consumer處理哦
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

//同時最多處理1條信息
$channel->basic_qos(null, 1, null);
//回調
$channel->basic_consume($q_name, '', false, false, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

打開三個xshell窗口:

第一個消費者窗口:

第二個消費者窗口:

第三個生產者窗口:

最好仍是直接腳本執行,不然框架是很容易504的(我在我本身的框架就是這樣)。

腳本執行,sleep 1秒下會很清晰的模擬出消息隊列。

好了,初次使用就到這裏了。

 

附錄:

 官網教程:https://www.rabbitmq.com/getstarted.html

相關文章
相關標籤/搜索