在項目中rabbitmq獲得了普遍的時候,這裏對rabbitmq的常規功能作了一個簡單的總結,並封裝成了composer包,composer包地址、github地址,歡迎fork,因爲水平有限,不免存在bug,歡迎提出寶貴意見php
對php-amqplib/php-amqplib包的二次封裝,爲常見功能提供一套開箱即用的生產解決方案
。目前支持的功能列表以下:html
若是還有其它場景,歡迎繼續補充,隨後進行迭代!!git
composer require maweibinguo/easyrabbitmq
在這裏咱們推薦php腳本+supervisor結合使用,用以保證消費進程的可靠性、加強worker的消費能力! 若是你尚未據說過supervisor,能夠點擊這裏瞭解.github
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); //延遲消息,30 秒中後纔會到達指定的交換機 $instance->pushToDirect( $msg = time(), //消息體內容 $exchange = "easy_direct_exchange", //交換機名稱 $routingKey = "direct_test_queue", //消息的routingkey,consume方法到bingdingkey 要和routingkey保持一致 $delaySec = 30 //延遲秒數 ); //無延遲,推入到指定到直鏈交換機 $instance->pushToDirect( $msg = time(), //消息體內容 $exchange = "easy_direct_exchange", //交換機名稱 $routingKey = "direct_test_queue", //消息的routingkey,consume方法到bingdingkey 要和routingkey保持一致 );
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); //延遲消息,30 秒中後纔會到達指定的交換機 $instance->pushToFanout( $msg = time(), //消息體內容 $exchange = "easy_fanout_exchange", //交換機名稱 $delaySec = 30 //延遲秒數 ); //無延遲,推入到指定到直鏈交換機 $instance->pushToFanout( $msg = time(), //消息體內容 $exchange = "easy_fanout_exchange" //交換機名稱 );
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); //延遲消息,30 秒中後纔會到達指定的交換機 $instance->pushToTopic( $msg = time(), //消息體內容 $exchange = "easy_topic_exchange", //交換機名稱 $routingKey = "", $delaySec = 30 //延遲秒數 ); //無延遲,推入到指定到直鏈交換機 $instance->pushToTopic( $msg = time(), //消息體內容 $exchange = "easy_topic_exchange", //交換機名稱 $routingKey = "easy.topic.queue" //routingKey 要同consum的bindingKey相匹配 //bindingKey支持兩種特殊的字符"*"、「#」,用做模糊匹配, 其中"*"用於匹配一個單詞、「#」用於匹配多個單詞(也能夠是0個) //不管是bindingKey 仍是routingKey, 被"."分隔開的每一段獨立的字符串就是一個單詞, easy.topic.queue, 包含三個單詞easy、topic、queue );
消費支持自動重試,最多嘗試重試5次,每次消費失敗後該消息將會被從新投入到消費隊列中。從新的時間將會隨着失敗的次數增多逐漸推移,本客戶端支持的推移策略以下:
失敗1次(1秒鐘後會再被投遞), 失敗2次(2秒鐘後會再被投遞), 失敗3次(4秒鐘後會再被投遞), 失敗4次(8秒鐘後會再被投遞), 失敗5次(16秒鐘後會再被投遞)php7
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); $instance->consume( $queueName = "direct_test_queue",//訂閱的隊列名稱 $consumerTag = "c1",//消費標記 $exchange = "easy_direct_exchange",//交換機名稱 $bindingKey = "direct_test_queue",//bindingkey,若是是直鏈交換機須要同routingKey保持一致 $callback = function($msg){ $body = $msg->body; file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND); //若是返回結果不絕對等於(===)true,那麼將觸發消息重試機制 return false; }, //5次消費消費失敗後,失敗消息將會投遞到的隊列名稱 $failedQueue = "easymq@failed" );
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); $instance->get( $queue = "get_queue", $exchange = "easy_fanout_exchange", $bindingKey = "", $callback = function($msg){ $body = $msg->body; file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND); //若是返回結果不絕對等於(===)true,那麼將觸發消息重試機制 return false; }, //5次消費消費失敗後,失敗消息將會投遞到的隊列名稱 $failedQueue = 'easymq@failed' );