原本插曲系列是應你們要求去更新的,可是好像第一篇的kafka效果還能夠因此更插曲就勤快些了(畢竟誰不想看着本身被多多點贊呢hhh🤣),上一篇說了一個案例是爲了說明如何去考量一個kafka集羣的部署,算是一個參考吧,畢竟你們在不一樣的公司工做確定也會有本身的一套實施方案。apache
此次咱們再回到原理性的問題,此次會延續第一篇的風格,帶領你們把圖一步一步畫出來。輕鬆愉快bootstrap
首先咱們得先有個集羣吧,而後集羣中有若干臺服務器,每一個服務器咱們管它叫Broker,其實就是一個個Kafka進程數組
若是你們還記得第一篇的內容,就不難猜出來,接下來確定會有一個controller和多個follower,還有個zookeeper集羣,一開始咱們的Broker都會註冊到咱們的zookeeper集羣上面。服務器
而後controller也會監聽zookeeper集羣的變化,在集羣產生變化時更改本身的元數據信息。而且follower也會去它們的老大controller那裏去同步元數據信息,因此一個Kafka集羣中全部服務器上的元數據信息都是一致的。網絡
上述準備完成後,咱們正式開始咱們生產者的內容負載均衡
生產者須要往集羣發送消息前,要先把每一條消息封裝成ProducerRecord對象,這是生產者內部完成的。以後會經歷一個序列化的過程。以前好幾篇專欄也是有提到過了,須要通過網絡傳輸的數據都是二進制的一些字節數據,須要進行序列化才能傳輸。異步
此時就會有一個問題,咱們須要把消息發送到一個Topic下的一個leader partition中,但是生產者是怎樣get到這個topic下哪一個分區纔是leader partition呢?socket
可能有些小夥伴忘了,提醒一下,controller能夠視做爲broker的領導,負責管理集羣的元數據,而leader partition是作負載均衡用的,它們會分佈式地存儲在不一樣的服務器上面。集羣中生產數據也好,消費數據也好,都是針對leader partition而操做的。分佈式
怎麼知道哪一個纔是leader partition,只須要獲取到元數據不就行了嘛。ide
說來要怎麼獲取元數據也不難,只要隨便找到集羣下某一臺服務器就能夠了(由於集羣中的每一臺服務器元數據都是同樣的)
此時生產者不着急把消息發送出去,而是先放到一個緩衝區
把消息放進緩衝區以後,與此同時會有一個獨立線程Sender去把消息分批次包裝成一個個Batch,不難想到若是Kafka真的是一條消息一條消息地傳輸,一條消息就是一個網絡鏈接,那性能就會被拉得不好。爲了提高吞吐量,因此採起了分批次的作法
整好一個個batch以後,就開始發送給對應的主機上面。此時通過第一篇所提到的Kakfa的網絡設計中的模型,而後再寫到os cache,再寫到磁盤上面。
下圖是當時咱們已經說明過的Kafka網絡設計模型
// 建立配置文件對象
Properties props = new Properties();
// 這個參數目的是爲了獲取kafka集羣的元數據
// 寫一臺主機也行,多個更加保險
// 這裏使用的是主機名,要根據server.properties來決定
// 使用主機名的狀況須要配置電腦的hosts文件(重點)
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
// 這個就是負責把發送的key從字符串序列化爲字節數組
// 咱們能夠給每一個消息設置key,做用以後再闡述
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 這個就是負責把你發送的實際的message從字符串序列化爲字節數組
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 如下屬於調優,以後再解釋
props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 323840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
複製代碼
// 建立一個Producer實例:線程資源,跟各個broker創建socket鏈接資源
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
複製代碼
ProducerRecord<String, String> record = new ProducerRecord<>(
"test-topic", "test-value");
複製代碼
固然你也能夠指定一個key,做用以後會說明
ProducerRecord<String, String> record = new ProducerRecord<>(
"test-topic", "test-key", "test-value");
複製代碼
帶有一個回調函數,若是沒有異常就返回消息發送成功
// 這是異步發送的模式
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception == null) {
// 消息發送成功
System.out.println("消息發送成功");
} else {
// 消息發送失敗,須要從新發送
}
}
});
Thread.sleep(10 * 1000);
// 這是同步發送的模式(是通常不會使用的,性能不好,測試可使用)
// 你要一直等待人家後續一系列的步驟都作完,發送消息以後
// 有了消息的迴應返回給你,你這個方法纔會退出來
producer.send(record).get();
複製代碼
producer.close();
複製代碼
區分是否是一個勤于思考的打字員的部分其實就是在1那裏尚未講到的那部分調優,一個個拿出來單獨解釋,就是下面這一大串
props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 323840);
props.put("linger.ms", 100);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
複製代碼
props.put("acks", "-1");
複製代碼
acks | 消息發送成功判斷 |
---|---|
-1 | leader & all follower接收 |
1 | leader接收 |
0 | 消息發送便可 |
這個acks參數有3個值,分別是-1,0,1,設置這3個不一樣的值會成爲kafka判斷消息發送是否成功的依據。Kafka裏面的分區是有副本的,若是acks爲-1.則說明消息在寫入一個分區的leader partition後,這些消息還須要被另外全部這個分區的副本同步完成後,纔算發送成功(對應代碼就是輸出System.out.println("消息發送成功")),此時發送數據的性能下降
若是設置acks爲1,須要發送的消息只要寫入了leader partition,即算髮送成功,可是這個方式存在丟失數據的風險,好比在消息恰好發送成功給leader partition以後,這個leader partition馬上宕機了,此時剩餘的follower不管選舉誰成爲leader,都不存在剛剛發送的那一條消息。
若是設置acks爲0,消息只要是發送出去了,就默認發送成功了。啥都無論了。
這個參數仍是很是重要的,在生產環境中是必須設置的參數,爲設置消息重發的次數
props.put("retries", 3);
複製代碼
在kafka中可能會遇到各類各樣的異常(能夠直接跳到下方的補充異常類型),可是不管是遇到哪一種異常,消息發送此時都出現了問題,特別是網絡忽然出現問題,可是集羣不可能每次出現異常都拋出,可能在下一秒網絡就恢復了呢,因此咱們要設置重試機制。
這裏補充一句:設置了retries以後,集羣中95%的異常都會本身乘風飛去,我真沒開玩笑🤣
代碼中我配置了3次,其實設置5~10次都是合理的,補充說明一個,若是咱們須要設置隔多久重試一次,也有參數,沒記錯的話是retry.backoff.ms,下面我設置了100毫秒重試一次,也就是0.1秒
props.put("retry.backoff.ms",100);
複製代碼
批次的大小默認是16K,這裏設置了32K,設置大一點能夠稍微提升一下吞吐量,設置這個批次的大小還和消息的大小有關,假設一條消息的大小爲16K,一個批次也是16K,這樣的話批次就失去意義了。因此咱們要事先估算一下集羣中消息的大小,正常來講都會設置幾倍的大小。
props.put("batch.size", 323840);
複製代碼
好比我如今設置了批次大小爲32K,而一條消息是2K,此時已經有了3條消息發送過來,總大小爲6K,而生產者這邊就沒有消息過來了,那在沒夠32K的狀況下就不發送過去集羣了嗎?顯然不是,linger.ms就是設置了固定多長時間,就算沒塞滿Batch,也會發送,下面我設置了100毫秒,因此就算個人Batch遲遲沒有滿32K,100毫秒事後都會向集羣發送Batch。
props.put("linger.ms", 100);
複製代碼
當咱們的Sender線程處理很是緩慢,而生產數據的速度很快時,咱們中間的緩衝區若是容量不夠,生產者就沒法再繼續生產數據了,因此咱們有必要把緩衝區的內存調大一點,緩衝區默認大小爲32M,其實基本也是合理的。
props.put("buffer.memory", 33554432);
複製代碼
那應該如何去驗證咱們這時候應該調整緩衝區的大小了呢,咱們能夠用通常Java計算結束時間減去開始時間的方式測試,當結束時間減去開始時間大於100ms,咱們認爲此時Sender線程處理速度慢,須要調大緩衝區大小。
固然通常狀況下咱們是不須要去設置這個參數的,32M在廣泛狀況下已經足以應付了。
Long startTime=System.currentTime();
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception == null) {
// 消息發送成功
System.out.println("消息發送成功");
} else {
// 消息發送失敗,須要從新發送
}
}
});
Long endTime=System.currentTime();
If(endTime - startTime > 100){//說明內存被壓滿了
說明有問題
複製代碼
}
compression.type,默認是none,不壓縮,可是也可使用lz4壓縮,效率仍是不錯的,壓縮以後能夠減少數據量,提高吞吐量,可是會加大producer端的cpu開銷
props.put("compression.type", lz4);
複製代碼
留到源碼時候說明,是設置某幾個方法的阻塞時間
props.put("max.block.ms", 3000);
複製代碼
max.request.size:這個參數用來控制發送出去的消息的大小,默認是1048576字節,也就1M,這個通常過小了,不少消息可能都會超過1mb的大小,因此須要本身優化調整,把它設置更大一些(企業通常設置成10M),否則程序跑的好好的忽然來了一條2M的消息,系統就報錯了,那就得不償失
props.put("max.request.size", 1048576);
複製代碼
request.timeout.ms:這個就是說發送一個請求出去以後,他有一個超時的時間限制,默認是30秒,若是30秒都收不到響應(也就是上面的回調函數沒有返回),那麼就會認爲異常,會拋出一個TimeoutException來讓咱們進行處理。若是公司網絡很差,要適當調整此參數
props.put("request.timeout.ms", 30000);
複製代碼
無論是異步仍是同步,均可能讓你處理異常,常見的異常以下:
1)LeaderNotAvailableException:這個就是若是某臺機器掛了,此時leader副本不可用,會致使你寫入失敗,要等待其餘follower副本切換爲leader副本以後,才能繼續寫入,此時能夠重試發送便可。若是說你平時重啓kafka的broker進程,確定會致使leader切換,必定會致使你寫入報錯,是LeaderNotAvailableException
2)NotControllerException:這個也是同理,若是說Controller所在Broker掛了,那麼此時會有問題,須要等待Controller從新選舉,此時也是同樣就是重試便可
3)NetworkException:網絡異常,重試便可 咱們以前配置了一個參數,retries,他會自動重試的,可是若是重試幾回以後仍是不行,就會提供Exception給咱們來處理了。 參數:retries 默認值是3 參數:retry.backoff.ms 兩次重試之間的時間間隔
上面從生產者生產消息到發送這一個流程分析下來,從而引出下面的各類各樣關於整個過程的參數的設置,若是真的能清晰地理解好這些基礎知識,相信對你一定是有所幫助。以後會再帶一個生產者的案例和消費者進來。感興趣的朋友能夠關注一下,謝謝。