【伍哥原創】php
1,前言python
RabbitMQ 是由 LShift 提供的一個 Advanced Message Queuing Protocol (AMQP) 的開源實現,由以高性能、健壯以及可伸縮性出名的 Erlang 寫成,所以也是繼承了這些優勢。mysql
AMQP 裏主要要說兩個組件:Exchange 和 Queue (在 AMQP 1.0 裏還會有變更),以下圖所示,綠色的 X 就是 Exchange ,紅色的是 Queue ,這二者都在 Server 端,又稱做 Broker ,這部分是 RabbitMQ 實現的,而藍色的則是客戶端,一般有 Producer 和 Consumer 兩種類型:
RabbitMQ做爲一個很是成熟的消息隊列技術方案,也應用到了豆莢商城項目裏面。
2,郵件服務:將慢動做從請求中分離出來sql
做爲一個商城,天然少不了給用戶發送郵件。好比註冊的時候要發送確認郵件,下單之後發送訂單郵件,推廣信息也須要發送郵件,相似的狀況很是多。
實現郵件發送的一般作法比較簡單,就是在HTTP請求中一併完成郵件發送這個動做。而發送郵件依賴於SMTP服務。在小併發的環境下,一切都工做的很正常。緩存
可是,當併發請求上到必定的程度,問題就來了。HTTP必須等待SMTP這個慢動做,若是你須要帶附件的話,狀況就更糟糕了。
另一個問題來自於SMTP,當請求過於頻密的時候,SMTP就出現超負荷工做的狀況,這樣各類郵件發送的異常狀況就在所不免了。架構
怎麼才能很好的解決這個問題呢?
答案在前面就給出了,就是創建消息隊列機制!
其實原理很是簡單,就是在內存維護一個隊列(queue),若是要發送一封郵件,就往隊列裏面寫一條消息,也就是所謂的信息生產者。
再創建一個進程,處理隊列裏面的郵件發送,就是所謂的信息消費者。併發
因爲商城是用PHP開發的,因此就須要支持amqp的PHP客戶端代碼。這裏用的是php amqplib (http://code.google.com/p/php-amqplib/)。
首先是鏈接rabbitmq,獲取一個通道,而後是發送消息,最後斷開通道和鏈接。下面是代碼示例:eclipse
1
2
3
4
5
6
7
8
9
10
11
|
$queue
=
'mail_queue'
$conn
=
new
AMQPConnection(
$config
[
'host'
],
$config
[
'port'
],
$config
[
'user'
],
$config
[
'pass'
]);
$channel
=
$conn
->channel();
$channel
->queue_declare(
$queue
, false, true, false, false);
$send_data
= serialize(
$send_msg
);
//數據先序列化一下,也可使用JSON格式化
$msg
=
new
AMQPMessage(
$send_data
,
array
(
'delivery_mode'
=> 2)
//讓消息持久化
);
$channel
->basic_publish(
$msg
,
''
,
$queue
);
$channel
->close();
$conn
->close();
|
在項目裏固然不能這樣寫,應該封裝成一個分佈式服務接口,融入到整個系統代碼架構裏面,方便其餘地方,好比controller,model的使用。分佈式
接下來是實現消息的消費程序。這裏用的是python的pika。
首先是鏈接rabbitmq,獲取通道,開始消費隊列裏面的信息。如下的代碼寫在類裏面:memcached
1
2
3
4
5
6
7
|
self
.connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
self
.rmq_host))
self
.channel
=
self
.connection.channel()
self
.channel.queue_declare(queue
=
self
.rmq_queue, durable
=
True
)
self
.channel.basic_qos(prefetch_count
=
1
)
# callback裏面就是具體處理消息的地方
self
.channel.basic_consume(
self
.callback, queue
=
self
.rmq_queue)
self
.channel.start_consuming()
|
callback回調
1
2
3
4
5
6
7
8
9
10
11
|
def
callback(
self
, ch, method, properties, body):
time.sleep(
1
)
#休息一秒才發送郵件
msg
=
phpserialize.loads(body)
#按PHP的格式作反序列化
validateutil
=
ValidateUtil()
if
validateutil.isEmail(msg[
'mail_to'
]):
mail
=
Mailer()
mail.setMailTo(msg[
'mail_to'
])
mail.setMailSubject(msg[
'mail_subject'
])
mail.setMailHtmlBody(msg[
'mail_body'
])
mail.sendEmail()
ch.basic_ack(delivery_tag
=
method.delivery_tag)
|
這裏只是骨幹代碼。應該創建一個python的project,在eclipse(加PyDev)裏面管理起來。
你還須要用到:配置文件以及配置文件解析庫,系統日誌,Mailer,phpserialize,ValidateUtil等等輔助類庫。
關於PyDev請參考:http://www.ibm.com/developerworks/cn/opensource/os-cn-ecl-pydev/
熟悉了郵件的應用,後面擴展到手機短信通知服務、站內通訊消息等等就很是方便了。固然,面對這樣的需求,咱們就須要在實現時考慮使用可擴展的消息隊列的模型了。
3,頁面訪問統計:經過寫緩存減輕DB的負載
對於商城來講,都有商品推薦的功能,好比人氣商品推薦。怎麼定義人氣呢?通常看商品頁面的訪問量。這裏就出現了頁面統計的需求了。統計的數據通常須要持久化到DB。
通常來講,某商品頁面被訪問一次,就應該插入或者更新一次DB記錄。這徹底沒有什麼技術難度。
然而當併發鏈接上到必定水平,DB的性能問題就出來了。由於DB,好比MYSQL,都有必定的鎖機制。當出現頻繁的insert或者update時,select的速度天然就受到很大制約了。並且打開一次頁面就觸發一次統計,也就要操做一次DB,那DB不哭纔怪!
有見及此,咱們就經過消息隊列實現了頁面訪問統計的寫緩存。
何謂寫緩存?對於某些不須要高實時的數據,好比咱們這裏的頁面訪問統計,能夠把更新操做先緩存起來,當累積到必定程度時,才進行一次實際的更新。這樣的好處是顯而易見的,DB操做少了不少,並且也避免的DB鎖機制引起的性能問題。
實現寫緩存的方式有不少,好比經過memcached來實現:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
$page
=
'goods_100052'
;
$memcache
= memcache_connect(
'192.168.1.100'
, 11711);
// 經過memcached提供的原子加操做,避免併發訪問帶來的統計出錯.
$count
=
$memcache
->increment(
$page
, 1);
if
(!
$count
) {
$memcache
->add(
$page
, 1, false, 0);
exit
;
}
if
(
$count
>= 1000) {
$sql
=
"update `goods_viewlog` set `count` = `count`+{$count} where `page` = $page"
;
$result
=
$mysql
->query(
$sql
);
if
(
$result
) {
// 更新成功後,把緩存統計清零
$memcache
->set(
$page
, 0, false, 0);
}
}
|
咱們這裏採用了消息隊列的實現方式。
消息生產者代碼和消息消費者代碼和上面介紹的郵件是幾乎同樣的。惟一不一樣在於回調函數那裏。這裏就再也不重複說明了。
4,總結
咱們在開發消息隊列應用特別要注意的是要先搞清楚消息隊列的主要概念和機制:好比交換,隊列,綁定,持久化等等。
搞清楚了之後,再根據具體的應用類型,定義好消息隊列模型。
具體能夠參考伍哥前面的文章。