1.kafka基本原理簡介html
1.1名詞簡介:apache
1.producer: 消息生產者,發佈消息到 kafka 集羣的終端或服務。 2.broker: kafka 集羣中包含的服務器。 3.topic: 每條發佈到 kafka 集羣的消息屬於的類別,即 kafka 是面向 topic 的。 4.partition: partition 是物理上的概念,每一個 topic 包含一個或多個 partition。kafka 分配的單位是 partition。 5.consumer: 從 kafka 集羣中消費消息的終端或服務。 6.Consumer group: high-level consumer API 中,每一個 consumer 都屬於一個 consumer group,每條消息只能被 consumer group 中的一個 Consumer 消費,但能夠被多個 consumer group 消費。 7.replica: partition 的副本,保障 partition 的高可用。 8.leader: replica 中的一個角色, producer 和 consumer 只跟 leader 交互。 9.follower: replica 中的一個角色,從 leader 中複製數據。 10.controller: kafka 集羣中的其中一個服務器,用來進行 leader election 以及 各類 failover。 12.zookeeper: kafka 經過 zookeeper 來存儲集羣的 meta 信息。
1.2 kafka原理:緩存
1.2.1拓撲圖服務器
1.2.3 寫入流程: 網絡
1. producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader 2. producer 將消息發送給該 leader 3. leader 將消息寫入本地 log 4. followers 從 leader pull 消息,寫入本地 log 後 leader 發送 ACK 5. leader 收到全部 ISR 中的 replica 的 ACK 後,增長 HW(high watermark,最後 commit 的 offset) 並向 producer 發送 ACK
1.2.4 kafka的存儲機制:
kafka經過topic來分主題存放數據 主題內又有分區 分區還能夠有多個副本 ,分區的內部還細分爲若干個segment。
所謂的分區 其實就是在 kafka對應存儲目錄下建立的文件夾,文件夾的名字是主題名加上分區編號 編號從0開始。
所謂的segment 其實就只在分區對應的文件夾下產生的文件。一個分區會被劃分紅大小相等的若干segment 這樣一方面保證了分區的數據被劃分到多個文件中保證不會產生體積過大的文件 另外一方面能夠基於這些segment文件進行歷史數據的刪除 提升效率。
一個segment又由 一個.log和一個.index文件組成,其中.log文件爲數據文件用來存放數據分段數據 .index爲索引文件保存對對應的.log文件的索引信息。這兩個文件的命名規則爲:partition全局的第一個segment從0開始,後續每一個segment文件名爲上一個segment文件最後一條消息的offset值,數值大小爲64位,20位數字字符長度,沒有數字用0填充。
而在.index文件中 保存了對對應 .log文件的索引信息,經過查找.index文件能夠獲知每一個存儲在當前segment中的offset在.log文件中的開始位置 而每條日誌 有其固定格式 保存了 包括offset編號 日誌長度 key的長度 等相關信息 經過這個固定格式中的數據能夠肯定出當前offset的結束位置 從而對數據進行讀取
真正開始讀取指定分區中 某個offset對應的數據時 先根據offset和當前分區 的全部segment的名稱作比較 肯定出數據在哪一個segment中 查找該segment的索引文件 肯定當前offset在數據文件中的開始位置 從該位置開始讀取數據文件 在根據數據格式判斷結果 最終 獲取到完整數據。session
1.2.5 消息可靠性保證:併發
在Kafka中維護了 一個AR列表 包括全部的分區的副本
AR又分爲ISR和OSR
只有ISR內的副本都同步了leader中的數據,該數據才能被提交,才能被消費者訪問
OSR內的副本是否同步了leader的數據,不影響數據的提交,OSR內的follower 盡力的去同步leader,可能數據版本或落後
AR = ISR + OSR
最開始全部的副本都在ISR中 在kafka工做的過程當中 若是某個副本同步速度慢於replica.lag.time.max.ms指定的閾值,則被踢出ISR 存入OSR 若是後續速度恢復能夠回到ISR中
LEO - LogEndOffset - 分區的最新的數據的offset
HW - HighWatermark - 只有寫入的數據被 同步到 全部的ISR中的 副本後,數據才認爲已提交,HW更新到該位置,HW以前的數據才能夠被消費者訪問,保證 沒有 同步完成的數據不會被消費者 訪問到
在leader宕機後,只能從ISR列表中選取新的leader,不管ISR中哪一個副本被選爲 新的leader都知道HW以前的數據,能夠保證在切換了leader後,消費者能夠繼續看到 以前已經 提交的數據socket
AR ISR OSR LEO HW 這些信息都被保存在Zookeeper中 ide
1.2.6 生產者生產數據的可靠性性能
生產者向leader發送數據時,能夠選擇須要的可靠性級別
經過request.required.acks參數配置:
1 - 生產者發送數據給leader,leader收到數據後發送成功信息,生產者收到後認爲發送數據成功 ,若是一直收不到成功消息,則生產者認爲發送數據失敗會自動重發數據.
當leader宕機時,可能丟失數據
0 - 生產者不停向leader發送數據,而不須要leader反饋成功消息
這種模式效率最高,可靠性最低
可能在發送過程當中丟失數據
可能在leader宕機時丟失數據
-1 - 生產者發送數據給leader,leader收到數據後要等到ISR列表中的全部副本都同步數據完成後,才向生產者發送成功消息,若是一隻收不到成功消息,則認爲發送數據失敗會自動重發數據.
這種模式下可靠性很高,可是 當ISR列表中只剩下leader時,當leader宕機讓然有可能丟數據
此時能夠配置min.insync.replicas指定要求觀察ISR中至少要有指定數量的副本,默認該值爲1,須要改成大於等於2的值
這樣當生產者發送數據給leader可是發現ISR中只有leader本身時,會 收到異常代表數據寫入失敗
此時沒法寫入數據 保證了數據絕對不丟
雖然不丟可是可能會多數據,例如生產者發送數據給leader,leader同步數據給ISR中的follower,同步到一半leader宕機,此時選出新的leader,可能具備部分這次提交的數據,而生產者收到失敗消息重發數據,新的leader接受 數據則數據重複了
1.2.7 HW截斷機制
若是leader宕機 選出了新的leader 而新的leader並無徹底同步以前leader的全部數據,以後接受了後續新的數據,此時舊的leader恢復,則會發現新的leader中的數據和本身持有的數據不一致,此時舊的leader會將本身的數據階段到以前宕機以前的hw位置,以後同步新leader的數據
若是ISR中的follower同步了leader中的部分數據,以後leader宕機,follower也宕機,此時選出新的leader可能同步了部分以前 leader的數據,以後接受新的數據,此時follower恢復過來,發現 本身持有的 數據和新 的leader的數據不一致,此時階段數據到 以前的 hw將,而後和 新的leader同步 數據
1.2.8 leader選舉
當leader宕機時 會選擇ISR中的一個follower成爲新的leader
若是ISR中的全部副本都宕機 怎麼辦
unclean.leader.election.enable=false
策略1:必須等待ISR列表中的副本活過來才選擇其成爲leader繼續工做
unclean.leader.election.enable=true
策略2:選擇任何一個活過來的副本 - 可能不在ISR中 - 成爲leader繼續工做
策略1,可靠性有保證,可是可用性低,只有最後掛了leader活過來kafka才能恢復
策略2,可用性高,可靠性沒有保證,任何一個副本活過來就能夠繼續工做,可是有可能存在數據不一致的狀況
1.2.9 kafka可靠性的保證
At most once: 消息可能會丟,但毫不會重複傳輸
At least once:消息毫不會丟,但可能會重複傳輸
Exactly once:每條消息確定會被傳輸一次且僅傳輸一次
kafka最多保證At least once,能夠保證不丟 可是可能會重複,爲了解決重複須要引入惟一標識和去重機制,kafka提供了GUID實現了惟一標識,可是並無提供自帶的去重機制,須要開發人員基於業務規則本身去重.
2.集羣配置:
broker.id :
每一個broker均可以用一個惟一的非負整數id進行標識;這個id能夠做爲broker的「名字」,而且它的存在使得broker無須混淆consumers就能夠遷移到不一樣的host/port上。你能夠選擇任意你喜歡的數字做爲id,只要id是惟一的便可。
log.dirs:
kafka存放數據的路徑。這個路徑並非惟一的,能夠是多個,路徑之間只須要使用逗號分隔便可;每當建立新partition時,都會選擇在包含最少partitions的路徑下進行。
port:
server接受客戶端鏈接的端口。
zookeeper.connect:
ZooKeeper鏈接字符串的格式爲:hostname:port,此處hostname和port分別是ZooKeeper集羣中某個節點的host和port;爲了當某個host宕掉以後你能經過其餘ZooKeeper節點進行鏈接,你能夠按照一下方式制定多個hosts:
hostname1:port1, hostname2:port2, hostname3:port3.
ZooKeeper 容許你增長一個「chroot」路徑,將集羣中全部kafka數據存放在特定的路徑下。當多個Kafka集羣或者其餘應用使用相同ZooKeeper集羣時,可使用這個方式設置數據存放路徑。這種方式的實現能夠經過這樣設置鏈接字符串格式,以下所示:
hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
這樣設置就將全部kafka集羣數據存放在/chroot/path路徑下。注意,在你啓動broker以前,你必須建立這個路徑,而且consumers必須使用相同的鏈接格式。
message.max.bytes:
server能夠接收的消息最大尺寸。重要的是,consumer和producer有關這個屬性的設置必須同步,不然producer發佈的消息對consumer來講太大,默認是1000000。
num.network.threads:
server用來處理網絡請求的網絡線程數目;通常你不須要更改這個屬性,默認爲3。
num.io.threads:
server用來處理請求的I/O線程的數目;這個線程數目至少要等於硬盤的個數,默認是8。
background.threads:
用於後臺處理的線程數目,例如文件刪除;你不須要更改這個屬性,默認是4。
queued.max.requests:
在網絡線程中止讀取新請求以前,能夠排隊等待I/O線程處理的最大請求個數,默認500。
host.name:
broker的hostname;若是hostname已經設置的話,broker將只會綁定到這個地址上;若是沒有設置,它將綁定到全部接口,併發布一份到ZK
advertised.host.name:
若是設置,則就做爲broker 的hostname發往producer、consumers以及其餘brokers
advertised.port:
此端口將給與producers、consumers、以及其餘brokers,它會在創建鏈接時用到; 它僅在實際端口和server須要綁定的端口不同時才須要設置。
socket.send.buffer.bytes:
SO_SNDBUFF 緩存大小,server進行socket 鏈接所用,默認100 * 1024
socket.receive.buffer.bytes:
SO_RCVBUFF緩存大小,server進行socket鏈接時所用,默認100 * 1024
socket.request.max.bytes:
erver容許的最大請求尺寸; 這將避免server溢出,它應該小於Java heap size,默認100 * 1024 * 1024
num.partitions:
若是建立topic時沒有給出劃分partitions個數,這個數字將是topic下partitions數目的默認數值1。
log.segment.bytes:
topic partition的日誌存放在某個目錄下諸多文件中,這些文件將partition的日誌切分紅一段一段的;這個屬性就是每一個文件的最大尺寸;當尺寸達到這個數值時,就會建立新文件。此設置能夠由每一個topic基礎設置時進行覆蓋。
查看 the per-topic configuration section
log.roll.hours:
即便文件沒有到達log.segment.bytes,只要文件建立時間到達此屬性,就會建立新文件。這個設置也能夠有topic層面的設置進行覆蓋;
查看the per-topic configuration section
log.retention.minutes和log.retention.hours:
每一個日誌文件刪除以前保存的時間。默認數據保存時間對全部topic都同樣。
log.retention.minutes 和 log.retention.bytes 都是用來設置刪除日誌文件的,不管哪一個屬性已經溢出。
這個屬性設置能夠在topic基本設置時進行覆蓋。
查看the per-topic configuration section
log.retention.bytes:
每一個topic下每一個partition保存數據的總量;注意,這是每一個partitions的上限,所以這個數值乘以partitions的個數就是每一個topic保存的數據總量。同時注意:若是log.retention.hours和log.retention.bytes都設置了,則超過了任何一個限制都會形成刪除一個段文件。
注意,這項設置能夠由每一個topic設置時進行覆蓋。
查看the per-topic configuration section
log.retention.check.interval.ms:
檢查日誌分段文件的間隔時間,以肯定是否文件屬性是否到達刪除要求,默認5 minutes。
log.cleaner.enable:
當這個屬性設置爲false時,一旦日誌的保存時間或者大小達到上限時,就會被刪除;若是設置爲true,則當保存屬性達到上限時,就會進行log compaction。
log.cleaner.threads:
進行日誌壓縮的線程數,默認1
log.cleaner.io.max.bytes.per.second:
進行log compaction時,log cleaner能夠擁有的最大I/O數目。這項設置限制了cleaner,以免干擾活動的請求服務
log.cleaner.io.buffer.size:
log cleaner清除過程當中針對日誌進行索引化以及精簡化所用到的緩存大小。最好設置大點,以提供充足的內存,默認500*1024*1024。
log.cleaner.io.buffer.load.factor
進行log cleaning時所須要的I/O chunk尺寸。你不須要更改這項設置。
log.cleaner.io.buffer.load.factor
log cleaning中所使用的hash表的負載因子;你不須要更改這個選項。
log.cleaner.backoff.ms
進行日誌是否清理檢查的時間間隔,默認:15000
log.cleaner.min.cleanable.ratio
這項配置控制log compactor試圖清理日誌的頻率(假定log compaction是打開的)。默認避免清理壓縮超過50%的日誌。這個比率綁定了備份日誌所消耗的最大空間(50%的日誌備份時壓縮率爲50%)。更高的比率則意味着浪費消耗更少,也就能夠更有效的清理更多的空間。這項設置在每一個topic設置中能夠覆蓋, 默認0.5。
查看the per-topic configuration section。
log.cleaner.delete.retention.ms:
保存時間;保存壓縮日誌的最長時間;也是客戶端消費消息的最長時間,榮log.retention.minutes的區別在於一個控制未壓縮數據,一個控制壓縮後的數據;會被topic建立時的指定時間覆蓋,默認1day。
log.index.size.max.bytes:
每一個log segment的最大尺寸。注意,若是log尺寸達到這個數值,即便尺寸沒有超過log.segment.bytes限制,也須要產生新的log segment,默認:10*1024*1024
log.index.interval.bytes:
當執行一次fetch後,須要必定的空間掃描最近的offset,設置的越大越好,通常使用默認值就能夠,默認4096
log.flush.interval.messages
log文件「sync」到磁盤以前累積的消息條數。由於磁盤IO操做是一個慢操做,但又是一個「數據可靠性」的必要手段,因此檢查是否須要固化到硬盤的時間間隔。須要在「數據可靠性」與「性能」之間作必要的權衡,若是此值過大,將會致使每次「發sync」的時間過長(IO阻塞),若是此值太小,將會致使「fsync」的時間較長(IO阻塞),若是此值太小,將會致使」發sync「的次數較多,這也就意味着總體的client請求有必定的延遲,物理server故障,將會致使沒有fsync的消息丟失。
log.flush.scheduler.interval.ms
檢查是否須要fsync的時間間隔
log.flush.interval.ms:
僅僅經過interval來控制消息的磁盤寫入時機,是不足的,這個數用來控制」fsync「的時間間隔,若是消息量始終沒有達到固化到磁盤的消息數,可是離上次磁盤同步的時間間隔達到閾值,也將觸發磁盤同步。
log.delete.delay.ms:
文件在索引中清除後的保留時間,通常不須要修改:
auto.create.topics.enable |
true |
是否容許自動建立topic。若是是真的,則produce或者fetch 不存在的topic時,會自動建立這個topic。不然須要使用命令行建立topic |
controller.socket.timeout.ms |
30000 |
partition管理控制器進行備份時,socket的超時時間。 |
controller.message.queue.size |
Int.MaxValue |
controller-to-broker-channles的buffer 尺寸 |
default.replication.factor |
1 |
默認備份份數,僅指自動建立的topics |
replica.lag.time.max.ms |
10000 |
若是一個follower在這個時間內沒有發送fetch請求,leader將從ISR重移除這個follower,並認爲這個follower已經掛了 |
replica.lag.max.messages |
4000 |
若是一個replica沒有備份的條數超過這個數值,則leader將移除這個follower,並認爲這個follower已經掛了 |
replica.socket.timeout.ms |
30*1000 |
leader 備份數據時的socket網絡請求的超時時間 |
replica.socket.receive.buffer.bytes |
64*1024 |
備份時向leader發送網絡請求時的socket receive buffer |
replica.fetch.max.bytes |
1024*1024 |
備份時每次fetch的最大值 |
replica.fetch.min.bytes |
500 |
leader發出備份請求時,數據到達leader的最長等待時間 |
replica.fetch.min.bytes |
1 |
備份時每次fetch以後迴應的最小尺寸 |
num.replica.fetchers |
1 |
從leader備份數據的線程數 |
replica.high.watermark.checkpoint.interval.ms |
5000 |
每一個replica檢查是否將最高水位進行固化的頻率 |
fetch.purgatory.purge.interval.requests |
1000 |
fetch 請求清除時的清除間隔 |
producer.purgatory.purge.interval.requests |
1000 |
producer請求清除時的清除間隔 |
zookeeper.session.timeout.ms |
6000 |
zookeeper會話超時時間。 |
zookeeper.connection.timeout.ms |
6000 |
客戶端等待和zookeeper創建鏈接的最大時間 |
zookeeper.sync.time.ms |
2000 |
zk follower落後於zk leader的最長時間 |
controlled.shutdown.enable |
true |
是否可以控制broker的關閉。若是可以,broker將能夠移動全部leaders到其餘的broker上,在關閉以前。這減小了不可用性在關機過程當中。 |
controlled.shutdown.max.retries |
3 |
在執行不完全的關機以前,能夠成功執行關機的命令數。 |
controlled.shutdown.retry.backoff.ms |
5000 |
在關機之間的backoff時間 |
auto.leader.rebalance.enable |
true |
若是這是true,控制者將會自動平衡brokers對於partitions的leadership |
leader.imbalance.per.broker.percentage |
10 |
每一個broker所容許的leader最大不平衡比率 |
leader.imbalance.check.interval.seconds |
300 |
檢查leader不平衡的頻率 |
offset.metadata.max.bytes |
4096 |
容許客戶端保存他們offsets的最大個數 |
max.connections.per.ip |
Int.MaxValue |
每一個ip地址上每一個broker能夠被鏈接的最大數目 |
max.connections.per.ip.overrides |
|
每一個ip或者hostname默認的鏈接的最大覆蓋 |
connections.max.idle.ms |
600000 |
空鏈接的超時限制 |
log.roll.jitter.{ms,hours} |
0 |
從logRollTimeMillis抽離的jitter最大數目 |
num.recovery.threads.per.data.dir |
1 |
每一個數據目錄用來日誌恢復的線程數目 |
unclean.leader.election.enable |
true |
指明瞭是否可以使不在ISR中replicas設置用來做爲leader |
delete.topic.enable |
false |
可以刪除topic |
offsets.topic.num.partitions |
50 |
The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200). |
offsets.topic.retention.minutes |
1440 |
存在時間超過這個時間限制的offsets都將被標記爲待刪除 |
offsets.retention.check.interval.ms |
600000 |
offset管理器檢查陳舊offsets的頻率 |
offsets.topic.replication.factor |
3 |
topic的offset的備份份數。建議設置更高的數字保證更高的可用性 |
offset.topic.segment.bytes |
104857600 |
offsets topic的segment尺寸。 |
offsets.load.buffer.size |
5242880 |
這項設置與批量尺寸相關,當從offsets segment中讀取時使用。 |
offsets.commit.required.acks |
-1 |
在offset commit能夠接受以前,須要設置確認的數目,通常不須要更改 |
3.生產環境調優: