插曲:Kafka的生產者原理及重要參數說明

前言

原本插曲系列是應你們要求去更新的,可是好像第一篇的kafka效果還能夠因此更插曲就勤快些了(畢竟誰不想看着本身被多多點贊呢hhh🤣),上一篇說了一個案例是爲了說明如何去考量一個kafka集羣的部署,算是一個參考吧,畢竟你們在不一樣的公司工做確定也會有本身的一套實施方案。apache

此次咱們再回到原理性的問題,此次會延續第一篇的風格,帶領你們把圖一步一步畫出來。輕鬆愉快bootstrap

1、Kafka的Producer原理

首先咱們得先有個集羣吧,而後集羣中有若干臺服務器,每一個服務器咱們管它叫Broker,其實就是一個個Kafka進程數組

若是你們還記得第一篇的內容,就不難猜出來,接下來確定會有一個controller和多個follower,還有個zookeeper集羣,一開始咱們的Broker都會註冊到咱們的zookeeper集羣上面。服務器

而後controller也會監聽zookeeper集羣的變化,在集羣產生變化時更改本身的元數據信息。而且follower也會去它們的老大controller那裏去同步元數據信息,因此一個Kafka集羣中全部服務器上的元數據信息都是一致的。網絡

上述準備完成後,咱們正式開始咱們生產者的內容負載均衡

① 名詞1 --- ProducerRecord

生產者須要往集羣發送消息前,要先把每一條消息封裝成ProducerRecord對象,這是生產者內部完成的。以後會經歷一個序列化的過程。以前好幾篇專欄也是有提到過了,須要通過網絡傳輸的數據都是二進制的一些字節數據,須要進行序列化才能傳輸。異步

此時就會有一個問題,咱們須要把消息發送到一個Topic下的一個leader partition中,但是生產者是怎樣get到這個topic下哪一個分區纔是leader partition呢?socket

可能有些小夥伴忘了,提醒一下,controller能夠視做爲broker的領導,負責管理集羣的元數據,而leader partition是作負載均衡用的,它們會分佈式地存儲在不一樣的服務器上面。集羣中生產數據也好,消費數據也好,都是針對leader partition而操做的。分佈式

② 名詞2 --- partitioner

怎麼知道哪一個纔是leader partition,只須要獲取到元數據不就行了嘛。ide

說來要怎麼獲取元數據也不難,只要隨便找到集羣下某一臺服務器就能夠了(由於集羣中的每一臺服務器元數據都是同樣的)

③ 名詞3 --- 緩衝區

此時生產者不着急把消息發送出去,而是先放到一個緩衝區

④ 名詞4 --- Sender

把消息放進緩衝區以後,與此同時會有一個獨立線程Sender去把消息分批次包裝成一個個Batch,不難想到若是Kafka真的是一條消息一條消息地傳輸,一條消息就是一個網絡鏈接,那性能就會被拉得不好。爲了提高吞吐量,因此採起了分批次的作法

整好一個個batch以後,就開始發送給對應的主機上面。此時通過第一篇所提到的Kakfa的網絡設計中的模型,而後再寫到os cache,再寫到磁盤上面。

下圖是當時咱們已經說明過的Kafka網絡設計模型

⑤ 生產者代碼

1.設置參數部分

// 建立配置文件對象
Properties props = new Properties();

// 這個參數目的是爲了獲取kafka集羣的元數據
// 寫一臺主機也行,多個更加保險
// 這裏使用的是主機名,要根據server.properties來決定
// 使用主機名的狀況須要配置電腦的hosts文件(重點)
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");  

// 這個就是負責把發送的key從字符串序列化爲字節數組
// 咱們能夠給每一個消息設置key,做用以後再闡述
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 這個就是負責把你發送的實際的message從字符串序列化爲字節數組
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 如下屬於調優,以後再解釋
props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 323840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
複製代碼

2.建立生產者實例

// 建立一個Producer實例:線程資源,跟各個broker創建socket鏈接資源
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
複製代碼

3.建立消息

