Kakfa 是一個分佈式的基於發佈/訂閱模式的消息隊列(message queue),主要應用於大數據的實時處理領域。html
上面是傳統的消息隊列,好比一個用戶要註冊信息,當用戶信息寫入數據庫後,後面還有一些其餘流程,好比發送短信,則須要等這些流程處理完成後,再返回給用戶。而新式隊列,好比一個用戶註冊信息,數據直接丟進數據庫,就直接返回給用戶成功。數據庫
消息生產者發送消息到消息隊列中,而後消息消費者從隊列中取出而且消費消息,消息被消費後,隊列中不在存儲。因此消息消費者不可能消費到已經被消費的消息;隊列支持存在多個消費者,可是對於一個消息而言,只會 有一個消費者能夠消費;若是想發給多個消費者,則須要屢次發送該條消息。微信
消息生產者將消息發佈到 topic 中,同時有多個消息消費者(訂閱)消費該消息。和點對點的方式不一樣,發佈到 topic 的消息會被全部的訂閱者消費;可是數據保留是期限的,默認是 7 天,由於它不是存儲系統。Kafka 就是這種模式的。有兩種方式,一種是是消費者去主動去消費(拉取)消息,而不是生產者推送消息給消費者;另一種就是生產者主動推送消息給消費者,相似公衆號。網絡
Kafka 的基礎架構主要有 broker、生產者、消費者組構成,當前還包括 ZooKeeper。架構
生產者負責發送消息,broker 負責緩衝消息,broker 中能夠建立 topic,每一個 topic 又有 partition 和 replication 的概念。併發
消費者組負責處理消息,同一個消費者組的中消費者不能消費同一個 partition 中的數據。消費者組主要是提升消費能力,好比以前是一個消費者消費 100 條數據,如今是 2 個消費者消費 100 條數據,能夠提升消費能力。因此消費者組的消費者的個數要小於 partition 的個數,否則就會有消費者沒有 partition 能夠消費,形成資源的浪費。異步
注意:不一樣消費者組的消費者是能夠消費相同的 partition 數據。分佈式
Kakfa 若是要組件集羣,則只須要註冊到一個 ZooKeeper 中就能夠了,ZooKeeper 中還保留消息消費的進度或者說偏移量或者消費位置。性能
爲何要改?測試
主要是考慮到頻繁更改偏移量,對 ZooKeeper 的壓力較大,並且 Kafka 自己本身的處理也較複雜。
1)Kafka 的安裝只須要解壓安裝包就能夠完成安裝。
tar -zxvf kafka_2.11-2.1.1.tgz -C /usr/local/
2)查看配置文件。
[root@es1 config]# pwd /usr/local/kafka/config [root@es1 config]# ll total 84 -rw-r--r--. 1 root root 906 Feb 8 2019 connect-console-sink.properties -rw-r--r--. 1 root root 909 Feb 8 2019 connect-console-source.properties -rw-r--r--. 1 root root 5321 Feb 8 2019 connect-distributed.properties -rw-r--r--. 1 root root 883 Feb 8 2019 connect-file-sink.properties -rw-r--r--. 1 root root 881 Feb 8 2019 connect-file-source.properties -rw-r--r--. 1 root root 1111 Feb 8 2019 connect-log4j.properties -rw-r--r--. 1 root root 2262 Feb 8 2019 connect-standalone.properties -rw-r--r--. 1 root root 1221 Feb 8 2019 consumer.properties -rw-r--r--. 1 root root 4727 Feb 8 2019 log4j.properties -rw-r--r--. 1 root root 1925 Feb 8 2019 producer.properties -rw-r--r--. 1 root root 6865 Jan 16 22:00 server-1.properties -rw-r--r--. 1 root root 6865 Jan 16 22:00 server-2.properties -rw-r--r--. 1 root root 6873 Jan 16 03:57 server.properties -rw-r--r--. 1 root root 1032 Feb 8 2019 tools-log4j.properties -rw-r--r--. 1 root root 1169 Feb 8 2019 trogdor.conf -rw-r--r--. 1 root root 1023 Feb 8 2019 zookeeper.properties
3)修改配置文件 server.properties。
設置 broker.id 這個是 Kafka 集羣區分每一個節點的惟一標誌符。
4)設置 Kafka 的數據存儲路徑。
注意:這個目錄下不能有其餘非 Kafka 目錄,否則會致使 Kafka 集羣沒法啓動。
5)設置是否能夠刪除 topic,默認 Kafka 的 topic 是不容許刪除的。
6)Kafka 的數據保留的時間,默認是 7 天。
7)Log 文件最大的大小,若是 log 文件超過 1 G 會建立一個新的文件。
8)Kafka 鏈接的 ZooKeeper 的地址和鏈接 Kafka 的超時時間。
9)默認的 partition 的個數。
Kafka 只能單節點啓動,因此每一個 Kakfa 節點都須要手動啓動,下面的方式阻塞的方式啓動。
1)查看當前 Kafka 集羣已有的 topic。
注意:這裏鏈接的 ZooKeeper,而不是鏈接的 Kafka。
2)建立 topic,指定分片和副本個數。
說明:replication-factor 副本數,replication-factor 分區數,topic 主題名。
若是當前 Kafka 集羣只有 3 個 broker 節點,則 replication-factor 最大就是 3 了,下面的例子建立副本爲 4,則會報錯。
3)刪除 topic。
4) 查看 topic 信息。
1.7啓動生產者生產消息,Kafka 自帶一個生產者和消費者的客戶端
這裏咱們啓動 2 個消費者來測試一下。
說明:若是不指定的消費者組的配置文件的話,默認每一個消費者都屬於不一樣的消費者組。
Kafka 不能保證消息的全局有序,只能保證消息在 partition 內有序,由於消費者消費消息是在不一樣的 partition 中隨機的。
Kafka 中的消息是以 topic 進行分類的,生產者生成消息、消費者消費消息都面向 topic。
Topic 是一個邏輯上的概念,而 partition 是物理上的概念。每一個 partition 又有副本的概念。每一個 partition 對應於一個 log 文件,該 log 文件中存儲的就是生產者生成的數據,生產者生成的數據會不斷的追加到該 log 的文件末端,且每條數據都有本身的 offset,消費者都會實時記錄本身消費到了那個 offset,以便出錯的時候從上次的位置繼續消費,這個 offset 就保存在 index 文件中。Kafka 的 offset 是分區內有序的,可是在不一樣分區中是無順序的,Kafka 不保證數據的全局有序。
因爲生產者生產的消息會不斷追加到 log 文件的末尾,爲防止 log 文件過大致使數據定位效率低下,Kafka 採用分片和索引的機制,將每一個 partition 分爲多個 segment,每一個 segment 對應2個文件 — index 文件和 log 文件,這 2 個文件位於一個相同的文件夾下,文件夾的命名規則爲:topic 名稱 + 分區序號。
Index 和 log 的文件的文件名是當前這個索引是最小的數據的 offset。Kafka 如何快速的消費數據呢?
Index 文件中存儲的數據的索引信息,第一列是 offset,第二列這這個數據所對應的 log 文件中的偏移量,就像咱們去讀文件,使用 seek() 設置當前鼠標的位置同樣,能夠更快的找到數據。
若是要去消費 offset 爲 3 的數據,首先經過二分法找到數據在哪一個 index 文件中,而後在經過 index 中 offset 找到數據在 log 文件中的 offset;這樣就能夠快速的定位到數據,並消費。
因此,Kakfa 雖然把數據存儲在磁盤中,可是他的讀取速度仍是很是快的。
Kafka 的 partition 分區的做用
Kafka 分區的緣由主要就是提供併發提升性能,由於讀寫是 partition 爲單位讀寫的;那生產者發送消息是發送到哪一個 partition 中呢?
爲保證生產者發送的數據,能可靠的發送到指定的 topic,topic 的每一個 partition 收到生產者發送的數據後,都須要向生產者發送 ack(確認收到),若是生產者收到 ack,就會進行下一輪的發送,不然從新發送數據。
確保 follower 和 leader 同步完成,leader 在發送 ack 給生產者,這樣才能確保 leader 掛掉以後,能在 follower 中選舉出新的 leader 後,數據不會丟失。
採用第二種方案後,設想如下場景:leader 收到數據,全部的 follower 都開始同步數據,可是有一個 follower 由於某種故障,一直沒法完成同步,那 leader 就要一直等下,直到他同步完成,才能發送 ack。這樣就很是影響效率,這個問題怎麼解決?
Leader 維護了一個動態的 ISR 列表(同步副本的做用),只須要這個列表的中的 follower 和 leader 同步;當 ISR 中的 follower 完成數據的同步以後,leader 就會給生產者發送 ack,若是 follower 長時間未向 leader 同步數據,則該 follower 將被剔除ISR,這個時間閾值也是自定義的;一樣 leader 故障後,就會從 ISR 中選舉新的 leader。
首先通訊的時間要快,要和 leader 要能夠很快的完成通訊,這個時間默認是 10s
而後就看 leader 數據差距,消息條數默認是 10000 條(後面版本被移除)
爲何移除:由於 Kafka 發送消息是批量發送的,因此會一瞬間 leader 接受完成,可是 follower 尚未拉取,因此會頻繁踢出和加入ISR,這個數據會保存到 ZooKeeper 和內存中,因此會頻繁更新 ZooKeeper 和內存。
可是對於某些不過重要的數據,對數據的可靠性要求不是很高,可以容忍數據的少許丟失,因此不必等 ISR 中的 follower 所有接受成功。因此 Kafka 爲用戶提供了三種可靠性級別,用戶能夠根據可靠性和延遲進行權衡,這個設置在kafka的生成中設置:ack 參數設置。
生產者不等 ack,只管往 topic 丟數據就能夠了,這個丟數據的機率很是高。
leader 落盤後就會返回 ack,會有數據丟失的現象,若是 leader 在同步完成後出現故障,則會出現數據丟失。
leader 和 follower(ISR)落盤纔會返回 ack,會有數據重複現象,若是在 leader 已經寫完成,且 follower 同步完成,可是在返回ack的出現故障,則會出現數據重複現象;極限狀況下,這個也會有數據丟失的狀況,好比 follower 和 leader 通訊都很慢,因此 ISR 中只有一個 leader 節點,這個時候,leader 完成落盤,就會返回 ack,若是此時 leader 故障後,就會致使丟失數據。
LEO:指每一個 follower 的最大的 offset;
HW(高水位):指消費者能見到的最大的 offset,LSR 隊列中最小的 LEO,也就是說消費者只能看到1~6的數據,後面的數據看不到,也消費不了。
避免 leader 掛掉後,好比當前消費者消費8這條數據後,leader 掛了,此時好比 f2 成爲 leader,f2 根本就沒有9這條數據,那麼消費者就會報錯,因此設計了 HW 這個參數,只暴露最少的數據給消費者,避免上面的問題。
follower 發生故障後會被臨時提出 LSR,待該 follower 恢復後,follower 會讀取本地的磁盤記錄的上次的 HW,並將該 log 文件高於 HW 的部分截取掉,從 HW 開始向 leader 進行同步,等該 follower 的 LEO 大於等於該 partition 的 hw,即 follower 追上leader後,就能夠從新加入 LSR。
leader 發生故障後,會從 ISR 中選出一個新的 leader,以後,爲了保證多個副本之間的數據一致性,其他的 follower 會先將各自的 log 文件高於 hw 的部分截掉(新 leader 本身不會截掉),而後重新的 leader 同步數據。
注意:這個是爲了保證多個副本間的數據存儲的一致性,並不能保證數據不丟失或者不重複。
ack 設置爲 -1,則能夠保證數據不丟失,可是會出現數據重複(at least once)
ack 設置爲 0,則能夠保證數據不重複,可是不能保證數據不丟失(at most once)
可是若是魚和熊掌兼得,該怎麼辦?這個時候就就引入了 Exact once(精準一次)。
在 0.11 版本後,引入冪等性解決 Kakfa 集羣內部的數據重複,在 0.11 版本以前,在消費者處本身作處理。若是啓用了冪等性,則 ack 默認就是-1,Kafka 就會爲每一個生產者分配一個 pid,並未每條消息分配 seqnumber,若是 pid、partition、seqnumber 三者同樣,則 Kafka 認爲是重複數據,就不會落盤保存;可是若是生產者掛掉後,也會出現有數據重複的現象;因此冪等性解決在單次會話的單個分區的數據重複,可是在分區間或者跨會話的是數據重複的是沒法解決的。
消息隊列有兩種消費消息的方式,push(微信公衆號)、pull(kafka)。push 模式很難適應消費速率不一樣的消費者,由於消費發送速率是由 broker 決定的,他的目標是儘量以最快的的速度傳遞消息,可是這樣很容易形成消費者來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 的方式能夠消費者的消費能力以適當的速率消費消息。
pull 模式的不足之處是若是 Kafka 沒有數據,消費者可能會陷入死循環,一直返回空數據,針對這一點,Kafka 消費者在消費數據時候回傳遞一個 timeout 參數,若是當時沒有數據可供消費,消費者會等待一段時間在返回。
一個消費者組有多個消費者,一個 topic 有多個 partition。因此必然會涉及到 partition 的分配問題,即肯定哪一個 partition 由哪一個消費者來消費。Kafka 提供兩種方式,一種是輪詢(RountRobin)對於 topic 組生效,一種是(Range)對於單個topic生效
輪詢:前置條件是須要一個消費者裏的消費者訂閱的是相同的 topic。否則就會出現問題;非默認的的方式。
同一個消費者組裏的消費者不能同時消費同一個分區,好比三個消費者消費一個 topic 的 9 個分區。
若是一個消費者組裏有2個消費者,這個消費者組裏同時消費 2 個 topic,每一個 topic 又有三個 partition。首先會把 2 個 topic 當作一個主題,而後根據 topic 和 partition 作 hash,而後在按照 hash 排序。而後輪詢分配給一個消費者組中的 2 個消費者。
若是是下面這樣的方式訂閱的呢?
好比有 3 個 topic,每一個 topic 有 3 個 partition,一個消費者組中有 2 個消費者。消費者1訂閱 topic1 和 topic2,消費者2訂閱 topic2 和 topic3。那麼這樣的場景,使用輪訓的方式訂閱 topic 就會有問題。
若是是下面這種方式訂閱呢?
好比有2個 topic,每一個 topic 有3個 partition,一個消費者組 有2個消費者,消費者1訂閱 topic1,消費者2訂閱 topic2,這樣使用輪訓的方式訂閱 topic 也會有問題。
因此咱們一直強調,使用輪訓的方式訂閱 topic 的前提是一個消費者組中的全部消費者訂閱的主題是同樣的;因此輪詢的方式不是 Kafka 默認的方式;Range 是按照單個 topic 來劃分的,默認的分配方式。
Range 的問題會出現消費者數據不均衡的問題。好比下面的例子,一個消費者組訂閱了 2 個 topic,就會出現消費者1消費 4 個 partition,而另一個消費者只消費 2 個 partition。
分區策略何時會觸發呢?當消費者組裏的消費者個數變化的時候,會觸發分區策略調整,好比消費者裏增長消費者,或者減小消費者。
因爲消費者在消費過程當中可能會出現斷電宕機等故障,消費者恢復後,須要從故障前的位置繼續消費,因此消費者須要實施記錄本身消費哪一個 offset,以便故障恢復後繼續消費。
Offset保存的位置有2個,一個 ZooKeeper,一個是 Kafka。首先看下 offset 保存到 ZooKeeper,由消費者組、topic、partition 三個元素肯定惟一的 offset。
因此消費者組中的某個消費者掛掉以後,或者的消費者仍是能夠拿到這個 offset。
Controller 這個節點和 ZooKeeper 通訊,同步數據,這個節點就是誰先起來,誰就先註冊 controller,誰就是 controller。其餘節點和 controller 信息保持同步。
修改消費者組 id
啓動一個消費者發送 3 條數據。
指定消費者組啓動消費者,啓動三個消費者,能夠看到每一個消費者消費了一條數據。
在演示下不一樣組能夠消費同一個 topic 的,咱們看到 2 個消費者的消費者都消費到同一條數據。再次啓動一個消費者,這個消費者屬於另一個消費者組。
多節點並行操做。
Kafka 的 producer 生產數據,要寫入到 log 文件中,寫的過程當中一直追加到文件末尾,爲順序寫,官網有數據代表。一樣的磁盤,順序寫能到 600M/S,而隨機寫只有 100K/S。這與磁盤的機械結構有關,順序寫之因此快,是由於其省去了大量磁頭尋址的時間。
正常狀況下,先把數據讀到內核空間,在從內核空間把數據讀到用戶空間,而後在調操做系統的 IO 接口寫到內核空間,最終在寫到硬盤中。
Kafka 是這樣作的,直接在內核空間流轉 IO 流,因此 Kafka 的性能很是高。
Kafka 集羣中有一個 broker 會被選舉爲 controller,負責管理集羣 broker 的上下線,全部的 topic 的分區副本分配和 leader 選舉等工做。
做者:bainianminguo
原文:https://www.cnblogs.com/baini...