Kafka如何保證百萬級寫入速度以及保證不丟失不重複消費

 正文前先來一波福利推薦:

福利一:面試

百萬年薪架構師視頻,該視頻能夠學到不少東西,是本人花錢買的VIP課程,學習消化了一年,爲了支持一下女友公衆號也方便你們學習,共享給你們。apache

福利二:bootstrap

畢業答辯以及工做上各類答辯,平時積累了很多精品PPT,如今共享給你們,大大小小加起來有幾千套,總有適合你的一款,不少是網上是下載不到。緩存

獲取方式:bash

微信關注 精品3分鐘 ,id爲 jingpin3mins,關注後回覆   百萬年薪架構師 ,精品收藏PPT  獲取雲盤連接,謝謝你們支持!微信

------------------------正文開始---------------------------網絡

 

1、如何保證百萬級寫入速度:

目錄架構

一、頁緩存技術 + 磁盤順序寫併發

二、零拷貝技術ide

三、最後的總結

「這篇文章來聊一下Kafka的一些架構設計原理,這也是互聯網公司面試時很是高頻的技術考點。

Kafka是高吞吐低延遲的高併發、高性能的消息中間件,在大數據領域有極爲普遍的運用。配置良好的Kafka集羣甚至能夠作到每秒幾十萬、上百萬的超高併發寫入。

那麼Kafka究竟是如何作到這麼高的吞吐量和性能的呢?這篇文章咱們來一點一點說一下。

一、頁緩存技術 + 磁盤順序寫

 

首先Kafka每次接收到數據都會往磁盤上去寫,以下圖所示。

那麼在這裏咱們不由有一個疑問了,若是把數據基於磁盤來存儲,頻繁的往磁盤文件裏寫數據,這個性能會不會不好?你們確定都以爲磁盤寫性能是極差的。

沒錯,要是真的跟上面那個圖那麼簡單的話,那確實這個性能是比較差的。

可是實際上Kafka在這裏有極爲優秀和出色的設計,就是爲了保證數據寫入性能,首先Kafka是基於操做系統的頁緩存來實現文件寫入的。

操做系統自己有一層緩存,叫作page cache,是在內存裏的緩存,咱們也能夠稱之爲os cache,意思就是操做系統本身管理的緩存。

你在寫入磁盤文件的時候,能夠直接寫入這個os cache裏,也就是僅僅寫入內存中,接下來由操做系統本身決定何時把os cache裏的數據真的刷入磁盤文件中。

僅僅這一個步驟,就能夠將磁盤文件寫性能提高不少了,由於其實這裏至關因而在寫內存,不是在寫磁盤,你們看下圖。

接着另一個就是kafka寫數據的時候,很是關鍵的一點,他是以磁盤順序寫的方式來寫的。也就是說,僅僅將數據追加到文件的末尾,不是在文件的隨機位置來修改數據。

普通的機械磁盤若是你要是隨機寫的話,確實性能極差,也就是隨便找到文件的某個位置來寫數據。

可是若是你是追加文件末尾按照順序的方式來寫數據的話,那麼這種磁盤順序寫的性能基本上能夠跟寫內存的性能自己也是差很少的。

因此你們就知道了,上面那個圖裏,Kafka在寫數據的時候,一方面基於了os層面的page cache來寫數據,因此性能很高,本質就是在寫內存罷了。

另一個,他是採用磁盤順序寫的方式,因此即便數據刷入磁盤的時候,性能也是極高的,也跟寫內存是差很少的。

基於上面兩點,kafka就實現了寫入數據的超高性能。

那麼你們想一想,假如說kafka寫入一條數據要耗費1毫秒的時間,那麼是否是每秒就是能夠寫入1000條數據?

可是假如kafka的性能極高,寫入一條數據僅僅耗費0.01毫秒呢?那麼每秒是否是就能夠寫入10萬條數?

因此要保證每秒寫入幾萬甚至幾十萬條數據的核心點,就是盡最大可能提高每條數據寫入的性能,這樣就能夠在單位時間內寫入更多的數據量,提高吞吐量。

