Apache Kafka是一個分佈式流媒體平臺。這究竟是什麼意思? 流媒體平臺具備三個關鍵功能: 發佈和訂閱記錄流,相似於消息隊列或企業消息系統。 以容錯的持久方式存儲記錄流。 記錄發生時的處理流。數據庫
Kafka一般用於兩類普遍的應用:apache
1.創建實時流數據管道,在系統或應用之間可靠地獲取數據bootstrap
2.創建對數據流進行轉換或反應的實時流應用程序數組
爲了理解Kafka是如何作到這些的,讓咱們從下到上探究Kafka的能力。bash
首先是幾個概念:服務器
Kafka做爲一個集羣運行在一個或多個能夠跨多個數據中心的服務器上。網絡
Kafka集羣將記錄流存儲在稱爲主題的類別中。 每一個記錄由一個鍵、一個值和一個時間戳組成。session
生產者API(Producer API)容許應用程序將記錄流發佈到一個或多個Kafka主題。多線程
消費者API(Consumer API)容許應用程序訂閱一個或多個主題,並處理產生給它們的記錄流。架構
流API(Streams API)容許應用充當流處理器,消耗來自一個或多個主題的輸入流,並將輸出流生成到一個或多個輸出主題,有效地將輸入流轉換爲輸出流。
鏈接器API(Connector API)容許創建和運行可重用的生產者或消費者,將Kafka主題鏈接到現有的應用程序或數據系統。例如,關係數據庫的鏈接器可能會捕獲表的每個更改。
在Kafka中,客戶端和服務器之間的通訊是用簡單、高性能、語言不可知的TCP協議來完成的。該協議版本化,並保持與舊版本的向後兼容性。咱們爲Kafka提供Java客戶端,但客戶端能夠用多種語言提供。讓咱們先深刻研究核心抽象Kafka爲一條記錄流提供的主題。
主題是記錄發佈的類別或進給名稱。Kafka中的主題老是多用戶的,也就是說,主題能夠有一個或多個訂閱該數據的用戶。
對於每一個主題,Kafka集羣維護一個看起來像這樣的分區日誌:
每個分區都是一個有序的、不可變的記錄序列,它連續地附加到結構化提交日誌。每一個分區中的記錄都被分配一個連續的ID號,稱爲惟一地標識分區內的每一個記錄的偏移量。
不管是否已經使用可配置的保留週期消耗,全部的已發佈記錄都能持久地保存Kafka集羣。例如,若是保留策略設置爲兩天,那麼在發佈記錄後的兩天,它能夠用於消費,以後將被丟棄以釋放空間。Kafka的性能相對於數據大小是有效不變的,因此長時間存儲數據不是問題。
事實上,在每一個消費者基礎上保留的惟一元數據是該用戶在日誌中的偏移或位置。這種偏移是由消費者控制的:正常狀況下,消費者會在讀取記錄時線性地偏移其偏移量,但事實上,因爲該位置是由消費者控制的,因此它能夠按它喜歡的任何順序消耗記錄。例如,消費者能夠重置爲舊的偏移量,以從新處理過去的數據,或者跳過最近的記錄並開始從「如今」開始消費。
這種組合的特徵意味着Kafka的消費者很是便宜,他們能夠來來每每,對集羣或其餘消費者沒有太大的影響。例如,您可使用咱們的命令行工具來「尾隨」任何主題的內容,而不改變任何現有消費者所消耗的內容。
日誌中的分區有多種用途。首先,它們容許日誌超出一個適合於單個服務器的大小。每一個單獨的分區必須適合於承載它的服務器,可是一個主題可能有許多分區,所以它能夠處理任意數量的數據。其次,它們做爲並行的單位更多的是在這一點上。
日誌分區被分佈在Kafka集羣中的服務器上,每一個服務器處理數據並請求共享分區。每一個分區在可配置的多個服務器上覆制以容錯。
每一個分區都有一個服務器充當「領導者」和零個或多個服務器,充當「追隨者」。領導者處理全部的讀寫請求的分區,而追隨者被動複製的領導者。若是領導者失敗,其中的一個追隨者將自動成爲新的領導者。每一個服務器充當一些分區的領導者和其餘人的追隨者,所以集羣內的負載很好地平衡。
Kafka MirrorMaker爲您的集羣提供地理複製支持。使用鏡像機,消息在多個數據中心或雲區域上被複制。能夠在主動/被動場景中使用此備份和恢復;或者在活動/活動場景中,將數據更靠近用戶,或支持數據位置要求。
生產者將數據發佈到他們選擇的主題中。生產者負責選擇要分配給主題內的哪一個分區的記錄。這能夠在循環的方式下完成,只是爲了平衡負載,或者它能夠根據一些語義劃分函數來完成(例如基於記錄中的某些鍵)。
消費者用消費者組名稱來標記本身,而且發佈到主題的每一個記錄被傳遞到每一個訂閱消費者組中的一個消費者實例。消費者實例能夠在單獨的進程中或在單獨的機器上。
若是全部的消費者實例都具備相同的消費羣,那麼記錄將有效地在消費者實例上進行負載均衡。
若是全部的消費者實例都有不一樣的消費羣體,那麼每一個記錄將被廣播到全部的消費過程。
兩個服務器Kafka集羣託管四個分區(P0至P3)與兩個消費羣體。消費者組A有兩個消費者實例,B組有四個。
然而,更常見的是,咱們發現話題有少許的消費羣體,每一個都有一個「邏輯用戶」。每一個組由許多可擴展性和容錯性的消費者實例組成。這僅僅是發佈訂閱語義,其中訂閱服務器是一組消費者而不是單個進程。
在Kafka中實現消費的方法是經過將日誌中的分區除以消費者實例,使得每一個實例在任什麼時候間點都是分區的「公平共享」的惟一消費者。這個組中的成員保持過程是由Kafka協議動態處理的。若是新實例加入組,它們將從組的其餘成員接管一些分區;若是一個實例死亡,它的分區將被分發到其他實例。
Kafka只提供一個分區內的記錄的總順序,而不是在一個主題中的不一樣分區之間。每個分區排序結合能力分區數據的關鍵是足夠的大多數應用程序。可是,若是須要對記錄進行總排序,則能夠用只有一個分區的主題來實現,但這將意味着每一個消費者組只有一個消費者進程。
能夠將Kafka部署爲多租戶解決方案。多租戶是經過配置哪些主題能夠產生或消耗數據來實現的。也有對配額的操做支持。管理員能夠在請求上定義和執行配額,以控制客戶端使用的代理資源。
在高級別Kafka提供如下保證:
生產者發送給特定主題分區的消息將按照發送的順序添加。也就是說,若是記錄M1是由與記錄M2相同的生產者發送的,M1是最早發送的,那麼M1將具備比M2更低的偏移量,而且在日誌中出現得更早。
一個用戶實例以記錄存儲在日誌中的順序查看記錄。
對於具備複製因子N的主題,咱們將容忍多達N-1服務器故障而不丟失提交到日誌的任何記錄。
Kafka能夠保證同一個分區裏的消息是有序的,也就是說生產者按照必定的順序發送消息,broker就會按照這個順序把他們寫入分區,消費者也會按照一樣的順序讀取他們
###### Kafka做爲消息傳遞系統
複製代碼
Kafka的流概念如何與傳統的企業消息系統相比較?
消息傳遞傳統上有兩種模式:排隊和發佈訂閱。在隊列中,消費者池能夠從服務器讀取,而且每一個記錄都轉到其中一個;在發佈訂閱中,記錄被廣播給全部消費者。這兩種模式各有其優勢和缺點。排隊的優勢在於,它容許您在多個消費者實例上劃分數據的處理,這使得您能夠對處理進行縮放。不幸的是,一旦一個進程讀取了它的數據,隊列就再也不是多用戶。發佈訂閱容許您將數據廣播到多個進程,可是因爲每一個消息都流向每一個訂閱服務器,因此沒法進行縮放處理。
Kafka的消費羣體概念歸納了這兩個概念。與隊列同樣,消費者組容許您在進程集合(消費者組的成員)上劃分處理。與發佈訂閱同樣,Kafka容許您向多個用戶組廣播消息。
Kafka模型的優勢是,每一個主題都具備這些屬性,它能夠縮放處理,而且也是多用戶,不須要選擇一個或另外一個。
Kafka比傳統的消息傳遞系統具備更強的排序保證。
傳統隊列在服務器上保留記錄順序,若是多個用戶從隊列中消耗,則服務器按其存儲的順序分發記錄。然而,儘管服務器按順序分發記錄,可是記錄是異步傳送給消費者的,所以它們可能會在不一樣的消費者之間無序地到達。這實際上意味着在並行消耗的存在下記錄的順序丟失。消息傳遞系統常常圍繞着這一點而工做,它有一個「獨佔消費者」的概念,它只容許一個進程從隊列中消耗,但這固然意味着在處理過程當中沒有並行性。
Kafka作得更好。經過在主題中具備並行性分區的概念,Kafka可以在消費者進程池中提供排序保證和負載平衡。這是經過將主題中的分區分配給消費者組中的消費者來實現的,這樣每一個分區就被該組中的一個消費者徹底消耗掉。經過這樣作,咱們確保消費者是該分區的惟一讀取器,並按順序消耗數據。因爲有許多分區,這仍然平衡了許多消費者實例的負載。可是注意,消費者組中的消費者實例不能多於分區。
任何容許發佈消息的消息隊列與它們之間的解耦都有效地充當了飛行消息的存儲系統。Kafka的不一樣之處在於它是一個很是好的存儲系統。
寫入Kafka的數據被寫入磁盤並複製用於容錯。Kafka容許生產商等待確認,這樣寫才被認爲是完整的,直到它被徹底複製,並保證即便服務器寫入失敗也會堅持。
磁盤結構Kafka使用規模井Kafka將執行相同的,不管你有50 KB或50 TB的持久性數據在服務器上。
做爲認真對待存儲並容許客戶端控制其讀取位置的結果,您能夠將Kafka視爲專用於高性能、低延遲提交日誌存儲、複製和傳播的專用分佈式文件系統。
僅僅讀取、寫入和存儲數據流是不夠的,其目的是實現對流的實時處理。
在Kafka中,流處理器是從輸入主題獲取連續數據流的任何東西,對該輸入執行一些處理,併產生連續的數據流到輸出主題。
例如,零售應用程序可能會接收銷售和出貨的輸入流,並輸出從該數據計算的從新排序和價格調整的流。
能夠直接使用生產者和消費者API來進行簡單的處理。然而,對於更復雜的轉換,Kafka提供了一個徹底集成的流API。這容許構建非平凡處理的應用程序,它們能夠計算流中的聚合或一塊兒加入流。
這個工具備助於解決這類應用程序面臨的難題:處理無序數據、從新處理輸入、代碼更改、執行狀態計算等。
流API構建在Kafka提供的核心原語上:它使用生產者和消費者API來進行輸入,使用Kafka進行狀態存儲,並在流處理器實例中使用相同的組機制來容錯。
消息傳遞、存儲和流處理的這種組合看起來是不尋常的,但這對於Kafka做爲流媒體平臺的角色是相當重要的。
分佈式文件系統(如HDFS)容許存儲靜態文件以進行批量處理。實際上,這樣的系統容許存儲和處理過去的歷史數據。
傳統的企業消息系統容許處理訂閱後到達的將來消息。以這種方式構建的應用程序在到達時處理將來的數據。
Kafka結合了這兩種能力,組合對於Kafka做爲流媒體應用平臺以及流數據管道來講都是相當重要的。
經過結合存儲和低延遲訂閱,流式應用程序能夠以相同的方式處理過去和未來的數據。也就是說,單個應用程序能夠處理歷史、存儲的數據,而不是在到達最後記錄時結束,它能夠在未來的數據到達時保持處理。這是包含批處理和消息驅動應用程序的流處理的通常概念。
一樣,對於流式數據管道,訂閱到實時事件使得使用Kafka用於很是低延遲的流水線是可能的;可是,可靠地存儲數據的能力使得它能夠用於關鍵數據,其中必須保證數據的傳遞或整合。n具備離線系統,只週期性地加載數據,或者能夠在長時間內進行維護。流處理設施使得在到達時轉換數據成爲可能。
生產者歸納
Kafka發送消息的主要步驟 咱們從建立一個ProducerRecord對象開始,ProducerRecord對象須要包含目標主題和要發送的內容.咱們還能夠指定鍵分區,在發送ProducerRecord 對象時,生產者要先把鍵和值對象序列化成字節數組,這樣他們才能在網絡上傳輸. 接下來,數據被傳給分區器,若是以前在ProducerRecord對象裏指定了分區,那麼分區器就不會再作任何事情,直接把指定的分區返回.若是沒有指定分區,那麼分區器就會根據ProducerRecord對象的鍵來選擇一個分區.選好分區之後,生產者就知道該往哪一個主題和分區發送這條消息了. 緊接着,這條記錄被添加到一個記錄批次裏,這個批次裏的全部消息會被髮送到相同的主題和分區上.有一個獨立的線程負責把這些記錄批次發送到相應的broker上. 服務器在收到這些消息時會返回一個響應.若是消息成功寫入Kafka,就返回一個RecordMetaData對象,它包含了主題和分區信息,以及記錄在分區裏的偏移量.若是寫入失敗,則返回一個錯誤.生產者在收到錯誤以後會嘗試從新發送消息,幾回以後若是仍是失敗,就返回錯誤信息.
要往Kafka寫入消息,首先要建立一個生產者對象,並設置一些屬性,Kafka生產者有3個必選屬性(什麼意思不做說明)
bootstrap.servers key.serializer value.serializer 其餘默認屬性: acks buffer.memory compression.type retries batch.size linger.ms client.id max.in.flight.requests.oer.connection timeout.ms,request.timeout.ms和metadata.fetch.timeout.ms max.block.ms max.request.size resive.buffer.bytes和send.buffer.bytes
下面代碼片斷演示瞭如何建立一個新生產者,這裏只指定了必要的屬性,其餘使用默認設置
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
複製代碼
上述代碼片斷主要意思是 先新建一個Properties對象 由於咱們打算把鍵和值定義成字符串類型,因此使用內置的StringSerializer 在這裏咱們建立一個新的生產者對象,併爲鍵和值設置了恰當的類型,而後把Properties對象傳給他
實例化生產者對象後,接下來就能夠開始發送消息了.發送消息主要有以3種方式,
發送並忘記(fire-and-forger): 咱們把消息發送給服務器,但並不關心他是否正常到達.大多數狀況下消息會正常到達,由於Kafaka是高可用的,並且生產者會自動嘗試重發,不過使用這種方式有時候也會丟失一些消息 同步發送send(): 咱們使用send()方式發送消息,他會返回一個Future對象,調用get()方法進行等待,就能夠知道消息是否發送成功 異步發送send(): 咱們使用send()方式發送消息,並指定一個回調函數,服務器在返回響應時調用該函數
上面的例子使用的都是單線程,但其實生產者是可使用多線程來發送消息的剛開始的時候可使用單個消費者和單個線程.若是須要更高的吞吐量,能夠在生產者數量不變的狀況下增長線程數量.若是這樣還不夠,能夠增長生產者數量
最簡單消息發送方式以下所示
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry":"Precision Products","France");
try{
proucer.send(record);
} catch (Exception e){
e.printStackTrance();
}
複製代碼
生產者的send() 方法將ProducerRecord 對象做爲參數,因此咱們要先建立一個ProducerRecord對象.ProducerRecord有多個構造函數,這裏只使用其中的一個,他須要目標主題的名字 和要發送的鍵和值對象 ,他們都是字符串.鍵和值對象的了新鮮感必須與序列化器的生產者對象相匹配
咱們使用生產者的send()方法發送ProducerRecord對象.從生產者的架構圖能夠看到,消息是先被放進緩衝區,而後使用單獨的線程發送到服務器端.send()方法會返回一個包含RecordMetadata的Future對象,不過咱們會忽略返回值,因此沒法知道消息是否發送成功,若是不關心發送結果,那麼可使用這種發送方式.好比 記錄Twitter消息日誌,或記錄不過重要的應用程序日誌
最簡單的同步發送消息方式以下所示
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry","Precision Products","France");
try{
producer.send(record).get();
} catch (Exception e){
e.printStackTrance();
}
複製代碼
在這裏 producer.send() 方法先返回一個Future對象,而後調用Future對象的get()方法等待Kafka響應,若是服務器返回錯誤,get()方法會拋異常.若是沒有發生錯誤,咱們會獲得一個RecordMetadata對象 能夠用他來獲取消息的偏移量 若是在發送數據以前或在發送過程當中發生了任何錯誤.好比broker返回一個不容許重發消息的異常或者已經超過了重發的次數,那麼就會拋出異常.咱們只是簡單的把異常信息打印出來
假設消息在應用程序和Kafka集羣之間一個來回須要10ms. 若是在發送完每一個消息後都等待迴應,那麼發送100個消息須要1秒 但若是隻發送消息而不等待響應,那麼發送100個消息所須要的時間會少不少.大多數時候,咱們並不須要等待響應,儘管Kafka會把目標主題,分區信息和消息的偏移量發送回來,但對於發送端的應用程序來講不是必需的.不過在遇到消息發送失敗時,咱們須要拋出異常,記錄錯誤日誌,或把消息寫入錯誤消息文件
爲了在異步發送消息的同時可以對異常狀況進行處理,生產者提供了回調支持,下面是使用回調的一個例子
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata,Exception e){
if (e != null){
e.prinStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry","Biomedical Materials","USA");
producer.send(record,new DemoProducerCallback());
複製代碼
爲了使用回調,須要實現org.apach.kafka.clients.producer.Callback接口的類,這個接口只有一個onCompletion方法 若是 Kafka返回一個錯誤 onCompletion方法會拋一個非空(non null)異常 在發送消息時傳進去一個回調對象
在瞭解如何從Kafaka讀取消息以前,咱們先先了解一下消費者和消費者羣組的概念 假設咱們有一個應用程序要從一個Kafka主題讀取消息並驗證這些消息,而後把他們存儲起來,應用程序須要建立一個消費者對象,訂閱主題並開始接收消息,而後驗證消息並保存結果,過了一陣子 生產者讓主題寫入消息的速度超過了應用程序驗證數據的速度,這個時候該怎麼辦?若是隻使用單個消費者處理消息,應用程序永遠跟不上消息的生成速度,這個時候就須要像多個生產者能夠向相同的主題寫入消息同樣,咱們也須要使用多個消費者從同一個主題讀取消息,對消息進行分流.Kafka 消費者從屬於消費者羣組,一個羣組裏的消費者訂閱的是同一個主題,每一個消費者接收主題一部分分區的消息,分區的一個主題消息會被不一樣消費組訂閱,一個消息只能被每一個消費者羣組中的一個消費者接收.
羣組裏的消費者共同讀取主題的分區,一個新的消費者加入羣組時,他讀取的是本來由其餘消費者讀取的消息.當一個消費者被關閉或者發生崩潰時,他就離開羣組,本來由他讀取的分區將有羣組裏的其餘消費者來讀取,在主題發生變化時,好比管理員添加新的分區,會發生分區重分配.分區的全部權從一個消費者轉移到另一個消費者這樣的行爲稱做再均衡 程序如何觸發再均衡? 消費者經過向羣組協調器的broker發送心跳來維持他們和羣組的從屬關係以及他們對分區的全部權關係,只要消費者能夠以正常的的時間間隔發送心跳就被認爲是活躍的說明他還在讀取分區的消息.消費者會在輪詢消息或者提交偏移量時發送心跳.若是消費者中止發送心跳的時間足夠長,會話過時,羣組協調器認爲他已經死了,就會觸發一次再均衡.
{
Properties props = new Properties();
props.put("bootstrap.servers","broker1:9092,broker2:9092");
props.put("group.id","CountryCounter");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
}
複製代碼
consumer.subscribe(Collection.singletonList("customerCountries"));//主題名"customerCountries"
複製代碼
fetch.min.bytes fetch.max.wait.ms max.partition.fetch.bytes session.timeout.ms auto.offset.reset enable.auto.commit partition.assignment.strategy client.id max.poll.records receive.buffer.bytes/send.buffer.bytes
到這裏Kafka的基礎內容已經介紹完了 ,若是想深刻了解這些是遠遠不夠的 在這裏能夠推薦幾本書給你們 若是想對Kafka的總體有深入的認識能夠讀<<Kafka權威指南>>必讀 其次就是 <<Kafka技術內幕>> 這兩本書讀完 幾本就OK了 最後也能夠讀<<Apache Kafka 源碼剖析>>不建議讀 固然官網讀英文文檔最好了