rabbitmq常見功能封裝 - php版本

在項目中rabbitmq獲得了普遍的時候,這裏對rabbitmq的常規功能作了一個簡單的總結,並封裝成了composer包,composer包地址github地址,歡迎fork,因爲水平有限,不免存在bug,歡迎提出寶貴意見php

easy-rabbitmq 包簡介

對php-amqplib/php-amqplib包的二次封裝,爲常見功能提供一套開箱即用的生產解決方案
。目前支持的功能列表以下:html

  • 推送消息到直連交換機(含延遲消息)
  • 推送消息到扇形交換機(含延遲消息)
  • 推送消息到主題交換機(含延遲消息)
  • 訂閱模式下的可靠消費, 消費者消費失敗後將會嘗試繼續消費,最多嘗試5次。
  • 拉取模式下的可靠消費, 消費者消費失敗後將會嘗試繼續消費,最多嘗試5次。

若是還有其它場景,歡迎繼續補充,隨後進行迭代!!git

要求

  • 安裝包對PHP版本對要求主要取決於php-amqplib/php-amqplib包自己對要求,這裏爲了兼顧php5.0的使用者,咱們使用了php-amqplib/php-amqplib包V2.9.0的版本。
    具體的要求參照這裏
    不過筆者推薦使用php7.0及其以上版本, 這個開發包也是在7.0這個版本上面開發完成的!

安裝

composer require maweibinguo/easyrabbitmq

使用

在這裏咱們推薦php腳本+supervisor結合使用,用以保證消費進程的可靠性、加強worker的消費能力! 若是你尚未據說過supervisor,能夠點擊這裏瞭解.github

一、推送消息

1-一、推送消息到直連交換機

$config = [
        "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
    ];    
    $instance = RabbitMq::getInstance($config);
    
    //延遲消息,30 秒中後纔會到達指定的交換機
    $instance->pushToDirect(
                $msg = time(), //消息體內容
                $exchange = "easy_direct_exchange", //交換機名稱
                $routingKey = "direct_test_queue", //消息的routingkey,consume方法到bingdingkey 要和routingkey保持一致
                $delaySec = 30 //延遲秒數
    );

    //無延遲,推入到指定到直鏈交換機
    $instance->pushToDirect(
                $msg = time(), //消息體內容
                $exchange = "easy_direct_exchange", //交換機名稱
                $routingKey = "direct_test_queue", //消息的routingkey,consume方法到bingdingkey 要和routingkey保持一致
    );

1-二、推送消息到扇形交換機

$config = [
        "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
    ];    
    $instance = RabbitMq::getInstance($config);
    
    //延遲消息,30 秒中後纔會到達指定的交換機
    $instance->pushToFanout(
                $msg = time(), //消息體內容
                $exchange = "easy_fanout_exchange", //交換機名稱
                $delaySec = 30 //延遲秒數
    );

    //無延遲,推入到指定到直鏈交換機
    $instance->pushToFanout(
                $msg = time(), //消息體內容
                $exchange = "easy_fanout_exchange" //交換機名稱
    );

1-三、推送消息到主題交換機

$config = [
        "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
    ];    
    $instance = RabbitMq::getInstance($config);
    
    //延遲消息,30 秒中後纔會到達指定的交換機
    $instance->pushToTopic(
                $msg = time(), //消息體內容
                $exchange = "easy_topic_exchange", //交換機名稱
                $routingKey = "",
                $delaySec = 30 //延遲秒數
    );

    //無延遲,推入到指定到直鏈交換機
    $instance->pushToTopic(
                $msg = time(), //消息體內容
                $exchange = "easy_topic_exchange", //交換機名稱
                $routingKey = "easy.topic.queue" //routingKey 要同consum的bindingKey相匹配
                                 //bindingKey支持兩種特殊的字符"*"、「#」,用做模糊匹配, 其中"*"用於匹配一個單詞、「#」用於匹配多個單詞(也能夠是0個)
                                 //不管是bindingKey 仍是routingKey, 被"."分隔開的每一段獨立的字符串就是一個單詞, easy.topic.queue, 包含三個單詞easy、topic、queue
    );

二、消費消息

消費支持自動重試,最多嘗試重試5次,每次消費失敗後該消息將會被從新投入到消費隊列中。從新的時間將會隨着失敗的次數增多逐漸推移,本客戶端支持的推移策略以下:
失敗1次(1秒鐘後會再被投遞), 失敗2次(2秒鐘後會再被投遞), 失敗3次(4秒鐘後會再被投遞), 失敗4次(8秒鐘後會再被投遞), 失敗5次(16秒鐘後會再被投遞)php7

2-一、訂閱模式

訂閱模式下的可靠消費
$config = [
        "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
    ];    
    $instance = RabbitMq::getInstance($config);
    $instance->consume(
        $queueName = "direct_test_queue",//訂閱的隊列名稱
        $consumerTag = "c1",//消費標記
        $exchange = "easy_direct_exchange",//交換機名稱
        $bindingKey = "direct_test_queue",//bindingkey,若是是直鏈交換機須要同routingKey保持一致
        $callback = function($msg){
            $body = $msg->body;
            file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
            //若是返回結果不絕對等於(===)true,那麼將觸發消息重試機制
            return false;
        },
        //5次消費消費失敗後,失敗消息將會投遞到的隊列名稱
        $failedQueue = "easymq@failed"
    );

2-二、拉取模式

拉取模式下的可靠消費
$config = [
        "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
    ];    
    $instance = RabbitMq::getInstance($config);
    $instance->get(
        $queue = "get_queue",
        $exchange = "easy_fanout_exchange",
        $bindingKey = "",
        $callback = function($msg){
            $body = $msg->body;
            file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
            //若是返回結果不絕對等於(===)true,那麼將觸發消息重試機制
            return false;
        },
        //5次消費消費失敗後,失敗消息將會投遞到的隊列名稱
        $failedQueue = 'easymq@failed'
        );
相關文章
相關標籤/搜索