開發Kafka通用數據平臺中間件前端
(含本次項目所有代碼及資源)web
目錄:數據庫
一. Kafka概述數組
二. Kafka啓動命令tomcat
三.咱們爲何使用Kafka安全
四. Kafka數據平臺中間件設計及代碼解析服務器
五.將來Kafka開發任務併發
一. Kafka概述負載均衡
Kafka是Linkedin於2010年12月份建立的開源消息系統,它主要用於處理活躍的流式數據。活躍的流式數據在web網站應用中很是常見,這些活動數據包括頁面訪問量(Page View)、被查看內容方面的信息以及搜索狀況等內容。 這些數據一般以日誌的形式記錄下來,而後每隔一段時間進行一次統計分析。框架
傳統的日誌分析系統是一種離線處理日誌信息的方式,但若要進行實時處理,一般會有較大延遲。而現有的消息隊列系統可以很好的處理實時或者近似實時的應用,但未處理的數據一般不會寫到磁盤上,這對於Hadoop之類,間隔時間較長的離線應用而言,在數據安全上會出現問題。Kafka正是爲了解決以上問題而設計的,它可以很好地進行離線和在線應用。
1.1 Kfka部署結構:
(圖1)
1.2 Kafka關鍵字:
•Broker : Kafka消息服務器,消息中心。一個Broker能夠容納多個Topic。
•Producer :消息生產者,就是向Kafka broker發消息的客戶端。
•Consumer :消息消費者,向Kafka broker取消息的客戶端。
•Zookeeper :管理Producer,Broker,Consumer的動態加入與離開。
•Topic :能夠爲各類消息劃分爲多個不一樣的主題,Topic就是主題名稱。Producer能夠針對某個主題進行生產,Consumer能夠針對某個主題進行訂閱。
•Consumer Group: Kafka採用廣播的方式進行消息分發,而Consumer集羣在消費某Topic時, Zookeeper會爲該集羣創建Offset消費偏移量,最新Consumer加入並消費該主題時,能夠從最新的Offset點開始消費。
•Partition:Kafka採用對數據文件切片(Partition)的方式能夠將一個Topic能夠分佈存儲到多個Broker上,一個Topic能夠分爲多個Partition。在多個Consumer併發訪問一個partition會有同步鎖控制。
(圖2)
1.3 消息收發流程:
•啓動Zookeeper及Broker.
•Producer鏈接Broker後,將消息發佈到Broker中指定Topic上(能夠指定Patition)。
•Broker集羣接收到Producer發過來的消息後,將其持久化到硬盤,並將消息該保留指定時長(可配置),而不關注消息是否被消費。
•Consumer鏈接到Broker後,啓動消息泵對Broker進行偵聽,當有消息到來時,會觸發消息泵循環獲取消息,獲取消息後Zookeeper將記錄該Consumer的消息Offset。
1.4 Kafka特性:
•高吞吐量
•負載均衡:經過zookeeper對Producer,Broker,Consumer的動態加入與離開進行管理。
•拉取系統:因爲kafka broker會持久化數據,broker沒有內存壓力,所以,consumer很是適合採起pull的方式消費數據
•動態擴展:當須要增長broker結點時,新增的broker會向zookeeper註冊,而producer及consumer會經過zookeeper感知這些變化,並及時做出調整。
•消息刪除策略:數據文件將會根據broker中的配置要求,保留必定的時間以後刪除。kafka經過這種簡單的手段,來釋放磁盤空間。
二. Kafka啓動命令:
啓動Zookeeper服務:
zookeeper-server-start.bat ../../config/zookeeper.properties
啓動Broker服務:
kafka-server-start.bat ../../config/server.properties
經過Zookeeper的協調在Broker中建立一個Topic(主題)
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
查詢當前Broker中某個指定主題的配置信息
kafka-run-class.bat kafka.admin.TopicCommand --describe --zookeeper localhost:2181 --topic testTopic
啓動一個數據生產者Producer
kafka-console-producer.bat --broker-list localhost:9092 --topic testTopic
啓動一個數據消費者Consumer
kafka-console-consumer.bat --zookeeper localhost:2181 --topic testTopic --from-beginning
Zookeeper配置文件,zookeeper.properties配置片斷
Broker配置文件,server.properties配置片斷
關於kafka收發消息相關的配置項
1.在Broker Server中屬性(這些屬性須要在Server啓動時加載):
//每次Broker Server可以接收的最大包大小,該參數要與consumer的fetch.message.max.bytes屬性進行匹配使用
* message.max.bytes 1000000(默認)
//Broker Server中針對Producer發送方的數據緩衝區。Broker Server會利用該緩衝區循環接收來至Producer的數據 包,緩衝區太小會致使對該數據包的分段數量增長,但不會影響數據包尺寸限制問題。
socket.send.buffer.bytes 100 * 1024(默認)
//Broker Server中針對Consumer接收方的數據緩衝區。意思同上。
socket.receive.buffer.bytes 100 * 1024(默認)
//Broker Server中針對每次請求最大的緩衝區尺寸,包括Prodcuer和Consumer雙方。該值必須大於 message.max.bytes屬性
* socket.request.max.bytes 100 * 1024 * 1024(默認)
2.在Consumer中的屬性(這些屬性須要在程序中配置Consumer時設置)
//Consumer用於接收來自Broker的數據緩衝區,意思同socket.send.buffer.bytes。
socket.receive.buffer.bytes 64 * 1024(默認)
//Consumer用於每次接收消息包的最大尺寸,該屬性須要與Broker中的message.max.bytes屬性配對使用
* fetch.message.max.bytes 1024 * 1024(默認)
3.在Producer中的屬性(這些屬性須要在程序中配置Consumer時設置)
//Producer用於發送數據緩衝區,意思同socket.send.buffer.bytes。
send.buffer.bytes 100 * 1024(默認)
三. 咱們爲何使用Kafka
當前項目中,咱們更但願從企業得到儘量多的有價值數據。最直接獲取大數據的方式是採用寫應用直連目標企業數據庫來得到數據。但這種方式在實際應用中,會因爲企業擔憂開放本地數據庫而致使的安全隱患很難實施。另外,這種方式會與企業本地數據庫結構耦合度太高,會出現多家企業多個應用的狀況,缺乏統一的數據交互平臺,致使後期維護困難。
3.1 Kafka在當前項目中問題:
當前案例,咱們想把某企業的本地數據實時同步到數據中心中,以後對這些數據進行二次分析處理。咱們的目標是創建統一的數據同步平臺,便於在往後的多企業多系統中能有統一的實施標準,因此選用了Kafka消息系統做爲支撐。
Producer(數據發送方)以獨立線程方式常駐某企業內部應用中,依靠必定的時間週期,從本地數據庫得到數據並推送至Broker中。而Consumer(快銷組數據接收方)也是獨立與WEB框架常駐內存,得到數據消息後保存至數據中心中。
但目前Kafka在實施中面臨如下問題:
1.Producer/Consumer均獨立於Web框架,Producer依靠消息片輪詢檢索/發送最新數據,執行效率低。
2.Producer會直接針對某企業內部數據庫表結構操做,致使代碼與企業業務耦合度太高,而沒法平滑移植到其餘企業系統中。
3.因爲Producer/Consumer是獨立於Web框架的,在外圍負責數據的採集及推送,與Web項目主程序無切合度。
4.目前針對Kafka的數據傳輸異常處理比較簡陋,當Broker或 Zookeeper等出現異常時,有可能會致使數據安全性問題。
3.2實現目標:
針對以上問題,咱們要實現以下目標:
1.把Producer/Consumer的數據推送/獲取的過程封裝成Class或者Jar包的形式,供Java Web框架調用,從而造成與企業內部Web應用或計算中心數據分析Web應用融合一體。
2.數據的推送/獲取只針對Java Object對象,不要針對數據庫表結構,不能與企業特有數據耦合度太高,造成通用的數據接口。Producer須要對Object進行序列化,Consumer須要對序列化後的二進制信息進行反序列化重建Object返回給調用者。
3.消息的推送/獲取的整個生命週期中,要把重要事件通知給外部調用者,好比:Broker,Zookeeper是否有異常,數據推送/獲取是否成功,若是失敗須要保留失敗記錄便於進行後期數據恢復等。(須要在中間件中創建回調機制通知調用者)
4.可對多企業多應用進行平滑移植,移植過程當中儘量保持總體Kafka數據平臺結構的零修改。
四. Kafka數據平臺中間件設計
4.1解決方案:
基於以上待完成目標,咱們有了如下解決方案。
3.2 實現要點:
KfkProducer(數據生產者)
•KfkProducer對象須要在Web框架中的Application_OnStart()中啓動,常駐進程,只與Broker鏈接一次,數據發送過程不能與Broker創建鏈接。(實踐中發現Kafka的 Broker若是有異常,重啓Broker後Producer不用再次鏈接便可發送)
•Web框架能夠隨時調用推送接口將對象(Object)推送至Broker.
•Object序列化後造成二進制信息,而且要保證在Consumer所處框架中能順利還原.
•可發送多種對象(Object,File ,Byte[]等),簡化外圍框架針對待發送數據所作操做,簡化調用接口。
•數據發送使用Kafka中最新的異步式數據發送API,不能因爲發送時間過長或Broker異常等問題阻塞調用者。
•須要對整個發送生命期進行跟蹤反饋異常信息,若發送失敗,須要將待發送數據使用回調機制通知到框架調用者。
•詳細測試Broker,或Zookeeper產生異常時,Producer可能會出現的狀況。
•在針對多企業多應用中,可依靠Topic進行區分數據主題,這樣可實現多應用部署時框架零修改問題。
KfkConsumer(數據消費者)
•KfkConsumer須要在計算中心內部Web框架中的Application_OnStart()中啓動,常駐進程,只與Broker鏈接一次,並啓動消息泵等待消息到來。(實踐中發現Kafka的 Broker若是有異常,重啓Broker後Consumer不用再次鏈接便可正常獲取消息)
•須要定義回調接口,該回調接口由外圍框架程序註冊處理程序,當數據消息到來時,Consumer須要把數據發送至該接口,以後由調用者處理。
•調用者須要註冊所接受的對象類型,由於Broker中同一Topic下會有各類數據對象(UserInfo,CompanyInfo,ProductInfo...)存在,因此必須提供接收對象的註冊接口,以方便調用者有針對性的獲取。
•數據到來時,要針對發送方序列化的二進制信息進行反序列化操做,並能準確還原成原始對象。
•須要對整個接收生命期進行跟蹤反饋異常信息,若消息泵中止或異常,須要通知到框架調用者。
實現以上要點後,須要將KfkProducer及KfkConsumer對象打包成Jar包的形式,更靈活的部署到企業本地Web框架及計算中心內部Web框架中。
3.3 代碼實現及分析:
3.3.1 KfkProducer 對象:數據生產者對象,封裝了關於數據發送的相關功能。
接口函數/子對象 |
說明 |
KfkProducer () |
構造函數中須要調用者提供Broker集羣的Ip,Port等信息。 Kafka支持Broker集羣列表。(127.0.0.1:9092,127.0.0.1:9093)
|
Connect() |
該函數須要完成對Broker集羣的鏈接。
|
Send() |
該函數入口爲Object對象,須要對該對象進行Serialize操做,根據待發送數據構造KfkMsg對象,並取得由KfkMsg序列化後的Byte[]數組,以後調用Kafka的異步發送方式及掛接回調處理函數。
要實現多個Send()接口,須要提供對Object,File ,Byte[]等多種數據類型的支持,方便調用者操做。
|
Close() |
該函數完成對Broker鏈接進行關閉。
|
SendCallback發送回調對象 onCompletion()發送回調接口 |
在kafka異步發送函數send()中註冊,在收到Broker返回的發送是否成功信息後,會觸發該函數,並調用ProducerEvent對象的onSendMsg()函數,向調用者發送成功與否結果。
成功則返回調用者RecordMetadata信息(BrokerServer中的數據offset,Partition位置ID,Topic主題)
失敗者返回調用者原始數據信息,便於往後恢復。
|
ProducerEvent接口對象 onSendMsg() |
爲調用者提供的回調接口,調用者在註冊後,便可重寫onSendMsg()函數,以便接到通知後,處理當前事件(發送數據成功與否)狀態。
|
3.3.2 KfkConsumer對象:數據消費者對象,封裝了關於數據接收的相關功能。
接口函數/子對象 |
說明 |
KfkConsumer() |
構造函數中須要調用者提供Zookeeper集羣的Ip,Port等信息。(即將推出的Kafka0.9.X版本將支持直連Broker集羣的機制)
該對象繼承至Thread對象,爲線程對象。 |
connect() |
配置Zookeeper鏈接相關屬性,並鏈接Zookeeper服務器。
|
run() |
線程主函數,該函數將啓動Kafka消息泵等待Broker的消息到來。
消息到來後,將調用KfkMsg對象對二進制序列化信息進行還原對象操做(KfkMsg將對序列化數據進行反序列化操做,並從新還原原始對象操做)。
對象還原後,將調用調用者註冊的回調接口,將對象傳出。
|
close() |
關閉Consumer與Broker,Zookeeper的Socket鏈接。
|
ConsumerEvent接收回調對象 onRecvMsg()接收回調函數 |
爲調用者提供的回調接口,調用者在註冊後,便可重寫onRecvMsg()函數,以便接到通知後,收取對象或處理當前事件。
|
3.3.3 KfkMsg對象:數據消息對象,封裝了數據對象的序列化/反序列化操做,構造多種類型的發送對象,封裝發送協議等操做。
接口函數/子對象 |
說明 |
MsgBase對象 |
消息包基類,能夠在Consumer接到數據消息後,造成多種對象的反序列化多態性。
|
MsgObject對象 serializeMsg()序列化函數 deserializeMsg()反序列化函數 |
針對Object數據的序列化和反序列化操做,及消息體封裝,通信協議構造等操做。
|
MsgByteArr對象 serializeMsg()序列化函數 deserializeMsg()反序列化函數 |
針對Byte[]數據的序列化和反序列化操做,及消息體封裝,通信協議構造等操做。
|
MsgFile對象 serializeMsg()序列化函數 deserializeMsg()反序列化函數 |
針對二進制文件的序列化和反序列化操做,及消息體封裝,通信協議構造等操做。
|
getMsgType()函數
|
負責對Consumer接收的序列化信息進行首次協議解析,判斷對象類型(Object,File,byte[])以後構造對應的MsgXXX對象,以便使調用者進行反序列化多態功能。
|
3.3.4 SerializeUtils對象:序列化操做工具類,完成在Jar包內部對外部對象的序列化/反序列化基礎從操做。
接口函數/子對象 |
說明 |
deserialize()函數 |
將序列化後的二進制數組byte[]還原成原始Object.
因爲若是使用默認的ObjectInputStream對象進行反序列化操做,在Jar內將沒法找到外部調用者定義的對象名,也即沒法反序列化成功,報沒法找到外部對象的異常。
因此必須重寫resolveClass()函數,加載當前線程範圍內的Class上下文。
|
Serialize()函數 |
將Object序列化成二進制數組,byte[]。
|
3.3.5 調用者Web框架部署:
KfkProducer部署:
部署要點 |
說明 |
1.註冊發送消息回調函數 |
在WEB框架中的Application_OnStart()事件中向Jar註冊發送消息回調函數。並重寫onSendMsg()回調接口,用於接受發送成功/失敗消息,發送失敗後,能夠在Web框架中針對返回的原始數據信息作備份/恢復處理。
|
2.創建與Broker之間的鏈接 |
在WEB框架中的 Application_OnStart()事件中調用KfkProducer connect()函數,鏈接遠程Broker。
|
3.將KfkProducer傳入框架 |
通過前兩步操做後,咱們已經順利創建KfkProducer對象,如今咱們須要把該對象傳入Web框架中後續頁面處理類中,以方便調用其send()函數進行數據發送。
在Play中咱們使用了cache對象機制,能夠在Play Web App全生命期內得到KfkProducer對象實例。
|
4.關閉與Broker之間的鏈接 |
在WEB框架中的Application_OnStop()事件中調用KfkProducer的close()函數,關閉遠程Broker鏈接。
|
KfkConsumer部署:
部署要點 |
說明 |
1.註冊發送消息回調函數 |
在WEB框架中的Application_OnStart()事件中向Jar註冊消息接收回調函數。並重寫onRecvMsg()回調接口,用於接受來自Broker的數據信息。
在onRecvMsg()函數中,還需針對傳入的Object對象進行instanceof比對操做,區分特定對象。
|
2.註冊須要接收的Object類型 |
向Jar包中註冊須要接收的對象類型,好比本應用須要接收(UserInfo,CompanyInfo,ProdcutInfo等對象)。 註冊後,來自Broker的廣播消息將被Jar包過濾,只返回調用者所需的對象數據。
|
3.創建與Zookeeper(Broker)之間的鏈接 |
在WEB框架中的 Application_OnStart()事件中調用KfkConsumer connect()函數,鏈接遠程Zookeeper/Broker。
|
4.啓動消息泵線程 |
通過前兩步操做後,咱們已經順利創建與Zookeeper/Broker創建鏈接。
咱們須要啓動消息泵來收聽消息的到來,這裏須要調用KfkConsumer對象的start()函數啓動消息泵線程常駐內存。
|
4.關閉與Zookeeper之間的鏈接 |
在WEB框架中的Application_OnStop()事件中調用KfkConsumer的close()函數,關閉遠程Zookeeper/Broker鏈接。
|
五. 將來Kafka中間件
目前該中間件只完成了初級階段功能,不少功能都不完善不深刻,隨着應用業務的拓展及Kafka將來版本功能支持,。以Kafka消息中間件爲中心的大數據處理平臺還有不少任務去實現。
通常在互聯網中所流動的數據由如下幾種類型:
•須要實時響應的交易數據,用戶提交一個表單,輸入一段內容,這種數據最後是存放在關係數據庫(Oracle, MySQL)中的,有些須要事務支持。
•活動流數據,準實時的,例如頁面訪問量、用戶行爲、搜索狀況等。咱們能夠針對這些數據廣播、排序、個性化推薦、運營監控等。這種數據通常是前端服務器先寫文件,而後經過批量的方式把文件倒到Hadoop(離線數據分析平臺)這種大數據分析器裏面,進行慢慢的分析。
•各個層面程序產生的日誌,例如http的日誌、tomcat的日誌、其餘各類程序產生的日誌。這種數據一個是用來監控報警,還有就是用來作分析。
謝謝觀賞!
注:基於全球開源共享理念,本人會分享更多原創及譯文,讓更多的IT人從中受益,與你們一塊兒進步!
基因Cloud 原創,轉發請註明出處
1738387@qq.com (工做繁忙,有事發郵件,QQ不加,非要事勿擾,多謝!)
2015 / 06 / 14