producer.properties
consumer.properties
server.properties
(1).producer.properties:生產端的配置文件面試
#指定kafka節點列表,用於獲取metadata,沒必要所有指定 #須要kafka的服務器地址,來獲取每個topic的分片數等元數據信息。 metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092 #生產者生產的消息被髮送到哪一個block,須要一個分組策略。 #指定分區處理類。默認kafka.producer.DefaultPartitioner,表經過key哈希到對應分區 #partitioner.class=kafka.producer.DefaultPartitioner #生產者生產的消息能夠經過必定的壓縮策略(或者說壓縮算法)來壓縮。消息被壓縮後發送到broker集羣,
#而broker集羣是不會進行解壓縮的,broker集羣只會把消息發送到消費者集羣,而後由消費者來解壓縮。 #是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。 #壓縮後消息中會有頭來指明消息壓縮類型,故在消費者端消息解壓是透明的無需指定。 #文本數據會以1比10或者更高的壓縮比進行壓縮。 compression.codec=none #指定序列化處理類,消息在網絡上傳輸就須要序列化,它有String、數組等許多種實現。 serializer.class=kafka.serializer.DefaultEncoder #若是要壓縮消息,這裏指定哪些topic要壓縮消息,默認empty,表示不壓縮。 #若是上面啓用了壓縮,那麼這裏就須要設置 #compressed.topics= #這是消息的確認機制,默認值是0。在面試中常被問到。 #producer有個ack參數,有三個值,分別表明: #(1)不在意是否寫入成功; #(2)寫入leader成功; #(3)寫入leader和全部副本都成功; #要求很是可靠的話能夠犧牲性能設置成最後一種。 #爲了保證消息不丟失,至少要設置爲1,也就 #是說至少保證leader將消息保存成功。 #設置發送數據是否須要服務端的反饋,有三個值0,1,-1,分別表明3種狀態: #0: producer不會等待broker發送ack。生產者只要把消息發送給broker以後,就認爲發送成功了,這是第1種狀況; #1: 當leader接收到消息以後發送ack。生產者把消息發送到broker以後,而且消息被寫入到本地文件,才認爲發送成功,這是第二種狀況;#-1: 當全部的follower都同步消息成功後發送ack。不只是主的分區將消息保存成功了,
#並且其全部的分區的副本數也都同步好了,纔會被認爲發動成功,這是第3種狀況。 request.required.acks=0 #broker必須在該時間範圍以內給出反饋,不然失敗。 #在向producer發送ack以前,broker容許等待的最大時間 ,若是超時, #broker將會向producer發送一個error ACK.意味着上一次消息由於某種緣由 #未能成功(好比follower未能同步成功) request.timeout.ms=10000 #生產者將消息發送到broker,有兩種方式,一種是同步,表示生產者發送一條,broker就接收一條;
#還有一種是異步,表示生產者積累到一批的消息,裝到一個池子裏面緩存起來,再發送給broker,
#這個池子不會無限緩存消息,在下面,它分別有一個時間限制(時間閾值)和一個數量限制(數量閾值)的參數供咱們來設置。
#通常咱們會選擇異步。 #同步仍是異步發送消息,默認「sync」表同步,"async"表異步。異步能夠提升發送吞吐量,
#也意味着消息將會在本地buffer中,並適時批量發送,可是也可能致使丟失未發送過去的消息 producer.type=sync #在async模式下,當message被緩存的時間超過此值後,將會批量發送給broker, #默認爲5000ms #此值和batch.num.messages協同工做. queue.buffering.max.ms = 5000 #異步狀況下,緩存中容許存放消息數量的大小。 #在async模式下,producer端容許buffer的最大消息量 #不管如何,producer都沒法儘快的將消息發送給broker,從而致使消息在producer端大量沉積 #此時,若是消息的條數達到閥值,將會致使producer端阻塞或者消息被拋棄,默認爲10000條消息。 queue.buffering.max.messages=20000 #若是是異步,指定每次批量發送數據量,默認爲200 batch.num.messages=500 #在生產端的緩衝池中,消息發送出去以後,在沒有收到確認以前,該緩衝池中的消息是不能被刪除的,
#可是生產者一直在生產消息,這個時候緩衝池可能會被撐爆,因此這就須要有一個處理的策略。
#有兩種處理方式,一種是讓生產者先別生產那麼快,阻塞一下,等會再生產;另外一種是將緩衝池中的消息清空。 #當消息在producer端沉積的條數達到"queue.buffering.max.meesages"後阻塞必定時間後,
#隊列仍然沒有enqueue(producer仍然沒有發送出任何消息) #此時producer能夠繼續阻塞或者將消息拋棄,此timeout值用於控制"阻塞"的時間 #-1: 不限制阻塞超時時間,讓produce一直阻塞,這個時候消息就不會被拋棄 #0: 當即清空隊列,消息被拋棄 queue.enqueue.timeout.ms=-1 #當producer接收到error ACK,或者沒有接收到ACK時,容許消息重發的次數 #由於broker並無完整的機制來避免消息重複,因此當網絡異常時(好比ACK丟失) #有可能致使broker接收到重複的消息,默認值爲3. message.send.max.retries=3 #producer刷新topic metada的時間間隔,producer須要知道partition leader #的位置,以及當前topic的狀況 #所以producer須要一個機制來獲取最新的metadata,當producer遇到特定錯誤時, #將會當即刷新 #(好比topic失效,partition丟失,leader失效等),此外也能夠經過此參數來配置 #額外的刷新機制,默認值600000 topic.metadata.refresh.interval.ms=60000
(2).consumer.properties:消費端的配置文件算法
#消費者集羣經過鏈接Zookeeper來找到broker。 #zookeeper鏈接服務器地址 zookeeper.connect=zk01:2181,zk02:2181,zk03:2181 #zookeeper的session過時時間,默認5000ms,用於檢測消費者是否掛掉 zookeeper.session.timeout.ms=5000 #當消費者掛掉,其餘消費者要等該指定時間才能檢查到而且觸發從新負載均衡 zookeeper.connection.timeout.ms=10000 #這是一個時間閾值。 #指定多久消費者更新offset到zookeeper中。 #注意offset更新時基於time而不是每次得到的消息。 #一旦在更新zookeeper發生異常並重啓,將可能拿到已拿到過的消息 zookeeper.sync.time.ms=2000 #指定消費 group.id=xxxxx #這是一個數量閾值,經測試是500條。 #當consumer消費必定量的消息以後,將會自動向zookeeper提交offset信息#注意offset信息並非每消費一次消息就向zk提交 #一次,而是如今本地保存(內存),並按期提交,默認爲true auto.commit.enable=true # 自動更新時間。默認60 * 1000 auto.commit.interval.ms=1000 # 當前consumer的標識,能夠設定,也能夠有系統生成, #主要用來跟蹤消息消費狀況,便於觀察 conusmer.id=xxx # 消費者客戶端編號,用於區分不一樣客戶端,默認客戶端程序自動產生 client.id=xxxx # 最大取多少塊緩存到消費者(默認10) queued.max.message.chunks=50 # 當有新的consumer加入到group時,將會reblance,此後將會 #有partitions的消費端遷移到新 的consumer上,若是一個 #consumer得到了某個partition的消費權限,那麼它將會向zk #註冊 "Partition Owner registry"節點信息,可是有可能 #此時舊的consumer尚沒有釋放此節點, 此值用於控制, #註冊節點的重試次數. rebalance.max.retries=5 #每拉取一批消息的最大字節數 #獲取消息的最大尺寸,broker不會像consumer輸出大於 #此值的消息chunk 每次feth將獲得多條消息,此值爲總大小, #提高此值,將會消耗更多的consumer端內存 fetch.min.bytes=6553600 #當消息的尺寸不足時,server阻塞的時間,若是超時, #消息將當即發送給consumer #數據一批一批到達,若是每一批是10條消息,若是某一批還 #不到10條,可是超時了,也會當即發送給consumer。 fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360 # 若是zookeeper沒有offset值或offset值超出範圍。 #那麼就給個初始的offset。有smallest、largest、 #anything可選,分別表示給當前最小的offset、 #當前最大的offset、拋異常。默認largest auto.offset.reset=smallest # 指定序列化處理類 derializer.class=kafka.serializer.DefaultDecoder
(3).server.properties:服務端的配置文件數組
#broker的全局惟一編號,不能重複 broker.id=0 #用來監聽連接的端口,producer或consumer將在此端口創建鏈接 port=9092 #處理網絡請求的線程數量,也就是接收消息的線程數。 #接收線程會將接收到的消息放到內存中,而後再從內存中寫入磁盤。 num.network.threads=3 #消息從內存中寫入磁盤是時候使用的線程數量。 #用來處理磁盤IO的線程數量 num.io.threads=8 #發送套接字的緩衝區大小 socket.send.buffer.bytes=102400 #接受套接字的緩衝區大小 socket.receive.buffer.bytes=102400 #請求套接字的緩衝區大小 socket.request.max.bytes=104857600 #kafka運行日誌存放的路徑 log.dirs=/export/servers/logs/kafka #topic在當前broker上的分片個數 num.partitions=2 #咱們知道segment文件默認會被保留7天的時間,超時的話就 #會被清理,那麼清理這件事情就須要有一些線程來作。這裏就是 #用來設置恢復和清理data下數據的線程數量 num.recovery.threads.per.data.dir=1 #segment文件保留的最長時間,默認保留7天(168小時), #超時將被刪除,也就是說7天以前的數據將被清理掉。 log.retention.hours=168 #滾動生成新的segment文件的最大時間 log.roll.hours=168 #日誌文件中每一個segment的大小,默認爲1G log.segment.bytes=1073741824 #上面的參數設置了每個segment文件的大小是1G,那麼 #就須要有一個東西去按期檢查segment文件有沒有達到1G, #多長時間去檢查一次,就須要設置一個週期性檢查文件大小 #的時間(單位是毫秒)。 log.retention.check.interval.ms=300000 #日誌清理是否打開 log.cleaner.enable=true #broker須要使用zookeeper保存meta數據 zookeeper.connect=zk01:2181,zk02:2181,zk03:2181 #zookeeper連接超時時間 zookeeper.connection.timeout.ms=6000 #上面咱們說過接收線程會將接收到的消息放到內存中,而後再從內存 #寫到磁盤上,那麼何時將消息從內存中寫入磁盤,就有一個 #時間限制(時間閾值)和一個數量限制(數量閾值),這裏設置的是 #數量閾值,下一個參數設置的則是時間閾值。 #partion buffer中,消息的條數達到閾值,將觸發flush到磁盤。 log.flush.interval.messages=10000 #消息buffer的時間,達到閾值,將觸發將消息從內存flush到磁盤, #單位是毫秒。 log.flush.interval.ms=3000 #刪除topic須要server.properties中設置delete.topic.enable=true不然只是標記刪除 delete.topic.enable=true #此處的host.name爲本機IP(重要),若是不改,則客戶端會拋出: #Producer connection to localhost:9092 unsuccessful 錯誤! host.name=kafka01 advertised.host.name=192.168.239.128
若是以爲本文對您有幫助,不妨掃描下方微信二維碼打賞點,您的鼓勵是我前進最大的動力:緩存