kafka原理

1.kafka配置文件

解壓kafka的安裝包後,在conf目錄下server.propertieslinux

#broker 的全局惟一編號,在kafka集羣中不能重複,爲整型數字 
broker.id=0 
#開啓刪除topic功能
delete.topic.enable=true 
#處理網絡請求的線程數量
num.network.threads=3 
#用來處理磁盤 IO 的現成數量
num.io.threads=8 
#發送套接字的緩衝區大小 
socket.send.buffer.bytes=102400 
#接收套接字的緩衝區大小 
socket.receive.buffer.bytes=102400 
#請求套接字的緩衝區大小 
socket.request.max.bytes=104857600 
# kafka運行數據保存目錄 kafka data是以.log後綴的
log.dirs=/data/kafka/logs 
#topic 在當前 broker 上的分區數量,下面配置一個分區
num.partitions=1 
#用來恢復和清理 data 下數據的線程數量 num.recovery.threads.per.data.dir=1
#segment log.dirs中文件保留的最長時間,超時將被刪除,默認爲7天也就是168小時
log.retention.hours=168 
#kafka須要zookeeper進行管理,配置鏈接 Zookeeper 集羣地址
zookeeper.connect=192.168.152.163:2181,192.168.152.162:2181,192.168.152.161:2181

啓動kafka網絡

kafka-server-start.sh -daemon config/server.properties
//若是沒有加-daemon的話,將會阻塞方式運行

關閉kafka併發

kafka-server-stop.sh stop

2.kafka工做流程

kafka是基於topic的可分區的分佈式的消息中間件。消息生產者經過將消息發送至broker的某一個分區,消費者經過broker的分區獲取消息進行消費,一個broker可有多個分區,一個topic可有多個分區,topic下的每個分區存在副本機制,也就是存在一個leader和多個follower,生產者將消息寫入leader分區,follower同步leader中新增的消息,每一個分區都有index文件和log文件。消費者消費分區中的消息經過偏移值offset進行記錄。app

image.png

kafka存儲消息到文件中採用分片及索引機制,每一個topic在一個broker上的數據放在一個文件夾下,該文件夾名稱爲topic名稱加分區號,如minerprofit-1,(minerprofit爲topic名稱,1爲分區號),每個分區上的數據又分爲多個segment,每個segment有index文件和log文件,segment中index和log文件的命名是在該segment第一個offset值。socket

0000000000000000000000.index
0000000000000000000000.log
0000000000000000017866.index
0000000000000000017866.log
0000000000000000029866.index
0000000000000000029866.log

log文件記錄具體的消息數據,index記錄的是消息的偏移值與消息數據的索引值,依據offset值找到具體的消息的過程是:例如offset爲5,首先在index文件中,採用二分法查找offset爲5對應的索引值,而後獲取該索引值,在log文件中依據該索引值獲取消息。分佈式

kafka零拷貝技術: linux有用戶態和核心態,正常的訪問文件linux須要核心態和用戶態進行轉換,kafka內部機制可以作到不通過用戶態,具體機制能夠了解了解。ide

kafka順序寫磁盤。高併發

分區的優劣:ui

優點:this

高併發: 採起分區機制,將不一樣數據放置在不一樣的分區,提升併發能力,相似ConcurrentHashMap的分段鎖機制,當須要訪問不一樣分區的數據,可以將鎖粒度下降。

方便擴展: 當有多個broker時候,多個topic可方便擴展。

缺點:

分區的副本機制可能致使生產者發送的消息不可以正常同步到全部follower,當leader掛掉後,形成消息丟失。

3.生產者

生產者是經過封裝一個ProductRecord對象,須要指定具體的topic,分區,key,value,headers。

  • topic:指定具體要發送消息到哪一個topic
  • partition: 指定topic具體的分區
  • timestamp: 發送消息的時間若是爲null則爲系統當前時間
  • key: 消息的key值,在決定具體分區的時候有用
  • value: 消息體信息
  • headers: 消息頭信息
/**
     * Creates a record with a specified timestamp to be sent to a specified topic and partition
     * 
     * @param topic The topic the record will be appended to
     * @param partition The partition to which the record should be sent
     * @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign
     *                  the timestamp using System.currentTimeMillis().
     * @param key The key that will be included in the record
     * @param value The record contents
     * @param headers the headers that will be included in the record
     */
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers);

消息發送到具體分區的選擇:

  • 若是partition指定了,就發送到具體分區
  • 若是partition沒有指定,key值存在,就將key的hashcode與該topic的可用分區進行取餘。
  • 若是partition和key都沒有指定,則會生成一個隨機數,將該隨機數的hashcode值與可用分區數進行取餘,下一次決定分區的時候將上述的隨機數進行加1,從新取餘。