二、零拷貝技術

 

說完了寫入這塊,再來談談消費這塊。

你們應該都知道,從Kafka裏咱們常常要消費數據,那麼消費的時候實際上就是要從kafka的磁盤文件裏讀取某條數據而後發送給下游的消費者,以下圖所示。

那麼這裏若是頻繁的從磁盤讀數據而後發給消費者,性能瓶頸在哪裏呢

 

假設要是kafka什麼優化都不作,就是很簡單的從磁盤讀數據發送給下游的消費者,那麼大概過程以下所示:

先看看要讀的數據在不在os cache裏,若是不在的話就從磁盤文件裏讀取數據後放入os cache。

接着從操做系統的os cache裏拷貝數據到應用程序進程的緩存裏,再從應用程序進程的緩存裏拷貝數據到操做系統層面的Socket緩存裏,最後從Socket緩存裏提取數據後發送到網卡,最後發送出去給下游消費。

整個過程,以下圖所示:

你們看上圖,很明顯能夠看到有兩次不必的拷貝吧!

一次是從操做系統的cache裏拷貝到應用進程的緩存裏,接着又從應用程序緩存裏拷貝回操做系統的Socket緩存裏。

並且爲了進行這兩次拷貝,中間還發生了好幾回上下文切換,一下子是應用程序在執行,一下子上下文切換到操做系統來執行。

因此這種方式來讀取數據是比較消耗性能的。

Kafka爲了解決這個問題,在讀數據的時候是引入零拷貝技術。

也就是說,直接讓操做系統的cache中的數據發送到網卡後傳輸給下游的消費者,中間跳過了兩次拷貝數據的步驟,Socket緩存中僅僅會拷貝一個描述符過去,不會拷貝數據到Socket緩存。

你們看下圖,體會一下這個精妙的過程:

經過零拷貝技術,就不須要把os cache裏的數據拷貝到應用緩存,再從應用緩存拷貝到Socket緩存了,兩次拷貝都省略了,因此叫作零拷貝。

對Socket緩存僅僅就是拷貝數據的描述符過去,而後數據就直接從os cache中發送到網卡上去了,這個過程大大的提高了數據消費時讀取文件數據的性能。

並且你們會注意到,在從磁盤讀數據的時候,會先看看os cache內存中是否有,若是有的話,其實讀數據都是直接讀內存的。

若是kafka集羣通過良好的調優,你們會發現大量的數據都是直接寫入os cache中,而後讀數據的時候也是從os cache中讀。

至關因而Kafka徹底基於內存提供數據的寫和讀了,因此這個總體性能會極其的高。

說個題外話,下回有機會給你們說一下Elasticsearch的架構原理,其實ES底層也是大量基於os cache實現了海量數據的高性能檢索的,跟Kafka原理相似。

三、最後的總結

 

經過這篇文章對kafka底層的頁緩存技術的使用,磁盤順序寫的思路,以及零拷貝技術的運用,你們應該就明白Kafka每臺機器在底層對數據進行寫和讀的時候採起的是什麼樣的思路,爲何他的性能能夠那麼高,作到每秒幾十萬的吞吐量。

這種設計思想對咱們平時本身設計中間件的架構。

 

2、Kafka如何作到不丟失不重複消費

有不少公司由於業務要求必須保證消息不丟失、不重複的到達,好比無人機實時監控系統,當無人機闖入機場區域,咱們必須馬上報警,不容許消息丟失。

而無人機離開禁飛區域後咱們須要將及時報警解除。若是消息重複了呢,咱們是否須要複雜的邏輯來本身處理消息重複的狀況呢,這種狀況恐怕至關複雜而難以處理。可是若是咱們能保證消息exactly once,那麼一切都容易得多。

下面咱們來簡單瞭解一下消息傳遞語義,以及kafka的消息傳遞機制。

首先咱們要了解的是message delivery semantic 也就是消息傳遞語義。

這是一個通用的概念,也就是消息傳遞過程當中消息傳遞的保證性。

分爲三種:

最多一次(at most once): 消息可能丟失也可能被處理,但最多隻會被處理一次。

