你好,Kafka

你們好,我是 kafka, 可能不少人都據說過我,知道我是 2011 年出生在 LinkedIn 的, 從那會兒到如今個人功能愈加強大了。做爲一個優秀而又完整的平臺,你能夠在我上面冗餘地存儲巨大的數據量,我有一個具備高吞吐量 (數百萬 / 秒) 的消息總線,你能夠在這上面對通過個人數據進行實時流處理。 若是你認爲我就只有上面的這些特色的話,那麼你真的是太膚淺了。html

上面雖說的很好,可是並未觸及到個人核心,這裏我給你幾個關鍵字:分佈式,水平可擴展,容錯,提交日誌。java

上面這些抽象的詞語,我會一一解釋它們的含義,並告訴大家我是如何工做的。git

心裏獨白: 原本我是想要以第一人稱來寫這篇文章的,可是我發現我只能寫出上面的了,再多的我就憋不出來了,因而我決定不要爲難本身,仍是用用第三人稱寫吧 (寫做的功底仍然須要鍛鍊)github

分佈式

分佈式系統由多個運行的計算機系統組成,全部這些計算機在一個集羣中一塊兒工做,對終端用戶來說只是一個單一節點。 redis

分佈式系統

kafka也是分佈式的,由於它在不一樣的節點(又被稱爲broker)上存儲,接受以及發送消息,這樣作的好處是具備很高的可擴展性和容錯性。sql

水平可擴展性

在這以前,先看看什麼是垂直可擴展,好比你有一個傳統的數據庫服務器,它開始過分負載,解決這個問題的辦法就是給服務器加配置(cpu,內存,SSD),這就叫作垂直擴展。可是這種方式存在兩個巨大的劣勢數據庫

  1. 硬件存在限制,不可能無限的添加機器配置
  2. 它須要停機時間,一般這是不少公司沒法容忍的

水平可擴展就是經過添加更多的機器來解決一樣的問題,添加新機器不須要停機,並且集羣中也不會對機器的數量有任何的限制。問題在於並不是全部系統都支持水平可伸縮性,由於它們不是設計用於集羣中(集羣中工做更加複雜)。apache

容錯性

非分佈式系統中容易最致命的問題就是單點失敗,若是你惟一的服務器掛掉了,那麼我相信你會很崩潰。數組

而分佈式系統的設計方式就是能夠以配置的方式來允許失敗。在5個節點的kafka集羣中,你仍然能夠繼續工做即便其中兩個節點掛掉了。 須要注意的是,容錯與性能直接相關,你的系統容錯程度越高,性能就越差。緩存

提交日誌(commit log)

提交日誌(也被稱爲預寫日誌或者事物日誌)是僅支持附加的持久有序數據結構,你沒法修改或者刪除記錄,它從左往右讀而且保證日誌的順序。

commit log
是否是以爲kafka的數據結構如此簡單?

是的,從不少方面來說,這個數據結構就是kafka的核心。這個數據結構的記錄是有序的,而有序的數據能夠確保咱們的處理流程。這兩個在分佈式系統中都是及其重要的問題。

kafka實際上將全部消息存儲到磁盤並在數據結構中對它們進行排序,以便利用順序磁盤讀取

  1. 讀取和寫入都是常量時間O(1)(當肯定了record id),與磁盤上其餘結構的O(log N)操做相比是一個巨大的優點,由於每一個磁盤搜索都很耗時。
  2. 讀取和寫入不會相互影響,寫不會鎖住讀,反之亦然。

這兩點有着巨大的優點, 由於數據大小與性能徹底分離。不管你的服務器上有100KB仍是100TB的數據,Kafka都具備相同的性能

如何工做

應用程序(producer)發送消息(record)到kafka服務器(broker),這些消息會被其餘應用程序(consumer)所處理,這些消息存儲在主題(topic)中,而且消費者訂閱該主題以接收新消息。是否是感受很像你平時寫的代碼——生產者消費者模式。

工做模式

隨着主題變得很是大,它們會分紅更小的分區(partition),以得到更好的性能和可伸縮性(好比存儲了用戶相互發送的消息,你能夠根據用戶名的第一個字母來進行拆分)。Kafka保證分區內的全部消息都按照它們的順序排序,區分特定消息的方式是經過其偏移量(offset),你能夠將其視爲普通數組索引,即爲分區中的每一個新消息遞增的序列號。

分區

kafka遵照着愚蠢的broker和聰明的consumer的準則。這意味着kafka不會跟蹤消費者讀取了哪些記錄並刪除它們,而是會將它們存儲必定的時間(好比1天,以log.retention開頭的來決定日誌保留時間),直到達到某個閾值。消費者本身輪詢kafka的新消息而且告訴它本身想要讀取哪些記錄。這容許它們按照本身的意願遞增/遞減它們所處的偏移量,從而可以重放和從新處理事件。

