PHP高級編程之消息隊列

PHP高級編程之消息隊列

http://netkiller.github.io/journal/php.mq.html

Mr. Neo Chen (陳景峯), netkiller, BG7NYT


中國廣東省深圳市龍華新區民治街道溪山美地
518131
+86 13113668890
+86 755 29812080
<netkiller@msn.com> php

版權聲明 html

轉載請與做者聯繫,轉載時請務必標明文章原始出處和做者信息及本聲明。 python

文檔出處:
http://netkiller.github.io
http://netkiller.sourceforge.net

微信掃描二維碼進入 Netkiller 微信訂閱號 mysql

QQ羣:128659835 請註明「讀者」 linux

2015-10-19 git

摘要
個人系列文檔
Netkiller Architect 手札 Netkiller Developer 手札 Netkiller PHP 手札 Netkiller Python 手札 Netkiller Testing 手札
Netkiller Cryptography 手札 Netkiller Linux 手札 Netkiller Debian 手札 Netkiller CentOS 手札 Netkiller FreeBSD 手札
Netkiller Shell 手札 Netkiller Security 手札 Netkiller Web 手札 Netkiller Monitoring 手札 Netkiller Storage 手札
Netkiller Mail 手札 Netkiller Docbook 手札 Netkiller Project 手札 Netkiller Database 手札 Netkiller PostgreSQL 手札
Netkiller MySQL 手札 Netkiller NoSQL 手札 Netkiller LDAP 手札 Netkiller Network 手札 Netkiller Cisco IOS 手札
Netkiller H3C 手札 Netkiller Multimedia 手札 Netkiller Perl 手札 Netkiller Amateur Radio 手札 Netkiller DevOps 手札

您可使用iBook閱讀當前文檔 github


1. 什麼是消息隊列

消息隊列(英語:Message queue)是一種進程間通訊或同一進程的不一樣線程間的通訊方式 sql

2. 爲何使用消息隊列

消息隊列技術是分佈式應用間交換信息的一種技術。消息隊列可駐留在內存或磁盤上,隊列存儲消息直到它們被應用程序讀出。經過消息隊列,應用程序可獨立地執行,它們不須要知道彼此的位置、或在繼續執行前不須要等待接收程序接收此消息。 shell

3. 什麼場合使用消息隊列

你首先須要弄清楚,消息隊列與遠程過程調用的區別,在不少讀者諮詢個人時候,我發現他們須要的是RPC(遠程過程調用),而不是消息隊列。 編程

消息隊列有同步或異步實現方式,一般咱們採用異步方式使用消息隊列,遠程過程調用多采用同步方式。

MQ與RPC有什麼不一樣? MQ一般傳遞無規則協議,這個協議由用戶定義而且實現存儲轉發;而RPC一般是專用協議,調用過程返回結果。

4. 何時使用消息隊列

同步需求,遠程過程調用(PRC)更適合你。

異步需求,消息隊列更適合你。

目前不少消息隊列軟件同時支持RPC功能,不少RPC系統也能異步調用。

消息隊列用來實現下列需求
  1. 存儲轉發

  2. 分佈式事務

  3. 發佈訂閱

  4. 基於內容的路由

  5. 點對點鏈接

5. 誰負責處理消息隊列

一般的作法,若是小的項目團隊能夠有一我的實現,包括消息的推送,接收處理。若是大型團隊,一般是定義好消息協議,而後各自開發各自的部分, 例如一個團隊負責寫推送協議部分,另外一個團隊負責寫接收與處理部分。

那麼爲何咱們不講消息隊列框架化呢?

框架化有幾個好處:
  1. 開發者不用學習消息隊列接口
  2. 開發者不須要關心消息推送與接收
  3. 開發者經過統一的API推送消息
  4. 開發者的重點是實現業務邏輯功能

6. 怎麼實現消息隊列框架