可能丟失 不會重複

至少一次(at least once): 消息不會丟失,但可能被處理屢次。

可能重複 不會丟失

精確傳遞一次(exactly once): 消息被處理且只會被處理一次。

不丟失 不重複 就一次

而kafka其實有兩次消息傳遞,一次生產者發送消息給kafka,一次消費者去kafka消費消息。

兩次傳遞都會影響最終結果,

兩次都是精確一次,最終結果纔是精確一次。

兩次中有一次會丟失消息,或者有一次會重複,那麼最終的結果就是可能丟失或者重複的。

1、Produce端消息傳遞

這是producer端的代碼:

Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); for (int i = 1; i <= 600; i++) { kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i)); System.out.println("testkafka"+i); } kafkaProducer.close();
 

其中指定了一個參數acks 能夠有三個值選擇:

 0: producer徹底無論broker的處理結果 回調也就沒有用了 並不能保證消息成功發送 可是這種吞吐量最高

​-1或者all: leader broker會等消息寫入 而且ISR都寫入後 纔會響應,這種只要ISR有副本存活就確定不會丟失,但吞吐量最低。

​ 1: 默認的值 leader broker本身寫入後就響應,不會等待ISR其餘的副本寫入,只要leader broker存活就不會丟失,即保證了不丟失,也保證了吞吐量。

因此設置爲0時,實現了at most once,並且從這邊看只要保證集羣穩定的狀況下,不設置爲0,消息不會丟失。

可是還有一種狀況就是消息成功寫入,而這個時候因爲網絡問題producer沒有收到寫入成功的響應,producer就會開啓重試的操做,直到網絡恢復,消息就發送了屢次。這就是at least once了。

kafka producer 的參數acks 的默認值爲1,因此默認的producer級別是at least once。並不能exactly once。

 

 

2、Consumer端消息傳遞

consumer是靠offset保證消息傳遞的。

consumer消費的代碼以下:

Properties props = new Properties(); props.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset","earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); try{ while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }finally{ consumer.close(); }

其中有一個參數是 enable.auto.commit

若設置爲true consumer在消費以前提交位移 就實現了at most once

如果消費後提交 就實現了 at least once 默認的配置就是這個。

kafka consumer的參數enable.auto.commit的默認值爲true ,因此默認的consumer級別是at least once。也並不能exactly once。

3、精確一次

經過了解producer端與consumer端的設置,咱們發現kafka在兩端的默認配置都是at least once,可能重複,經過配置也不能作到exactly once,好像kafka的消息必定會丟失或者重複的,

是否是沒有辦法作到exactly once了呢?

確實在kafka 0.11.0.0版本以前producer端確實是不可能的,

可是在kafka 0.11.0.0版本以後,kafka正式推出了idempotent producer。

也就是冪等的producer還有對事務的支持。

冪等的producer

kafka 0.11.0.0版本引入了idempotent producer機制,在這個機制中同一消息可能被producer發送屢次,可是在broker端只會寫入一次,他爲每一條消息編號去重,並且對kafka開銷影響不大。

如何設置開啓呢? 須要設置producer端的新參數 enable.idempotent 爲true。

而多分區的狀況,咱們須要保證原子性的寫入多個分區,即寫入到多個分區的消息要麼所有成功,要麼所有回滾。

這時候就須要使用事務,在producer端設置 transcational.id爲一個指定字符串。

這樣冪等producer只能保證單分區上無重複消息;事務能夠保證多分區寫入消息的完整性。

這樣producer端實現了exactly once,那麼consumer端呢?

consumer端因爲可能沒法消費事務中全部消息,而且消息可能被刪除,因此事務並不能解決consumer端exactly once的問題,咱們可能仍是須要本身處理這方面的邏輯。好比本身管理offset的提交,不要自動提交,也是能夠實現exactly once的。

還有一個選擇就是使用kafka本身的流處理引擎,也就是Kafka Streams,

設置processing.guarantee=exactly_once,就能夠輕鬆實現exactly once了。

相關文章
相關標籤/搜索