本文綜合了我以前寫的kafka相關文章,可做爲一個全面瞭解學習kafka的培訓學習資料。
轉載請註明出處 : 本文連接java
當今社會各類應用系統諸如商業、社交、搜索、瀏覽等像信息工廠同樣不斷的生產出各類信息,在大數據時代,咱們面臨以下幾個挑戰:linux
以上幾個挑戰造成了一個業務需求模型,即生產者生產(produce)各類信息,消費者消費(consume)(處理分析)這些信息,而在生產者與消費者之間,須要一個溝通二者的橋樑-消息系統。從一個微觀層面來講,這種需求也可理解爲不一樣的系統之間如何傳遞消息。web
Kafka由 linked-in 開源
kafka-便是解決上述這類問題的一個框架,它實現了生產者和消費者之間的無縫鏈接。
kafka-高產出的分佈式消息系統(A high-throughput distributed messaging system)算法
Apache kafka 是一個分佈式的基於push-subscribe的消息系統,它具有快速、可擴展、可持久化的特色。它如今是Apache旗下的一個開源系統,做爲hadoop生態系統的一部分,被各類商業公司普遍應用。它的最大的特性就是能夠實時的處理大量數據以知足各類需求場景:好比基於hadoop的批處理系統、低延遲的實時系統、storm/spark流式處理引擎。apache
下面介紹先大致介紹一下Kafka的主要設計思想,可讓相關人員在短期內瞭解到kafka相關特性,若是想深刻研究,後面會對其中每個特性都作詳細介紹。api
Kafka中發佈訂閱的對象是topic。咱們能夠爲每類數據建立一個topic,把向topic發佈消息的客戶端稱做producer,從topic訂閱消息的客戶端稱做consumer。Producers和consumers能夠同時從多個topic讀寫數據。一個kafka集羣由一個或多個broker服務器組成,它負責持久化和備份具體的kafka消息。數組
消息發送時都被髮送到一個topic,其本質就是一個目錄,而topic由是由一些Partition Logs(分區日誌)組成,其組織結構以下圖所示:緩存
咱們能夠看到,每一個Partition中的消息都是有序的,生產的消息被不斷追加到Partition log上,其中的每個消息都被賦予了一個惟一的offset值。
Kafka集羣會保存全部的消息,無論消息有沒有被消費;咱們能夠設定消息的過時時間,只有過時的數據纔會被自動清除以釋放磁盤空間。好比咱們設置消息過時時間爲2天,那麼這2天內的全部消息都會被保存到集羣中,數據只有超過了兩天才會被清除。
Kafka須要維持的元數據只有一個–消費消息在Partition中的offset值,Consumer每消費一個消息,offset就會加1。其實消息的狀態徹底是由Consumer控制的,Consumer能夠跟蹤和重設這個offset值,這樣的話Consumer就能夠讀取任意位置的消息。
把消息日誌以Partition的形式存放有多重考慮,第一,方便在集羣中擴展,每一個Partition能夠經過調整以適應它所在的機器,而一個topic又能夠有多個Partition組成,所以整個集羣就能夠適應任意大小的數據了;第二就是能夠提升併發,由於能夠以Partition爲單位讀寫了。服務器
經過上面介紹的咱們能夠知道,kafka中的數據是持久化的而且可以容錯的。Kafka容許用戶爲每一個topic設置副本數量,副本數量決定了有幾個broker來存放寫入的數據。若是你的副本數量設置爲3,那麼一份數據就會被存放在3臺不一樣的機器上,那麼就容許有2個機器失敗。通常推薦副本數量至少爲2,這樣就能夠保證增減、重啓機器時不會影響到數據消費。若是對數據持久化有更高的要求,能夠把副本數量設置爲3或者更多。
Kafka中的topic是以partition的形式存放的,每個topic均可以設置它的partition數量,Partition的數量決定了組成topic的log的數量。Producer在生產數據時,會按照必定規則(這個規則是能夠自定義的)把消息發佈到topic的各個partition中。上面將的副本都是以partition爲單位的,不過只有一個partition的副本會被選舉成leader做爲讀寫用。
關於如何設置partition值須要考慮的因素。一個partition只能被一個消費者消費(一個消費者能夠同時消費多個partition),所以,若是設置的partition的數量小於consumer的數量,就會有消費者消費不到數據。因此,推薦partition的數量必定要大於同時運行的consumer的數量。另一方面,建議partition的數量大於集羣broker的數量,這樣leader partition就能夠均勻的分佈在各個broker中,最終使得集羣負載均衡。在Cloudera,每一個topic都有上百個partition。須要注意的是,kafka須要爲每一個partition分配一些內存來緩存消息數據,若是partition數量越大,就要爲kafka分配更大的heap space。網絡
Producers直接發送消息到broker上的leader partition,不須要通過任何中介一系列的路由轉發。爲了實現這個特性,kafka集羣中的每一個broker均可以響應producer的請求,並返回topic的一些元信息,這些元信息包括哪些機器是存活的,topic的leader partition都在哪,現階段哪些leader partition是能夠直接被訪問的。
Producer客戶端本身控制着消息被推送到哪些partition。實現的方式能夠是隨機分配、實現一類隨機負載均衡算法,或者指定一些分區算法。Kafka提供了接口供用戶實現自定義的分區,用戶能夠爲每一個消息指定一個partitionKey,經過這個key來實現一些hash分區算法。好比,把userid做爲partitionkey的話,相同userid的消息將會被推送到同一個分區。
以Batch的方式推送數據能夠極大的提升處理效率,kafka Producer 能夠將消息在內存中累計到必定數量後做爲一個batch發送請求。Batch的數量大小能夠經過Producer的參數控制,參數值能夠設置爲累計的消息的數量(如500條)、累計的時間間隔(如100ms)或者累計的數據大小(64KB)。經過增長batch的大小,能夠減小網絡請求和磁盤IO的次數,固然具體參數設置須要在效率和時效性方面作一個權衡。
Producers能夠異步的並行的向kafka發送消息,可是一般producer在發送完消息以後會獲得一個future響應,返回的是offset值或者發送過程當中遇到的錯誤。這其中有個很是重要的參數「acks」,這個參數決定了producer要求leader partition 收到確認的副本個數,若是acks設置數量爲0,表示producer不會等待broker的響應,因此,producer沒法知道消息是否發送成功,這樣有可能會致使數據丟失,但同時,acks值爲0會獲得最大的系統吞吐量。
若acks設置爲1,表示producer會在leader partition收到消息時獲得broker的一個確認,這樣會有更好的可靠性,由於客戶端會等待直到broker確認收到消息。若設置爲-1,producer會在全部備份的partition收到消息時獲得broker的確認,這個設置能夠獲得最高的可靠性保證。
Kafka 消息有一個定長的header和變長的字節數組組成。由於kafka消息支持字節數組,也就使得kafka能夠支持任何用戶自定義的序列號格式或者其它已有的格式如Apache Avro、protobuf等。Kafka沒有限定單個消息的大小,但咱們推薦消息大小不要超過1MB,一般通常消息大小都在1~10kB以前。
Kafka提供了兩套consumer api,分爲high-level api和sample-api。Sample-api 是一個底層的API,它維持了一個和單一broker的鏈接,而且這個API是徹底無狀態的,每次請求都須要指定offset值,所以,這套API也是最靈活的。
在kafka中,當前讀到消息的offset值是由consumer來維護的,所以,consumer能夠本身決定如何讀取kafka中的數據。好比,consumer能夠經過重設offset值來從新消費已消費過的數據。無論有沒有被消費,kafka會保存數據一段時間,這個時間週期是可配置的,只有到了過時時間,kafka纔會刪除這些數據。
High-level API封裝了對集羣中一系列broker的訪問,能夠透明的消費一個topic。它本身維持了已消費消息的狀態,即每次消費的都是下一個消息。
High-level API還支持以組的形式消費topic,若是consumers有同一個組名,那麼kafka就至關於一個隊列消息服務,而各個consumer均衡的消費相應partition中的數據。若consumers有不一樣的組名,那麼此時kafka就至關與一個廣播服務,會把topic中的全部消息廣播到每一個consumer。
咱們上面已經知道了Kafka支持以集合(batch)爲單位發送消息,在此基礎上,Kafka還支持對消息集合進行壓縮,Producer端能夠經過GZIP或Snappy格式對消息集合進行壓縮。Producer端進行壓縮以後,在Consumer端需進行解壓。壓縮的好處就是減小傳輸的數據量,減輕對網絡傳輸的壓力,在對大數據處理上,瓶頸每每體如今網絡上而不是CPU(壓縮和解壓會耗掉部分CPU資源)。
那麼如何區分消息是壓縮的仍是未壓縮的呢,Kafka在消息頭部添加了一個描述壓縮屬性字節,這個字節的後兩位表示消息的壓縮採用的編碼,若是後兩位爲0,則表示消息未被壓縮。
在消息系統中,保證消息在生產和消費過程當中的可靠性是十分重要的,在實際消息傳遞過程當中,可能會出現以下三中狀況:
有許多系統聲稱它們實現了exactly-once,可是它們其實忽略了生產者或消費者在生產和消費過程當中有可能失敗的狀況。好比雖然一個Producer成功發送一個消息,可是消息在發送途中丟失,或者成功發送到broker,也被consumer成功取走,可是這個consumer在處理取過來的消息時失敗了。
從Producer端看:Kafka是這麼處理的,當一個消息被髮送後,Producer會等待broker成功接收到消息的反饋(可經過參數控制等待時間),若是消息在途中丟失或是其中一個broker掛掉,Producer會從新發送(咱們知道Kafka有備份機制,能夠經過參數控制是否等待全部備份節點都收到消息)。
從Consumer端看:前面講到過partition,broker端記錄了partition中的一個offset值,這個值指向Consumer下一個即將消費message。當Consumer收到了消息,但卻在處理過程當中掛掉,此時Consumer能夠經過這個offset值從新找到上一個消息再進行處理。Consumer還有權限控制這個offset值,對持久化到broker端的消息作任意處理。
備份機制是Kafka0.8版本的新特性,備份機制的出現大大提升了Kafka集羣的可靠性、穩定性。有了備份機制後,Kafka容許集羣中的節點掛掉後而不影響整個集羣工做。一個備份數量爲n的集羣容許n-1個節點失敗。在全部備份節點中,有一個節點做爲lead節點,這個節點保存了其它備份節點列表,並維持各個備份間的狀體同步。下面這幅圖解釋了Kafka的備份機制:
Kafka高度依賴文件系統來存儲和緩存消息,通常的人認爲磁盤是緩慢的,這致使人們對持久化結構具備競爭性持懷疑態度。其實,磁盤遠比你想象的要快或者慢,這決定於咱們如何使用磁盤。
一個和磁盤性能有關的關鍵事實是:磁盤驅動器的吞吐量跟尋到延遲是相背離的,也就是所,線性寫的速度遠遠大於隨機寫。好比:在一個6 7200rpm SATA RAID-5 的磁盤陣列上線性寫的速度大概是600M/秒,可是隨機寫的速度只有100K/秒,二者相差將近6000倍。線性讀寫在大多數應用場景下是能夠預測的,所以,操做系統利用read-ahead和write-behind技術來從大的數據塊中預取數據,或者將多個邏輯上的寫操做組合成一個大寫物理寫操做中。更多的討論能夠在ACMQueueArtical中找到,他們發現,對磁盤的線性讀在有些狀況下能夠比內存的隨機訪問要快一些。
爲了補償這個性能上的分歧,現代操做系統都會把空閒的內存用做磁盤緩存,儘管在內存回收的時候會有一點性能上的代價。全部的磁盤讀寫操做會在這個統一的緩存上進行。
此外,若是咱們是在JVM的基礎上構建的,熟悉java內存應用管理的人應該清楚如下兩件事情:
基於這些事實,利用文件系統而且依靠頁緩存比維護一個內存緩存或者其餘結構要好——咱們至少要使得可用的緩存加倍,經過自動訪問可用內存,而且經過存儲更緊湊的字節結構而不是一個對象,這將有可能再次加倍。這麼作的結果就是在一臺32GB的機器上,若是不考慮GC懲罰,將最多有28-30GB的緩存。此外,這些緩存將會一直存在即便服務重啓,然而進程內緩存須要在內存中重構(10GB緩存須要花費10分鐘)或者它須要一個徹底冷緩存啓動(很是差的初始化性能)。它同時也簡化了代碼,由於如今全部的維護緩存和文件系統之間內聚的邏輯都在操做系統內部了,這使得這樣作比one-off in-process attempts更加高效與準確。若是你的磁盤應用更加傾向於順序讀取,那麼read-ahead在每次磁盤讀取中實際上獲取到這人緩存中的有用數據。
以上這些建議了一個簡單的設計:不一樣於維護儘量多的內存緩存而且在須要的時候刷新到文件系統中,咱們換一種思路。全部的數據不須要調用刷新程序,而是馬上將它寫到一個持久化的日誌中。事實上,這僅僅意味着,數據將被傳輸到內核頁緩存中並稍後被刷新。咱們能夠增長一個配置項以讓系統的用戶來控制數據在何時被刷新到物理硬盤上。
消息系統中持久化數據結構的設計一般是維護者一個和消費隊列有關的B樹或者其它可以隨機存取結構的元數據信息。B樹是一個很好的結構,能夠用在事務型與非事務型的語義中。可是它須要一個很高的花費,儘管B樹的操做須要O(logN)。一般狀況下,這被認爲與常數時間等價,但這對磁盤操做來講是不對的。磁盤尋道一次須要10ms,而且一次只能尋一個,所以並行化是受限的。
直覺上來說,一個持久化的隊列能夠構建在對一個文件的讀和追加上,就像通常狀況下的日誌解決方案。儘管和B樹相比,這種結構不能支持豐富的語義,可是它有一個優勢,全部的操做都是常數時間,而且讀寫之間不會相互阻塞。這種設計具備極大的性能優點:最終系統性能和數據大小徹底無關,服務器能夠充分利用廉價的硬盤來提供高效的消息服務。
事實上還有一點,磁盤空間的無限增大而不影響性能這點,意味着咱們能夠提供通常消息系統沒法提供的特性。好比說,消息被消費後不是立馬被刪除,咱們能夠將這些消息保留一段相對比較長的時間(好比一個星期)。
咱們已經爲效率作了很是多的努力。可是有一種很是主要的應用場景是:處理Web活動數據,它的特色是數據量很是大,每一次的網頁瀏覽都會產生大量的寫操做。更進一步,咱們假設每個被髮布的消息都會被至少一個consumer消費,所以咱們更要怒路讓消費變得更廉價。
經過上面的介紹,咱們已經解決了磁盤方面的效率問題,除此以外,在此類系統中還有兩類比較低效的場景:
爲了減小大量小I/O操做的問題,kafka的協議是圍繞消息集合構建的。Producer一次網絡請求能夠發送一個消息集合,而不是每一次只發一條消息。在server端是以消息塊的形式追加消息到log中的,consumer在查詢的時候也是一次查詢大量的線性數據塊。消息集合即MessageSet,實現自己是一個很是簡單的API,它將一個字節數組或者文件進行打包。因此對消息的處理,這裏沒有分開的序列化和反序列化的上步驟,消息的字段能夠按需反序列化(若是沒有須要,能夠不用反序列化)。
另外一個影響效率的問題就是字節拷貝。爲了解決字節拷貝的問題,kafka設計了一種「標準字節消息」,Producer、Broker、Consumer共享這一種消息格式。Kakfa的message log在broker端就是一些目錄文件,這些日誌文件都是MessageSet按照這種「標準字節消息」格式寫入到磁盤的。
維持這種通用的格式對這些操做的優化尤其重要:持久化log 塊的網絡傳輸。流行的unix操做系統提供了一種很是高效的途徑來實現頁面緩存和socket之間的數據傳遞。在Linux操做系統中,這種方式被稱做:sendfile system call(Java提供了訪問這個系統調用的方法:FileChannel.transferTo api)。
爲了理解sendfile的影響,須要理解通常的將數據從文件傳到socket的路徑:
這種操做方式明顯是很是低效的,這裏有四次拷貝,兩次系統調用。若是使用sendfile,就能夠避免兩次拷貝:操做系統將數據直接從頁緩存發送到網絡上。因此在這個優化的路徑中,只有最後一步將數據拷貝到網卡緩存中是須要的。
咱們指望一個主題上有多個消費者是一種常見的應用場景。利用上述的zero-copy,數據只被拷貝到頁緩存一次,而後就能夠在每次消費時被重得利用,而不須要將數據存在內存中,而後在每次讀的時候拷貝到內核空間中。這使得消息消費速度能夠達到網絡鏈接的速度。這樣以來,經過頁面緩存和sendfile的結合使用,整個kafka集羣幾乎都已以緩存的方式提供服務,並且即便下游的consumer不少,也不會對整個集羣服務形成壓力。
關於sendfile和zero-copy,請參考:zero-copy
爲了提升性能,推薦採用專用的服務器來部署kafka集羣,儘可能與hadoop集羣分開,由於kafka依賴磁盤讀寫和大的頁面緩存,若是和hadoop共享節點的話會影響其使用頁面緩存的性能。
Kafka集羣的大小須要根據硬件的配置、生產者消費者的併發數量、數據的副本個數、數據的保存時長綜合肯定。
磁盤的吞吐量尤其重要,由於一般kafka的瓶頸就在磁盤上。
Kafka依賴於zookeeper,建議採用專用服務器來部署zookeeper集羣,zookeeper集羣的節點採用偶數個,通常建議用三、五、7個。注意zookeeper集羣越大其讀寫性能越慢,由於zookeeper須要在節點之間同步數據。一個3節點的zookeeper集羣容許一個節點失敗,一個5節點集羣容許2個幾點失敗。
有不少因素決定着kafka集羣須要具有存儲能力的大小,最準確的衡量辦法就是模擬負載來測算一下,Kafka自己也提供了負載測試的工具。
若是不想經過模擬實驗來評估集羣大小,最好的辦法就是根據硬盤的空間需求來推算。下面我就根據網絡和磁盤吞吐量需求來作一下估算。
咱們作以下假設:
通常的來講,kafka集羣瓶頸在於網絡和磁盤吞吐量,因此咱們先評估一下集羣的網絡和磁盤需求。
對於每條消息,每一個副本都要寫一遍,因此總體寫的速度是W*R。讀數據的部分主要是集羣內部各個副本從leader同步消息讀和集羣外部的consumer讀,因此集羣內部讀的速率是(R-1)*W,同時,外部consumer讀的速度是C*W,所以:
須要注意的是,咱們能夠在讀的時候緩存部分數據來減小IO操做,若是一個集羣有M MB內存,寫的速度是W MB/sec,則容許M/(W*R) 秒的寫能夠被緩存。若是集羣有32GB內存,寫的速度是50MB/s的話,則能夠至少緩存10分鐘的數據。
Kafka data structures in Zookeeper
屬性 | 默認值 | 描述 |
---|---|---|
broker.id | 必填參數,broker的惟一標識 | |
log.dirs | /tmp/kafka-logs | Kafka數據存放的目錄。能夠指定多個目錄,中間用逗號分隔,當新partition被建立的時會被存放到當前存放partition最少的目錄。 |
port | 9092 | BrokerServer接受客戶端鏈接的端口號 |
zookeeper.connect | null | Zookeeper的鏈接串,格式爲:hostname1:port1,hostname2:port2,hostname3:port3。能夠填一個或多個,爲了提升可靠性,建議都填上。注意,此配置容許咱們指定一個zookeeper路徑來存放此kafka集羣的全部數據,爲了與其餘應用集羣區分開,建議在此配置中指定本集羣存放目錄,格式爲:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。須要注意的是,消費者的參數要和此參數一致。 |
message.max.bytes | 1000000 | 服務器能夠接收到的最大的消息大小。注意此參數要和consumer的maximum.message.size大小一致,不然會由於生產者生產的消息太大致使消費者沒法消費。 |
num.io.threads | 8 | 服務器用來執行讀寫請求的IO線程數,此參數的數量至少要等於服務器上磁盤的數量。 |
queued.max.requests | 500 | I/O線程能夠處理請求的隊列大小,若實際請求數超過此大小,網絡線程將中止接收新的請求。 |
socket.send.buffer.bytes | 100 * 1024 | The SO_SNDBUFF buffer the server prefers for socket connections. |
socket.receive.buffer.bytes | 100 * 1024 | The SO_RCVBUFF buffer the server prefers for socket connections. |
socket.request.max.bytes | 100 * 1024 * 1024 | 服務器容許請求的最大值, 用來防止內存溢出,其值應該小於 Java heap size. |
num.partitions | 1 | 默認partition數量,若是topic在建立時沒有指定partition數量,默認使用此值,建議改成5 |
log.segment.bytes | 1024 * 1024 * 1024 | Segment文件的大小,超過此值將會自動新建一個segment,此值能夠被topic級別的參數覆蓋。 |
log.roll.{ms,hours} | 24 * 7 hours | 新建segment文件的時間,此值能夠被topic級別的參數覆蓋。 |
log.retention.{ms,minutes,hours} | 7 days | Kafka segment log的保存週期,保存週期超過此時間日誌就會被刪除。此參數能夠被topic級別參數覆蓋。數據量大時,建議減少此值。 |
log.retention.bytes | -1 | 每一個partition的最大容量,若數據量超過此值,partition數據將會被刪除。注意這個參數控制的是每一個partition而不是topic。此參數能夠被log級別參數覆蓋。 |
log.retention.check.interval.ms | 5 minutes | 刪除策略的檢查週期 |
auto.create.topics.enable | true | 自動建立topic參數,建議此值設置爲false,嚴格控制topic管理,防止生產者錯寫topic。 |
default.replication.factor | 1 | 默認副本數量,建議改成2。 |
replica.lag.time.max.ms | 10000 | 在此窗口時間內沒有收到follower的fetch請求,leader會將其從ISR(in-sync replicas)中移除。 |
replica.lag.max.messages | 4000 | 若是replica節點落後leader節點此值大小的消息數量,leader節點就會將其從ISR中移除。 |
replica.socket.timeout.ms | 30 * 1000 | replica向leader發送請求的超時時間。 |
replica.socket.receive.buffer.bytes | 64 * 1024 | The socket receive buffer for network requests to the leader for replicating data. |
replica.fetch.max.bytes | 1024 * 1024 | The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader. |
replica.fetch.wait.max.ms | 500 | The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader. |
num.replica.fetchers | 1 | Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker. |
fetch.purgatory.purge.interval.requests | 1000 | The purge interval (in number of requests) of the fetch request purgatory. |
zookeeper.session.timeout.ms | 6000 | ZooKeeper session 超時時間。若是在此時間內server沒有向zookeeper發送心跳,zookeeper就會認爲此節點已掛掉。 此值過低致使節點容易被標記死亡;若過高,.會致使太遲發現節點死亡。 |
zookeeper.connection.timeout.ms | 6000 | 客戶端鏈接zookeeper的超時時間。 |
zookeeper.sync.time.ms | 2000 | H ZK follower落後 ZK leader的時間。 |
controlled.shutdown.enable | true | 容許broker shutdown。若是啓用,broker在關閉本身以前會把它上面的全部leaders轉移到其它brokers上,建議啓用,增長集羣穩定性。 |
auto.leader.rebalance.enable | true | If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the 「preferred」 replica for each partition if it is available. |
leader.imbalance.per.broker.percentage | 10 | The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above the configured value per broker. |
leader.imbalance.check.interval.seconds | 300 | The frequency with which to check for leader imbalance. |
offset.metadata.max.bytes | 4096 | The maximum amount of metadata to allow clients to save with their offsets. |
connections.max.idle.ms | 600000 | Idle connections timeout: the server socket processor threads close the connections that idle more than this. |
num.recovery.threads.per.data.dir | 1 | The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. |
unclean.leader.election.enable | true | Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss. |
delete.topic.enable | false | 啓用deletetopic參數,建議設置爲true。 |
offsets.topic.num.partitions | 50 | The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200). |
offsets.topic.retention.minutes | 1440 | Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic. |
offsets.retention.check.interval.ms | 600000 | The frequency at which the offset manager checks for stale offsets. |
offsets.topic.replication.factor | 3 | The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas. |
offsets.topic.segment.bytes | 104857600 | Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads. |
offsets.load.buffer.size | 5242880 | An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager’s cache. |
offsets.commit.required.acks | -1 | The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer’s acknowledgement setting. In general, the default should not be overridden. |
offsets.commit.timeout.ms | 5000 | The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout. |
屬性 | 默認值 | 描述 |
---|---|---|
metadata.broker.list | 啓動時producer查詢brokers的列表,能夠是集羣中全部brokers的一個子集。注意,這個參數只是用來獲取topic的元信息用,producer會從元信息中挑選合適的broker並與之創建socket鏈接。格式是:host1:port1,host2:port2。 | |
request.required.acks | 0 | 參見3.2節介紹 |
request.timeout.ms | 10000 | Broker等待ack的超時時間,若等待時間超過此值,會返回客戶端錯誤信息。 |
producer.type | sync | 同步異步模式。async表示異步,sync表示同步。若是設置成異步模式,能夠容許生產者以batch的形式push數據,這樣會極大的提升broker性能,推薦設置爲異步。 |
serializer.class | kafka.serializer.DefaultEncoder | 序列號類,.默認序列化成 byte[] 。 |
key.serializer.class | Key的序列化類,默認同上。 | |
partitioner.class | kafka.producer.DefaultPartitioner | Partition類,默認對key進行hash。 |
compression.codec | none | 指定producer消息的壓縮格式,可選參數爲: 「none」, 「gzip」 and 「snappy」。關於壓縮參見4.1節 |
compressed.topics | null | 啓用壓縮的topic名稱。若上面參數選擇了一個壓縮格式,那麼壓縮僅對本參數指定的topic有效,若本參數爲空,則對全部topic有效。 |
message.send.max.retries | 3 | Producer發送失敗時重試次數。若網絡出現問題,可能會致使不斷重試。 |
retry.backoff.ms | 100 | Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. |
topic.metadata.refresh.interval.ms | 600 * 1000 | The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available…). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed |
queue.buffering.max.ms | 5000 | 啓用異步模式時,producer緩存消息的時間。好比咱們設置成1000時,它會緩存1秒的數據再一次發送出去,這樣能夠極大的增長broker吞吐量,但也會形成時效性的下降。 |
queue.buffering.max.messages | 10000 | 採用異步模式時producer buffer 隊列裏最大緩存的消息數量,若是超過這個數值,producer就會阻塞或者丟掉消息。 |
queue.enqueue.timeout.ms | -1 | 當達到上面參數值時producer阻塞等待的時間。若是值設置爲0,buffer隊列滿時producer不會阻塞,消息直接被丟掉。若值設置爲-1,producer會被阻塞,不會丟消息。 |
batch.num.messages | 200 | 採用異步模式時,一個batch緩存的消息數量。達到這個數量值時producer纔會發送消息。 |
send.buffer.bytes | 100 * 1024 | Socket write buffer size |
client.id | 「」 | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request. |
屬性 | 默認值 | 描述 |
---|---|---|
group.id | Consumer的組ID,相同goup.id的consumer屬於同一個組。 | |
zookeeper.connect | Consumer的zookeeper鏈接串,要和broker的配置一致。 | |
consumer.id | null | 若是不設置會自動生成。 |
socket.timeout.ms | 30 * 1000 | 網絡請求的socket超時時間。實際超時時間由max.fetch.wait + socket.timeout.ms 肯定。 |
socket.receive.buffer.bytes | 64 * 1024 | The socket receive buffer for network requests. |
fetch.message.max.bytes | 1024 * 1024 | 查詢topic-partition時容許的最大消息大小。consumer會爲每一個partition緩存此大小的消息到內存,所以,這個參數能夠控制consumer的內存使用量。這個值應該至少比server容許的最大消息大小大,以避免producer發送的消息大於consumer容許的消息。 |
num.consumer.fetchers | 1 | The number fetcher threads used to fetch data. |
auto.commit.enable | true | 若是此值設置爲true,consumer會週期性的把當前消費的offset值保存到zookeeper。當consumer失敗重啓以後將會使用此值做爲新開始消費的值。 |
auto.commit.interval.ms | 60 * 1000 | Consumer提交offset值到zookeeper的週期。 |
queued.max.message.chunks | 2 | 用來被consumer消費的message chunks 數量, 每一個chunk能夠緩存fetch.message.max.bytes大小的數據量。 |
rebalance.max.retries | 4 | When a new consumer joins a consumer group the set of consumers attempt to 「rebalance」 the load to assign partitions to each consumer. If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. This setting controls the maximum number of attempts before giving up. |
fetch.min.bytes | 1 | The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. |
fetch.wait.max.ms | 100 | The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch.min.bytes. |
rebalance.backoff.ms | 2000 | Backoff time between retries during rebalance. |
refresh.leader.backoff.ms | 200 | Backoff time to wait before trying to determine the leader of a partition that has just lost its leader. |
auto.offset.reset | largest | What to do when there is no initial offset in ZooKeeper or if an offset is out of range ;smallest : automatically reset the offset to the smallest offset; largest : automatically reset the offset to the largest offset;anything else: throw exception to the consumer |
consumer.timeout.ms | -1 | 若在指定時間內沒有消息消費,consumer將會拋出異常。 |
exclude.internal.topics | true | Whether messages from internal topics (such as offsets) should be exposed to the consumer. |
zookeeper.session.timeout.ms | 6000 | ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur. |
zookeeper.connection.timeout.ms | 6000 | The max time that the client waits while establishing a connection to zookeeper. |
zookeeper.sync.time.ms | 2000 | How far a ZK follower can be behind a ZK leader |