看了下 Kafka 作了一些隨筆的筆記。先看了第1、3、4、5、十一章,後續章節還會慢慢補上。java
第一章 初識 :生產者和消費者正則表達式
生產者:算法
一個消息會被髮布到一個特定主題上。生產者默認吧消息均衡分不到主題的全部分區上,並不關心特定消息會被寫到那個分區。sql
某些狀況下,生產者會把消息直接寫到指定分區。是經過消息鍵和分區器來實現,分區器爲鍵爲鍵生成一個散列值,並將其映射到指定的分區上。這樣可保證同一個鍵的消息會被寫到同一個分區上。數據庫
消費者:編程
消費者訂閱一個或多個主題,並按照消息生成的順序讀取他們。bootstrap
消費者經過檢查消息的偏移量來區分已經讀取過的消息。設計模式
偏移量 是另外一種元數據,一個不斷遞增的整數值,在建立消息時,Kafka 把它添加到消息裏。數組
在給定分區裏,消息偏移量都是惟一的,消費者把每一個分區最後讀取的消息偏移量保存在 Zookeeper 或 kafka上,若是消費者關閉或重啓,它的讀取狀態不會丟失。[ 這也是爲何 消費者宕機後,可以從宕機前的位置繼續讀取數據。 ]緩存
多消費者 ,消費者是消費者羣組的一部分,多個消費者共通讀取一個主題。羣組保證每一個分區智能被一個消費者使用。消費者與分區之間的映射一般被稱爲消費者對分區的全部權關係。
broker 和集羣
一個獨立的 kafka 服務器被稱爲 broker。
broker 接收來自生產者的消息,爲消息設置偏移量,並提交消息到磁盤保存。broker 爲消費者提供服務,對讀取分區的請求做出相應,返回已經提交到磁盤上的消息。
根據特定硬件及其性能特性,單個 broker 能夠輕鬆處理數千個分區以及每秒百萬級的消息量。
broker 是集羣的組成部分。每一個集羣都有一個 broker 充當集羣控制器的角色(自動從集羣的活躍成員中選舉出來)。
保留消息(在必定期限內)是kafka 一個重要特性。kafka默認消息保留策略 是 :默認 7天,或 默認 1G 。當消息達到這些上限時,舊消息就會過時並被刪除。主題能夠配置本身的保留策略,能夠將消息保留到再也不使用他們爲止。
多集羣
三點緣由: 最好使用多個集羣
數據類型分離
安全需求隔離
多數據中心(災難恢復)
若是使用多個數據中心,就須要在它們之間複製消息。這樣應用程序能夠訪問到多個站點的用戶活動信息。 kafka 的消息複製機制智能在單個集羣裏進行,不能再多個集羣之間進行。
Kafka 提供了一個叫作 MirrorMaker的工具,能夠實現集羣間的消息複製。
爲何選擇 Kafka
多個生產者
Kafka 能夠無縫支持多個生產者,無論客戶端在使用單個主題仍是多個主題。
多個消費者
Kafka 支持多個消費者從一個單獨的消息流上讀取數據,且消息之間互不影響。(這與其餘隊列系統不一樣,其餘隊列系統的消息一旦被一個客戶端讀取,其餘客戶端就沒法讀取它。另外,多個消費者能夠組成一個羣組,他們共享一個消息流,並保證整個羣組對每一個給定的消息只處理一次。)
基於磁盤的數據存儲
Kafka 容許消費者非實時地讀取消息,歸功於 kafka的數據保留特性。
消費者能夠在進行應用程序維護時離線一小段時間,無需擔憂消息丟失或者擁塞在生產者端。
伸縮性
Kafka 從一開始被設計成一個具備靈活伸縮性的系統。開發階段能夠先使用單個 broker,再擴展到包含3個broker的小型開發集羣...,一個包含多個broker的集羣,即便個別broker 失效,仍然能夠持續爲客戶提供服務。
高性能
經過橫向擴展生產者、消費者 和 broker, kafka 能夠處理巨大的消息流。在處理大量數據的同時,它能保證亞秒級的消息延遲。
數據生態系統
Kafka 爲數據生態系統帶來了循環系統,它在基礎設施的各個組件之間傳遞消息,爲全部客戶端提供一直的接口。
使用場景:
活動跟蹤
傳遞消息
默認配置
broker 配置
Kafka 發送包裏自帶配置樣本能夠安裝單機服務,並不能知足大多數安裝場景的要求。
主題配置
主題吞吐量 / 消費者吞吐量 = 分區個數
num.partitions : 默認把分區大小限制在 25 GB 之內,能夠獲得比較理想的效果。
log.retention.ms : 數據被保留時間。默認爲 168小時(一週),還有 log.retention.hours、log.retention.minutes
log.retention.bytes : 經過保留消息字節數來判斷消息是否過時。例:一個包含8個分區的主題,而且 log.retention.bytes 設置爲 1GB , 那麼這個主題最多能夠保留8GB的數據。
log.segment.bytes : 當日志片斷大小達到 log.segment.bytes 指定的上線(默認 1G)時,當前日誌片斷就會被關閉,一個新的日誌片斷被打開。
log.segment.ms : 控制日誌片斷關閉時間
message.max.bytes : broker 經過設置message.max.bytes 參數來限制單個消息的大小。默認值是 1 000 000 ,也就是 1M。若是生產者發送消息超過這個值,消息不會被接收,broker 還會返回 錯誤信息。
硬件性能
(磁盤吞吐量 和 容量、內存、網絡、CPU )
Kafka 集羣
簡單kafka集羣圖:
須要多少 broker ?
一、須要多少磁盤空間來保留數據,以及單個 broker 有多少空間可用。
若是 整個集羣須要保留 10TB 數據,每一個broker 能夠存儲 2TB,至少須要 5 個 broker。若是啓用了數據複製,至少還須要一倍的空間(取決於複製係數是多少)。
二、集羣處理請求的能力。與網絡接口處理客戶端流量能力有關。
broker 配置
要把一個 broker 加入集羣,須要配置兩個參數。
一、全部 broker 都必須配置相同的 zookeeper.connect ,指定了用於保存元數據的 Zookeeper 羣組和路徑。
二、每一個 broker 都必須爲 broker.id 參數設置惟一的值。若是 兩個 broker 使用相同的broker.id,那麼第二個 broker 就沒法啓動。
生產環境注意
垃圾回收器選項
Java 7 有 G1 垃圾回收器,.
MaxGcPauseMillis: 指定每次垃圾回收默認的停頓時間。值不固定,默認 200ms。
InitiatingHeapOccupancyPercent: 指定了G1啓動新一輪垃圾回收以前可使用的堆內存百分比,默認 45。即 堆內存使用率到達 45% 前,G1不會啓動垃圾回收。這個百分比包括 新生代 和 老年代的內存。
(例:若是一臺服務器有 64GB內存,而且使用 5GB 堆內存來運行 Kafka,
參考配置:MaxGcPauseMillis : 20ms;
InitiatingHeapOccupancyPercent : 35 ;
)
kafka 啓動腳本沒有啓用 G1回收器,使用了 Paraller New 和 CMS 垃圾回收器。
數據中心佈局
把集羣的 broker 安裝在不一樣機架上,不能讓他們共享可能出現單點故障的基礎設施。
共享zookeeper
Kafka 使用 zookeeper 來保存 broker、主題和分區的元數據信息。
Kafka 消費者 和 zookeeper
kafka 0.9.0.0 版本前,除 broker 外,消費者會使用 zookeeper 來保存一些信息,
kafka 0.9.0.0 版本後,kafka 引入了一個新的消費者接口,容許 broker 直接維護這些信息。
kafka 對 zookeeper 的延遲和超時比較敏感。
第三章: 生產者
概覽
Kafka 發送消息主要步驟,從建立一個 ProducerRecord 對象開始,對象須要包含目標主題和要發送的內容。還能夠指定鍵或分區。
發送 ProducerRecord 對象時,生產者先把鍵和值對象序列化成字節數組,這樣纔可在網絡上傳輸。
數據傳給分區器。如指定了分區,分區器直接把指定分區返回。如未指定,分區器會根據 ProducerRecord 對象的鍵來選擇一個分區。
分區後,生產者知道往哪一個分區發送信息,這條信息被添加到一個記錄批次裏,這個批次全部消息會被髮送到相同的主題和分區上。有獨立線程負責把這些記錄批次發送到相應 broker 上。
服務器收到消息會返回一個相應。若是消息成功寫入 Kafka ,就返回一個包含了 主題、分區信息、分區裏偏移量 的 RecordMetaData 對象。寫入失敗,返回錯誤。生產者收到錯誤後會嘗試從新發送,幾回後還失敗,就返回錯誤信息。
建立生產者
kafka寫消息,先要建立生產者對象,並設置一些屬性。有三個必選屬性:
bootstrap.servers :broker 地址清單,格式爲 host:port 。清單中不用包含全部 broker地址,生產者會從給定的broker裏查找到其餘broker信息。建議至少兩個,一旦其中一個宕機,生產者仍能鏈接到集羣上。
key.serializer : broker 接收的消息都是字節數組,生產者須要知道如何把這些java對象轉換成字節數組。key.serializer 必須被設置爲一個實現了 Serializer 接口的類。Kafka客戶端默認提供了 ByteArraySerializer、StringSerializer、IntegerSerializer。
value.serializer :與 key.serializer 同樣,value.serializer 指定類會將值序列化。
實例化生產者對象後,發送消息主要有三種方式:
發送並忘記 : 消息發給服務器,不關心是否正常到達。kafka高可用,生產者會自動嘗試重發。有時會丟失一些消息。
同步發送 :send() 方法發送消息,返回 Futrue 對象,調用 get() 方法等待。
kafkaProducer 通常會發生兩類錯誤。
1:可重試錯誤,可經過重發消息解決。例:鏈接錯誤、無主(no leader)錯誤。kafka 能夠配置成自動重試,若是屢次重試後仍沒法解決,程序會受到一個重試異常。
二、另類錯誤,沒法經過重試解決。例:「消息太大」 異常,不進行重試,一直拋出異常
異步發送 : send() 方法發送消息,指定一個回調函數,服務器在返回響應時調用該函數。
生產者配置
acks : 指定了必需要有多少個分區副本收到消息,生產者纔會認爲消息寫入時成功的。
acks = 0 :生產者在成功寫入消息前不會等待任何來自服務器的響應。
能夠以網絡可以支持的最大速度發送消息,從而達到很高的吞吐量。
acks = 1 : 只要集羣首領節點收到消息,生產者就會收到一個來自服務器的成功響應。
吞吐量取決於使用的是同步發送仍是異步發送。
acks = all :只有當全部參與複製的節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應。模式最安全。延遲最高。
buffer.memory : 設置生產者內存緩衝區大小。緩衝要發送到服務器的消息。(0.9.0.0 版本替換爲 max.block.ms)
compression.type :默認狀況,消息發送時不會被壓縮。該參數能夠設置爲 snappy、gzip、lz4。指定了消息發送給 broker以前使用哪一種壓縮算法。
snappy 壓縮佔用較少CPU,提供較好性能和壓縮比。比較關注 性能和網絡帶寬可用。
gzip 佔用較多CPU,若是網絡帶寬比較有限 可用。
retries :生產者能夠重發消息的次數。默認:生產者每次重試之間等待 100ms( 能夠經過 retry.backoff.ms 參數來改變這個時間間隔 ),建議設置 總的重試時間比kafka集羣從崩潰中恢復的時間長。通常狀況,生產者會自動進行重試,代碼中可不處理 可重試錯誤,只處理 不可重試錯誤 或 重試次數超出上限的狀況。
batch.size :當有多個消息須要被髮送到同一個分區時,生產者會把他們放在同一個批次裏。該參數指定了一個批次可使用的內存大小,按照字節數計算。
批次填滿即發送,半滿或一條消息也可能被髮送。批次大小設置很大,不會形成延遲,只會佔用更多內存。設置過小,生產者須要更頻繁地發送消息,會增長一些額外的開銷。
linger.ms :指定了生產者在發送批次以前等待更多消息加入批次的時間。
把 linger.ms 設置成比 0 大的數,雖然會增長延遲,但也會提高吞吐量。
client.id :能夠是任意字符串,服務器用它來識別消息來源,還能夠用在日誌和配額指標裏。
max.in.flight.requests.per.connection :指定了生產者在收到服務器響應以前能夠發送多少個消息。值越高,佔用越多內存,提高吞吐量。設爲 1 ,能夠保證消息是按照發送的順序寫入服務器的,即便發送了重試。
timeout.ms 、 request.timeout.ms 、metadata.fetch.timeout.ms :
request.timeout.ms : 生產者在發送數據時等待服務器返回響應的時間。
metadata.fetch.timeout.ms :生產者在獲取元數據時等待服務器返回響應的時間。若是等待響應超時,生產者要麼重試發送數據,要麼返回一個錯誤。
timeout.ms : 指定了broker 等待同步副本返回消息確認的時間,與 asks 的配置相匹配。若是指定時間內沒有收到同步副本的確認,那麼broker 就會返回一個錯誤。
max.block.ms :指定了調用 send() 方法或使用 partitionsFor() 方法獲取元數據時生產者的阻塞時間。當 生產者發送緩衝區滿 或 沒有可用元數據時,方法會阻塞。阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。
max.request.size :用於控制生產者發送的請求大小。指 單個消息最大值 或 單個請求全部消息總大小。
注: broker 對可接收的消息最大值也有本身限制(message.max.bytes),兩邊最好匹配。避免生產者發送的消息被 broker 拒絕。
receive.buffer.bytes 和 send.buffer.bytes :分別指定 TCP socket 接收和發送數據包的緩衝區大小。值爲 -1,就使用操做系統的默認值。若是 生產者或消費者 與 broker 處於不一樣的數據中心,那麼能夠適當增大這些值,由於跨數據中心的網絡通常都有比較高的延遲和比較低的帶寬。
注: 順序保證。kafka 能夠保證同一個分區裏的消息是有序的。
例:若是把 retries 設爲 非零整數,把 max.in.flight.requests.per.connection 設爲比 1 大的數。
若是第一批次消息寫入失敗,第二批次成功,broker重寫第一批次。如此時第一批次寫入成功,那麼兩個批次的順序就反過來了。
若是 retries 爲非零整數,把 max.in.flight.requests.per.connection 設爲 1,這樣生產者發送第一批消息時,就不會有其餘消息發送給 broker。這樣會嚴重哦影響生產者的吞吐量,只有在對消息順序有嚴格要求狀況才這麼作。
序列化器
已有序列化器和反序列化器 , 不如 JSON、Avro、Thrift、Protobuf
Apache Avro 序列化: 一種與編程語言無關的序列化格式。
注:寫入數據和讀取數據的 schema 必須是相互兼容的。
分區
ProducerRecord 對象包含目標主題、鍵、值。Kafka 消息是一個個鍵值對,ProducerRecord 對象 的鍵能夠設置爲默認的 null。
鍵有兩個用途:能夠做爲消息的附加信息,也能夠決定消息該被寫到主題的那個分區。擁有相同鍵的消息將被寫入用一個分區。
第四章: 消費者
消費者和消費者羣組
kafka 消費者 從屬於消費者羣組。一個羣組裏的消費者訂閱的是同一個主題,每一個消費者接收主題一部分分區的消息。
例: kafka 有 4個分區,若是隻要 1 個消費者- 一個消費者收到4個分區的消息。
若是有兩個消費者,每一個消費者收到兩個分區的消息。
若是有4個消費者,每一個消費者收到1個分區的消息。
若是有 5 個消費者,有4個消費者能夠收到消息,多出來的消費者會閒置,不會收到消息。
多個消費者羣組訂閱相同的主題,羣組互不影響。
消費者羣組和分區再均衡
羣組的消費者共同讀取主題的分區。一個新的消費者 加入羣組時,他讀取的是本來由其餘消費者讀取的消息。當某一消費者被關閉或發生崩潰,它就離開羣組,本來由它讀取的分區將由羣組裏的其餘消費者讀取。
在主題發送變化時,好比管理員添加了新的分區,會發生分區重分配。
分區的全部權從一個消費者轉移到另外一個消費者,這樣的行爲被稱爲 再均衡
在 再均衡期間,消費者沒法讀取消息,形成整個羣組一小段時間的不可用。
當分區被從新分配給另外一個消費者時,消費者當前的讀取狀態會丟失,他還須要去刷新緩存,在它從新恢復狀態以前,會拖慢應用程序。
消費者經過向被指派爲 羣組協調器 的 broker(不一樣羣組能夠有不一樣的協調器) 發送心跳 來維持和羣組的從屬關係 和 對分區的全部權關係。消費者會在輪詢消息 或提交偏移量時發送心跳。
若是消費者中止發送心跳時間過長,會話過時,羣組協調器認爲它死亡,就會觸發一次再均衡。
若是一個消費者崩潰,中止讀取消息,羣組協調器會等待幾秒鐘,確認它死亡了纔會觸發再均衡。這幾秒內,死掉的消費者不會讀取分區裏的消息。在清理消費者時,消費者會通知協調器它將要離開羣組,協調器會當即出發一次再均衡,儘可能下降處理停頓。
建立 Kafka 消費者
3個必要屬性: bootstrap.servers 、key.deserializer、value.deserializer
bootstrap.servers : kafka 集羣的鏈接字符串。(與KfakaProducer 中用途同樣)
key.deserializer 和 value.deserializer 與生產者的 serializer 定義也相似
group.id 非必須。 指定了kafkaConsumer 屬於哪個消費者羣組。
訂閱主題
subscribe() 方法
consumer.subscribe(Collections.singletonList("customerCountries"));
也能夠在 subscribe() 方法時傳入一個正則表達式。正則表達式能夠匹配多個主題,若是有人建立了新主題,且主題名字與正則表達式匹配,會當即觸發一次再均衡,消費者就能夠讀取新添加的主題。
例:訂閱全部與 test 相關的主題,能夠 consumer.subscribe(" test.* ");
輪詢
消息輪詢是消費者 API 核心,經過一個簡單的輪詢服務向服務器請求數據。
一旦消費者訂閱了主題,輪詢會處理全部的細節,包括羣組協調、分區再均衡、發送心跳和獲取數據。
輪詢不止是獲取數據 。在第一次調用新消費者的 poll() 方法時,它會查找 GroupCoordinator,而後加入羣組,接收分配的分區。
線程安全 -- 同一羣組裏,沒法讓 一個線程運行多個消費者,也沒法讓多個線程安全的共享一個消費者。若是要在同一個消費者羣組裏運行多個消費者,須要讓每一個消費者運行在本身的線程裏,最好把消費者邏輯封裝在本身的對象裏,而後使用Java 的 ExecutorService 啓動多個線程,使每一個消費者運行在本身的線程上。(參考 : https://www.confluent.io/blog/ )
消費者的配置
幾個配置屬性: bootstrap.servers 、group.id 、key.deserializer 、value.deserializer 。
幾個重要屬性:
fetch.min.bytes :消費者從服務器獲取記錄的最小字節數
fetch.max.wait.ms : broker 的等待時間,默認 500ms。
max.partition.fetch.bytes : 指定了服務器從每一個分區裏返回給消費者的最大字節數。默認 1MB,
session.timeout.ms :指定了消費者在被認爲死亡以前能夠與服務器斷開鏈接的時間,默認 3s。
auto.offset.reset : 指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的狀況下(因消費者長時間失效,包含偏移量的記錄已通過時並被刪除)該作何處理。默認是 latest (在偏移量無效的狀況下,消費者將從最新的記錄開始讀取數據)。另外一個值時 earliest,意爲:在偏移量無效的狀況下,消費者將從起始位置讀取分區的記錄。
enable.auto.commit :指定了 消費者是否自動提交偏移量,默認爲 true
partition.assignment.strategy : 分區策略
client.id :能夠是任意字符,broker 用它標識從客戶端發送過來的消息,常被用在日誌、度量指標和配額裏。
max.poll.records :用於控制單次調用 call() 方法可以返回的記錄數量。
receive.buffer.bytes 和 send.buffer.bytes :socket 在讀寫數據時用到的TCP 緩衝區也能夠設置大小。若是值爲 -1,即爲操做系統的默認值。
提交和偏移量
每次調用 poll() 方法,它老是返回由生產者寫入kafka 但尚未被消費者讀取過的記錄。
咱們把更新分區當前位置的操做叫作提交。
消費者如何提交偏移量?
消費者往一個叫作 _consumer_offset 的特殊主題發送消息,消息裏包含每一個分區的偏移量。若是消費者一直處於運行狀態,那麼偏移量就沒用。若是消費者發生崩潰或新消費者加入羣組,觸發在均衡。再均衡後消費者分到新分區,爲了能繼續以前工做,消費者須要讀取每一個分區最後一次提交的偏移量,而後從偏移量指定的地方繼續處理。
若是提交的偏移量小於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息就會被重複處理。
若是提交的偏移量大於客戶端處理的最後一個消息的偏移量,那麼助於兩個偏移量之間的消息會丟失。
自動提交
最簡單。enable.auto.commit = true,消費者自動把poll()方法接收到的最大偏移量提交上去。提交間隔由 auto.commit.interval.ms 控制,默認 5s。
這種方法沒法避免由於再均衡致使的 重複處理消息。
提交當前偏移量
auto.commit.offset = false,讓應用程序決定什麼時候提交偏移量。使用 commitSync() 提交偏移量。
注: commitSync() 將會提交由 poll() 返回的最新偏移量,在處理完全部記錄後要確保調用了 commitSync(),不然會有丟失消息的風險。若是發生再均衡,最近一批消息到發生再均衡之間的全部消息都將被重複處理。
異步提交
因爲手動提交的不足:【在broke 對提交請求做出迴應前,應用程序會一直阻塞,限制應用程序吞吐量。可經過下降提交頻率來提高吞吐量。若是發生再均衡,會增長重複消息的數量。】
使用異步提交,只管發送提交請求,無需等待broker 響應。
重試異步提交 : 使用一個單調遞增的序列號維護異步提交順序,每次提交偏移量後或在回調裏提交偏移量時遞增序列號。重試前對比回調序列號和即將提交的偏移量是否相等,相等 - 沒有新提交,可安全的進行重試,若是序列號比較大,說明有新提交已經發送,應該中止提交。
同步和異步組合提交
通常狀況下,偶爾提交失敗,不影響後續提交的成功。在關閉消費者或再均衡前的最後一次提交,須要確保可以提交成功。故:消費者關閉前會組合使用 commitAsync() 和 commitSync() 。
提交特定的偏移量
提交偏移量的頻率和處理消息批次的頻率是同樣的。
消費者 API 容許調用 commitSync() 和 commitAsync() 方法時傳入但願提交的分區和偏移量的 map。
再均衡監聽器
從特定偏移量處開始處理記錄
從分區起始位置開始讀取消息: seekToBeginning(Collection<TopicPartition> tp)
從分區末尾位置開始讀取消息: seekToEnd(Collection<TopicPartition> tp)
經過把偏移量和記錄保存到同一個外部系統來實現單次語義,結合使用 ConsumerRebalanceListener 和 seek () 方法來確保可以及時保存偏移量,並保證消費者老是可以從正確的位置開始讀取消息。
如何退出
肯定要退出循環,經過另外一個線程調用 consumer.wakeup() 方法。
若是循環運行在主線程,能夠在 ShutdownHook 中調用該方法。
反序列化器
生產者須要用 序列化器 把對象轉換成字節數組再發送給kafka。
消費者須要用 反序列化器 把從kafka接收到的字節數組轉換成 java對象。
生成消息使用的序列化器 與 讀取消息使用的反序列化器應該是一一對應的。
使用 Avro 和 schema 註冊表進行序列化和反序列化的優點: AvroSerializer 能夠保證寫入主題的數據與主題的 schema 是兼容的。
注: 不建議使用自定義序列化器和自定義反序列化器。它使生產者和消費者牢牢耦合在一塊兒。
獨立消費者
一個消費者從一個主題的全部分區或某個特定分區讀取數據。不須要消費者羣組和再均衡,不須要訂閱主題,取之 是 爲本身分配分區。
一個消費者能夠訂閱主題(並加入消費者羣組),或者爲本身分配分區,但不能同時作這兩件事。
第五章:深刻Kafka
話題 : Kafka 如何進行復制;
Kafka 如何處理來自生產者和消費者的請求;
Kafka 的存儲細節,好比文件格式 和 索引;
集羣成員關係
kafka 使用 zookeeper 來維護 集羣成員的信息。每一個broker 都有一個惟一的標識符,標識符可在配置文件中指定,也能夠自動生成。broker 啓動時,經過建立臨時節點把本身的id註冊到 Zookeeper。kafka 組件訂閱 Zookeeper 的 /broker/ids 路徑(broker 在 zookeeper 上的註冊路徑),當有 broker 加入集羣或退出集羣時,這些組件就能夠得到通知。
如要啓動另外一個具備相同 id 的 broker,會獲得錯誤。由於 zookeeper 裏已經有一個具備相同ID 的broker。
在 broker 停機、出現網絡分區或長時間垃圾回收停頓時,broker 會從 zookeeper 上斷開鏈接,broker 啓動時建立的臨時節點會自動從 zookeeper 上移除。監聽 broker 列表的 kafka 組件會被告知該 broker 已移除。
關閉 broker 時,對應的節點也會消失,在徹底關閉一個 broker以後,若是使用相同的 id 啓動另外一個全新的broker ,它會當即加入羣組,並擁有與舊broker 相同的分區和主題。
控制器
控制器就是一個broker ,具備普通broker 功能外,還負責分區首領的選舉。
集羣裏第一個啓動的broker 經過在 zookeeper 裏建立一個臨時節點 / controller 讓本身成爲控制器。其餘 broker 啓動時也嘗試建立這個節點,但會受到「節點已存在」 的異常。其餘控制器節點上建立 zookeeper watch 對象,這樣它們就能夠受到這個節點的變動通知。這種方式能夠確保集羣裏一次只有一個控制器存在。
若是控制器被關閉或者與zookeeper斷開鏈接,zookeeper上臨時節點消失。集羣上其餘 broker經過 watch對象獲得通知,會嘗試讓本身成爲新的控制器。第一個在 zookeeper裏成功建立控制器節點的broker 會成爲新的 控制器,其餘節點收到「節點已存在」的異常,而後再新控制器節點上再次建立 watch 對象。
總之: Kafka 使用 zookeeper 的臨時節點選舉控制器,在節點加入集羣或退出集羣時通知控制器。控制器負責在節點加入或離開集羣時進行分區首領選舉。控制器使用 epoch 來避免 「腦裂」。「腦裂」 指連個節點同時認爲本身是當前的控制器。
複製:
複製功能 是 kafka 架構核心。
kafka :一個分佈式的 、 可分區的 、可複製的提交日誌服務。
複製 是 關鍵 ,由於它能夠在個別節點失效時仍能保證kafka 的可用性和持久性。
kafka 使用主題來組織數據,每一個主題被分爲若干個分區,每一個分區有多個副本。副本被保存在 broker 上,每一個 broker 能夠保存成百上千個屬於不一樣主題和分區的副本。
副本類型:
首領副本 : 每一個分區都有一個首領副本。爲保證數據一致性,全部生產者請求和消費者請求都會通過這個副本。
跟隨者副本 :首領外副本都是跟隨者副本。不處理來自客戶端的請求,惟一任務是 從首領哪裏複製消息,保持與首領一直的狀態。若是首領崩潰,其中一個跟隨者會被提高爲新首領。
爲了與首領保持同步,跟隨者向首領發送獲取數據的請求,請求信息裏包含了跟隨者想要獲取消息的偏移量,偏移量老是有序的。
首領經過查看每一個跟隨者請求的最新偏移量,知道跟隨者複製的進度。若是跟隨者 10s 內沒有請求任何消息,或 在請求消息,但 10s 內沒有請求最新的數據,那麼它就會被認爲是不一樣步的。若是副本與首領不一樣步,首領失效時,它就不可能成爲新首領。
持續請求獲得的最新消息副本被稱爲同步的副本。
除當前首領外,每一個分區都有一個 首選首領 - 建立主題時選定的首領。默認 kafka的 auto.leader.rebalance.enable = true 。它會檢查首選首領 是否是當前首領,若是不是,而且該副本是同步的,那麼就會觸發首領選舉,讓首選首領稱爲當前首領。
處理請求
broker 大部分工做是處理客戶端、分區副本 和控制器發送給分區首領的請求。
全部的請求消息都包含一個標準消息頭:
Request type ( 即 API key)
Request version ( broker 能夠處理不一樣版本的客戶端請求,並根據客戶端版本做出不一樣的響應。 )
Correlation ID 一個具備惟一性的數字,用於標識請求消息,同時也出如今響應消息和錯誤日誌裏。
Client ID 用於標識發送請求的客戶端。
幾種常見的請求類型
生產請求 : 生產者發送的請求,包含客戶端要寫入 broker 的消息。
包含首領副本的broker 在收到生產請求時,會對請求作一些驗證。
一、發送數據的用戶是否有主題寫入權限
二、請求裏包含的 acks 值是否有效(只容許出現 0、一、all )
三、若是 acks = all ,是否有足夠多的同步副本保證消息已經安全被寫入。
獲取請求 : 在消費者和跟隨者副本須要從 broker 讀取消息時發送的請求。
首領收到請求時,先檢查請求是否有效。
例:指定偏移量在分區上是否存在?若是客戶端請求是已被刪除的數據,或者請求的偏移量不存在,那麼broker 將返回一個錯誤。如存在,broker 將按照客戶端指定的數量上限從分區裏讀取消息,再把消息返回給客戶端。
kafka 使用 零複製 技術向客戶端發送消息。 即 : kafka 直接把消息從文件裏發送到網絡通道,不須要通過任何中間緩衝區。避免了字節複製,也不用管理內存緩衝區,從而得到更好的性能。
生產請求和獲取請求都必須發送給分區的首領副本。
元數據請求 :用來獲取 客戶端該往哪裏發送請求。元數據請求包含了客戶端感興趣的主題列表。服務端的響應消息裏指明瞭這些主題所包含的分區、每一個分區都有哪些副本,以及哪一個副本是首領。元數據請求能夠發送給任意一個 broker,由於全部 broker 都緩存了這些信息。
其餘請求
物理存儲
kafka 基本存儲單元是分區。分區沒法在多個 broker 間進行再細分,也沒法在同一個broker的多個磁盤上再進行細分。
分區大小受到單個掛載點可用空間的限制。
分區分配 : 建立主題時,kafka 會決定如何在 broker 間分配分區。目標 -
在 broker 間平均地分佈分區副本。
確保每一個分區的每一個副本分佈在不一樣的broker 上。
若是爲 broker 指定了機架信息,那麼儘量把每一個分區的副本分配到不一樣機架的 broker 上。目的是 爲了保證一個機架的不可用不會致使總體的分區不可用。
注意:磁盤空間。在爲 broker 分配分區時並無考慮可用空間和工做負載問題,但在將分區分配到磁盤上時會考慮分區數量,不過不考慮分區大小。
文件管理:保留數據 是kafka的一個基本特性。把分區分紅若干個片斷。默認狀況下,每一個片斷包含 1GB 或一週的數據,以較小的那個爲準。在broker 往分區寫入數據時,若是達到片斷上限,就關閉當前文件,並打開一個新文件。
當前正在寫入的文件 叫 活躍片斷。該片斷永遠不會被刪除。
文件格式 : 咱們把 kafka 的消息和偏移量保存在文件裏。保存在磁盤上的數據格式與從生產者發送過來或者發送給消費者的消息格式是同樣的。
除了鍵、值、偏移量。消息裏還包含了消息大小、校驗、消息格式版本號、壓縮算法、時間戳。
若是生產者發送的是壓縮過的消息,那麼同一批次的消息會被壓縮在一塊兒,被當作 「包裝消息」 進行發送。
索引 : 消費者能夠從kafka 的任意可用偏移量位置開始讀取消息。爲了幫助 broker 更快地定位到指定的偏移量,kafka 爲每一個分區維護了一個索引。索引把偏移量映射到片斷文件和偏移量在文件裏的位置。
清理 :通常狀況下,kafka 會根據設置的時間保留數據,把超過期效的舊數據刪除掉。
清理的工做原理 :每一個日誌片斷能夠分爲兩個部分 -
乾淨的部分 - 消息以前被清理過,每一個鍵只有一個對應的值,這個值是上一次清理時保留下來的。
污濁的部分 - 消息是在上一次清理以後寫入的。
被刪除的事件 : 爲了完全把一個鍵從系統裏刪除,應用程序必須發送一個包含該鍵且值爲 null的消息。清理線程發現該消息,會先進性常規清理,只保留值爲null的消息。該消息(被稱爲 墓碑消息)會被保留一段時間,時間長短可配置。消費者往數據庫裏複製 kafka 的數據時,看到墓碑消息,就知道應該要把相關用戶信息從數據庫裏刪除。
什麼時候會清理出題 : 像 delete 策略不會刪除當前活躍的片斷同樣,compact 策略也不會對當前片斷進行清理。只有舊片斷裏的消息纔會被清理。
第十一章:流式處理
什麼是流式處理:
數據流 是無邊界數據集的抽象表示。無邊界意味着無限和持續增加。
例:信用卡交易、股票交易、包裹遞送 等等
除沒邊界外,事件流模型的其餘屬性:
事件流是有序的
不可變的數據記錄
事件流是可重播的
流式處理 指實時地處理一個或多個事件流。
對比:
請求 與 響應:延遲最小的一種範式,響應時間處於亞毫秒到毫秒之間,響應時間穩定。這種處理模式通常是阻塞的。 在數據庫領域,這種範式是線上交易處理(OLTP)。
批處理:高延遲和高吞吐量。處理系統按照設定時間啓動處理進程。在數據庫領域,是 數據倉庫(DWH) 或商業智能(BI) 系統
流處理:介於上二者之間的範式。
流 的定義不依賴任何一個特定的框架、API 或 特性。只要持續地從一個無邊界的數據集讀取數據,而後對它們進行處理並生成結果,那就是在進行流式處理。重點: 整個處理過程必須是持續的。
流式處理的一些概念
時間 : 最重要概念。通常包含的幾個時間概念:
事件時間: 指 所追蹤事件的發生時間和記錄的建立時間。
日誌追加時間: 指 時間保存到 broker 的時間
處理時間: 指 應用程序在收到事件以後要對其進行處理的時間。
注意: 時區問題。整個數據管道應該使用同一個時區。
狀態: 事件 與 事件 之間的信息被稱爲 「狀態」。包含的幾種類型的狀態:
本地狀態或內部狀態: 只能被單個應用程序實例訪問。優勢是 速度快,缺點是 受內存大小的限制。流式處理的不少設計模式都將數據拆分到多個子流,這樣就可使用有線的本地狀態來處理它們。
外部狀態: 使用外部的數據存儲來維護,通常使用 Nosql 系統。優點是 沒有大小的限制,缺點是 引入額外的系統會形成更大的延遲和複雜性。
流和表的二元性
流 是一系列事件,每一個事件就是一個變動。
表 是記錄的集合,包含了當前的狀態,是多個變動所產生的結果。
將 流 轉化爲 表,須要 」應用「 流裏所包含的全部變動,這也叫作流的 」物化「
時間窗口
大部分針對流的操做都是基於時間窗口的。
窗口的大小 。
窗口移動的頻率 :若是 ‘移動間隔’ 與窗口大小相等 爲 「滾動窗口」; 若是 窗口隨每條記錄移動, 爲 滑動窗口。
窗口的可更新時間多長。
流式處理的設計模式
單個事件處理:最基本模式。 可使用一個生產者 和 一個消費者來實現。
使用本地狀態:流式處理使用本地狀態須要解決 內存使用、持久化、再均衡。
使用外部查找 - 流和表的鏈接。
流 與 流的鏈接 。
亂序的事件 : 識別亂序的事件,規定一個時間段用於重排亂序的事件,具備在必定時間段內重排亂序事件的能力。具有更新結果的能力。
從新處理 。