kafka極簡入門(四)--經常使用配置

回顧:kafka極簡入門(三)--建立topicapache

前言

kafka針對broker, topic, producer, consumer的配置很是多,這裏講一下經常使用的配置bootstrap

1.broker相關配置
  • broker.id
    broker在kafka集羣中的惟一標識,必須是一個大於等於0的整數,若是不寫的話默認從1001開始。
    建議:把它設置成與機器名具備相關性的整數。
  • port
    設置kafka的端口號,默認狀況下是9092,不建議修改爲1024如下的端口,由於須要使用root權限啓動。
  • zookeeper.connect
    設置zookeeper集羣,該配置參數是一組用逗號隔開的host:port/path列表segmentfault

    • host是zookeeper服務器的機器名或ip地址
    • port是zookeeper服務器的端口
    • /path是可選的zookeeper路徑,做爲kafka集羣的chroot環境,默認是根路徑,若是指定的chroot路徑不存在,kafka會在啓動的時候建立它。使用chroot使得zookeeper集羣能夠共享給其餘應用程序時而不會產生衝突。
  • log.dirs
    配置kafka日誌片斷的目錄位置,多個路徑以逗號隔開。若是配置了多個路徑,kafka會根據最少使用原則,把統一分區的日誌保存到同一路徑下,注意的是,kafka會往擁有最少數量分區的路徑新增分區,而不是往擁有最小磁盤空間的路徑新增分區。
  • auto.create.topics.enable
    配置是否開啓自動建立topic,若是設置爲true, kafka會在如下幾種場景自動建立topic:安全

    • 當一個producer開始往topic寫入消息時。
    • 當一個consumer開始從topic消費消息時。
    • 當一個client向topic發送元數據請求時。
  • num.partitions
    配置建立主題時包含多少個分區,默認值爲1,由於咱們能增長主題的分區數,可是不能減小分區的個數,因此,若是要讓一個主題的分區個數少於num.partitions須要手動建立該主題而不是經過自動建立主題。
  • log.retention.hours
    配置kafka保留數據的時間,默認爲168小時也就是7天,效果等同log.retention.minutes和log.retention.ms,只是單位不同,分別是小時,分鐘,和毫秒,推薦使用log.retention.ms,粒度更加細,若是三個參數都配置了則去數值最小的配置。
  • log.retention.bytes
    配置一個分區能保存最大的字節數,若是超出的部分就會被刪除,同時配置了log.retention.hours/log.retention.minutes/log.retention.ms的話,任一個知足條件都會觸發數據刪除。
  • message.max.bytes
    配置消息的大小限制,默認爲100000,也就是1M,這裏的大小是指在kafka壓縮後的大小,也就是說實際消息能夠大於1M,若是消息超過這個限制,則會被kafka拒收。

2.producer相關配置
  • bootstrap.servers
    配置broker的地址,格式爲host:port,若是多個則以逗號隔開,不須要配置全部的broker,producer會從給定的broker查詢其餘broker的信息,不過建議至少填寫兩個,以防在一個宕機的狀況還能從另一個去獲取broker的信息。
  • acks
    acks指定了必需要有多少個分區副本收到消息,producer纔會認爲消息寫入是成功的。服務器

    • acks=0 producer發送消息不等待任何來自服務器的響應,因此會出現消息丟失而producer不感知的狀況,該模式下可獲取最大的吞吐量。
    • acks=1 只要集羣的leader節點收到消息,producer就會收到一個服務器的成功響應,若是消息沒法達到leader節點,那麼producer就會獲取到一個錯誤響應,這時候爲了不消息的丟失,producer能夠選擇重發,不過若是一個沒有收到消息的節點成爲新的leader,那麼消息仍是會丟失。
    • acks=all 只有當leader節點和follower節點都收到消息時,producer纔會收到成功的響應,這是一個避免消息丟失最安全的作法,不過這種模式吞吐量最低
  • client.id
    能夠是任意字符串,標識消息的來源
  • max.in.flight.requests.per.connection
    配置producer在收到服務器響應前能夠發送的消息個數,值越高,吞吐量就會越高,不過相應的佔用的內存也會越多,設置爲1能夠保證消息能夠按照發送的順序寫入服務,即使發生了重試。
  • max.request.size
    配置producer單次發送的全部消息的總的大小限制,例如設置爲1M,單個消息大小爲1K,那麼單次能夠發的上限是1000個,最好跟message.max.bytes配置匹配,避免發送到broker的消息被拒絕。
  • retries
    該參數決定了producer能夠重發消息的次數,producer從broker收到的錯誤多是臨時性的,例如分區找不到首領,這種狀況下,producer在進行retries次重試後就會放棄重試而且返回錯誤,默認狀況下,重試的時間間隔爲100ms,能夠經過retry.backoff.ms參數配置,建議在設置重試間隔以前最好測試一下恢復一個崩潰的節點要多長時間,重試的間隔最比如恢復時間要長。
  • batch.size
    當多個消息往同一個分區發送時,producer會把這些消息放到同一個分區,該參數指定了一個批次可使用的內存大小,按字節數計算,當批次填滿時消息就會被髮送出去,不過producer不必定等批次被填滿纔會發送,甚至只有一個消息也會被髮送,因此就算把該值設置得很大也不會形成延遲,只不過會佔用內存,可是若是設置過小的話,producer會很頻繁的發送,增長一些額外的開銷。
  • linger.ms
    指定producer發送批次以前要等待更多消息加入批次的時間,producer會在批次填滿或者longer.ms到達上限時把批次發送出去,默認狀況下,只要有可用的線程,就算批次只有一個消息,producer也會把消息發送出去。把linger.ms設置成比0大的數,讓producer在發送批次以前多等待一會,可使得更多的消息能夠加入到批次,雖然增長了延遲,可是同時也增長了吞吐量。