須要注意的是消費者是屬於消費者組的,消費者組有一個或多個消費者。爲了不兩個進程讀取一樣的消息兩次,每一個partition只能被一個消費者組中的一個消費者訪問。

Kafka消費數據

持久化到硬盤

正如我以前提到的,kafka其實是將全部記錄存儲到硬盤而不在RAM中保存任何內容。你想知道這個如何作出這個選擇的,其實這背後有不少優化使得這個方案可行。

  1. kafka有一個將消息分組的協議,這容許網絡請求將消息組合在一塊兒並減小網絡開銷,服務器反過來一次性保留大量消息,消費者一次獲取大量線性塊。
  2. 磁盤上線性讀寫很是快,現代磁盤很是慢的概念是因爲大量磁盤尋址,可是在大量的線性操做中不是問題。
  3. 操做系統對線性操做進行了大量優化,經過預讀(預取大塊屢次)和後寫(將小型邏輯寫入組成大型物理寫入)技術。
  4. 操做系統將磁盤文件緩存在空閒RAM中。這稱爲pagecache,而kafka的讀寫都大量使用了pagecahce
    1. 寫消息的時候消息先從java到page cache,而後異步線程刷盤,消息從page cache刷入磁盤
    2. 讀消息的時候先從page cache找,有就直接轉入socket,沒有就先從磁盤load到page cache,而後直接從socket發出去
  5. 因爲Kafka在整個流程(producer - >broker - >consumer)中以未經修改的標準化二進制格式存儲消息,所以它可使用零拷貝優化。那時操做系統將數據從pagecache直接複製到socket,有效地徹底繞過了Kafka broker。

全部這些優化都使Kafka可以以接近網絡的速度傳遞消息。

數據分發和複製

咱們來談談Kafka如何實現容錯以及它如何在節點之間分配數據。

爲了使得一個borker掛掉的時候,數據還能得以保留,分區(partition)數據在多個broker中複製。

在任什麼時候候,一個broker擁有一個partition,應用程序讀取/寫入都要經過這個節點,這個節點叫作----partition leader。它將收到的數據複製到N個其餘broker,這些接收數據的broker叫作follower,follower也存儲數據,一旦leader節點死掉的時候,它們就準備競爭上崗成爲leader。

這能夠保證你成功發佈的消息不會丟失,經過選擇更改複製因子,你能夠根據數據的重要性來交換性能以得到更強的持久性保證

4個Kafka broker,副本因子是3
可是你可能會問:producer或者consumer怎麼知道partition leader是誰?

對生產者/消費者對分區的寫/讀請求,它們須要知道分區的leader是哪個,對吧?這個信息確定是能夠獲取到的,Kafka使用zookeeper來存儲這些元數據。

什麼是ZooKeeper

Zookeeper是一個分佈式鍵值存儲。它針對讀取進行了高度優化,但寫入速度較慢。它最經常使用於存儲元數據和處理羣集的機制(心跳,分發更新/配置等)。

它容許服務的客戶(Kafka broker)訂閱並在發生變動後發送給他們,這就是Kafka如何知道什麼時候切換分區領導者。ZooKeeper自己維護了一個集羣,因此它就有很高的容錯性,固然它也應該具備,畢竟Kafka很大程度上是依賴於它的。

zookeeper用於存儲全部的元數據信息,包括但不限於以下幾項:

  • 消費者組每一個分區的偏移量(如今客戶端在單獨的kafka topic上存儲偏移量)
  • ACL —— 權限控制
  • 生產者/消費者的流量控制——每秒生產/消費的數據大小。能夠參考Kafka-流量控制Quota功能
  • partition leader以及它們的健康信息

那麼produer/consumer是如何知道誰是partition leader的呢?

生產者和消費者之前經常直接鏈接ZooKeeper來獲取這些信息,可是Kafka從0.8和0.9版本開始移除了這種強耦合關係。客戶端直接從kafka broker直接獲取這些元數據,而讓kafka broker從zookeeper那裏獲取這些元數據。

獲取leader

更多zookeeper的講解能夠參考:漫畫:什麼是ZooKeeper?

流式處理(Streaming)

在Kafka中,流處理器是指從輸入主題獲取連續數據流,對此輸入執行某些處理並生成數據流以輸出到其餘主題(或者外部服務,數據庫,容器等等).

什麼是數據流呢?首先,數據流是無邊界數據集的抽象表示。無邊界意味着無限和持續增加。無邊界數據集之因此是無限的,是由於隨着時間推移,新的記錄會不斷加入進來。好比信用卡交易,股票交易等事件均可以用來表示數據流

咱們可使用producer/consumer的API直接進行簡單處理,可是對於更加複雜的轉換好比將流鏈接到一塊兒,kafka提供了集成Stream API

這個API是在你本身的代碼中使用的,它並非運行在broker上,它的工做原理和consumer API相似,可幫助你在多個應用程序(相似於消費者組)上擴展流處理工做。