下面是做者開發的一個SOA框架,該框架提供了三種接口,分別是SOAP,RESTful,AMQP(RabbitMQ),理解了該框架思想,你很容易進一步擴展,例如增長XML-RPC, ZeroMQ等等支持。

https://github.com/netkiller/SOA

本文只講消息隊列框架部分。

6.1. 守護進程

消息隊列框架是本地應用程序(命令行程序),咱們爲了讓他在後臺運行,須要實現守護進程。

https://github.com/netkiller/SOA/blob/master/bin/rabbitmq.php

每一個實例處理一組隊列,實例化須要提供三個參數,$queueName = '隊列名', $exchangeName = '交換名', $routeKey = '路由'

$daemon = new \framework\RabbitDaemon($queueName = 'email', $exchangeName = 'email', $routeKey = 'email');

守護進程須要使用root用戶運行,運行後會切換到普通用戶,同事建立進程ID文件,一邊進程中止的時候使用。

守護進程核心代碼https://github.com/netkiller/SOA/blob/master/system/rabbitdaemon.class.php

6.2. 消息隊列協議

消息協議是一個數組,將數組序列化或者轉爲JSON推送到消息隊列服務器,這裏使用json格式的協議。

$msg = array(
	'Namespace'=>'namespace',
	"Class"=>"Email",
	"Method"=>"smtp",
	"Param" => array(
		$mail, $subject, $message, null
	)
);

序列化後的協議

{"Namespace":"single","Class":"Email","Method":"smtp","Param":["netkiller@msn.com","Hello"," TestHelloWorld",null]}

使用json格式是考慮到通用性,這樣推送端可使用任何語言。若是不考慮兼容,建議使用二進制序列化,例如msgpack效率更好。

6.3. 消息隊列處理

消息隊列處理核心代碼

https://github.com/netkiller/SOA/blob/master/system/rabbitmq.class.php

因此消息的處理在下面一段代碼中進行

$this->queue->consume(function($envelope, $queue) {

	$speed = microtime(true);
	
	$msg = $envelope->getBody();
	$result = $this->loader($msg);
	$queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答

	//$this->logging->info(''.$msg.' '.$result)
	$this->logging->debug('Protocol: '.$msg.' ');
	$this->logging->debug('Result: '. $result.' ');
	$this->logging->debug('Time: '. (microtime(true) - $speed) .'');
});

public function loader($msg = null) 負責拆解協議,而後載入對應的類文件,傳遞參數,運行方法,反饋結果。

Time 能夠輸出程序運行所花費的時間,對於後期優化十分有用。

提示

loader() 能夠進一步優化,使用多線程每次調用loader將任務提交到線程池中,這樣即可以多線程處理消息隊列。

6.4. 測試

測試代碼 https://github.com/netkiller/SOA/blob/master/test/queue/email.php

<?php
$queueName = 'example';
$exchangeName = 'email';
$routeKey = 'email';
$mail = $argv[1];
$subject = $argv[2];
$message = empty($argv[3]) ? 'Hello World!' : ' '.$argv[3];

 
$connection = new AMQPConnection(array(
	'host' => '192.168.4.1', 
	'port' => '5672', 
	'vhost' => '/', 
	'login' => 'guest', 
	'password' => 'guest'
	));
$connection->connect() or die("Cannot connect to the broker!\n");
 
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();

$msg = array(
	'Namespace'=>'namespace',
	"Class"=>"Email",
	"Method"=>"smtp",
	"Param" => array(
		$mail, $subject, $message, null
	)
);

$exchange->publish(json_encode($msg), $routeKey);
printf("[x] Sent %s \r\n", json_encode($msg));


$connection->disconnect();

這裏只給出了少許測試與演示程序,若有疑問請到瀆者羣,或者公衆號詢問。

7. 總結

該消息隊列框架還比較簡陋,但在生產環境已經運行很長一段時間,效果仍是不錯的。同時下降了消息隊列的開發難度,開發者更多的時間是考慮業務邏輯的實現,而不用操心消息隊列自己的使用。

相關文章
相關標籤/搜索