[喵咪KafKa(3)]PHP拓展See-KafKa

#[喵咪KafKa(3)]PHP拓展See-KafKaphp

##前言git

(Simple 簡單 easy 容易 expand 的拓展)github

KafKa是由Apache基金會維護的一個分佈式訂閱分發系統,KafKa它最初的目的是爲了解決,統一,高效低延時,高通量(同時能傳輸的數據量)而且高可用一個消息平臺,它是分佈式消息隊列,分佈式日誌,數據傳輸通道的不二之選,可是惋惜的時PHP的拓展實在不是很好用(php-kafka拓展已經長期不維護存在很是多的問題,rdkafkaC底層編寫不利於使用),但願能夠更加方便的來使用KafKa這塊肥肉因而基於rdKafKa封裝的一個簡單溫馨KafKa拓展誕生了!vim

附上:數組

GitHub地址:https://github.com/wenzhenxi/See-KafKa分佈式

rdkafka PHP拓展地址:https://github.com/arnaud-lb/php-rdkafka日誌

服務底層依賴:https://github.com/edenhill/librdkafkacode

做者博客:http://w-blog.cnblog

##1. 安裝隊列

(See-KafKa支持0.9~0.10版本,對0.8版本以及之前的版本協議不支持)

首先須要安裝配置好zookeeper+KafKa:能夠參考做者博客下的KafKa模塊下的介紹安裝,做者博客介紹是對於0.8.2.2的安裝方式,可是和0.9和0.10的安裝並無區別,只須要去下載0.9和0.10的包便可

在使用以前須要按照順序先安裝librdkafka,在安裝php-rdkafka:

# 安裝librdkafka
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
make install
# 安裝php-rdkafka
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
make install
# 在php.ini加入以下信息
vim /usr/local/php/etc/php.ini
extension=rdkafka.so

這個時候使用php -m 能夠看到拓展列表內存在 rdkafka這項證實拓展已經安裝成功

##2. 使用

See-KafKa完美支持PhalApi,只須要把去拓展庫中獲取kafka拓展便可,固然不是PhalApi的也可使用只須要include文件下的kafka.php便可使用

2.1 Producer

KafKa最基礎的兩個角色其中一個就是Producer(能夠參考做者博客介紹)

向KafKa中的一個Topic寫入一條消息,須要寫入多條能夠屢次使用setMassage

<?php
/**
 * See-kafka Producer例子
 * 循環寫入1w條數據15毫秒
 */

// 配置KafKa集羣(默認端口9092)經過逗號分隔
$KafKa_Lite = new KafKa_Lite("127.0.0.1,localhost");
// 設置一個Topic
$KafKa_Lite->setTopic("test");
// 單次寫入效率ok  寫入1w條15 毫秒
$Producer = $KafKa_Lite->newProducer();
// 參數分別是partition,消息內容,消息key(可選)
// partition:能夠設置爲KAFKA_PARTITION_UA會自動分配,好比有6個分區寫入時會隨機選擇Partition
$Producer->setMessage(0, "hello");

2.2 Consumer

對於Consumer來講支持4種從offset的獲取方式分別爲:

  • KAFKA_OFFSET_STORED #經過group來獲取消息的offset(必須設置group)
  • KAFKA_OFFSET_END #獲取尾部的offset
  • KAFKA_OFFSET_BEGINNING #獲取頭部的offset
  • 手動指定offset開始值

2.2.1 例子1

此例子適合獲取一段數據就結束的場景,每一次getMassage都會創建鏈接而後關閉鏈接,當循環使用getMassage會形成相對嚴重的效率問題

<?php
/**
 * See-kafka Consumer例子1
 */

// 配置KafKa集羣(默認端口9092)經過逗號分隔
$KafKa_Lite = new KafKa_Lite("127.0.0.1,localhost");
// 設置一個Topic
$KafKa_Lite->setTopic("test");
// 設置Consumer的Group分組(不使用自動offset的時候能夠不設置)
$KafKa_Lite->setGroup("test");
// 獲取Consumer實例
$consumer = $KafKa_Lite->newConsumer();

