KafkaBridge 封裝了對Kafka集羣的讀寫操做,接口極少,簡單易用,穩定可靠,支持c++/c、php、python、golang等多種語言,並特別針對php-fpm場景中做了長鏈接複用的優化,已在360公司內部普遍使用。php
前言python
衆所周知,Kafka是近幾年來大數據領域最流行的分佈式流處理平臺。它最初由LinkedIn公司開發, 已於2010年貢獻給了Apache基金會併成爲頂級開源項目, 本質上是一種低延時的、可擴展的、設計內在就是分佈式的,分區的和可複製的消息系統;c++
Kafka在360公司內部也有至關普遍的使用,業務覆蓋搜索,商業廣告,IOT, 視頻,安全, 遊戲等幾乎全部核心業務,天天的寫入流量近1.2PB,讀取流量近2.4PB;git
Kafka官方提供了Java版本的客戶端SDK, 但因360公司內部產品線衆多,語言幾乎囊括目前全部主流語言,因此咱們研發了Kafka客戶端SDK —— KafkaBridge;github
簡介golang
KafkaBridge 底層基於 librdkafka, 與之相比封裝了大量的使用細節,簡單易用,使用者無需瞭解過多的Kafka系統細節,只需調用極少許的接口,就可完成消息的生產和消費;安全
針對使用者比較關心的消息生產的可靠性,做了近一步的提高;異步
開源地址:分佈式
https://github.com/Qihoo360/kafkabridgeide
特色
支持多種語言:c++/c、php、python、golang, 且各語言接口徹底統一;
接口少,簡單易用;
針對高級用戶,支持經過配置文件調整全部的librdkafka的配置;
在非按key寫入數據的狀況下,盡最大努力將消息成功寫入;
支持同步和異步兩種數據寫入方式;
在消費時,除默認自動提交offset外,容許用戶經過配置手動提交offset;
在php-fpm場景中,複用長鏈接生產消息,避免頻繁建立斷開鏈接的開銷;
編譯
編譯依賴於librdkafka, liblog4cplus, boost
(僅依賴於若干個頭文件);
對於C++/C使用 CMake 編譯;
對於Python, Php, Golang使用 swig 編譯;
每種語言都提供了自動編譯腳本,方便使用者自行編譯。
使用
數據寫入
在非按key寫入的狀況下,sdk盡最大努力提交每一條消息,只要Kafka集羣存有一臺broker正常,就會重試發送;
每次寫入數據只須要調用produce接口,在異步發送的場景下,經過返回值能夠判斷髮送隊列是否填滿,發送隊列可經過配置文件調整;
在同步發送的場景中,produce接口返回當前消息是否寫入成功,可是寫入性能會有所降低,CPU使用率會有所上升,推薦仍是使用異步寫入方式;
咱們來簡單看一下寫入kafka所涉及到的全部接口:
//初始化接口
bool QbusProducer::init(const string& broker_list, const string& log_path, const string& config_path, const string& topic)
//寫入數據接口
bool QbusProducer::produce(const char* data, size_t data_len, const std::string& key)
//再也不須要寫入數據時,須要調用的清理接口,必須調用
void QbusProducer::uninit()
具體使用能夠參考源碼中的實例;
數據消費
消費只需調用subscribeOne訂閱topic(也支持同時訂閱多個topic),而後執行start就開始消費,當前進程非阻塞,每條消息經過callback接口回調給使用者;
sdk還支持用戶手動提交offset方式,用戶能夠經過callback中返回的消息體,在代碼其餘邏輯中進行提交。
下面是消費接口,以c++爲例:
//初始化接口
bool QbusConsumer::init(const string& string broker_list, const string& string log_path, const string& string config_path, QbusConsumerCallback& callback)
//訂閱須要消費的消息
bool QbusConsumer::subscribeOne(const string& string group, const string& string topic)
//開始消費
bool QbusConsumer::start()
//中止消費
void QbusConsumer::stop()
性能測試
kafka 集羣三臺broker, 除測試用topic外,無其餘topic的讀寫操做;
測試用topic有3個partition;
Producer單實例,單線程;
Topic無複本下測試:
單條消息 100 byte, 發送 1百萬 條消息,耗時 1.7 秒;
單條消息 1024 byte, 發送 1百萬 條消息,耗時 13 秒;
Topic有2複本下測試:
單條消息 100 byte, 發送 1百萬 條消息,耗時 1.7 秒;
單條消息 1024 byte, 發送 1百萬 條消息,耗時 14 秒;
寫在最後
KafkaBridge 一直在360公司內部使用,如今已經開源,有疏漏之處,歡迎廣大使用者批評指正,也歡迎更多的使用者加入到 KafkaBridge 的持續改進中。
開源地址:KafkaBridge:
https://github.com/Qihoo360/kafkabridge