ProducerRecord<String, String> record = new ProducerRecord<>(
				"test-topic", "test-value");
複製代碼

固然你也能夠指定一個key,做用以後會說明

ProducerRecord<String, String> record = new ProducerRecord<>(
				"test-topic", "test-key", "test-value");
複製代碼

4.發送消息

帶有一個回調函數,若是沒有異常就返回消息發送成功

// 這是異步發送的模式
producer.send(record, new Callback() {
	@Override
	public void onCompletion(RecordMetadata metadata, Exception exception) {
		if(exception == null) {
			// 消息發送成功
			System.out.println("消息發送成功");  
		} else {
			// 消息發送失敗,須要從新發送
		}
	}
});
Thread.sleep(10 * 1000); 
		
// 這是同步發送的模式(是通常不會使用的,性能不好,測試可使用)
// 你要一直等待人家後續一系列的步驟都作完,發送消息以後
// 有了消息的迴應返回給你,你這個方法纔會退出來
producer.send(record).get(); 
複製代碼

5.關閉鏈接

producer.close();
複製代碼

2、乾貨時間:調優部分的代碼

區分是否是一個勤于思考的打字員的部分其實就是在1那裏尚未講到的那部分調優,一個個拿出來單獨解釋,就是下面這一大串

props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 323840);
props.put("linger.ms", 100);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
複製代碼

① acks 消息驗證

props.put("acks", "-1");
複製代碼
acks 消息發送成功判斷
-1 leader & all follower接收
1 leader接收
0 消息發送便可

這個acks參數有3個值,分別是-1,0,1,設置這3個不一樣的值會成爲kafka判斷消息發送是否成功的依據。Kafka裏面的分區是有副本的,若是acks爲-1.則說明消息在寫入一個分區的leader partition後,這些消息還須要被另外全部這個分區的副本同步完成後,纔算發送成功(對應代碼就是輸出System.out.println("消息發送成功")),此時發送數據的性能下降

若是設置acks爲1,須要發送的消息只要寫入了leader partition,即算髮送成功,可是這個方式存在丟失數據的風險,好比在消息恰好發送成功給leader partition以後,這個leader partition馬上宕機了,此時剩餘的follower不管選舉誰成爲leader,都不存在剛剛發送的那一條消息。

若是設置acks爲0,消息只要是發送出去了,就默認發送成功了。啥都無論了。

② retries 重試次數(重要)

這個參數仍是很是重要的,在生產環境中是必須設置的參數,爲設置消息重發的次數

props.put("retries", 3);
複製代碼

在kafka中可能會遇到各類各樣的異常(能夠直接跳到下方的補充異常類型),可是不管是遇到哪一種異常,消息發送此時都出現了問題,特別是網絡忽然出現問題,可是集羣不可能每次出現異常都拋出,可能在下一秒網絡就恢復了呢,因此咱們要設置重試機制。

這裏補充一句:設置了retries以後,集羣中95%的異常都會本身乘風飛去,我真沒開玩笑🤣

代碼中我配置了3次,其實設置5~10次都是合理的,補充說明一個,若是咱們須要設置隔多久重試一次,也有參數,沒記錯的話是retry.backoff.ms,下面我設置了100毫秒重試一次,也就是0.1秒

props.put("retry.backoff.ms",100);
複製代碼

③ batch.size 批次大小

批次的大小默認是16K,這裏設置了32K,設置大一點能夠稍微提升一下吞吐量,設置這個批次的大小還和消息的大小有關,假設一條消息的大小爲16K,一個批次也是16K,這樣的話批次就失去意義了。因此咱們要事先估算一下集羣中消息的大小,正常來講都會設置幾倍的大小。

props.put("batch.size", 323840);
複製代碼

④ linger.ms 發送時間限制

好比我如今設置了批次大小爲32K,而一條消息是2K,此時已經有了3條消息發送過來,總大小爲6K,而生產者這邊就沒有消息過來了,那在沒夠32K的狀況下就不發送過去集羣了嗎?顯然不是,linger.ms就是設置了固定多長時間,就算沒塞滿Batch,也會發送,下面我設置了100毫秒,因此就算個人Batch遲遲沒有滿32K,100毫秒事後都會向集羣發送Batch。

