前提:本例適合那些沒有順序要求的消息主題。
kafka經過一系列優化,寫入和讀取速度可以達到數萬條/秒。經過增長分區數量,可以經過部署多個消費者增長並行消費能力。但仍是有不少狀況下,某些業務的執行速度實在是太慢,這個時候咱們就要用到多線程去消費,提升應用機器的利用率,而不是一味的給kafka增長壓力。
使用Spring建立一個kafka消費者是很是簡單的。咱們選擇的方式是繼承kafka的ShutdownableThread
,而後實現它的doWork
方法便可。java
參考:https://github.com/apache/kaf...git
即然是使用多線程,咱們就須要新建一個線程池。
咱們建立了一個最大容量爲20的線程池,其中有兩個參數須要注意一下。(參考《JAVA多線程使用場景和注意事項簡版》)。程序員
咱們使用了了零容量的SynchronousQueue
,一進一出,避免隊列裏緩衝數據,這樣在系統異常關閉時,就能排除由於阻塞隊列丟消息的可能。
而後使用了CallerRunsPolicy
飽和策略,使得多線程處理不過來的時候,可以阻塞在kafka的消費線程上。github
而後,咱們將真正處理業務的邏輯放在任務中多線程執行,每次執行完畢,咱們都手工的commit一次ack
,代表這條消息我已經處理了。因爲是線程池認領了這些任務,順序性是沒法保證的,可能有些任務沒有執行完畢,後面的任務就已經把它的offset給提交了。o.O
redis
不過這暫時不重要,首先讓它並行化運行就好。
惋惜的是,當咱們運行程序,直接拋出了異常,沒法進行下去。
程序直接說了:apache
KafkaConsumer is not safe for multi-threaded access
顯然,kafka的消費端不是線程安全的,它拒絕你這麼調用它的api。kafka的初衷是好的,想要避免一些併發環境的問題,但我確實須要使用多線程處理。api
kafka消費者經過比較調用者的線程id來判斷是不是由外部線程發起請求。安全
long threadId = Thread.currentThread().getId(); if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); refcount.incrementAndGet();
}session
得,只能將commitSync
函數放在線程外面了,先提交ack、再執行任務。多線程
咱們獲取的消息,可能在真正被執行以前,會進行一些過濾,好比一些空值或者特定條件的判斷。雖然能夠直接放在消費者線程裏運行,但顯的特別的亂,能夠加入一個生產者消費者模型(你能夠認爲這是多此一舉)。這裏採用的是阻塞隊列依然是SynchronousQueue
,它充當了管道的功能。
咱們把任務放入管道後,立馬commit。若是線程池已經滿了,將一直阻塞在消費者線程裏,直到有空缺。而後,咱們單獨啓動了一個線程,用來接收這些數據,而後提交到這部分的代碼看起來大概這樣。
應用可以啓動了,消費速度賊快。
kafka的參數很是的多,咱們比較關心的有如下幾個參數。
調用一次poll,返回的最大條數。這個值設置的大,那麼處理的就慢,很容易超出max.poll.interval.ms
的值(默認5分鐘),形成消費者的離線。在耗時很是大的消費中,是須要特別注意的。
是否開啓自動提交(offset)若是開啓,consumer已經消費的offset信息將會間歇性的提交到kafka中(持久保存)
當開啓offset自動提交時,提交請求的時間頻率由參數`
auto.commit.interval.ms`控制。
若是broker端反饋的數據量不足時(fetch.min.bytes),fetch請求等待的最長時間。若是數據量知足須要,則當即返回。
consumer會話超時時長,若是在此時間內,server還沒有接收到consumer任何請求(包括心跳檢測),那麼server將會斷定此consumer離線。此值越大,server等待consumer失效、rebalance時間就越長。
consumer協調器與kafka集羣之間,心跳檢測的時間間隔。kafka集羣經過心跳判斷consumer會話的活性,以判斷consumer是否在線,若是離線則會把此consumer註冊的partition分配(assign)給相同group的其餘consumer。此值必須小於「session.timeout.ms」,即會話過時時間應該比心跳檢測間隔要大,一般爲session.timeout.ms的三分之一,不然心跳檢測就失去意義。
在本例中,咱們的參數簡單的設置以下,主要調整了每次獲取的條數和檢測時間。其餘的都是默認。
仔細的同窗可能會看到,咱們的代碼依然不是徹底安全的。這是因爲咱們提早提交了ack致使的。程序正常運行下,這無傷大雅。但在應用異常關閉的時候,那些正在執行中的消息,極可能會丟失,對於一致性要求很是高的應用,咱們要從兩個手段上進行保證。
第一種就是考慮kill -15的狀況。這種方式比較簡單,只要覆蓋ShutdownableThread的shutdown方法便可,應用將有機會執行線程池中的任務,確保消費完畢再關閉應用。
@Override public void shutdown() { super.shutdown(); executor.shutdown(); }
應用oom,或者直接kill -9了,事情就變得麻煩起來。
維護一個單獨的日誌文件(或者本地db),在commit以前寫入一條日誌,而後在真正執行完畢以後寫入一條對應的日誌。當系統啓動時,讀取這些日誌文件,獲取沒有執行成功的任務,從新執行。
想要效率,還想要可靠,是得下點苦力氣的。
這種方式與日誌方式相似,但因爲redis的效率很高(可達數萬),並且方便,是優於日誌方式的。
可使用Hash結構,提交任務的同時寫入Redis,任務執行完畢刪掉這個值,那麼剩下的就是出現問題的消息。
在系統啓動時,首先檢測一下redis中是否有異常數據。若是有,首先處理這些數據,而後正常消費。
多線程是爲了增長效率,redis等是爲了增長可靠性。業務代碼是很是好編寫的,搞懂了邏輯就搞定了大部分;業務代碼有時候又是困難的,你要編寫大量輔助功能增長它的效率、照顧它的邊界。
以程序員的角度來講,最有競爭力的代碼都是爲了照顧小几率發生的邊界異常。
kafka在吞吐量和可靠性方面,有各類的權衡,不少都是魚和熊掌的關係。沒必要糾結於它自己,咱們能夠藉助外部的工具,獲取更大的收益。在這種狀況下,redis當機與應用同時當機的機率仍是比較小的。5個9的消息保證是能夠作到的,剩下的那點不完美問題消息,你爲何不從日誌裏找呢?
擴展閱讀:
三、360度測試:KAFKA會丟數據麼?其高可用是否知足需求?