最近在StackOverflow碰到的一個問題,即在consumer.poll以後assignment()返回爲空的問題,以下面這段代碼所示:分佈式
consumer.subscribe(Arrays.asList("test")); consumer.poll(Duration.ofMillis(0)); // consumer.poll(0);
Set<TopicPartition> assignment = consumer.assignment(); // empty!
有意思的是,若是是consumer.poll(0);則assignment不爲空。以前我覺得poll(long)被標記爲「Deprecated」以後使用poll(Duration)是相同的效果,如今看來二者仍是要有差異的。爲何poll(0)就能獲取到consumer分配方案,而使用poll(Duration)就不能呢?fetch
調研了一番以後發現緣由以下:在poll(0)中consumer會一直阻塞直到它成功獲取了所需的元數據信息,以後它纔會發起fetch請求去獲取數據。雖然poll能夠指定超時時間,但這個超時時間只適用於後面的消息獲取,前面更新元數據信息不計入這個超時時間。poll(Duration)這個版本修改了這樣的設計,會把元數據獲取也計入整個超時時間。因爲本例中使用的是0,即瞬時超時,所以consumer根本沒法在這麼短的時間內鏈接上coordinator,因此只能趕在超時前返回一個空集合。這就是爲何使用不一樣版本的poll命令assignment不一樣的緣由。spa
仔細想一想爲何社區要作這樣的變動?poll(0)這種設計的一個問題在於若是遠端的broker不可用了, 那麼consumer程序會被無限阻塞下去。用戶指定了超時時間但卻被無限阻塞,顯然這樣的設計時有欠缺的。特別是對於Kafka Streams而言,這個設計可能致使的問題在於Stream Thread沒法正常關閉。目前源代碼中依然有一些無限阻塞的場景,好比以前處理的initTransaction,commitTransaction和abortTransaction也是無限等待。看來後面社區仍是須要慢慢地將它們都替換掉,畢竟在分佈式系統中沒有什麼場景是須要絕對地等待的。設計