props.put("linger.ms", 100);
複製代碼

⑤ buffer.memory 緩衝區大小

當咱們的Sender線程處理很是緩慢,而生產數據的速度很快時,咱們中間的緩衝區若是容量不夠,生產者就沒法再繼續生產數據了,因此咱們有必要把緩衝區的內存調大一點,緩衝區默認大小爲32M,其實基本也是合理的。

props.put("buffer.memory", 33554432);
複製代碼

那應該如何去驗證咱們這時候應該調整緩衝區的大小了呢,咱們能夠用通常Java計算結束時間減去開始時間的方式測試,當結束時間減去開始時間大於100ms,咱們認爲此時Sender線程處理速度慢,須要調大緩衝區大小。

固然通常狀況下咱們是不須要去設置這個參數的,32M在廣泛狀況下已經足以應付了。

Long startTime=System.currentTime();
producer.send(record, new Callback() {
	@Override
	public void onCompletion(RecordMetadata metadata, Exception exception) {
		if(exception == null) {
			// 消息發送成功
			System.out.println("消息發送成功");  
		} else {
			// 消息發送失敗,須要從新發送
		}
	}
});
Long endTime=System.currentTime();
If(endTime - startTime > 100){//說明內存被壓滿了
 說明有問題
複製代碼

}

⑦ compression.type 壓縮方式

compression.type,默認是none,不壓縮,可是也可使用lz4壓縮,效率仍是不錯的,壓縮以後能夠減少數據量,提高吞吐量,可是會加大producer端的cpu開銷

props.put("compression.type", lz4);
複製代碼

⑧ max.block.ms

留到源碼時候說明,是設置某幾個方法的阻塞時間

props.put("max.block.ms", 3000);
複製代碼

⑨ max.request.size 最大消息大小

max.request.size:這個參數用來控制發送出去的消息的大小,默認是1048576字節,也就1M,這個通常過小了,不少消息可能都會超過1mb的大小,因此須要本身優化調整,把它設置更大一些(企業通常設置成10M),否則程序跑的好好的忽然來了一條2M的消息,系統就報錯了,那就得不償失

props.put("max.request.size", 1048576);    
複製代碼

⑩ request.timeout.ms 請求超時

request.timeout.ms:這個就是說發送一個請求出去以後,他有一個超時的時間限制,默認是30秒,若是30秒都收不到響應(也就是上面的回調函數沒有返回),那麼就會認爲異常,會拋出一個TimeoutException來讓咱們進行處理。若是公司網絡很差,要適當調整此參數

props.put("request.timeout.ms", 30000); 
複製代碼

補充:kafka中的異常

無論是異步仍是同步,均可能讓你處理異常,常見的異常以下:

1)LeaderNotAvailableException:這個就是若是某臺機器掛了,此時leader副本不可用,會致使你寫入失敗,要等待其餘follower副本切換爲leader副本以後,才能繼續寫入,此時能夠重試發送便可。若是說你平時重啓kafka的broker進程,確定會致使leader切換,必定會致使你寫入報錯,是LeaderNotAvailableException

2)NotControllerException:這個也是同理,若是說Controller所在Broker掛了,那麼此時會有問題,須要等待Controller從新選舉,此時也是同樣就是重試便可

3)NetworkException:網絡異常,重試便可 咱們以前配置了一個參數,retries,他會自動重試的,可是若是重試幾回以後仍是不行,就會提供Exception給咱們來處理了。 參數:retries 默認值是3 參數:retry.backoff.ms 兩次重試之間的時間間隔

finally

上面從生產者生產消息到發送這一個流程分析下來,從而引出下面的各類各樣關於整個過程的參數的設置,若是真的能清晰地理解好這些基礎知識,相信對你一定是有所幫助。以後會再帶一個生產者的案例和消費者進來。感興趣的朋友能夠關注一下,謝謝。

相關文章
相關標籤/搜索