什麼是kafka?php
Kakfa起初是由LinkedIn公司開發的一個分佈式的消息系統,後成爲Apache的一部分,它使用Scala編寫,以可水平擴展和高吞吐率而被普遍使用。目前愈來愈多的開源分佈式處理系統如Cloudera、Apache Storm、Spark等都支持與Kafka集成。html
活動流數據是幾乎全部站點在對其網站使用狀況作報表時都要用到的數據中最常規的部分。活動數據包括頁面訪問量(Page View)、被查看內容方面的信息以及搜索狀況等內容。這種數據一般的處理方式是先把各類活動以日誌的形式寫入某種文件,而後週期性地對這些文件進行統計分析。運營數據指的是服務器的性能數據(CPU、IO使用率、請求時間、服務日誌等等數據)。運營數據的統計方法種類繁多。
近年來,活動和運營數據處理已經成爲了網站軟件產品特性中一個相當重要的組成部分,這就須要一套稍微更加複雜的基礎設施對其提供支持。如今最新版本已經到2.x.x前端
kafka的架構java
一個典型的Kafka體系架構包括若干Producer(能夠是服務器日誌,業務數據,頁面前端產生的page view等等),若干broker(Kafka支持水平擴展,通常broker數量越多,集羣吞吐率越高),若干Consumer (Group),以及一個Zookeeper集羣。Kafka經過Zookeeper管理集羣配置,選舉leader,以及在consumer group發生變化時進行rebalance。Producer使用push(推)模式將消息發佈到broker,Consumer使用pull(拉)模式從broker訂閱並消費消息。web
kafka的優勢數據庫
Kafka與傳統MQ區別:api
爲什麼要用消息系統數組
在項目啓動之初來預測未來項目會碰到什麼需求,是極其困難的。消息系統在處理過程當中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。緩存
有些狀況下,處理數據的過程會失敗。除非數據被持久化,不然將形成丟失。消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。許多消息隊列所採用的」插入-獲取-刪除」範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。安全
由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的,只要另外增長處理過程便可。不須要改變代碼、不須要調節參數。擴展就像調大電力按鈕同樣簡單。
在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見;若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。
系統的一部分組件失效時,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。Kafka保證一個Partition內的消息的有序性。
在任何重要的系統中,都會有須要不一樣的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列經過一個緩衝層來幫助任務最高效率的執行———寫入隊列的處理會盡量的快速。該緩衝有助於控制和優化數據流通過系統的速度。
不少時候,用戶不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。
RabbitMQ是使用Erlang編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP, SMTP, STOMP,也正因如此,它很是重量級,更適合於企業級的開發。同時實現了Broker構架,這意味着消息在發送給客戶端時先在中心隊列排隊。對路由,負載均衡或者數據持久化都有很好的支持。
Redis是一個基於Key-Value對的NoSQL數據庫,開發維護很活躍。雖然它是一個Key-Value數據庫存儲系統,但它自己支持MQ功能,因此徹底能夠當作一個輕量級的隊列服務來使用。對於RabbitMQ和Redis的入隊和出隊操做,各執行100萬次,每10萬次記錄一次執行時間。測試數據分爲128Bytes、512Bytes、1K和10K四個不一樣大小的數據。實驗代表:入隊時,當數據比較小時Redis的性能要高於RabbitMQ,而若是數據大小超過了10K,Redis則慢的沒法忍受;出隊時,不管數據大小,Redis都表現出很是好的性能,而RabbitMQ的出隊性能則遠低於Redis。
ZeroMQ號稱最快的消息隊列系統,尤爲針對大吞吐量的需求場景。ZMQ可以實現RabbitMQ不擅長的高級/複雜的隊列,可是開發人員須要本身組合多種技術框架,技術上的複雜度是對這MQ可以應用成功的挑戰。ZeroMQ具備一個獨特的非中間件的模式,你不須要安裝和運行一個消息服務器或中間件,由於你的應用程序將扮演這個服務器角色。你只須要簡單的引用ZeroMQ程序庫,可使用NuGet安裝,而後你就能夠愉快的在應用程序之間發送消息了。可是ZeroMQ僅提供非持久性的隊列,也就是說若是宕機,數據將會丟失。其中,Twitter的Storm 0.9.0之前的版本中默認使用ZeroMQ做爲數據流的傳輸(Storm從0.9版本開始同時支持ZeroMQ和Netty做爲傳輸模塊)。
ActiveMQ是Apache下的一個子項目。 相似於ZeroMQ,它可以以代理人和點對點的技術實現隊列。同時相似於RabbitMQ,它少許代碼就能夠高效地實現高級應用場景。
Kafka是Apache下的一個子項目,是一個高性能跨語言分佈式發佈/訂閱消息隊列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具備如下特性:快速持久化,能夠在O(1)的系統開銷下進行消息持久化;高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;徹底的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現負載均衡;支持Hadoop數據並行加載,對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka經過Hadoop的並行加載機制統一了在線和離線的消息處理。Apache Kafka相對於ActiveMQ是一個很是輕量級的消息系統,除了性能很是好以外,仍是一個工做良好的分佈式系統。
關鍵概念
Kafka集羣包含一個或多個服務器,這種服務器被稱爲broker。每一個broker一般就是一臺物理機器,在上面運行kafka server的一個實例,全部這些broker實例組成kafka的服務器集羣。每一個broker會給本身分配一個惟一的broker id。broker集羣是經過zookeeper集羣來管理的。每一個broker都會註冊到zookeeper上,有某個機器掛了,有新的機器加入,zookeeper都會收到通知。在0.9.0中,producer/consumer已經不會依賴Zookeeper來獲取集羣的配置信息,而是經過任意一個broker來獲取整個集羣的配置信息。
每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲Topic,Topic這是一個邏輯上的概念,而Partition是物理上的概念。(物理上,不一樣Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存於何處)。因此咱們在談論topic每每和partition一塊兒討論,二者有緊密的聯繫:
每一個topic將被分紅多個partition(區),每一個partition在存儲層面是append log文件。任何發佈到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱爲offset(偏移量),offset爲一個long型數字,它是惟一標記一條消息。它惟一的標記一條消息。kafka並無提供其餘額外的索引機制來存儲offset,由於在kafka中幾乎不容許對消息進行「隨機讀寫」。
Parition是物理上的概念,每一個Topic包含一個或多個Partition,不一樣Partition可位於不一樣節點(kafka server實例)。同時每一個Partition在物理上對應一個本地文件夾,每一個Partition包含一個或多個Segment,每一個Segment包含一個數據文件和一個與之對應的索引文件。在邏輯上,能夠把一個Partition看成一個很是長的數組,可經過這個「數組」的索引(offset)去訪問其數據。
每一條消息被髮送到broker中,會根據partition規則(有默認規則,固然也能夠自定義規則)選擇被存儲到哪個partition。若是partition規則設置的合理,全部消息能夠均勻分佈到不一樣的partition裏,這樣就實現了水平擴展。(若是一個topic對應一個文件,那這個文件所在的機器I/O將會成爲這個topic的性能瓶頸,而partition解決了這個問題)。在建立topic時能夠在$KAFKA_HOME/config/server.properties中指定這個partition的數量,固然能夠在topic建立以後去修改partition的數量。
在發送一條消息時,能夠指定這個消息的key,producer根據這個key和partition機制來判斷這個消息發送到哪一個partition。partition機制能夠經過指定producer的partition.class這一參數來指定,該class必須實現kafka.producer.Partitioner接口。
一方面,因爲不一樣Partition可位於不一樣機器,所以能夠充分利用集羣優點,實現機器間的並行處理。另外一方面,因爲Partition在物理上對應一個文件夾,即便多個Partition位於同一個節點,也可經過配置讓同一節點上的不一樣Partition置於不一樣的disk drive上,從而實現磁盤間的並行處理,充分發揮多磁盤的優點。
利用多磁盤的具體方法是,將不一樣磁盤mount到不一樣目錄,而後在server.properties中,將log.dirs設置爲多目錄(用逗號分隔)。Kafka會自動將全部Partition儘量均勻分配到不一樣目錄也即不一樣目錄(也即不一樣disk)上。注:雖然物理上最小單位是Segment,但Kafka並不提供同一Partition內不一樣Segment間的並行處理。由於對於寫而言,每次只會寫Partition內的一個Segment,而對於讀而言,也只會順序讀取同一Partition內的不一樣Segment。
負責發佈消息到Kafka broker,並負責決定發佈到哪一個分區。一般簡單的由負載均衡機制隨機選擇分區,但也能夠經過特定的分區函數選擇分區。使用的更多的是第二種。
消息消費者,向Kafka broker讀取消息的客戶端。在消費消息時只須要指明topic名稱便可,不須要關心具體的消息存在了哪一個borker上。注意,一個topic只能被一個group組中的一個cusmer來消費,可是能夠被多個group組來同時消費;consumer只能修改被commit狀態的消息;
發佈消息一般有兩種模式:隊列模式(queuing)和發佈-訂閱模式(publish-subscribe)。隊列模式中,consumers能夠同時從服務端讀取消息,每一個消息只被其中一個consumer讀到;發佈-訂閱模式中消息被廣播到全部的consumer中。Consumers能夠加入一個consumer 組,共同競爭一個topic,topic中的消息將被分發到組中的一個成員中。同一組中的consumer能夠在不一樣的程序中,也能夠在不一樣的機器上。若是全部的consumer都在一個組中,這就成爲了傳統的隊列模式,在各consumer中實現負載均衡。若是全部的consumer都不在不一樣的組中,這就成爲了發佈-訂閱模式,全部的消息都被分發到全部的consumer中。更常見的是,每一個topic都有若干數量的consumer組,每一個組都是一個邏輯上的「訂閱者」,爲了容錯和更好的穩定性,每一個組由若干consumer組成。這其實就是一個發佈-訂閱模式,只不過訂閱者是個組而不是單個consumer。
由兩個機器組成的集羣擁有4個分區 (P0-P3) 2個consumer組. A組有兩個consumerB組有4個
相比傳統的消息系統,Kafka能夠很好的保證有序性。
傳統的隊列在服務器上保存有序的消息,若是多個consumers同時從這個服務器消費消息,服務器就會以消息存儲的順序向consumer分發消息。雖然服務器按順序發佈消息,可是消息是被異步的分發到各consumer上,因此當消息到達時可能已經失去了原來的順序,這意味着併發消費將致使順序錯亂。爲了不故障,這樣的消息系統一般使用「專用consumer」的概念,其實就是隻容許一個消費者消費消息,固然這就意味着失去了併發性。
在這方面Kafka作的更好,經過分區的概念,Kafka能夠在多個consumer組併發的狀況下提供較好的有序性和負載均衡。將每一個分區分只分發給一個consumer組,這樣一個分區就只被這個組的一個consumer消費,就能夠順序的消費這個分區的消息。由於有多個分區,依然能夠在多個consumer組之間進行負載均衡。注意consumer組的數量不能多於分區的數量,也就是有多少分區就容許多少併發消費。
Kafka只能保證一個分區以內消息的有序性,在不一樣的分區之間是不能夠的,這已經能夠知足大部分應用的需求。若是須要topic中全部消息的有序性,那就只能讓這個topic只有一個分區,固然也就只有一個consumer組消費它。
每一個Consumer屬於一個特定的Consumer Group(可爲每一個Consumer指定group name,若不指定group name則屬於默認的group)。
每一個topic的partion的全部消息,都不是隻存1份,而是在多個broker上冗餘存儲,從而提升系統的可靠性。這多臺機器就叫一個replica集合。
在這個replica集合中,須要選出1個leader,剩下的是follower。也就是master/slave。
發送消息的時候,只會發送給leader,而後leader再把消息同步給followers(以pull的方式,followers去leader上pull,而不是leader push給followers)。
那這裏面就有一個問題:leader收到消息以後,是直接返回給producer呢,仍是等全部followers都寫完消息以後,再返回? 關於這個請看下面博客中的kafka機制內容
關鍵點:這裏replica/leader/follower都是邏輯概念,而且是相對」partion」來說的,而不是」topic」。也就說,同一個topic的不一樣partion,對於的replica集合能夠是不同的。
好比 :
「abc-0」 <1,3,5> //abc_0的replica集合是borker 1, 3, 5, leader是1, follower是3, 5
「abc-1」 <1,3,7> //abc_1的replica集合是broker 1, 3, 7,leader是1, follower是3, 7
「abc_2」 <3,7,9>
「abc_3」 <1,7,9>
「abc_4」 <1,3,5>
持久化文件刪除策略
什麼是數據分片
複製備份
kafka將每一個partition數據複製到多個server上,任何一個partition有一個leader和多個follower(能夠沒有);備份的個數能夠經過broker配置文件來設定.leader處理全部的read-write請求,follower須要和leader保持同步.Follower和consumer同樣,消費消息並保存在本地日誌中;leader負責跟蹤全部的follower狀態,若是follower"落後"太多或者失效,leader將會把它從replicas同步列表中刪除.當全部的follower都將一條消息保存成功,此消息才被認爲是"committed",那麼此時consumer才能消費它.即便只有一個replicas實例存活,仍然能夠保證消息的正常發送和接收,只要zookeeper集羣存活便可.(不一樣於其餘分佈式存儲,好比hbase須要"多數派"存活才行)
leader選舉
使用場景
對於消息隊列的使用,表面上看起來很簡單,一端往裏面放,一端從裏面取。但就在這一放一取中,存在着諸多策略。
所謂ACK,是指服務器收到消息以後,是存下來以後,再給客戶端返回,仍是直接返回。很顯然,是否ACK,是影響性能的一個重要指標。在kafka中,request.required.acks有3個取值,分別對應3種策略:
request.required.acks
//0: 不等服務器ack就返回了,性能最高,可能丟數據
//1. leader確認消息存下來了,再返回
//all: leader和當前ISR中全部replica都確認消息存下來了,再返回(這種方式最可靠)
備註:在0.9.0之前的版本,是用-1表示all
所謂異步發送,就是指客戶端有個本地緩衝區,消息先存放到本地緩衝區,而後有後臺線程來發送。
在0.8.2和0.8.2以前的版本中,同步發送和異步發送是分開實現的,用的Scala語言。從0.8.2開始,引入了1套新的Java版的client api。在這套api中,同步其實是用異步間接實現的:
在異步發送下,有如下4個參數須要配置:
(1)隊列的最大長度
buffer.memory //缺省爲33554432, 即32M
(2)隊列滿了,客戶端是阻塞,仍是拋異常出來(缺省是true)
block.on.buffer.full
//true: 阻塞消息
//false:拋異常
(3)發送的時候,能夠批量發送的數據量
batch.size //缺省16384字節,即16K
(4)最長等多長時間,批量發送
linger.ms //缺省是0
//相似TCP/IP協議中的linger algorithm,> 0 表示發送的請求,會在隊列中積攥,而後批量發送。
很顯然,異步發送能夠提升發送的性能,但一旦客戶端掛了,就可能丟數據。
對於RabbitMQ, ActiveMQ,他們都強調可靠性,所以不容許非ACK的發送,也沒有異步發送模式。Kafka提供了這個靈活性,容許使用者在性能與可靠性之間作權衡。
(5)消息的最大長度
max.request.size //缺省是1048576,即1M
這個參數會影響batch的大小,若是單個消息的大小 > batch的最大值(16k),那麼batch會相應的增大
全部的消息隊列都要面對一個問題,是broker把消息Push給消費者呢,仍是消費者主動去broker Pull消息?
kafka選擇了pull的方式,爲何呢? 由於pull的方式更靈活:消息發送頻率應該如何,消息是否能夠延遲而後batch發送,這些信息只有消費者本身最清楚!
所以把控制權交給消費者,消費者本身控制消費的速率,當消費者處理消息很慢時,它能夠選擇減緩消費速率;當處理消息很快時,它能夠選擇加快消費速率。而在push的方式下,要實現這種靈活的控制策略,就須要額外的協議,讓消費者告訴broker,要減緩仍是加快消費速率,這增長了實現的複雜性。
另外pull的方式下,消費者能夠很容易的自適應控制消息是batch的發送,仍是最低限度的減小延遲,每來1個就發送1個。
在消費端,全部消息隊列都要解決的一個問題就是「消費確認問題」:消費者拿到一個消息,而後處理這個消息的時候掛了,若是這個時候broker認爲這個消息已經消費了,那這條消息就丟失了。
一個解決辦法就是,消費者在消費完以後,再往broker發個confirm消息。broker收到confirm消息以後,再把消息刪除。
要實現這個,broker就要維護每一個消息的狀態,已發送/已消費,很顯然,這會增大broker的實現難度。同時,這還有另一個問題,就是消費者消費完消息,發送confirm的時候,掛了。這個時候會出現重複消費的問題。
kafka沒有直接解決這個問題,而是引入offset回退機制,變相解決了這個問題。在kafka裏面,消息會存放一個星期,纔會被刪除。而且在一個partion裏面,消息是按序號遞增的順序存放的,所以消費者能夠回退到某一個歷史的offset,進行從新消費。
固然,對於重複消費的問題,須要消費者去解決。
在某些業務場景下,須要消息的順序不能亂:發送順序和消費順序要嚴格一致。而在kafka中,同一個topic,被分紅了多個partition,這多個partition之間是互相獨立的。
之因此要分紅多個partition,是爲了提升併發度,多個partition並行的進行發送/消費,但這卻沒有辦法保證消息的順序問題。
一個解決辦法是,一個topic只用一個partition,但這樣很顯然限制了靈活性。
還有一個辦法就是,全部發送的消息,用同一個key,這樣一樣的key會落在一個partition裏面。
咱們都知道,操做系統自己是有page cache的。即便咱們用無緩衝的io,消息也不會當即落到磁盤上,而是在操做系統的page cache裏面。操做系統會控制page cache裏面的內容,何時寫回到磁盤。在應用層,對應的就是fsync函數。
咱們能夠指定每條消息都調用一次fsync存盤,但這會較低性能,也增大了磁盤IO。也可讓操做系統去控制存盤。
一個完美的消息隊列,應該作到消息的「不重不漏」,這裏麪包含了4重語義:
消息不會重複存儲;
消息不會重複消費;
消息不會丟失存儲;
消息不會丟失消費。
先說第1個:重複存儲。發送者發送一個消息以後,服務器返回超時了。那請問,這條消息是存儲成功了,仍是沒有呢?
要解決這個問題:發送者須要給每條消息增長一個primary key,同時服務器要記錄全部發送過的消息,用於判重。很顯然,要實現這個,代價很大
重複消費:上面說過了,要避免這個,消費者須要消息confirm。但一樣,會引入其餘一些問題,好比消費完了,發送confirm的時候,掛了怎麼辦? 一個消息一直處於已發送,但沒有confirm狀態怎麼辦?
丟失存儲:這個已經解決
丟失消費:同丟失存儲同樣,須要confirm。
總結一下:真正作到不重不漏,exactly once,是很難的。這個須要broker、producer、consumer和業務方的協調配合。
在kafka裏面,是保證消息不漏,也就是at least once。至於重複消費問題,須要業務本身去保證,好比業務加判重表。
常遇到的問題
因爲一些博客,在kafka介紹方面已經很是的完善,因此這裏只收集一些有內涵的文章來供你們學習參考:
學習連接