#[喵咪KafKa(3)]PHP拓展See-KafKaphp
##前言git
(Simple 簡單 easy 容易 expand 的拓展)github
KafKa是由Apache基金會維護的一個分佈式訂閱分發系統,KafKa它最初的目的是爲了解決,統一,高效低延時,高通量(同時能傳輸的數據量)而且高可用一個消息平臺,它是分佈式消息隊列,分佈式日誌,數據傳輸通道的不二之選,可是惋惜的時PHP的拓展實在不是很好用(php-kafka拓展已經長期不維護存在很是多的問題,rdkafkaC底層編寫不利於使用),但願能夠更加方便的來使用KafKa這塊肥肉因而基於rdKafKa封裝的一個簡單溫馨KafKa拓展誕生了!vim
附上:數組
GitHub地址:https://github.com/wenzhenxi/See-KafKa分佈式
rdkafka PHP拓展地址:https://github.com/arnaud-lb/php-rdkafka日誌
服務底層依賴:https://github.com/edenhill/librdkafkacode
做者博客:http://w-blog.cnblog
##1. 安裝隊列
(See-KafKa支持0.9~0.10版本,對0.8版本以及之前的版本協議不支持)
首先須要安裝配置好zookeeper+KafKa:能夠參考做者博客下的KafKa模塊下的介紹安裝,做者博客介紹是對於0.8.2.2的安裝方式,可是和0.9和0.10的安裝並無區別,只須要去下載0.9和0.10的包便可
在使用以前須要按照順序先安裝librdkafka,在安裝php-rdkafka:
# 安裝librdkafka git clone https://github.com/edenhill/librdkafka.git cd librdkafka ./configure make make install
# 安裝php-rdkafka git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure make all -j 5 make install # 在php.ini加入以下信息 vim /usr/local/php/etc/php.ini extension=rdkafka.so
這個時候使用php -m 能夠看到拓展列表內存在 rdkafka這項證實拓展已經安裝成功
##2. 使用
See-KafKa完美支持PhalApi,只須要把去拓展庫中獲取kafka拓展便可,固然不是PhalApi的也可使用只須要include文件下的kafka.php便可使用
KafKa最基礎的兩個角色其中一個就是Producer(能夠參考做者博客介紹)
向KafKa中的一個Topic寫入一條消息,須要寫入多條能夠屢次使用setMassage
<?php /** * See-kafka Producer例子 * 循環寫入1w條數據15毫秒 */ // 配置KafKa集羣(默認端口9092)經過逗號分隔 $KafKa_Lite = new KafKa_Lite("127.0.0.1,localhost"); // 設置一個Topic $KafKa_Lite->setTopic("test"); // 單次寫入效率ok 寫入1w條15 毫秒 $Producer = $KafKa_Lite->newProducer(); // 參數分別是partition,消息內容,消息key(可選) // partition:能夠設置爲KAFKA_PARTITION_UA會自動分配,好比有6個分區寫入時會隨機選擇Partition $Producer->setMessage(0, "hello");
對於Consumer來講支持4種從offset的獲取方式分別爲:
此例子適合獲取一段數據就結束的場景,每一次getMassage都會創建鏈接而後關閉鏈接,當循環使用getMassage會形成相對嚴重的效率問題
<?php /** * See-kafka Consumer例子1 */ // 配置KafKa集羣(默認端口9092)經過逗號分隔 $KafKa_Lite = new KafKa_Lite("127.0.0.1,localhost"); // 設置一個Topic $KafKa_Lite->setTopic("test"); // 設置Consumer的Group分組(不使用自動offset的時候能夠不設置) $KafKa_Lite->setGroup("test"); // 獲取Consumer實例 $consumer = $KafKa_Lite->newConsumer(); // 獲取一組消息參數分別爲:Partition,maxsize最大返回條數,offset(可選)默認KAFKA_OFFSET_STORED $rs = $consumer->getMassage(0,100); //返回結果是一個數組,數組元素類型爲Kafka_Message
例子2適合腳本隊列任務
<?php /** * See-kafka Consumer例子1 * 889 毫秒 獲取1w條 */ // 配置KafKa集羣(默認端口9092)經過逗號分隔 $KafKa_Lite = new KafKa_Lite("127.0.0.1,localhost"); // 設置一個Topic $KafKa_Lite->setTopic("test"); // 設置Consumer的Group分組(不使用自動offset的時候能夠不設置) $KafKa_Lite->setGroup("test"); // 此項設置決定 在使用一個新的group時 是從 最小的一個開始 仍是從最大的一個開始 默認是最大的(或尾部) $KafKa_Lite->setTopicConf('auto.offset.reset', 'smallest'); // 此項配置決定在獲取數據後回自動做爲一家消費 成功 無需在 必定要 stop以後纔會 提交 可是也是有限制的 // 時間越小提交的時間越快,時間越大提交的間隔也就越大 當獲取一條數據以後就拋出異常時 更具獲取以後的時間來計算是否算做處理完成 // 時間小於這個時間時拋出異常 則不會更新offset 若是大於這個時間則會直接更新offset 建議設置爲 100~1000之間 $KafKa_Lite->setTopicConf('auto.commit.interval.ms', 1000); // 獲取Consumer實例 $consumer = $KafKa_Lite->newConsumer(); // 開啓Consumer獲取,參數分別爲partition(默認:0),offset(默認:KAFKA_OFFSET_STORED) $consumer->consumerStart(0); for ($i = 0; $i < 100; $i++) { // 當獲取不到數據時會阻塞默認10秒能夠經過$consumer->setTimeout()進行設置 // 阻塞後由數據可以獲取會當即返回,超過10秒回返回null,正常返回格式爲Kafka_Message $message = $consumer->consume(); } // 關閉Consumer(不關閉程序不會中止) $consumer->consumerStop();
See-kafka提供兩種配置文件的配置,分別傳入key和value,具體配置項已經做用參看以下地址:
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
配置文件說明:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
$KafKa_Lite->setTopicConf(); $KafKa_Lite->setKafkaConf();
在使用Consumer的Group(KAFKA_OFFSET_STORED)中須要注意如下配置項,不然你在使用一個新的group會從當前開始計算offset(根據場景):
// 此項設置決定 在使用一個新的group時 是從 最小的一個開始 仍是從最大的一個開始 默認是最大的(或尾部) $KafKa_Lite->setTopicConf('auto.offset.reset', 'smallest');
Consumer獲取以後是須要提交告訴KafKa獲取成功而且更新offset,可是若是中途報錯沒有提交offset則下次仍是會從頭獲取,此項配置設置一個自動提交時間,當失敗後以前處理的也會吧offset提交到KafKa:
// 此項配置決定在獲取數據後回自動做爲一家消費 成功 無需在 必定要 stop以後纔會 提交 可是也是有限制的 // 時間越小提交的時間越快,時間越大提交的間隔也就越大 當獲取一條數據以後就拋出異常時 更具獲取以後的時間來計算是否算做處理完成 // 時間小於這個時間時拋出異常 則不會更新offset 若是大於這個時間則會直接更新offset 建議設置爲 100~1000之間 $KafKa_Lite->setTopicConf('auto.commit.interval.ms', 1000);
在初始化KafKa_Lite會對集羣端口進行驗證,若是無任何一個可用的則會拋出一個No can use KafKa異常,也能夠主動觸發ping操做檢查集羣是否有有可用機器
當獲取Consumer異常了會拋出一個KafKa_Exception_Base異常,異常有一個code號可參考,Exception/err.php文件,推薦使用try-catch進行處理
See-KafKa的宗旨是爲了更加方便把KafKa和PHP相結合,而且可以方便的進行使用,若是你們感興趣可使用看看,有問題能夠進行反饋,此拓展做者會長期維護下去!
官方交流羣: 438882880