中國廣東省深圳市龍華新區民治街道溪山美地
518131
+86 13113668890
+86 755 29812080
<netkiller@msn.com> php
版權聲明 html
轉載請與做者聯繫,轉載時請務必標明文章原始出處和做者信息及本聲明。 python
|
|
|
微信掃描二維碼進入 Netkiller 微信訂閱號 mysql QQ羣:128659835 請註明「讀者」 linux |
2015-10-19 git
您可使用iBook閱讀當前文檔 github
消息隊列(英語:Message queue)是一種進程間通訊或同一進程的不一樣線程間的通訊方式 sql
消息隊列技術是分佈式應用間交換信息的一種技術。消息隊列可駐留在內存或磁盤上,隊列存儲消息直到它們被應用程序讀出。經過消息隊列,應用程序可獨立地執行,它們不須要知道彼此的位置、或在繼續執行前不須要等待接收程序接收此消息。 shell
你首先須要弄清楚,消息隊列與遠程過程調用的區別,在不少讀者諮詢個人時候,我發現他們須要的是RPC(遠程過程調用),而不是消息隊列。 編程
消息隊列有同步或異步實現方式,一般咱們採用異步方式使用消息隊列,遠程過程調用多采用同步方式。
MQ與RPC有什麼不一樣? MQ一般傳遞無規則協議,這個協議由用戶定義而且實現存儲轉發;而RPC一般是專用協議,調用過程返回結果。
同步需求,遠程過程調用(PRC)更適合你。
異步需求,消息隊列更適合你。
目前不少消息隊列軟件同時支持RPC功能,不少RPC系統也能異步調用。
存儲轉發
分佈式事務
發佈訂閱
基於內容的路由
點對點鏈接
一般的作法,若是小的項目團隊能夠有一我的實現,包括消息的推送,接收處理。若是大型團隊,一般是定義好消息協議,而後各自開發各自的部分, 例如一個團隊負責寫推送協議部分,另外一個團隊負責寫接收與處理部分。
那麼爲何咱們不講消息隊列框架化呢?
下面是做者開發的一個SOA框架,該框架提供了三種接口,分別是SOAP,RESTful,AMQP(RabbitMQ),理解了該框架思想,你很容易進一步擴展,例如增長XML-RPC, ZeroMQ等等支持。
https://github.com/netkiller/SOA
本文只講消息隊列框架部分。
消息隊列框架是本地應用程序(命令行程序),咱們爲了讓他在後臺運行,須要實現守護進程。
https://github.com/netkiller/SOA/blob/master/bin/rabbitmq.php
每一個實例處理一組隊列,實例化須要提供三個參數,$queueName = '隊列名', $exchangeName = '交換名', $routeKey = '路由'
$daemon = new \framework\RabbitDaemon($queueName = 'email', $exchangeName = 'email', $routeKey = 'email');
守護進程須要使用root用戶運行,運行後會切換到普通用戶,同事建立進程ID文件,一邊進程中止的時候使用。
守護進程核心代碼https://github.com/netkiller/SOA/blob/master/system/rabbitdaemon.class.php
消息協議是一個數組,將數組序列化或者轉爲JSON推送到消息隊列服務器,這裏使用json格式的協議。
$msg = array( 'Namespace'=>'namespace', "Class"=>"Email", "Method"=>"smtp", "Param" => array( $mail, $subject, $message, null ) );
序列化後的協議
{"Namespace":"single","Class":"Email","Method":"smtp","Param":["netkiller@msn.com","Hello"," TestHelloWorld",null]}
使用json格式是考慮到通用性,這樣推送端可使用任何語言。若是不考慮兼容,建議使用二進制序列化,例如msgpack效率更好。
消息隊列處理核心代碼
https://github.com/netkiller/SOA/blob/master/system/rabbitmq.class.php
因此消息的處理在下面一段代碼中進行
$this->queue->consume(function($envelope, $queue) { $speed = microtime(true); $msg = $envelope->getBody(); $result = $this->loader($msg); $queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答 //$this->logging->info(''.$msg.' '.$result) $this->logging->debug('Protocol: '.$msg.' '); $this->logging->debug('Result: '. $result.' '); $this->logging->debug('Time: '. (microtime(true) - $speed) .''); });
public function loader($msg = null) 負責拆解協議,而後載入對應的類文件,傳遞參數,運行方法,反饋結果。
Time 能夠輸出程序運行所花費的時間,對於後期優化十分有用。
loader() 能夠進一步優化,使用多線程每次調用loader將任務提交到線程池中,這樣即可以多線程處理消息隊列。
測試代碼 https://github.com/netkiller/SOA/blob/master/test/queue/email.php
<?php $queueName = 'example'; $exchangeName = 'email'; $routeKey = 'email'; $mail = $argv[1]; $subject = $argv[2]; $message = empty($argv[3]) ? 'Hello World!' : ' '.$argv[3]; $connection = new AMQPConnection(array( 'host' => '192.168.4.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest' )); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); $msg = array( 'Namespace'=>'namespace', "Class"=>"Email", "Method"=>"smtp", "Param" => array( $mail, $subject, $message, null ) ); $exchange->publish(json_encode($msg), $routeKey); printf("[x] Sent %s \r\n", json_encode($msg)); $connection->disconnect();
這裏只給出了少許測試與演示程序,若有疑問請到瀆者羣,或者公衆號詢問。
該消息隊列框架還比較簡陋,但在生產環境已經運行很長一段時間,效果仍是不錯的。同時下降了消息隊列的開發難度,開發者更多的時間是考慮業務邏輯的實現,而不用操心消息隊列自己的使用。