從0.9.0.0開始,下面是消費者的配置。apache
名稱 | 描述 | 類型 | 默認值 |
---|---|---|---|
bootstrap.servers | 消費者初始鏈接kafka集羣時的地址列表。無論這邊配置的什麼地址,消費者會使用全部的kafka集羣服務器。消費者會經過這些地址列表,找到全部的kafka集羣機器。 | list | |
key.deserializer | 實現了Deserializer的key的反序列化類 | class | |
value.deserializer | 實現了Deserializer的value的反序列化類 | class | |
fetch.min.bytes | 每次請求,kafka返回的最小的數據量。若是數據量不夠,這個請求會等待,直到數據量到達最小指標時,纔會返回給消費者。若是設置大於1,會提升kafka的吞吐量,可是會有額外的等待期的代價。 | int | 1 |
group.id | 標識這臺消費者屬於那個消費組。若是消費者經過訂閱主題來實現組管理功能,或者使用基於kafka的偏移量管理策略,這個配置是必須的。 | string | "" |
heartbeat.interval.ms | 使用kafka集羣管理工具時,消費者協調器之間的預計心跳時間。心跳的做用是確保消費者的session是活躍的,同時當新的機器加入集羣或有機器掛掉的狀況下觸發再平衡操做。這個配置必須小於heartbeat.interval.ms,並且應該不大於這個值的1/3。爲了控制正常的負載均衡的預期時間,這個值能夠設置的更小。 | int | 3000 |
max.partition.fetch.bytes | kafka集羣每一個分區一次返回的最大數據量。一次請求的最大內存使用量應該等於#partitions * max.partition.fetch.bytes。這個值必須與kafka集羣容許的最大消息數據量差很少大小,不然可能生產者發送了一個消息,大於消費者配置的值。這種狀況下,消費者可能會在獲取那條消息時堵住。 | int | 1048576 |
session.timeout.ms | 使用kafka集羣管理工具時檢測失敗的超時時間。若是在session超時時間範圍內,沒有收到消費者的心跳,broker會把這個消費者置爲失效,並觸發消費者負載均衡。由於只有在調用poll方法時纔會發送心跳,更大的session超時時間容許消費者在poll循環週期內處理消息內容,儘管這會有花費更長時間檢測失效的代價。若是想控制消費者處理消息的時間,還能夠參考max.poll.records。注意這個值的大小應該在group.min.session.timeout.ms和group.max.session.timeout.ms範圍內。 | int | 30000 |
ssl.key.password | 私鑰存儲文件的私鑰密碼。可選配置。 | password | null |
ssl.keystore.location | 私鑰存儲文件的路徑。可選配置,而且可用來做爲客戶端的雙向認證。 | string | null |
ssl.keystore.password | 私鑰存儲文件的存儲密碼。可選配置,而且只有ssl.keystore.location配置的狀況下才須要配置。 | password | null |
ssl.truststore.location | 信任祕鑰文件路徑。 | string | null |
ssl.truststore.password | 信任祕鑰文件密碼。 | password | null |
auto.offset.reset | 當kafka的初始偏移量沒了,或者當前的偏移量不存在的狀況下,應該怎麼辦?下面有幾種策略:earliest(將偏移量自動重置爲最初的值)、latest(自動將偏移量置爲最新的值)、none(若是在消費者組中沒有發現前一個偏移量,就向消費者拋出一個異常)、anything else(向消費者拋出異常) | string | latest |
connections.max.idle.ms | 配置時間後,關閉空閒的鏈接 | long | 540000 |
enable.auto.commit | 若是設爲true,消費者的偏移量會按期在後臺提交。 | boolean | true |
exclude.internal.topics | 內部主題(好比偏移量)是否須要暴露給消費者。若是設爲true,獲取內部主題消息的途徑就是訂閱他們。 | boolean | true |
max.poll.records | 一次poll調用返回的最大消息數量。 | int | 2147483647 |
partition.assignment.strategy | 使用組管理時,客戶端使用的分區策略的類名,根據這個策略來進行消費分區。 | list | [org.apache.kafka.clients.consumer.RangeAssignor] |
receive.buffer.bytes | SO_RCVBUF讀取數據使用的內存大小。 | int | 65536 |
request.timeout.ms | 這個配置控制一次請求響應的最長等待時間。若是在超時時間內未獲得響應,kafka要麼重發這條消息,要麼超太重試次數的狀況下直接置爲失敗。 | int | 40000 |