// 獲取一組消息參數分別爲:Partition,maxsize最大返回條數,offset(可選)默認KAFKA_OFFSET_STORED
$rs = $consumer->getMassage(0,100);
//返回結果是一個數組,數組元素類型爲Kafka_Message

2.2.1 例子2

例子2適合腳本隊列任務

<?php
/**
 * See-kafka Consumer例子1
 * 889 毫秒 獲取1w條
 */

// 配置KafKa集羣(默認端口9092)經過逗號分隔
$KafKa_Lite = new KafKa_Lite("127.0.0.1,localhost");
// 設置一個Topic
$KafKa_Lite->setTopic("test");
// 設置Consumer的Group分組(不使用自動offset的時候能夠不設置)
$KafKa_Lite->setGroup("test");

// 此項設置決定 在使用一個新的group時  是從 最小的一個開始 仍是從最大的一個開始  默認是最大的(或尾部)
$KafKa_Lite->setTopicConf('auto.offset.reset', 'smallest');
// 此項配置決定在獲取數據後回自動做爲一家消費 成功 無需在 必定要 stop以後纔會 提交 可是也是有限制的
// 時間越小提交的時間越快,時間越大提交的間隔也就越大 當獲取一條數據以後就拋出異常時 更具獲取以後的時間來計算是否算做處理完成
// 時間小於這個時間時拋出異常 則不會更新offset 若是大於這個時間則會直接更新offset 建議設置爲 100~1000之間
$KafKa_Lite->setTopicConf('auto.commit.interval.ms', 1000);

// 獲取Consumer實例
$consumer = $KafKa_Lite->newConsumer();

// 開啓Consumer獲取,參數分別爲partition(默認:0),offset(默認:KAFKA_OFFSET_STORED)
$consumer->consumerStart(0);

for ($i = 0; $i < 100; $i++) {
    // 當獲取不到數據時會阻塞默認10秒能夠經過$consumer->setTimeout()進行設置
    // 阻塞後由數據可以獲取會當即返回,超過10秒回返回null,正常返回格式爲Kafka_Message
    $message = $consumer->consume();
}

// 關閉Consumer(不關閉程序不會中止)
$consumer->consumerStop();

3. 配置文件

See-kafka提供兩種配置文件的配置,分別傳入key和value,具體配置項已經做用參看以下地址:

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

配置文件說明:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

$KafKa_Lite->setTopicConf();
$KafKa_Lite->setKafkaConf();

在使用Consumer的Group(KAFKA_OFFSET_STORED)中須要注意如下配置項,不然你在使用一個新的group會從當前開始計算offset(根據場景):

// 此項設置決定 在使用一個新的group時  是從 最小的一個開始 仍是從最大的一個開始  默認是最大的(或尾部)
$KafKa_Lite->setTopicConf('auto.offset.reset', 'smallest');

Consumer獲取以後是須要提交告訴KafKa獲取成功而且更新offset,可是若是中途報錯沒有提交offset則下次仍是會從頭獲取,此項配置設置一個自動提交時間,當失敗後以前處理的也會吧offset提交到KafKa:

// 此項配置決定在獲取數據後回自動做爲一家消費 成功 無需在 必定要 stop以後纔會 提交 可是也是有限制的
// 時間越小提交的時間越快,時間越大提交的間隔也就越大 當獲取一條數據以後就拋出異常時 更具獲取以後的時間來計算是否算做處理完成
// 時間小於這個時間時拋出異常 則不會更新offset 若是大於這個時間則會直接更新offset 建議設置爲 100~1000之間
$KafKa_Lite->setTopicConf('auto.commit.interval.ms', 1000);

4. 異常

在初始化KafKa_Lite會對集羣端口進行驗證,若是無任何一個可用的則會拋出一個No can use KafKa異常,也能夠主動觸發ping操做檢查集羣是否有有可用機器

當獲取Consumer異常了會拋出一個KafKa_Exception_Base異常,異常有一個code號可參考,Exception/err.php文件,推薦使用try-catch進行處理

5. 總結

See-KafKa的宗旨是爲了更加方便把KafKa和PHP相結合,而且可以方便的進行使用,若是你們感興趣可使用看看,有問題能夠進行反饋,此拓展做者會長期維護下去!

官方交流羣: 438882880

相關文章
相關標籤/搜索