須要看圖學習java
producer生產數據,經過ack發送到kafka 中broker(每臺機器的節點不同)對應的partition,數組
存數據:partition存放在pagecache中,最終持久化到磁盤中緩存
取數據: consumer先到達kernel,kernel通知partition獲取元數據,而後調起senfile(in,offset, out),sendfile先去pagecache拿數據,拿不到去磁盤並緩存到pagecache,發送給sendfile,使用了0拷貝模式(不把數據拷貝給應用kafka)網絡
數據存儲方式app
基礎: 數組 大小固定 空間上是連續的 計算方式找到方便 鏈表 大小不固定 空間上不連續 遍歷複雜度高 須要創建索引 數據存儲方式是鏈表 須要維護本身的索引,索引有兩種方式:1.offset 2.timestamp 其實timestamp能夠轉換成offset
producer生產數據到kafka的partition ack有三種方式ide
ack=0: 無論kafka的partition狀態,只往裏面發數據,由於不獲取kafka分區的回調信息 ack=1: 往kafka發數據,只要有leader存活(broker搶到controller),就往kafka發數據,由於須要partition返回確認信息 ack=-1: 往kafaka發數據,當發數據的時候出現網絡波動、副本或者主機死掉,那麼會出現短暫的卡頓,以後會正常發數據,由於ack=-1須要全部的ISR返回ok信息,若是沒有返回的會把該副本T出ISR
一些語義性能
ISR: in-sync replicas 存活的副本 OSR: outof-sync replicas 超過閾值時間10秒,沒有心跳的副本(死掉的副本) AR: assigned replicas 面向分區的副本集合 AR = ISR + OSR LW:、HW、LEO看圖理解
建立topic查看ISR學習
[root@ke03 ~]# kafka-topics.sh --zookeeper ke02:2181,ke03:2181/kafka --create --topic xiaoke-items --partitions 2 --replication-factor 3 Created topic xiaoke-items. [root@ke03 ~]# kafka-topics.sh --zookeeper ke02:2181,ke03:2181/kafka --describe --topic xiaoke-items Topic:xiaoke-items PartitionCount:2 ReplicationFactor:3 Configs: Topic: xiaoke-items Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: xiaoke-items Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 partition0: 在2節點 副本在1,2,3節點 共三個 ISR存活的副本1,2,3
追蹤進程,發現日誌是經過網絡IO發送的測試
[root@ke03 xiaoke-items-0]# jps 11957 Kafka [root@ke03 xiaoke-items-0]# lsof -Pnp 11957 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 11957 root cwd DIR 8,3 4096 924176 /opt/bigdata/kafka/config java 11957 root 143u REG 8,3 0 262735 /var/kafka_data/xiaoke-items-1/00000000000000000000.log java 11957 root 144u REG 8,3 0 262740 /var/kafka_data/xiaoke-items-0/00000000000000000000.log 問:爲何log不用mmap, 而用普通IO呢? log使用普通io的形式目的是通用性 數據存入磁盤的可靠性級別 app層級 調用了io的write,可是這個時候只是到達了內核,性能快,可是丟數據 只有NIO的filechannel,你調用了write()+force(),才真的寫到磁盤,性能極低的 1.每條都force 2.只是write基於內核刷寫機制,靠髒頁 java中: 傳統的io, io.flush是個空實現,沒有物理刷盤,仍是依賴內核的dirty刷盤,因此,會丟東西
向topic:xiaoke-items 生產數據日誌
key: item0 val: val0 partition: 1 offset: 0 key: item1 val: val0 partition: 0 offset: 0 key: item2 val: val0 partition: 1 offset: 1 key: item0 val: val1 partition: 1 offset: 2 key: item1 val: val1 partition: 0 offset: 1 key: item2 val: val1 partition: 1 offset: 3 key: item0 val: val2 partition: 1 offset: 4 key: item1 val: val2 partition: 0 offset: 2 key: item2 val: val2 partition: 1 offset: 5 key: item0 val: val0 partition: 1 offset: 6 key: item1 val: val0 partition: 0 offset: 3 key: item2 val: val0 partition: 1 offset: 7 key: item0 val: val1 partition: 1 offset: 8 key: item1 val: val1 partition: 0 offset: 4 key: item2 val: val1 partition: 1 offset: 9 查看日誌: [root@ke03 xiaoke-items-0]# ll -h total 8.0K -rw-r--r-- 1 root root 10M Jul 26 10:30 00000000000000000000.index -rw-r--r-- 1 root root 385 Jul 26 11:25 00000000000000000000.log -rw-r--r-- 1 root root 10M Jul 26 10:30 00000000000000000000.timeindex -rw-r--r-- 1 root root 8 Jul 26 11:25 leader-epoch-checkpoint 查看kafka日誌文件 [root@ke03 xiaoke-items-0]# kafka-dump-log.sh --files 00000000000000000000.log | more Dumping 00000000000000000000.log Starting offset: 0 baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1627396648184 size: 77 magic: 2 compresscodec: NO NE crc: 1546433855 isvalid: true baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 77 CreateTime: 1627396651246 size: 77 magic: 2 compresscodec: N ONE crc: 2422575540 isvalid: true baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 154 CreateTime: 1627396654287 size: 77 magic: 2 compresscodec: NONE crc: 674617845 isvalid: true baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 231 CreateTime: 1627396657309 size: 77 magic: 2 compresscodec: NONE crc: 1996918817 isvalid: true baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 308 CreateTime: 1627396660339 size: 77 magic: 2 compresscodec: NONE crc: 110021385 isvalid: true 總結:能夠看出向0號分區發送了0-4號數據, 4號分區的日誌文件offset是4 查看index索引文件 [root@ke03 xiaoke-items-0]# kafka-dump-log.sh --files 00000000000000000000.index Dumping 00000000000000000000.index offset: 0 position: 0 爲了看到效果:增長數據0號分區offset到122 [root@ke03 xiaoke-items-0]# kafka-dump-log.sh --files 00000000000000000000.index Dumping 00000000000000000000.index offset: 54 position: 4158 offset: 108 position: 8316 說明: 1.position(字節數組):4158字節的位置 就是offset:54 2.目前offset是122 日誌記錄到108 說明:offset的索引記錄是跳躍記錄,優勢:減小了IO次數 查看timeindex索引文件 [root@ke03 xiaoke-items-0]# kafka-dump-log.sh --files 00000000000000000000.timeindex Dumping 00000000000000000000.timeindex timestamp: 1627397016578 offset: 54 timestamp: 1627397033738 offset: 108 說明: timeindex索引文件指向index索引文件的offset
取數據:
1. timeindex(offset)文件 找到offset,經過offset找到position和下一個position之間的範圍,而後在這個範圍內進行檢索
測試ACK
代碼修改: p.setProperty(ProducerConfig.ACKS_CONFIG, "0"); ack=0 1.生產數據 2.kill kafka 3.ISR減小一個,正常往kafka正產數據 ack=1 1.生產數據 2.kill kafka 3.ISR減小一個,正常往kafka正產數據,由於leader存活,既(broker搶到了controller的這臺機器) ack=-1 1.生產數據 2.kill kafka 3.ISR減小一個,卡頓10秒以後正常往kafka正產數據
發送和消費數據
發送數據:producer.seed() 消費數據:consumer.poll() 拉取數據 修改消費的偏移量:consumer.seek(partition,offset); offset能夠經過timestamp去轉換