ps:這篇文章自我感受說的很大白話了!但願大家看過了以後能有收穫。java
不瞭解 Kafka 的朋友建議先看一看個人下面這幾篇文章,第一篇必定要看,其餘的能夠按需學習。git
生產者(Producer) 調用send
方法發送消息以後,消息可能由於網絡問題並無發送過去。github
因此,咱們不能默認在調用send
方法發送消息以後消息消息發送成功了。爲了肯定消息是發送成功,咱們要判斷消息發送的結果。可是要注意的是 Kafka 生產者(Producer) 使用 send
方法發送消息其實是異步的操做,咱們能夠經過 get()
方法獲取調用結果,可是這樣也讓它變爲了同步操做,示例代碼以下:面試
詳細代碼見個人這篇文章:Kafka系列第三篇!10 分鐘學會如何在 Spring Boot 程序中使用 Kafka 做爲消息隊列?apache
SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
logger.info("生產者成功發送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe
sult.getProducerRecord().value().toString());
}
複製代碼
可是通常不推薦這麼作!能夠採用爲其添加回調函數的形式,示例代碼以下:安全
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生產者成功發送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("生產者發送消失敗,緣由:{}", ex.getMessage()));
複製代碼
若是消息發送失敗的話,咱們檢查失敗的緣由以後從新發送便可!網絡
另外這裏推薦爲 Producer 的retries
(重試次數)設置一個比較合理的值,通常是 3 ,可是爲了保證消息不丟失的話通常會設置比較大一點。設置完成以後,當出現網絡問題以後可以自動重試消息發送,避免消息丟失。另外,建議還要設置重試間隔,由於間隔過小的話重試的效果就不明顯了,網絡波動一次你3次一會兒就重試完了異步
咱們知道消息在被追加到 Partition(分區)的時候都會分配一個特定的偏移量(offset)。偏移量(offset)表示 Consumer 當前消費到的 Partition(分區)的所在的位置。Kafka 經過偏移量(offset)能夠保證消息在分區內的順序性。ide
當消費者拉取到了分區的某個消息以後,消費者會自動提交了 offset。自動提交的話會有一個問題,試想一下,當消費者剛拿到這個消息準備進行真正消費的時候,忽然掛掉了,消息實際上並無被消費,可是 offset 卻被自動提交了。函數
解決辦法也比較粗暴,咱們手動關閉閉自動提交 offset,每次在真正消費完消息以後以後再本身手動提交 offset 。 可是,細心的朋友必定會發現,這樣會帶來消息被從新消費的問題。好比你剛剛消費完消息以後,還沒提交 offset,結果本身掛掉了,那麼這個消息理論上就會被消費兩次。
咱們知道 Kafka 爲分區(Partition)引入了多副本(Replica)機制。分區(Partition)中的多個副本之間會有一個叫作 leader 的傢伙,其餘副本稱爲 follower。咱們發送的消息會被髮送到 leader 副本,而後 follower 副本才能從 leader 副本中拉取消息進行同步。生產者和消費者只與 leader 副本交互。你能夠理解爲其餘副本只是 leader 副本的拷貝,它們的存在只是爲了保證消息存儲的安全性。
試想一種狀況:假如 leader 副本所在的 broker 忽然掛掉,那麼就要從 follower 副本從新選出一個 leader ,可是 leader 的數據還有一些沒有被 follower 副本的同步的話,就會形成消息丟失。
解決辦法就是咱們設置 acks = all。acks 是 Kafka 生產者(Producer) 很重要的一個參數。
acks 的默認值即爲1,表明咱們的消息被leader副本接收以後就算被成功發送。當咱們配置 acks = all 表明則全部副本都要接收到該消息以後該消息纔算真正成功被髮送。
爲了保證 leader 副本能有 follower 副本能同步消息,咱們通常會爲 topic 設置 replication.factor >= 3。這樣就能夠保證每一個 分區(partition) 至少有 3 個副本。雖然形成了數據冗餘,可是帶來了數據的安全性。
通常狀況下咱們還須要設置 min.insync.replicas> 1 ,這樣配置表明消息至少要被寫入到 2 個副本纔算是被成功發送。min.insync.replicas 的默認值爲 1 ,在實際生產中應儘可能避免默認值 1。
可是,爲了保證整個 Kafka 服務的高可用性,你須要確保 replication.factor > min.insync.replicas 。爲何呢?設想一下加入二者相等的話,只要是有一個副本掛掉,整個分區就沒法正常工做了。這明顯違反高可用性!通常推薦設置成 replication.factor = min.insync.replicas + 1。
Kafka 0.11.0.0版本開始 unclean.leader.election.enable 參數的默認值由原來的true 改成false
咱們最開始也說了咱們發送的消息會被髮送到 leader 副本,而後 follower 副本才能從 leader 副本中拉取消息進行同步。多個 follower 副本之間的消息同步狀況不同,當咱們配置了 unclean.leader.election.enable = false 的話,當 leader 副本發生故障時就不會從 follower 副本中和 leader 同步程度達不到要求的副本中選擇出 leader ,這樣下降了消息丟失的可能性。
《70k Star Java開源項目出PDF閱讀版本啦!》 。
做者介紹: Github 70k Star 項目 JavaGuide(公衆號同名) 做者。每週都會在公衆號更新一些本身原創乾貨。公衆hao後臺回覆「1」領取Java工程師必備學習資料+面試突擊pdf。