3.Consumer相關配置
  • fetch.min.bytes
    配置Consumer從broker獲取記錄的最小字節數,broker在收到Consumer的數據請求時,若是可用的數據量小於該配置,那麼broker會等到有足夠的可用數據時才把它返回給Consumer。
  • fetch.max.wait.ms
    配置broker的等待時間,默認爲500ms,若是沒有足夠的數據流入,致使不知足fetch.mis.bytes,最終會致使500ms的延遲。若是fetch.mis.bytes配置爲1M,fetch.max.wait.ms配置爲500ms,那麼最終broker要麼返回1M的數據,要麼等待500ms後返回全部可用的數據,取決於哪一個條件先獲得知足。
  • max.partition.fetch.bytes
    配置broker從每一個分區返回給Consumer的最大字節數,默認爲1MB,也就是說,KafkaConsumer.poll()方法從每一個分區裏面返回的記錄最多不超過該值,加入有10個分區5個消費者,則每一個消費者須要2MB的內存來接收消息,須要注意的是,若是該值設置過大,致使消費者處理的業務的時間過長,會有回話超時的風險。
  • session.timeout.ms
    配置了Consumer被認定爲死亡前能夠與服務器斷開鏈接的時間,默認3秒,若是服務器在超過該值時間沒有收到Consumer的心跳,就會認定Consumer死亡,會觸發再均衡,把死亡Consumer的分區分配給其餘Consumer,這個配置要跟heartbeat.interval.ms配合使用,heartbeat.interval.ms設置了poll()方法向協調器發送心跳的頻率。建議heartbeat.interval.ms的值爲session.timeout.ms的三分之一。
  • enable.auto.commit
    指定了Consumer是否開啓自動提交偏移量,默認爲true。能夠把它設置爲false,由程序本身控制什麼時候提交偏移量來避免出現重複數據和數據丟失的狀況。
  • auto.offset.reset
    指定在Consumer讀取一個沒有偏移量的分區或者偏移量無效的狀況下的策略(由於消費者長時間失效,包含偏移量的記錄已通過時並刪除),默認爲latest,表示會從最新的記錄開始讀取(在消費者啓動以後生成的記錄,會出現漏消費歷史記錄的狀況),另一個配置是earliest,表示從最先的消息開始消費(會出現重複消費的狀況)
  • partition.assignment.strategy
    分區分配給Consumer的策略,有兩種:session

    • Range
      把topic的若干個連續的分區分配給Consumer,假設有Consumer1和Consumer2,分別訂閱了topic1和topic2,每一個topic都有3個分區,那麼Consumer1可能分配到topic1和topic2的分區0和分區1,Consumer2分配到topic1和topic2的分區2,這種策略會致使當分區數量(針對單個topic,上面例子是3個分區)沒法被消費者數量(上面例子是2個消費者)整除時,就會出現分區分佈不均勻的狀況。
      Range.png
    • RoundRobin
      該策略會把全部的分區逐個分配給Consumer,仍是上面的例子,若是按這種策略分配那麼Consumer1最終分到的是topic1的分區0,分區2和topic2的分區1,Consumer2最終分到的是topic1的分區1和topic2的分區0和分區2。
      RoundRobin.png

默認使用org.apache.kafka.clients.consumer.RangeAssignor,這個類實現了Range策略,RoundRabin的實現類爲org.apache.kafka.clients.consumer.RoundRobinAssignor,咱們還能夠自定義策略。測試

  • max.poll.records
    指定單次調用poll()方法返回的記錄數量。fetch

若是對你有用麻煩點個贊哦 ^_^spa

相關文章
相關標籤/搜索