Kafka是由LinkedIn開發的一個分佈式的消息系統,使用Scala編寫,它以可水平擴展和高吞吐率而被普遍使用。目前愈來愈多的開源分佈式處理系統如Cloudera、Apache Storm、Spark都支持與Kafka集成。InfoQ一直在緊密關注Kafka的應用以及發展,「Kafka剖析」專欄將會從架構設計、實現、應用場景、性能等方面深度解析Kafka。git
Kafka是一個消息系統,本來開發自LinkedIn,用做LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。如今它已被多家不一樣類型的公司 做爲多種類型的數據管道和消息系統使用。github
活動流數據是幾乎全部站點在對其網站使用狀況作報表時都要用到的數據中最常規的部分。活動數據包括頁面訪問量(Page View)、被查看內容方面的信息以及搜索狀況等內容。這種數據一般的處理方式是先把各類活動以日誌的形式寫入某種文件,而後週期性地對這些文件進行統計分析。運營數據指的是服務器的性能數據(CPU、IO使用率、請求時間、服務日誌等等數據)。運營數據的統計方法種類繁多。web
近年來,活動和運營數據處理已經成爲了網站軟件產品特性中一個相當重要的組成部分,這就須要一套稍微更加複雜的基礎設施對其提供支持。瀏覽器
Kafka是一種分佈式的,基於發佈/訂閱的消息系統。主要設計目標以下:安全
在項目啓動之初來預測未來項目會碰到什麼需求,是極其困難的。消息系統在處理過程當中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。服務器
有些狀況下,處理數據的過程會失敗。除非數據被持久化,不然將形成丟失。消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。微信
由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的,只要另外增長處理過程便可。不須要改變代碼、不須要調節參數。擴展就像調大電力按鈕同樣簡單。
在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見;若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。
系統的一部分組件失效時,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。Kafka保證一個Partition內的消息的有序性。
在任何重要的系統中,都會有須要不一樣的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列經過一個緩衝層來幫助任務最高效率的執行———寫入隊列的處理會盡量的快速。該緩衝有助於控制和優化數據流通過系統的速度。
不少時候,用戶不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。
RabbitMQ
RabbitMQ是使用Erlang編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP, SMTP, STOMP,也正因如此,它很是重量級,更適合於企業級的開發。同時實現了Broker構架,這意味着消息在發送給客戶端時先在中心隊列排隊。對路由,負載均衡或者數據持久化都有很好的支持。
Redis
Redis是一個基於Key-Value對的NoSQL數據庫,開發維護很活躍。雖然它是一個Key-Value數據庫存儲系統,但它自己支持MQ功能,因此徹底能夠當作一個輕量級的隊列服務來使用。對於RabbitMQ和Redis的入隊和出隊操做,各執行100萬次,每10萬次記錄一次執行時間。測試數據分爲128Bytes、512Bytes、1K和10K四個不一樣大小的數據。實驗代表:入隊時,當數據比較小時Redis的性能要高於RabbitMQ,而若是數據大小超過了10K,Redis則慢的沒法忍受;出隊時,不管數據大小,Redis都表現出很是好的性能,而RabbitMQ的出隊性能則遠低於Redis。
ZeroMQ
ZeroMQ號稱最快的消息隊列系統,尤爲針對大吞吐量的需求場景。ZeroMQ可以實現RabbitMQ不擅長的高級/複雜的隊列,可是開發人員須要本身組合多種技術框架,技術上的複雜度是對這MQ可以應用成功的挑戰。ZeroMQ具備一個獨特的非中間件的模式,你不須要安裝和運行一個消息服務器或中間件,由於你的應用程序將扮演這個服務器角色。你只須要簡單的引用ZeroMQ程序庫,可使用NuGet安裝,而後你就能夠愉快的在應用程序之間發送消息了。可是ZeroMQ僅提供非持久性的隊列,也就是說若是宕機,數據將會丟失。其中,Twitter的Storm 0.9.0之前的版本中默認使用ZeroMQ做爲數據流的傳輸(Storm從0.9版本開始同時支持ZeroMQ和Netty做爲傳輸模塊)。
ActiveMQ
ActiveMQ是Apache下的一個子項目。 相似於ZeroMQ,它可以以代理人和點對點的技術實現隊列。同時相似於RabbitMQ,它少許代碼就能夠高效地實現高級應用場景。
Kafka/Jafka
Kafka是Apache下的一個子項目,是一個高性能跨語言分佈式發佈/訂閱消息隊列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具備如下特性:快速持久化,能夠在O(1)的系統開銷下進行消息持久化;高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;徹底的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現負載均衡;支持Hadoop數據並行加載,對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka經過Hadoop的並行加載機制統一了在線和離線的消息處理。Apache Kafka相對於ActiveMQ是一個很是輕量級的消息系統,除了性能很是好以外,仍是一個工做良好的分佈式系統。
Kafka集羣包含一個或多個服務器,這種服務器被稱爲broker
每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲Topic。(物理上不一樣Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存於何處)
Parition是物理上的概念,每一個Topic包含一個或多個Partition.
負責發佈消息到Kafka broker
消息消費者,向Kafka broker讀取消息的客戶端。
每一個Consumer屬於一個特定的Consumer Group(可爲每一個Consumer指定group name,若不指定group name則屬於默認的group)。
如上圖所示,一個典型的Kafka集羣中包含若干Producer(能夠是web前端產生的Page View,或者是服務器日誌,系統CPU、Memory等),若干broker(Kafka支持水平擴展,通常broker數量越多,集羣吞吐率越高),若干Consumer Group,以及一個Zookeeper集羣。Kafka經過Zookeeper管理集羣配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。Producer使用push模式將消息發佈到broker,Consumer使用pull模式從broker訂閱並消費消息。
Topic在邏輯上能夠被認爲是一個queue,每條消費都必須指定它的Topic,能夠簡單理解爲必須指明把這條消息放進哪一個queue裏。爲了使得Kafka的吞吐率能夠線性提升,物理上把Topic分紅一個或多個Partition,每一個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的全部消息和索引文件。若建立topic1和topic2兩個topic,且分別有13個和19個分區,則整個集羣上會相應會生成共32個文件夾(本文所用集羣共8個節點,此處topic1和topic2 replication-factor均爲1),以下圖所示。
每一個日誌文件都是一個log entrie序列,每一個log entrie包含一個4字節整型數值(值爲N+5),1個字節的"magic value",4個字節的CRC校驗碼,其後跟N個字節的消息體。每條消息都有一個當前Partition下惟一的64字節的offset,它指明瞭這條消息的起始位置。磁盤上存儲的消息格式以下:
message length : 4 bytes (value: 1+4+n) "magic" value : 1 byte crc : 4 bytes payload : n bytes
這個log entries並不是由一個文件構成,而是分紅多個segment,每一個segment以該segment第一條消息的offset命名並以「.kafka」爲後綴。另外會有一個索引文件,它標明瞭每一個segment下包含的log entry的offset範圍,以下圖所示。
由於每條消息都被append到該Partition中,屬於順序寫磁盤,所以效率很是高(經驗證,順序寫磁盤效率比隨機寫內存還要高,這是Kafka高吞吐率的一個很重要的保證)。
對於傳統的message queue而言,通常會刪除已經被消費的消息,而Kafka集羣會保留全部的消息,不管其被消費與否。固然,由於磁盤限制,不可能永久保留全部數據(實際上也不必),所以Kafka提供兩種策略刪除舊數據。一是基於時間,二是基於Partition文件大小。例如能夠經過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一週前的數據,也可在Partition文件超過1GB時刪除舊數據,配置以下所示。
# The minimum age of a log file to be eligible for deletion log.retention.hours=168 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according to the retention policies log.retention.check.interval.ms=300000 # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. log.cleaner.enable=false
這裏要注意,由於Kafka讀取特定消息的時間複雜度爲O(1),即與文件大小無關,因此這裏刪除過時文件與提升Kafka性能無關。選擇怎樣的刪除策略只與磁盤以及具體的需求有關。另外,Kafka會爲每個Consumer Group保留一些metadata信息——當前消費的消息的position,也即offset。這個offset由Consumer控制。正常狀況下Consumer會在消費完一條消息後遞增該offset。固然,Consumer也可將offset設成一個較小的值,從新消費一些消息。由於offet由Consumer控制,因此Kafka broker是無狀態的,它不須要標記哪些消息被哪些消費過,也不須要經過broker去保證同一個Consumer Group只有一個Consumer能消費某一條消息,所以也就不須要鎖機制,這也爲Kafka的高吞吐率提供了有力保障。
Producer發送消息到broker時,會根據Paritition機制選擇將其存儲到哪個Partition。若是Partition機制設置合理,全部消息能夠均勻分佈到不一樣的Partition裏,這樣就實現了負載均衡。若是一個Topic對應一個文件,那這個文件所在的機器I/O將會成爲這個Topic的性能瓶頸,而有了Partition後,不一樣的消息能夠並行寫入不一樣broker的不一樣Partition裏,極大的提升了吞吐率。能夠在$KAFKA_HOME/config/server.properties中經過配置項num.partitions來指定新建Topic的默認Partition數量,也可在建立Topic時經過參數指定,同時也能夠在Topic建立以後經過Kafka提供的工具修改。
在發送一條消息時,能夠指定這條消息的key,Producer根據這個key和Partition機制來判斷應該將這條消息發送到哪一個Parition。Paritition機制能夠經過指定Producer的paritition. class這一參數來指定,該class必須實現kafka.producer.Partitioner接口。本例中若是key能夠被解析爲整數則將對應的整數與Partition總數取餘,該消息會被髮送到該數對應的Partition。(每一個Parition都會有個序號,序號從0開始)
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class JasonPartitioner<T> implements Partitioner { public JasonPartitioner(VerifiableProperties verifiableProperties) {} @Override public int partition(Object key, int numPartitions) { try { int partitionNum = Integer.parseInt((String) key); return Math.abs(Integer.parseInt((String) key) % numPartitions); } catch (Exception e) { return Math.abs(key.hashCode() % numPartitions); } } }
若是將上例中的類做爲partition.class,並經過以下代碼發送20條消息(key分別爲0,1,2,3)至topic3(包含4個Partition)。
public void sendMessage() throws InterruptedException{ for(int i = 1; i <= 5; i++){ List messageList = new ArrayList<KeyedMessage<String, String>>(); for(int j = 0; j < 4; j++){ messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j)); } producer.send(messageList); } producer.close(); }
則key相同的消息會被髮送並存儲到同一個partition裏,並且key的序號正好和Partition序號相同。(Partition序號從0開始,本例中的key也從0開始)。下圖所示是經過Java程序調用Consumer後打印出的消息列表。
(本節全部描述都是基於Consumer hight level API而非low level API)。
使用Consumer high level API時,同一Topic的一條消息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一消息。
這是Kafka用來實現一個Topic消息的廣播(發給全部的Consumer)和單播(發給某一個Consumer)的手段。一個Topic能夠對應多個Consumer Group。若是須要實現廣播,只要每一個Consumer有一個獨立的Group就能夠了。要實現單播只要全部的Consumer在同一個Group裏。用Consumer Group還能夠將Consumer進行自由的分組而不須要屢次發送消息到不一樣的Topic。
實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可使用Storm這種實時流處理系統對消息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還能夠同時將數據實時備份到另外一個數據中心,只須要保證這三個操做所使用的Consumer屬於不一樣的Consumer Group便可。下圖是Kafka在Linkedin的一種簡化部署示意圖。
下面這個例子更清晰地展現了Kafka Consumer Group的特性。首先建立一個Topic (名爲topic1,包含3個Partition),而後建立一個屬於group1的Consumer實例,並建立三個屬於group2的Consumer實例,最後經過Producer向topic1發送key分別爲1,2,3的消息。結果發現屬於group1的Consumer收到了全部的這三條消息,同時group2中的3個Consumer分別收到了key爲1,2,3的消息。以下圖所示。
做爲一個消息系統,Kafka遵循了傳統的方式,選擇由Producer向broker push消息並由Consumer從broker pull消息。一些logging-centric system,好比Facebook的Scribe和Cloudera的Flume,採用push模式。事實上,push模式和pull模式各有優劣。
push模式很難適應消費速率不一樣的消費者,由於消息發送速率是由broker決定的。push模式的目標是儘量以最快速度傳遞消息,可是這樣很容易形成Consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則能夠根據Consumer的消費能力以適當的速率消費消息。
對於Kafka而言,pull模式更合適。pull模式可簡化broker的設計,Consumer可自主控制消費消息的速率,同時Consumer能夠本身控制消費方式——便可批量消費也可逐條消費,同時還能選擇不一樣的提交方式從而實現不一樣的傳輸語義。
有這麼幾種可能的delivery guarantee:
Exactly once 每條消息確定會被傳輸一次且僅傳輸一次,不少時候這是用戶所想要的。
當Producer向broker發送消息時,一旦這條消息被commit,因數replication的存在,它就不會丟。可是若是Producer發送數據給broker後,遇到網絡問題而形成通訊中斷,那Producer就沒法判斷該條消息是否已經commit。雖然Kafka沒法肯定網絡故障期間發生了什麼,可是Producer能夠生成一種相似於主鍵的東西,發生故障時冪等性的重試屢次,這樣就作到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),這一Feature還並未實現,有但願在Kafka將來的版本中實現。(因此目前默認狀況下一條消息從Producer到broker是確保了At least once,可經過設置Producer異步發送實現At most once)。
接下來討論的是消息從broker到Consumer的delivery guarantee語義。(僅針對Kafka consumer high level API)。Consumer在從broker讀取消息後,能夠選擇commit,該操做會在Zookeeper中保存該Consumer在該Partition中讀取的消息的offset。該Consumer下一次再讀該Partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit以後的開始位置相同。固然能夠將Consumer設置爲autocommit,即Consumer一旦讀到數據當即自動commit。若是隻討論這一讀取消息的過程,那Kafka是確保了Exactly once。但實際使用中應用程序並不是在Consumer讀取完數據就結束了,而是要進行進一步處理,而數據處理與commit的順序在很大程度上決定了消息從broker和consumer的delivery guarantee semantic。
讀完消息先commit再處理消息。這種模式下,若是Consumer在commit後還沒來得及處理消息就crash了,下次從新開始工做後就沒法讀到剛剛已提交而未處理的消息,這就對應於At most once
讀完消息先處理再commit。這種模式下,若是在處理完消息以後commit以前Consumer crash了,下次從新開始工做時還會處理剛剛未commit的消息,實際上該消息已經被處理過了。這就對應於At least once。在不少使用場景下,消息都有一個主鍵,因此消息的處理每每具備冪等性,即屢次處理這一條消息跟只處理一次是等效的,那就能夠認爲是Exactly once。(筆者認爲這種說法比較牽強,畢竟它不是Kafka自己提供的機制,主鍵自己也並不能徹底保證操做的冪等性。並且實際上咱們說delivery guarantee 語義是討論被處理多少次,而非處理結果怎樣,由於處理方式多種多樣,咱們不該該把處理過程的特性——如是否冪等性,當成Kafka自己的Feature)
若是必定要作到Exactly once,就須要協調offset和實際操做的輸出。精典的作法是引入兩階段提交。若是能讓offset和操做輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,由於許多輸出系統可能不支持兩階段提交。好比,Consumer拿到數據後可能把數據放到HDFS,若是把最新的offset和數據自己一塊兒寫到HDFS,那就能夠保證數據的輸出和offset的更新要麼都完成,要麼都不完成,間接實現Exactly once。(目前就high level API而言,offset是存於Zookeeper中的,沒法存於HDFS,而low level API的offset是由本身去維護的,能夠將之存於HDFS中)
總之,Kafka默認保證At least once,而且容許經過設置Producer異步提交來實現At most once。而Exactly once要求與外部存儲系統協做,幸運的是Kafka提供的offset能夠很是直接很是容易得使用這種方式。
郭俊(Jason),碩士,從事大數據平臺研發工做,精通Kafka等分佈式消息系統及Storm等流式處理系統。
聯繫方式:新浪微博: 郭俊_Jason 微信:habren 博客:http://www.jasongj.com
下一篇將深刻講解Kafka是如何作Replication和Leader Election的。在Kafka0.8之前的版本中,若是某個broker宕機,或者磁盤出現問題,則該broker上全部partition的數據都會丟失。而Kafka0.8之後加入了Replication機制,能夠將每一個Partition的數據備份多份,即便某些broker宕機也能保證系統的可用性和數據的完整性。
感謝郭蕾對本文的策劃和審校。
給InfoQ中文站投稿或者參與內容翻譯工做,請郵件至editors@cn.infoq.com。也歡迎你們經過新浪微博(@InfoQ)或者騰訊微博(@InfoQ)關注咱們,並與咱們的編輯和其餘讀者朋友交流。