一、如何獲取 topic 主題的列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
二、生產者和消費者的命令行是什麼?
生產者在主題上發佈消息:
bin/kafka-console-producer.sh --broker-list 192.168.43.49:9092 --topicHello-Kafka
注意這裏的 IP 是 server.properties 中的 listeners 的配置。接下來每一個新行就是輸入一條新消息
消費者接受消息:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topicHello-Kafka --from-beginning
三、consumer 是推仍是拉?
Kafka 最初考慮的問題是,customer 應該從 brokes 拉取消息仍是 brokers 將消息推送到 consumer,也就是 pull 還 push。在這方面,Kafka 遵循了一種大部分消息系統共同的傳統的設計:producer 將消息推送到 broker,consumer 從broker 拉取消息。
一些消息系統好比 Scribe 和 Apache Flume 採用了 push 模式,將消息推送到下游的 consumer。這樣作有好處也有壞處:由 broker 決定消息推送的速率,對於不一樣消費速率的 consumer 就不太好處理了。消息系統都致力於讓 consumer 以最大的速率最快速的消費消息,但不幸的是,push 模式下,當 broker 推送的速率遠大於 consumer 消費的速率時,consumer 恐怕就要崩潰了。最終 Kafka 仍是選取了傳統的 pull 模式。
Pull 模式的另一個好處是 consumer 能夠自主決定是否批量的從 broker 拉取數據。Push 模式必須在不知道下游 consumer 消費能力和消費策略的狀況下決定是當即推送每條消息仍是緩存以後批量推送。若是爲了不 consumer 崩潰而採用較低的推送速率,將可能致使一次只推送較少的消息而形成浪費。Pull 模式下,consumer 就能夠根據本身的消費能力去決定這些策略。
Pull 有個缺點是,若是 broker 沒有可供消費的消息,將致使 consumer 不斷在循環中輪詢,直到新消息到 t 達。爲了不這點,Kafka 有個參數可讓 consumer阻塞知道新消息到達(固然也能夠阻塞知道消息的數量達到某個特定的量這樣就能夠批量發送)。
四、講講 kafka 維護消費狀態跟蹤的方法
大部分消息系統在 broker 端的維護消息被消費的記錄:一個消息被分發到consumer 後 broker 就立刻進行標記或者等待 customer 的通知後進行標記。這樣也能夠在消息在消費後立馬就刪除以減小空間佔用。
可是這樣會不會有什麼問題呢?若是一條消息發送出去以後就當即被標記爲消費過的,一旦 consumer 處理消息時失敗了(好比程序崩潰)消息就丟失了。爲了解決這個問題,不少消息系統提供了另一個個功能:當消息被髮送出去以後僅僅被標記爲已發送狀態,當接到 consumer 已經消費成功的通知後才標記爲已被消費的狀態。這雖然解決了消息丟失的問題,但產生了新問題,首先若是 consumer處理消息成功了可是向 broker 發送響應時失敗了,這條消息將被消費兩次。第二個問題時,broker 必須維護每條消息的狀態,而且每次都要先鎖住消息而後更改狀態而後釋放鎖。這樣麻煩又來了,且不說要維護大量的狀態數據,好比若是消息發送出去但沒有收到消費成功的通知,這條消息將一直處於被鎖定的狀態,Kafka 採用了不一樣的策略。Topic 被分紅了若干分區,每一個分區在同一時間只被一個 consumer 消費。這意味着每一個分區被消費的消息在日誌中的位置僅僅是一個簡單的整數:offset。這樣就很容易標記每一個分區消費狀態就很容易了,僅僅須要一個整數而已。這樣消費狀態的跟蹤就很簡單了。
這帶來了另一個好處:consumer 能夠把 offset 調成一個較老的值,去從新消費老的消息。這對傳統的消息系統來講看起來有些難以想象,但確實是很是有用的,誰規定了一條消息只能被消費一次呢?
五、講一下主從同步
Kafka容許topic的分區擁有若干副本,這個數量是能夠配置的,你能夠爲每一個topci配置副本的數量。Kafka會自動在每一個個副本上備份數據,因此當一個節點down掉時數據依然是可用的。
mysql
Kafka的副本功能不是必須的,你能夠配置只有一個副本,這樣其實就至關於只有一份數據。
程序員
建立副本的單位是topic的分區,每一個分區都有一個leader和零或多個followers.全部的讀寫操做都由leader處理,通常分區的數量都比broker的數量多的多,各分區的leader均勻的分佈在brokers中。全部的followers都複製leader的日誌,日誌中的消息和順序都和leader中的一致。flowers向普通的consumer那樣從leader那裏拉取消息並保存在本身的日誌文件中。 許多分佈式的消息系統自動的處理失敗的請求,它們對一個節點是否 着(alive)」有着清晰的定義。Kafka判斷一個節點是否活着有兩個條件:
面試
- 節點必須能夠維護和ZooKeeper的鏈接,Zookeeper經過心跳機制檢查每一個節點的鏈接。
- 若是節點是個follower,他必須能及時的同步leader的寫操做,延時不能過久。
符合以上條件的節點準確的說應該是「同步中的(in sync)」,而不是模糊的說是「活着的」或是「失敗的」。Leader會追蹤全部「同步中」的節點,一旦一個down掉了,或是卡住了,或是延時過久,leader就會把它移除。至於延時多久算是「過久」,是由參數replica.lag.max.messages決定的,怎樣算是卡住了,怎是由參數replica.lag.time.max.ms決定的。
spring
只有當消息被全部的副本加入到日誌中時,纔算是「committed」,只有committed的消息纔會發送給consumer,這樣就不用擔憂一旦leader down掉了消息會丟失。Producer也能夠選擇是否等待消息被提交的通知,這個是由參數request.required.acks決定的。sql
Kafka保證只要有一個「同步中」的節點,「committed」的消息就不會丟失。
數據庫
Leader的選擇
Kafka的核心是日誌文件,日誌文件在集羣中的同步是分佈式數據系統最基礎的要素。
緩存
若是leaders永遠不會down的話咱們就不須要followers了!一旦leader down掉了,須要在followers中選擇一個新的leader.可是followers自己有可能延時過久或者crash,因此必須選擇高質量的follower做爲leader.必須保證,一旦一個消息被提交了,可是leader down掉了,新選出的leader必須能夠提供這條消息。大部分的分佈式系統採用了多數投票法則選擇新的leader,對於多數投票法則,就是根據全部副本節點的情況動態的選擇最適合的做爲leader.Kafka並非使用這種方法。
安全
Kafaka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每一個節點讀取並追加到日誌中了,纔回通知外部這個消息已經被提交了。所以這個集合中的任何一個節點隨時均可以被選爲leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就能夠容許在f個節點down掉的狀況下不會丟失消息並正常提供服。ISR的成員是動態的,若是一個節點被淘汰了,當它從新達到「同步中」的狀態時,他能夠從新加入ISR.這種leader的選擇方式是很是快速的,適合kafka的應用場景。
bash
一個邪惡的想法:若是全部節點都down掉了怎麼辦?Kafka對於數據不會丟失的保證,是基於至少一個節點是存活的,一旦全部節點都down了,這個就不能保證了。
服務器
實際應用中,當全部的副本都down掉時,必須及時做出反應。能夠有如下兩種選擇:
- 等待ISR中的任何一個節點恢復並擔任leader。
- 選擇全部節點中(不僅是ISR)第一個恢復的節點做爲leader.
這是一個在可用性和連續性之間的權衡。若是等待ISR中的節點恢復,一旦ISR中的節點起不起來或者數據都是了,那集羣就永遠恢復不了了。若是等待ISR意外的節點恢復,這個節點的數據就會被做爲線上數據,有可能和真實的數據有所出入,由於有些數據它可能還沒同步到。Kafka目前選擇了第二種策略,在將來的版本中將使這個策略的選擇可配置,能夠根據場景靈活的選擇。
這種窘境不僅Kafka會遇到,幾乎全部的分佈式數據系統都會遇到。
副本管理
以上僅僅以一個topic一個分區爲例子進行了討論,但實際上一個Kafka將會管理成千上萬的topic分區.Kafka儘可能的使全部分區均勻的分佈到集羣全部的節點上而不是集中在某些節點上,另外主從關係也儘可能均衡這樣每一個幾點都會擔任必定比例的分區的leader.
優化leader的選擇過程也是很重要的,它決定了系統發生故障時的空窗期有多久。Kafka選擇一個節點做爲「controller」,當發現有節點down掉的時候它負責在游泳分區的全部節點中選擇新的leader,這使得Kafka能夠批量的高效的管理全部分區節點的主從關係。若是controller down掉了,活着的節點中的一個會備切換爲新的controller.
六、爲何須要消息系統,mysql 不能知足需求嗎?
1.解耦:
容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。
2.冗餘:
消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。許多消息隊列所採用的」插入-獲取-刪除」範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
3.擴展性:
由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的,只要另外增長處理過程便可。
4.靈活性 & 峯值處理能力:
在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見。若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。
5.可恢復性:
系統的一部分組件失效時,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。
6.順序保證:
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。(Kafka 保證一個 Partition 內的消息的有序性)
7.緩衝:
有助於控制和優化數據流通過系統的速度,解決生產消息和消費消息的處理速度不一致的狀況。
8.異步通訊:
不少時候,用戶不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。
七、Zookeeper 對於 Kafka 的做用是什麼?
Zookeeper 是一個開放源碼的、高性能的協調服務,它用於 Kafka 的分佈式應用。
Zookeeper 主要用於在集羣中不一樣節點之間進行通訊
在 Kafka 中,它被用於提交偏移量,所以若是節點在任何狀況下都失敗了,它都
能夠從以前提交的偏移量中獲取
除此以外,它還執行其餘活動,如: leader 檢測、分佈式同步、配置管理、識別新
節點什麼時候離開或鏈接、集羣、節點實時狀態等等。
八、數據傳輸的事務定義有哪三種?
和 MQTT 的事務定義同樣都是 3 種。
(1)最多一次: 消息不會被重複發送,最多被傳輸一次,但也有可能一次不傳輸
(2)最少一次: 消息不會被漏發送,最少被傳輸一次,但也有可能被重複傳輸.
(3)精確的一次(Exactly once): 不會漏傳輸也不會重複傳輸,每一個消息都傳輸被一次並且僅僅被傳輸一次,這是你們所指望的
九、Kafka 判斷一個節點是否還活着有那兩個條件?
(1)節點必須能夠維護和 ZooKeeper 的鏈接,Zookeeper 經過心跳機制檢查每一個節點的鏈接
(2)若是節點是個 follower,他必須能及時的同步 leader 的寫操做,延時不能過久
十、Kafka 與傳統 MQ 消息系統之間有三個關鍵區別
(1).Kafka 持久化日誌,這些日誌能夠被重複讀取和無限期保留
(2).Kafka 是一個分佈式系統:它以集羣的方式運行,能夠靈活伸縮,在內部經過複製數據提高容錯能力和高可用性
(3).Kafka 支持實時的流式處理
十一、講一講 kafka 的 ack 的三種機制
request.required.acks 有三個值 0 1 -1(all)
0:生產者不會等待 broker 的 ack,這個延遲最低可是存儲的保證最弱當 server 掛掉的時候就會丟數據。
1:服務端會等待 ack 值 leader 副本確認接收到消息後發送 ack 可是若是 leader掛掉後他不確保是否複製完成新 leader 也會致使數據丟失。
-1(all):服務端會等全部的 follower 的副本受到數據後纔會受到 leader 發出的ack,這樣數據不會丟失
十二、消費者如何不自動提交偏移量,由應用提交?
將 auto.commit.offset 設爲 false,而後在處理一批消息後 commitSync() 或者異步提交 commitAsync()
即:
ConsumerRecords<> records = consumer.poll();
for (ConsumerRecord<> record : records){
。。。
tyr{consumer.commitSync()
}
。。。
}複製代碼
1三、消費者故障,出現活鎖問題如何解決?
出現「活鎖」的狀況,是它持續的發送心跳,可是沒有處理。爲了預防消費者在這種狀況下一直持有分區,咱們使用 max.poll.interval.ms 活躍檢測機制。 在此基礎上,若是你調用的 poll 的頻率大於最大間隔,則客戶端將主動地離開組,以便其餘消費者接管該分區。 發生這種狀況時,你會看到 offset 提交失敗(調用commitSync()引起的 CommitFailedException)。這是一種安全機制,保障只有活動成員可以提交 offset。因此要留在組中,你必須持續調用 poll。
消費者提供兩個配置設置來控制 poll 循環:
max.poll.interval.ms:增大 poll 的間隔,能夠爲消費者提供更多的時間去處理返回的消息(調用 poll(long)返回的消息,一般返回的消息都是一批)。缺點是此值越大將會延遲組從新平衡。
max.poll.records:此設置限制每次調用 poll 返回的消息數,這樣能夠更容易的預測每次 poll 間隔要處理的最大值。經過調整此值,能夠減小 poll 間隔,減小從新平衡分組的
對於消息處理時間不可預測地的狀況,這些選項是不夠的。 處理這種狀況的推薦方法是將消息處理移到另外一個線程中,讓消費者繼續調用 poll。 可是必須注意確保已提交的 offset 不超過實際位置。另外,你必須禁用自動提交,並只有在線程完成處理後才爲記錄手動提交偏移量(取決於你)。 還要注意,你須要 pause 暫停分區,不會從 poll 接收到新消息,讓線程處理完以前返回的消息(若是你的處理能力比拉取消息的慢,那建立新線程將致使你機器內存溢出)。
1四、如何控制消費的位置
kafka 使用 seek(TopicPartition, long)指定新的消費位置。用於查找服務器保留的最先和最新的 offset 的特殊的方法也可用(seekToBeginning(Collection) 和seekToEnd(Collection))
1五、kafka 分佈式(不是單機)的狀況下,如何保證消息的順序消費?
Kafka 分佈式的單位是 partition,同一個 partition 用一個 write ahead log 組織,因此能夠保證 FIFO 的順序。不一樣 partition 之間不能保證順序。可是絕大多數用戶均可以經過 message key 來定義,由於同一個 key 的 message 能夠保證只發送到同一個 partition。
Kafka 中發送 1 條消息的時候,能夠指定(topic, partition, key) 3 個參數。
partiton 和 key 是可選的。若是你指定了 partition,那就是全部消息發往同 1個 partition,就是有序的。而且在消費端,Kafka 保證,1 個 partition 只能被1 個 consumer 消費。或者你指定 key(好比 order id),具備同 1 個 key 的全部消息,會發往同 1 個 partition。
1六、kafka 的高可用機制是什麼?
這個問題比較系統,回答出 kafka 的系統特色,leader 和 follower 的關係,消息讀寫的順序便可。
1七、kafka 如何減小數據丟失
Kafka到底會不會丟數據(data loss)? 一般不會,但有些狀況下的確有可能會發生。下面的參數配置及Best practice列表能夠較好地保證數據的持久性(固然是trade-off,犧牲了吞吐量)。筆者會在該列表以後對列表中的每一項進行討論,有興趣的同窗能夠看下後面的分析。
- block.on.buffer.full = true
- acks = all
- retries = MAX_VALUE
- max.in.flight.requests.per.connection = 1
- 使用KafkaProducer.send(record, callback)
- callback邏輯中顯式關閉producer:close(0)
- unclean.leader.election.enable=false
- replication.factor = 3
- min.insync.replicas = 2
- replication.factor > min.insync.replicas
- enable.auto.commit=false
- 消息處理完成以後再提交位移
給出列表以後,咱們從兩個方面來探討一下數據爲何會丟失:
1. Producer端
目前比較新版本的Kafka正式替換了Scala版本的old producer,使用了由Java重寫的producer。新版本的producer採用異步發送機制。KafkaProducer.send(ProducerRecord)方法僅僅是把這條消息放入一個緩存中(即RecordAccumulator,本質上使用了隊列來緩存記錄),同時後臺的IO線程會不斷掃描該緩存區,將知足條件的消息封裝到某個batch中而後發送出去。顯然,這個過程當中就有一個數據丟失的窗口:若IO線程發送以前client端掛掉了,累積在accumulator中的數據的確有可能會丟失。
Producer的另外一個問題是消息的亂序問題。假設客戶端代碼依次執行下面的語句將兩條消息發到相同的分區
producer.send(record1);
producer.send(record2);複製代碼
若是此時因爲某些緣由(好比瞬時的網絡抖動)致使record1沒有成功發送,同時Kafka又配置了重試機制和max.in.flight.requests.per.connection大於1(默認值是5,原本就是大於1的),那麼重試record1成功後,record1在分區中就在record2以後,從而形成消息的亂序。不少某些要求強順序保證的場景是不容許出現這種狀況的。
鑑於producer的這兩個問題,咱們應該如何規避呢??對於消息丟失的問題,很容易想到的一個方案就是:既然異步發送有可能丟失數據, 我改爲同步發送總能夠吧?好比這樣:
producer.send(record).get();複製代碼
這樣固然是能夠的,可是性能會不好,不建議這樣使用。所以特地總結了一份配置列表。我的認爲該配置清單應該可以比較好地規避producer端數據丟失狀況的發生:(特此說明一下,軟件配置的不少決策都是trade-off,下面的配置也不例外:應用了這些配置,你可能會發現你的producer/consumer 吞吐量會降低,這是正常的,由於你換取了更高的數據安全性)
- block.on.buffer.full = true 儘管該參數在0.9.0.0已經被標記爲「deprecated」,但鑑於它的含義很是直觀,因此這裏仍是顯式設置它爲true,使得producer將一直等待緩衝區直至其變爲可用。不然若是producer生產速度過快耗盡了緩衝區,producer將拋出異常
- acks=all 很好理解,全部follower都響應了才認爲消息提交成功,即"committed"
- retries = MAX 無限重試,直到你意識到出現了問題:)
- max.in.flight.requests.per.connection = 1 限制客戶端在單個鏈接上可以發送的未響應請求的個數。設置此值是1表示kafka broker在響應請求以前client不能再向同一個broker發送請求。注意:設置此參數是爲了不消息亂序
- 使用KafkaProducer.send(record, callback)而不是send(record)方法 自定義回調邏輯處理消息發送失敗
- callback邏輯中最好顯式關閉producer:close(0) 注意:設置此參數是爲了不消息亂序
- unclean.leader.election.enable=false 關閉unclean leader選舉,即不容許非ISR中的副本被選舉爲leader,以免數據丟失
- replication.factor >= 3 這個徹底是我的建議了,參考了Hadoop及業界通用的三備份原則
- min.insync.replicas > 1 消息至少要被寫入到這麼多副本纔算成功,也是提高數據持久性的一個參數。與acks配合使用
- 保證replication.factor > min.insync.replicas 若是二者相等,當一個副本掛掉了分區也就無法正常工做了。一般設置replication.factor = min.insync.replicas + 1便可
2. Consumer端
consumer端丟失消息的情形比較簡單:若是在消息處理完成前就提交了offset,那麼就有可能形成數據的丟失。因爲Kafka consumer默認是自動提交位移的,因此在後臺提交位移前必定要保證消息被正常處理了,所以不建議採用很重的處理邏輯,若是處理耗時很長,則建議把邏輯放到另外一個線程中去作。爲了不數據丟失,現給出兩點建議:
- enable.auto.commit=false 關閉自動提交位移
- 在消息被完整處理以後再手動提交位移
1八、kafka 如何不消費重複數據?好比扣款,咱們不能重複的扣。
其實仍是得結合業務來思考,我這裏給幾個思路:
好比你拿個數據要寫庫,你先根據主鍵查一下,若是這數據都有了,你就別插入了,update 一下好吧。
好比你是寫 Redis,那沒問題了,反正每次都是 set,自然冪等性。
好比你不是上面兩個場景,那作的稍微複雜一點,你須要讓生產者發送每條數據的時候,裏面加一個全局惟一的 id,相似訂單 id 之類的東西,而後你這裏消費到了以後,先根據這個 id 去好比 Redis 裏查一下,以前消費過嗎?若是沒有消費過,你就處理,而後這個 id 寫 Redis。若是消費過了,那你就別處理了,保證別重複處理相同的消息便可。
好比基於數據庫的惟一鍵來保證重複數據不會重複插入多條。由於有惟一鍵約束了,重複數據插入只會報錯,不會致使數據庫中出現髒數據。