無狀態處理

流的無狀態處理是肯定性處理,其不依賴於任何外部條件,對於任何給定的數據,將始終生成與其餘任何內容無關的相同輸出。舉個例子,咱們要作一個簡單的數據轉換----"zhangsan" ---> "Hello,zhangsan"

kafka流處理

流-表二義性

重要的是要認識到流和表實質上是同樣的,流能夠被解釋稱爲表,表也能夠被解釋稱爲流.

流做爲表

流能夠解釋爲數據的一系列更新,聚合後得結果就是表的最終結果,這項技術被稱爲事件溯源(Event Sourcing)

若是你瞭解數據庫備份同步,你就會知道它們得技術實現被稱爲流式複製----將對錶的每一個更改都發送報副本服務器.好比redis中的AOF以及Mysql中的binlog

Kafka流能夠用相同的方式解釋 - 當累積造成最終狀態時的事件。此類流聚合保存在本地RocksDB中(默認狀況下),被稱爲KTable。

Kafka流轉換爲表

表做爲流

能夠將表視爲流中每一個鍵的最新值的快照。以流記錄能夠生成表同樣,表更新能夠生成更改日誌流。

kafka表做爲流

有狀態處理

咱們在java中經常使用的一些操做好比map()或者filter()是沒有狀態的,它不會要求你保留任何原始數據。可是現實中,大多數的操做都是有狀態的(好比count()),由於就須要你存儲當前累計的狀態。

在流處理器上維護狀態的問題是流處理器可能會失敗!你須要在哪裏保持這種狀態才能容錯?

一種簡單的方法是簡單地將全部狀態存儲在遠程數據庫中,並經過網絡鏈接到該存儲,這樣作的問題是大量的網絡帶寬會使得你的應用程序變慢。一個更微妙但重要的問題是你的流處理做業的正常運行時間將與遠程數據庫緊密耦合,而且做業將不是自包含的(其餘team更改數據庫可能會破壞你的處理)。

那麼什麼是更好的辦法呢? 回想一下表和流的二元性。這容許咱們將流轉換爲與咱們的處理位於同一位置的表。它還爲咱們提供了一種處理容錯的機制 - 經過將流存儲在Kafka broker中。

流處理器能夠將其狀態保持在本地表(例如RocksDB)中,該表將從輸入流(可能在某些任意轉換以後)更新。當進程失敗時,它能夠經過重放流來恢復其數據。

你甚至能夠將遠程數據庫做爲流的生產者,有效地廣播用於在本地重建表的更改日誌。

Kafka處理有狀態數據

KSQL

一般,咱們不得不使用JVM語言編寫流處理,由於這是惟一的官方Kafka Streams API客戶端。 2018年4月,KSQL做爲一項新特性被髮布,它容許你使用熟悉的相似SQL的語言編寫簡單的stream jobs。你安裝了KSQL服務器並經過CLI以交互方式查詢以及管理。它使用相同的抽象(KStream和KTable),保證了Streams API的相同優勢(可伸縮性,容錯性),並大大簡化了流的工做。

這聽起來可能不是不少,但在實踐中對於測試內容更有用,甚至容許開發以外的人(例如產品全部者)使用流處理,能夠看看Confluent提供的這篇關於ksql的使用

何時使用kafka

正如咱們已經介紹的那樣,Kafka容許你經過集中式介質獲取大量消息並存儲它們,而沒必要擔憂性能或數據丟失等問題。 這意味着它很是適合用做系統架構的核心,充當鏈接不一樣應用程序的集中式媒體。Kafka能夠成爲事件驅動架構的中心部分,使你能夠真正地將應用程序彼此分離.

何時使用Kafka
Kafka容許你輕鬆地分離不一樣(微)服務之間的通訊。使用Streams API,如今能夠比以往更輕鬆地編寫業務邏輯,從而豐富Kafka主題數據以供服務使用。可能性很大,我懇請你探討公司如何使用Kafka。

總結

Apache Kafka是一個分佈式流媒體平臺,天天可處理數萬億個事件。Kafka提供低延遲,高吞吐量,容錯的發佈和訂閱管道,並可以處理事件流。咱們回顧了它的基本語義(生產者,代理,消費者,主題),瞭解了它的一些優化(pagecache),經過複製數據瞭解了它的容錯能力,並介紹了它不斷增加的強大流媒體功能。Kafka已經在全球數千家公司中大量採用,其中包括財富500強企業中的三分之一。隨着Kafka的積極開發和最近發佈的第一個主要版本1.0(2017年11月1日),有預測這個流媒體平臺將會與關係數據庫同樣,是數據平臺的重要核心。我但願這篇介紹能幫助你熟悉Apache Kafka。

參考文章

tech.meituan.com/2015/01/13/… shiyueqi.github.io/2017/04/27/… kafka.apache.org/documentati… docs.confluent.io/current/

相關文章
相關標籤/搜索