基本配置以下:
-broker.id
-log.dirs
-zookeeper.connect
Topic-level配置以及其默認值將在下面討論。
html
Property算法 |
Defaultapache |
Descriptionbootstrap |
broker.id緩存 |
每一個broker均可以用一個惟一的非負整數id進行標識;這個id能夠做爲broker的「名字」,而且它的存在使得broker無須混淆consumers就能夠遷移到不一樣的host/port上。你能夠選擇任意你喜歡的數字做爲id,只要id是惟一的便可。安全 |
|
log.dirs網絡 |
/tmp/kafka-logssession |
kafka存放數據的路徑。這個路徑並非惟一的,能夠是多個,路徑之間只須要使用逗號分隔便可;每當建立新partition時,都會選擇在包含最少partitions的路徑下進行。併發 |
portapp |
6667 |
server接受客戶端鏈接的端口 |
zookeeper.connect |
null |
ZooKeeper鏈接字符串的格式爲:hostname:port,此處hostname和port分別是ZooKeeper集羣中某個節點的host和port;爲了當某個host宕掉以後你能經過其餘ZooKeeper節點進行鏈接,你能夠按照一下方式制定多個hosts: |
message.max.bytes |
1000000 |
server能夠接收的消息最大尺寸。重要的是,consumer和producer有關這個屬性的設置必須同步,不然producer發佈的消息對consumer來講太大。 |
num.network.threads |
3 |
server用來處理網絡請求的網絡線程數目;通常你不須要更改這個屬性。 |
num.io.threads |
8 |
server用來處理請求的I/O線程的數目;這個線程數目至少要等於硬盤的個數。 |
background.threads |
4 |
用於後臺處理的線程數目,例如文件刪除;你不須要更改這個屬性。 |
queued.max.requests |
500 |
在網絡線程中止讀取新請求以前,能夠排隊等待I/O線程處理的最大請求個數。 |
host.name |
null |
broker的hostname;若是hostname已經設置的話,broker將只會綁定到這個地址上;若是沒有設置,它將綁定到全部接口,併發布一份到ZK |
advertised.host.name |
null |
若是設置,則就做爲broker 的hostname發往producer、consumers以及其餘brokers |
advertised.port |
null |
此端口將給與producers、consumers、以及其餘brokers,它會在創建鏈接時用到; 它僅在實際端口和server須要綁定的端口不同時才須要設置。 |
socket.send.buffer.bytes |
100 * 1024 |
SO_SNDBUFF 緩存大小,server進行socket 鏈接所用 |
socket.receive.buffer.bytes |
100 * 1024 |
SO_RCVBUFF緩存大小,server進行socket鏈接時所用 |
socket.request.max.bytes |
100 * 1024 * 1024 |
server容許的最大請求尺寸; 這將避免server溢出,它應該小於Java heap size |
num.partitions |
1 |
若是建立topic時沒有給出劃分partitions個數,這個數字將是topic下partitions數目的默認數值。 |
log.segment.bytes |
1014*1024*1024 |
topic partition的日誌存放在某個目錄下諸多文件中,這些文件將partition的日誌切分紅一段一段的;這個屬性就是每一個文件的最大尺寸;當尺寸達到這個數值時,就會建立新文件。此設置能夠由每一個topic基礎設置時進行覆蓋。 |
log.roll.hours |
24 * 7 |
即便文件沒有到達log.segment.bytes,只要文件建立時間到達此屬性,就會建立新文件。這個設置也能夠有topic層面的設置進行覆蓋; |
log.cleanup.policy |
delete |
|
log.retention.minutes和log.retention.hours |
7 days |
每一個日誌文件刪除以前保存的時間。默認數據保存時間對全部topic都同樣。 |
log.retention.bytes |
-1 |
每一個topic下每一個partition保存數據的總量;注意,這是每一個partitions的上限,所以這個數值乘以partitions的個數就是每一個topic保存的數據總量。同時注意:若是log.retention.hours和log.retention.bytes都設置了,則超過了任何一個限制都會形成刪除一個段文件。 |
log.retention.check.interval.ms |
5 minutes |
檢查日誌分段文件的間隔時間,以肯定是否文件屬性是否到達刪除要求。 |
log.cleaner.enable |
false |
當這個屬性設置爲false時,一旦日誌的保存時間或者大小達到上限時,就會被刪除;若是設置爲true,則當保存屬性達到上限時,就會進行log compaction。 |
log.cleaner.threads |
1 |
進行日誌壓縮的線程數 |
log.cleaner.io.max.bytes.per.second |
None |
進行log compaction時,log cleaner能夠擁有的最大I/O數目。這項設置限制了cleaner,以免干擾活動的請求服務。 |
log.cleaner.io.buffer.size |
500*1024*1024 |
log cleaner清除過程當中針對日誌進行索引化以及精簡化所用到的緩存大小。最好設置大點,以提供充足的內存。 |
log.cleaner.io.buffer.load.factor |
512*1024 |
進行log cleaning時所須要的I/O chunk尺寸。你不須要更改這項設置。 |
log.cleaner.io.buffer.load.factor |
0.9 |
log cleaning中所使用的hash表的負載因子;你不須要更改這個選項。 |
log.cleaner.backoff.ms |
15000 |
進行日誌是否清理檢查的時間間隔 |
log.cleaner.min.cleanable.ratio |
0.5 |
這項配置控制log compactor試圖清理日誌的頻率(假定log compaction是打開的)。默認避免清理壓縮超過50%的日誌。這個比率綁定了備份日誌所消耗的最大空間(50%的日誌備份時壓縮率爲50%)。更高的比率則意味着浪費消耗更少,也就能夠更有效的清理更多的空間。這項設置在每一個topic設置中能夠覆蓋。 |
log.cleaner.delete.retention.ms |
1day |
保存時間;保存壓縮日誌的最長時間;也是客戶端消費消息的最長時間,榮log.retention.minutes的區別在於一個控制未壓縮數據,一個控制壓縮後的數據;會被topic建立時的指定時間覆蓋。 |
log.index.size.max.bytes |
10*1024*1024 |
每一個log segment的最大尺寸。注意,若是log尺寸達到這個數值,即便尺寸沒有超過log.segment.bytes限制,也須要產生新的log segment。 |
log.index.interval.bytes |
4096 |
當執行一次fetch後,須要必定的空間掃描最近的offset,設置的越大越好,通常使用默認值就能夠 |
log.flush.interval.messages |
Long.MaxValue |
log文件「sync」到磁盤以前累積的消息條數。由於磁盤IO操做是一個慢操做,但又是一個「數據可靠性」的必要手段,因此檢查是否須要固化到硬盤的時間間隔。須要在「數據可靠性」與「性能」之間作必要的權衡,若是此值過大,將會致使每次「發sync」的時間過長(IO阻塞),若是此值太小,將會致使「fsync」的時間較長(IO阻塞),若是此值太小,將會致使」發sync「的次數較多,這也就意味着總體的client請求有必定的延遲,物理server故障,將會致使沒有fsync的消息丟失。 |
log.flush.scheduler.interval.ms |
Long.MaxValue |
檢查是否須要fsync的時間間隔 |
log.flush.interval.ms |
Long.MaxValue |
僅僅經過interval來控制消息的磁盤寫入時機,是不足的,這個數用來控制」fsync「的時間間隔,若是消息量始終沒有達到固化到磁盤的消息數,可是離上次磁盤同步的時間間隔達到閾值,也將觸發磁盤同步。 |
log.delete.delay.ms |
60000 |
文件在索引中清除後的保留時間,通常不須要修改 |
auto.create.topics.enable |
true |
是否容許自動建立topic。若是是真的,則produce或者fetch 不存在的topic時,會自動建立這個topic。不然須要使用命令行建立topic |
controller.socket.timeout.ms |
30000 |
partition管理控制器進行備份時,socket的超時時間。 |
controller.message.queue.size |
Int.MaxValue |
controller-to-broker-channles的buffer 尺寸 |
default.replication.factor |
1 |
默認備份份數,僅指自動建立的topics |
replica.lag.time.max.ms |
10000 |
若是一個follower在這個時間內沒有發送fetch請求,leader將從ISR重移除這個follower,並認爲這個follower已經掛了 |
replica.lag.max.messages |
4000 |
若是一個replica沒有備份的條數超過這個數值,則leader將移除這個follower,並認爲這個follower已經掛了 |
replica.socket.timeout.ms |
30*1000 |
leader 備份數據時的socket網絡請求的超時時間 |
replica.socket.receive.buffer.bytes |
64*1024 |
備份時向leader發送網絡請求時的socket receive buffer |
replica.fetch.max.bytes |
1024*1024 |
備份時每次fetch的最大值 |
replica.fetch.min.bytes |
500 |
leader發出備份請求時,數據到達leader的最長等待時間 |
replica.fetch.min.bytes |
1 |
備份時每次fetch以後迴應的最小尺寸 |
num.replica.fetchers |
1 |
從leader備份數據的線程數 |
replica.high.watermark.checkpoint.interval.ms |
5000 |
每一個replica檢查是否將最高水位進行固化的頻率 |
fetch.purgatory.purge.interval.requests |
1000 |
fetch 請求清除時的清除間隔 |
producer.purgatory.purge.interval.requests |
1000 |
producer請求清除時的清除間隔 |
zookeeper.session.timeout.ms |
6000 |
zookeeper會話超時時間。 |
zookeeper.connection.timeout.ms |
6000 |
客戶端等待和zookeeper創建鏈接的最大時間 |
zookeeper.sync.time.ms |
2000 |
zk follower落後於zk leader的最長時間 |
controlled.shutdown.enable |
true |
是否可以控制broker的關閉。若是可以,broker將能夠移動全部leaders到其餘的broker上,在關閉以前。這減小了不可用性在關機過程當中。 |
controlled.shutdown.max.retries |
3 |
在執行不完全的關機以前,能夠成功執行關機的命令數。 |
controlled.shutdown.retry.backoff.ms |
5000 |
在關機之間的backoff時間 |
auto.leader.rebalance.enable |
true |
若是這是true,控制者將會自動平衡brokers對於partitions的leadership |
leader.imbalance.per.broker.percentage |
10 |
每一個broker所容許的leader最大不平衡比率 |
leader.imbalance.check.interval.seconds |
300 |
檢查leader不平衡的頻率 |
offset.metadata.max.bytes |
4096 |
容許客戶端保存他們offsets的最大個數 |
max.connections.per.ip |
Int.MaxValue |
每一個ip地址上每一個broker能夠被鏈接的最大數目 |
max.connections.per.ip.overrides |
每一個ip或者hostname默認的鏈接的最大覆蓋 |
|
connections.max.idle.ms |
600000 |
空鏈接的超時限制 |
log.roll.jitter.{ms,hours} |
0 |
從logRollTimeMillis抽離的jitter最大數目 |
num.recovery.threads.per.data.dir |
1 |
每一個數據目錄用來日誌恢復的線程數目 |
unclean.leader.election.enable |
true |
指明瞭是否可以使不在ISR中replicas設置用來做爲leader |
delete.topic.enable |
false |
可以刪除topic |
offsets.topic.num.partitions |
50 |
The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200). |
offsets.topic.retention.minutes |
1440 |
存在時間超過這個時間限制的offsets都將被標記爲待刪除 |
offsets.retention.check.interval.ms |
600000 |
offset管理器檢查陳舊offsets的頻率 |
offsets.topic.replication.factor |
3 |
topic的offset的備份份數。建議設置更高的數字保證更高的可用性 |
offset.topic.segment.bytes |
104857600 |
offsets topic的segment尺寸。 |
offsets.load.buffer.size |
5242880 |
這項設置與批量尺寸相關,當從offsets segment中讀取時使用。 |
offsets.commit.required.acks |
-1 |
在offset commit能夠接受以前,須要設置確認的數目,通常不須要更改 |
offsets.commit.timeout.ms 5000 offset commit的延遲時間,這和producer request的超時時間類似。
更多細節能夠在scala 類 kafka.server.KafkaConfig中找到。
topic-level的配置
有關topics的配置既有全局的又有每一個topic獨有的配置。若是沒有給定特定topic設置,則應用默認的全局設置。這些覆蓋會在每次建立topic發生。下面的例子:建立一個topic,命名爲my-topic,自定義最大消息尺寸以及刷新比率爲:
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
須要刪除重寫時,能夠按照如下來作:
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
--deleteConfig max.message.bytes
如下是topic-level的配置選項。server的默認配置在Server Default Property列下給出了,設定這些默認值不會改變原有的設置
Property |
Default |
Server Default Property |
Description |
cleanup.policy |
delete |
log.cleanup.policy |
要麼是」delete「要麼是」compact「; 這個字符串指明瞭針對舊日誌部分的利用方式;默認方式("delete")將會丟棄舊的部分當他們的回收時間或者尺寸限制到達時。」compact「將會進行日誌壓縮 |
delete.retention.ms |
86400000 (24 hours) |
log.cleaner.delete.retention.ms |
對於壓縮日誌保留的最長時間,也是客戶端消費消息的最長時間,通log.retention.minutes的區別在於一個控制未壓縮數據,一個控制壓縮後的數據。此項配置能夠在topic建立時的置頂參數覆蓋 |
flush.messages |
None |
log.flush.interval.messages |
此項配置指定時間間隔:強制進行fsync日誌。例如,若是這個選項設置爲1,那麼每條消息以後都須要進行fsync,若是設置爲5,則每5條消息就須要進行一次fsync。通常來講,建議你不要設置這個值。此參數的設置,須要在"數據可靠性"與"性能"之間作必要的權衡.若是此值過大,將會致使每次"fsync"的時間較長(IO阻塞),若是此值太小,將會致使"fsync"的次數較多,這也意味着總體的client請求有必定的延遲.物理server故障,將會致使沒有fsync的消息丟失. |
flush.ms |
None |
log.flush.interval.ms |
此項配置用來置頂強制進行fsync日誌到磁盤的時間間隔;例如,若是設置爲1000,那麼每1000ms就須要進行一次fsync。通常不建議使用這個選項 |
index.interval.bytes |
4096 |
log.index.interval.bytes |
默認設置保證了咱們每4096個字節就對消息添加一個索引,更多的索引使得閱讀的消息更加靠近,可是索引規模卻會由此增大;通常不須要改變這個選項 |
max.message.bytes |
1000000 |
max.message.bytes |
kafka追加消息的最大尺寸。注意若是你增大這個尺寸,你也必須增大你consumer的fetch 尺寸,這樣consumer才能fetch到這些最大尺寸的消息。 |
min.cleanable.dirty.ratio |
0.5 |
min.cleanable.dirty.ratio |
此項配置控制log壓縮器試圖進行清除日誌的頻率。默認狀況下,將避免清除壓縮率超過50%的日誌。這個比率避免了最大的空間浪費 |
min.insync.replicas |
1 |
min.insync.replicas |
當producer設置request.required.acks爲-1時,min.insync.replicas指定replicas的最小數目(必須確認每個repica的寫數據都是成功的),若是這個數目沒有達到,producer會產生異常。 |
retention.bytes |
None |
log.retention.bytes |
若是使用「delete」的retention 策略,這項配置就是指在刪除日誌以前,日誌所能達到的最大尺寸。默認狀況下,沒有尺寸限制而只有時間限制 |
retention.ms |
7 days |
log.retention.minutes |
若是使用「delete」的retention策略,這項配置就是指刪除日誌前日誌保存的時間。 |
segment.bytes |
1GB |
log.segment.bytes |
kafka中log日誌是分紅一塊塊存儲的,此配置是指log日誌劃分紅塊的大小 |
segment.index.bytes |
10MB |
log.index.size.max.bytes |
此配置是有關offsets和文件位置之間映射的索引文件的大小;通常不須要修改這個配置 |
segment.ms |
7 days |
log.roll.hours |
即便log的分塊文件沒有達到須要刪除、壓縮的大小,一旦log 的時間達到這個上限,就會強制新建一個log分塊文件 |
segment.jitter.ms |
0 |
log.roll.jitter.{ms,hours} |
The maximum jitter to subtract from logRollTimeMillis. |
3.2 Consumer Configs
consumer基本配置以下:
group.id
zookeeper.connect
Property |
Default |
Description |
group.id |
用來惟一標識consumer進程所在組的字符串,若是設置一樣的group id,表示這些processes都是屬於同一個consumer group |
|
zookeeper.connect |
指定zookeeper的鏈接的字符串,格式是hostname:port,此處host和port都是zookeeper server的host和port,爲避免某個zookeeper 機器宕機以後失聯,你能夠指定多個hostname:port,使用逗號做爲分隔: |
|
consumer.id |
null |
不須要設置,通常自動產生 |
socket.timeout.ms |
30*100 |
網絡請求的超時限制。真實的超時限制是 max.fetch.wait+socket.timeout.ms |
socket.receive.buffer.bytes |
64*1024 |
socket用於接收網絡請求的緩存大小 |
fetch.message.max.bytes |
1024*1024 |
每次fetch請求中,針對每次fetch消息的最大字節數。這些字節將會督導用於每一個partition的內存中,所以,此設置將會控制consumer所使用的memory大小。這個fetch請求尺寸必須至少和server容許的最大消息尺寸相等,不然,producer可能發送的消息尺寸大於consumer所能消耗的尺寸。 |
num.consumer.fetchers |
1 |
用於fetch數據的fetcher線程數 |
auto.commit.enable |
true |
若是爲真,consumer所fetch的消息的offset將會自動的同步到zookeeper。這項提交的offset將在進程掛掉時,由新的consumer使用 |
auto.commit.interval.ms |
60*1000 |
consumer向zookeeper提交offset的頻率,單位是秒 |
queued.max.message.chunks |
2 |
用於緩存消息的最大數目,以供consumption。每一個chunk必須和fetch.message.max.bytes相同 |
rebalance.max.retries |
4 |
當新的consumer加入到consumer group時,consumers集合試圖從新平衡分配到每一個consumer的partitions數目。若是consumers集合改變了,當分配正在執行時,這個從新平衡會失敗並重入 |
fetch.min.bytes |
1 |
每次fetch請求時,server應該返回的最小字節數。若是沒有足夠的數據返回,請求會等待,直到足夠的數據纔會返回。 |
fetch.wait.max.ms |
100 |
若是沒有足夠的數據可以知足fetch.min.bytes,則此項配置是指在應答fetch請求以前,server會阻塞的最大時間。 |
rebalance.backoff.ms |
2000 |
在重試reblance以前backoff時間 |
refresh.leader.backoff.ms |
200 |
在試圖肯定某個partition的leader是否失去他的leader地位以前,須要等待的backoff時間 |
auto.offset.reset |
largest |
zookeeper中沒有初始化的offset時,若是offset是如下值的迴應: |
consumer.timeout.ms |
-1 |
若是沒有消息可用,即便等待特定的時間以後也沒有,則拋出超時異常 |
exclude.internal.topics |
true |
是否將內部topics的消息暴露給consumer |
paritition.assignment.strategy |
range |
選擇向consumer 流分配partitions的策略,可選值:range,roundrobin |
client.id |
group id value |
是用戶特定的字符串,用來在每次請求中幫助跟蹤調用。它應該能夠邏輯上確認產生這個請求的應用 |
zookeeper.session.timeout.ms |
6000 |
zookeeper 會話的超時限制。若是consumer在這段時間內沒有向zookeeper發送心跳信息,則它會被認爲掛掉了,而且reblance將會產生 |
zookeeper.connection.timeout.ms |
6000 |
客戶端在創建通zookeeper鏈接中的最大等待時間 |
zookeeper.sync.time.ms |
2000 |
ZK follower能夠落後ZK leader的最大時間 |
offsets.storage |
zookeeper |
用於存放offsets的地點: zookeeper或者kafka |
offset.channel.backoff.ms |
1000 |
從新鏈接offsets channel或者是重試失敗的offset的fetch/commit請求的backoff時間 |
offsets.channel.socket.timeout.ms |
10000 |
當讀取offset的fetch/commit請求迴應的socket 超時限制。此超時限制是被consumerMetadata請求用來請求offset管理 |
offsets.commit.max.retries |
5 |
重試offset commit的次數。這個重試只應用於offset commits在shut-down之間。他 |
dual.commit.enabled |
true |
若是使用「kafka」做爲offsets.storage,你能夠二次提交offset到zookeeper(還有一次是提交到kafka)。在zookeeper-based的offset storage到kafka-based的offset storage遷移時,這是必須的。對任意給定的consumer group來講,比較安全的建議是當完成遷移以後就關閉這個選項 |
partition.assignment.strategy |
range |
在「range」和「roundrobin」策略之間選擇一種做爲分配partitions給consumer 數據流的策略; 循環的partition分配器分配全部可用的partitions以及全部可用consumer 線程。它會將partition循環的分配到consumer線程上。若是全部consumer實例的訂閱都是肯定的,則partitions的劃分是肯定的分佈。循環分配策略只有在如下條件知足時才能夠:(1)每一個topic在每一個consumer實力上都有一樣數量的數據流。(2)訂閱的topic的集合對於consumer group中每一個consumer實例來講都是肯定的。 |
更多細節能夠查看 scala類: kafka.consumer.ConsumerConfig
3.3 Producer Configs
producer基本的配置屬性包含:
(1) metadata.broker.list
(2)request.required.acks
(3)producer.type
(4)serializer.class
Property |
Default |
Description |
metadata.broker.list |
服務於bootstrapping。producer僅用來獲取metadata(topics,partitions,replicas)。發送實際數據的socket鏈接將基於返回的metadata數據信息而創建。格式是: |
|
request.required.acks |
0 |
此配置是代表當一次produce請求被認爲完成時的確認值。特別是,多少個其餘brokers必須已經提交了數據到他們的log而且向他們的leader確認了這些信息。典型的值包括: |
request.timeout.ms |
10000 |
broker盡力實現request.required.acks需求時的等待時間,不然會發送錯誤到客戶端 |
producer.type |
sync |
此選項置頂了消息是否在後臺線程中異步發送。正確的值: |
serializer.class |
kafka.serializer.DefaultEncoder |
消息的序列化類別。默認編碼器輸入一個字節byte[],而後返回相同的字節byte[] |
key.serializer.class |
關鍵字的序列化類。若是沒給與這項,默認狀況是和消息一致 |
|
partitioner.class |
kafka.producer.DefaultPartitioner |
partitioner 類,用於在subtopics之間劃分消息。默認partitioner基於key的hash表 |
compression.codec |
none |
此項參數能夠設置壓縮數據的codec,可選codec爲:「none」, 「gzip」, 「snappy」 |
compressed.topics |
null |
此項參數能夠設置某些特定的topics是否進行壓縮。若是壓縮codec是NoCompressCodec以外的codec,則對指定的topics數據應用這些codec。若是壓縮topics列表是空,則將特定的壓縮codec應用於全部topics。若是壓縮的codec是NoCompressionCodec,壓縮對全部topics軍不可用。 |
message.send.max.retries |
3 |
此項參數將使producer自動重試失敗的發送請求。此項參數將置頂重試的次數。注意:設定非0值將致使重複某些網絡錯誤:引發一條發送並引發確認丟失 |
retry.backoff.ms |
100 |
在每次重試以前,producer會更新相關topic的metadata,以此進行查看新的leader是否分配好了。由於leader的選擇須要一點時間,此選項指定更新metadata以前producer須要等待的時間。 |
topic.metadata.refresh.interval.ms |
600*1000 |
producer通常會在某些失敗的狀況下(partition missing,leader不可用等)更新topic的metadata。他將會規律的循環。若是你設置爲負值,metadata只有在失敗的狀況下才更新。若是設置爲0,metadata會在每次消息發送後就會更新(不建議這種選擇,系統消耗太大)。重要提示: 更新是有在消息發送後纔會發生,所以,若是producer歷來不發送消息,則metadata歷來也不會更新。 |
queue.buffering.max.ms |
5000 |
當應用async模式時,用戶緩存數據的最大時間間隔。例如,設置爲100時,將會批量處理100ms以內消息。這將改善吞吐率,可是會增長因爲緩存產生的延遲。 |
queue.buffering.max.messages |
10000 |
當使用async模式時,在在producer必須被阻塞或者數據必須丟失以前,能夠緩存到隊列中的未發送的最大消息條數 |
batch.num.messages |
200 |
使用async模式時,能夠批量處理消息的最大條數。或者消息數目已到達這個上線或者是queue.buffer.max.ms到達,producer纔會處理 |
send.buffer.bytes |
100*1024 |
socket 寫緩存尺寸 |
client.id |
「」 |
這個client id是用戶特定的字符串,在每次請求中包含用來追蹤調用,他應該邏輯上能夠確認是那個應用發出了這個請求。 |
更多細節須要查看 scala類
kafka.producer.ProducerConfig
三、4 New Producer Configs
咱們正在努力替換現有的producer。代碼在trunk中是可用的,能夠認爲beta版本。下面是新producer的配置
Name |
Type |
Default |
Importance |
Description |
boostrap.servers |
list |
high |
用於創建與kafka集羣鏈接的host/port組。數據將會在全部servers上均衡加載,無論哪些server是指定用於bootstrapping。這個列表僅僅影響初始化的hosts(用於發現所有的servers)。這個列表格式: |
|
acks |
string |
1 |
high |
producer須要server接收到數據以後發出的確認接收的信號,此項配置就是指procuder須要多少個這樣的確認信號。此配置實際上表明瞭數據備份的可用性。如下設置爲經常使用選項: |
buffer.memory |
long |
33554432 |
high |
producer能夠用來緩存數據的內存大小。若是數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常,以「block.on.buffer.full」來代表。 |
compression.type |
string |
none |
high |
producer用於壓縮數據的壓縮類型。默認是無壓縮。正確的選項值是none、gzip、snappy。 |
retries |
int |
0 |
high |
設置大於0的值將使客戶端從新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什麼不一樣。容許重試將潛在的改變數據的順序,若是這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。 |
batch.size |
int |
16384 |
medium |
producer將試圖批處理消息記錄,以減小請求次數。這將改善client與server之間的性能。這項配置控制默認的批量處理消息字節數。 |
client.id |
string |
medium |
當向server發出請求時,這個字符串會發送給server。目的是可以追蹤請求源頭,以此來容許ip/port許可列表以外的一些應用能夠發送信息。這項應用能夠設置任意字符串,由於沒有任何功能性的目的,除了記錄和跟蹤 |
|
linger.ms |
long |
0 |
medium |
producer組將會彙總任何在請求與發送之間到達的消息記錄一個單獨批量的請求。一般來講,這隻有在記錄產生速度大於發送速度的時候才能發生。然而,在某些條件下,客戶端將但願下降請求的數量,甚至下降到中等負載一下。這項設置將經過增長小的延遲來完成--即,不是當即發送一條記錄,producer將會等待給定的延遲時間以容許其餘消息記錄發送,這些消息記錄能夠批量處理。這能夠認爲是TCP種Nagle的算法相似。這項設置設定了批量處理的更高的延遲邊界:一旦咱們得到某個partition的batch.size,他將會當即發送而不顧這項設置,然而若是咱們得到消息字節數比這項設置要小的多,咱們須要「linger」特定的時間以獲取更多的消息。 這個設置默認爲0,即沒有延遲。設定linger.ms=5,例如,將會減小請求數目,可是同時會增長5ms的延遲。 |
max.request.size |
int |
1028576 |
medium |
請求的最大字節數。這也是對最大記錄尺寸的有效覆蓋。注意:server具備本身對消息記錄尺寸的覆蓋,這些尺寸和這個設置不一樣。此項設置將會限制producer每次批量發送請求的數目,以防發出巨量的請求。 |
receive.buffer.bytes |
int |
32768 |
medium |
TCP receive緩存大小,當閱讀數據時使用 |
send.buffer.bytes |
int |
131072 |
medium |
TCP send緩存大小,當發送數據時使用 |
timeout.ms |
int |
30000 |
medium |
此配置選項控制server等待來自followers的確認的最大時間。若是確認的請求數目在此時間內沒有實現,則會返回一個錯誤。這個超時限制是以server端度量的,沒有包含請求的網絡延遲 |
block.on.buffer.full |
boolean |
true |
low |
當咱們內存緩存用盡時,必須中止接收新消息記錄或者拋出錯誤。默認狀況下,這個設置爲真,然而某些阻塞可能不值得期待,所以當即拋出錯誤更好。設置爲false則會這樣:producer會拋出一個異常錯誤:BufferExhaustedException, 若是記錄已經發送同時緩存已滿 |
metadata.fetch.timeout.ms |
long |
60000 |
low |
是指咱們所獲取的一些元素據的第一個時間數據。元素據包含:topic,host,partitions。此項配置是指當等待元素據fetch成功完成所須要的時間,不然會跑出異常給客戶端。 |
metadata.max.age.ms |
long |
300000 |
low |
以微秒爲單位的時間,是在咱們強制更新metadata的時間間隔。即便咱們沒有看到任何partition leadership改變。 |
metric.reporters |
list |
[] |
low |
類的列表,用於衡量指標。實現MetricReporter接口,將容許增長一些類,這些類在新的衡量指標產生時就會改變。JmxReporter總會包含用於註冊JMX統計 |
metrics.num.samples |
int |
2 |
low |
用於維護metrics的樣本數 |
metrics.sample.window.ms |
long |
30000 |
low |
metrics系統維護可配置的樣本數量,在一個可修正的window size。這項配置配置了窗口大小,例如。咱們可能在30s的期間維護兩個樣本。當一個窗口推出後,咱們會擦除並重寫最老的窗口 |
recoonect.backoff.ms |
long |
10 |
low |
鏈接失敗時,當咱們從新鏈接時的等待時間。這避免了客戶端反覆重連 |
retry.backoff.ms |
long |
100 |
low |
在試圖重試失敗的produce請求以前的等待時間。避免陷入發送-失敗的死循環中。 |