問題導讀:
1. 什麼是多線程異步發送模型?
2. Metadata的線程安全性如何實現?
3. Metadata的數據結構是什麼?
4. producer如何讀取Metadata?
5. Sender的如何建立?
6. Sender poll()如何更新Metadata?
7. Metadata有哪2種更新機制?
8. 什麼是Metadata失效檢測?
9. Metadata有哪些其餘的更新策略?
解決方案:
多線程異步發送模型
下圖是通過源碼分析以後,整理出來的Producer端的架構圖:
在上一篇咱們講過,Producer有同步發送和異步發送2種策略。在之前的Kafka client api實現中,同步和異步是分開實現的。而在0.9中,同步發送實際上是經過異步發送間接實現,其接口以下:html
1node 2bootstrap 3api 4安全 5數據結構 6多線程 7架構 |
|
要實現同步發送,只要在拿到返回的Future對象以後,直接調用get()就能夠了。
基本思路
從上圖咱們能夠看出,異步發送的基本思路就是:send的時候,KafkaProducer把消息放到本地的消息隊列RecordAccumulator,而後一個後臺線程Sender不斷循環,把消息發給Kafka集羣。
要實現這個,還得有一個前提條件:就是KafkaProducer/Sender都須要獲取集羣的配置信息Metadata。所謂Metadata,也就是在上一篇所講的,Topic/Partion與broker的映射關係:每個Topic的每個Partion,得知道其對應的broker列表是什麼,其中leader是誰,follower是誰。
2個數據流
因此在上圖中,有2個數據流:
Metadata流(A1,A2,A3):Sender從集羣獲取信息,而後更新Metadata; KafkaProducer先讀取Metadata,而後把消息放入隊列。
消息流(B1, B2, B3):這個很好理解,再也不詳述。
本篇着重講述Metadata流,消息流,將在後續詳細講述。
Metadata的線程安全性
從上圖能夠看出,Metadata是多個producer線程讀,一個sender線程更新,所以它必須是線程安全的。
Kafka的官方文檔上也有說明,KafkaProducer是線程安全的,能夠在多線程中調用:
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
從下面代碼也能夠看出,它的全部public方法都是synchronized:
01 02 03 04 05 06 07 08 09 10 11 12 13 |
|
Metadata的數據結構
下面代碼列舉了Metadata的主要數據結構:一個Cluster對象 + 1堆狀態變量。前者記錄了集羣的配置信息,後者用於控制Metadata的更新策略。
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
|
producer讀取Metadata
下面是send函數的源碼,能夠看到,在send以前,會先讀取metadata。若是metadata讀不到,會一直阻塞在那,直到超時,拋出TimeoutException
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
|
總結:從上面代碼能夠看出,producer wait metadata的時候,有2個條件:
(1) while (metadata.fetch().partitionsForTopic(topic) == null)
(2)while (this.version <= lastVersion)
有wait就會有notify,notify在Sender更新Metadata的時候發出。
Sender的建立
下面是KafkaProducer的構造函數,從代碼能夠看出,Sender就是KafkaProducer中建立的一個Thread.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
|
Sender poll()更新Metadata
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
|
從上面能夠看出,Metadata的更新,是在while循環,每次調用client.poll()的時候更新的。
更新機制又有如下2種:
Metadata的2種更新機制
(1)週期性的更新: 每隔一段時間更新一次,這個經過 Metadata的lastRefreshMs, lastSuccessfulRefreshMs 這2個字段來實現
對應的ProducerConfig配置項爲:
metadata.max.age.ms //缺省300000,即10分鐘1次
(2) 失效檢測,強制更新:檢查到metadata失效之後,調用metadata.requestUpdate()強制更新。 requestUpdate()函數裏面其實什麼都沒作,就是把needUpdate置成了false
每次poll的時候,都檢查這2種更新機制,達到了,就觸發更新。
那如何斷定Metadata失效了呢?這個在代碼中很分散,有不少地方,會斷定Metadata失效。
Metadata失效檢測
條件1:initConnect的時候
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 |
|
條件2:poll裏面IO的時候,鏈接斷掉了
1 2 3 4 5 6 7 8 |
|
條件3:有請求超時
01 02 03 04 05 06 07 08 09 10 11 |
|
條件4:發消息的時候,有partition的leader沒找到
1 2 3 4 5 6 |
|
條件5:返回的response和請求對不上的時候
1 2 3 4 5 6 7 8 |
|
總之1句話:發生各式各樣的異常,數據不一樣步,都認爲metadata可能出問題了,要求更新。
Metadata其餘的更新策略
除了上面所述,Metadata的更新,還有如下幾個特色:
1.更新請求MetadataRequest是nio異步發送的,在poll的返回中,處理MetadataResponse的時候,才真正更新Metadata。
這裏有個關鍵點:Metadata的cluster對象,每次是整個覆蓋的,而不是局部更新。因此cluster內部不用加鎖。
2.更新的時候,是從metadata保存的全部Node,或者說Broker中,選負載最小的那個,也就是當前接收請求最少的那個。向其發送MetadataRequest請求,獲取新的Cluster對象。
文章轉自About雲(http://www.aboutyun.com/thread-19917-1-1.html),原文位於csdn,做者:travi