kafka的生產者能夠選擇使用異步方式發送數據,所謂異步方式,就是咱們調用 send()
方法,並指定一個回調函數, 服務器在返回響應時調用該函數。程序員
kafka在客戶端裏暴露了兩個send
方法,咱們能夠本身選擇同步或者異步模式。咱們來看一個kafka的生產者發送示例,有個直觀的感覺。這個示例是一個同步的模式。緩存
ProducerRecord<String, String> record = new ProducerRecord<>(「Kafka」, 「Kafka_Products」, 「測試」);//Topic Key Value try{ Future future = producer.send(record); future.get();//獲取執行結果 } catch(Exception e) { e.printStackTrace(); }
咱們從源碼層面來繼續看下。服務器
首先kafka定義了一個接口,異步
而後KafkaProducer
實現了這兩個方法,咱們看下異步方法的實現邏輯。函數
能夠看到最終是調用doSend
方法,調用的時候傳入一個回調。這個回調就是監聽方法的執行結果的。學習
不少人會認爲,既然是異步模式,無論結果是成功仍是失敗,確定方法調用會立刻返回的。那我只能告訴你,很差意思,不必定是這樣。我本身就曾經踩過這個坑。測試
咱們當時有個業務流程須要在執行完成後發送kakfa消息給某個業務方,爲了儘可能減小影響我這個主流程的執行時間,採用了異步方式發送kafka消息。在使用中,由於配錯了kafka的TOPIC信息,發現流程阻塞發送消息這裏長達6秒(kafka默認的發送超時時間)。spa
究竟爲啥異步方式還會阻塞呢?咱們繼續看源碼。線程
無論是同步模式仍是異步模式,最終都會調用到doSend
方法,注意看上圖中的waitOnMetadata
方法,我上面說的阻塞的狀況就是阻塞在這個方法裏。那咱們繼續看這個方法。3d
經過代碼中的註釋咱們大概能瞭解這個方法的功能,不過我這裏仍是要解釋下。(防止有人看不懂英文,哈哈)
waitOnMetadata
獲取當前的集羣元數據信息,若是緩存有,而且分區沒有超過指定分區範圍則緩存返回,不然觸發更新,等待新的metadata。這個等待的操做在下面這行代碼:
metadata.awaitUpdate(version, remainingWaitMs);
而後就繼續跟嘍,
這個方法很好理解,就是一直在等一個條件,這個條件達到了就返回,不然一直等待超時退出。而這個條件就是當前的版本號要大於上個版本號。
那麼誰來更新版本號呢?就是咱們前面提到的sender
線程。當咱們的topic配置錯誤的時候致使metadata一直沒法更新,而後一直等到超時。
破案了!
kafka的異步模式可讓咱們在業務場景中發送消息時即刻返回,沒必要等待發送的結果。可是當metadata取不到時,發送的過程仍是須要等待一直超時的。
程序員是一個尤爲須要不斷學習的工種,平時養成閱讀源碼的習慣,不光能避免踩一些坑,還能在遇到問題是快遞定位到問題的根源。