生產者消息確認(ack)機制:

kafka分區存在副本機制,一個leader和多個follower,生產者將消息發送到leader,follower同leader同步數據,存在一種狀況,當生產者將消息發送至leader後,follower還未同步到數據,leader就宕機了,而後follower就選擇新的leader,新的leader進行數據同步後,就不清楚了生產者發送的消息時候成功了,因此在此過程當中就涉及到消息確認問題。

副本同步數據有半數確認機制和所有確認機制,kafka使用的是全數確認機制。

kafka設計了三種acks消息確認機制:

  • acks=0,當leader一收到消息,尚未寫入磁盤,就給生產者發送ack消息,此就可能致使發送了acks,還未寫入磁盤,leader就掛了。
  • acks=1,當leader收到消息,而且將消息記錄到磁盤中後,再發送ack確認,但該消息還未同步到全部follower,leader掛掉後,就會致使消息丟失
  • acks=all, 當leader收到消息後,而且寫入到磁盤後,全部follower也同步消息後,才進行發送ack確認。此也有缺點,若是follower比較多的話就會致使發送消息慢,kafka提供了isr機制。該機制也可能致使數據重複問題,當leader掛掉後,follower部分同步了數據,而後follower之間選出leader,同步了還未給生產者發送ack確認信息的數據,而後生產者會從新發送該消息,因此就會重複,該重複可在業務中作判斷,如消費者獲得消息後首先在庫中查找是否已經存在該消息,若是存在就拋出異常,即便不查庫,也可使用惟一索引。

kafka消息重複或者消息丟失均可能在此發生。

public static final String ACKS_CONFIG = "acks";
    private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
                                           + " durability of records that are sent. The following settings are allowed: "
                                           + " <ul>"
                                           + " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the"
                                           + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"
                                           + " made that the server has received the record in this case, and the <code>retries</code> configuration will not"
                                           + " take effect (as the client won't generally know of any failures). The offset given back for each record will"
                                           + " always be set to -1."
                                           + " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond"
                                           + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after"
                                           + " acknowledging the record but before the followers have replicated it then the record will be lost."
                                           + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"
                                           + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
                                           + " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.";

ISR機制:

當acks使用all的時候,全部follower同步數據後再發送acks就會很慢,kafka設計維護一個isr列表,isr是全部follower中的一部分,當isr內的follower同步完了數據,leader就會馬上發送ack確認,isr經過肯定follower是否在必定時間內與其是否完成數據同步後而確認是否將該follower加入到isr列表中。該時間經過replica.lag.time.max.ms參數設定。

log文件中的HW,LEO

  • HW : high water,記錄消費者消費offset的最大值,在leader和follower之間,消費者可以看到的最大offset就是HW。
  • LEO: log end offset,記錄每個log文件中最大的offset值。

4.消費者

發佈訂閱模式中消費者消費消息有兩種方式,一種是消息中間推送消息到消費者,另外一種方式是消費者經過消息中間件拉去消息。推的方式會致使消費者處理消息的速度趕不上推送的速度,拉的方式就會致使消費者在沒有消息的時候會不斷的輪詢,致使空運行,kafka採用拉的方式,能夠設置時間來間隔性的訪問消息中間件。

消費者組:多個消費者可組成一個消費者組,一個消費者組訂閱一個topic,消費者組中的一個消費者消費了消息就不會再有另一個消費者消費消息,當有多個消費者後就涉及分區分消費者策略。

分區分消費者策略:

  • RoundRobin:RoundRobin方式是以消費者組爲主,若是消費者組CG1有三個消費者C1,C2和C3,C1,C2和C3分別訂閱了一個topic有三個分區p1,p2和p3,kafka會進行隨機組合,組合成c1p1,c1p2,c1p3,c2p1,c2p2,c2p3,c3p1,c3p2和c3p3,而後依據hashcode值將消息分別發送到不一樣的消費者中進行消費。
  • Range:range方式是面向topic的,若是一個topic被一個消費者組訂閱,該消費者組有3個消費者,而後會將topic三個分區的消息分別給三個消費者。

消費者offset值維護:

kafka老版本將offset維護放置zookeeper中,0.9版本後將維護放置在kafka __consumer_offset topic 中,

exclude.internal.topics=false #開啓內部topic

5.zookeeper在kafka中的做用

  • broker中的controller的選舉
  • broker的上下線
  • topic 分區副本分配及follower之間選舉leader
相關文章
相關標籤/搜索