解壓kafka的安裝包後,在conf目錄下server.propertieslinux
#broker 的全局惟一編號,在kafka集羣中不能重複,爲整型數字 broker.id=0 #開啓刪除topic功能 delete.topic.enable=true #處理網絡請求的線程數量 num.network.threads=3 #用來處理磁盤 IO 的現成數量 num.io.threads=8 #發送套接字的緩衝區大小 socket.send.buffer.bytes=102400 #接收套接字的緩衝區大小 socket.receive.buffer.bytes=102400 #請求套接字的緩衝區大小 socket.request.max.bytes=104857600 # kafka運行數據保存目錄 kafka data是以.log後綴的 log.dirs=/data/kafka/logs #topic 在當前 broker 上的分區數量,下面配置一個分區 num.partitions=1 #用來恢復和清理 data 下數據的線程數量 num.recovery.threads.per.data.dir=1 #segment log.dirs中文件保留的最長時間,超時將被刪除,默認爲7天也就是168小時 log.retention.hours=168 #kafka須要zookeeper進行管理,配置鏈接 Zookeeper 集羣地址 zookeeper.connect=192.168.152.163:2181,192.168.152.162:2181,192.168.152.161:2181
啓動kafka網絡
kafka-server-start.sh -daemon config/server.properties //若是沒有加-daemon的話,將會阻塞方式運行
關閉kafka併發
kafka-server-stop.sh stop
kafka是基於topic的可分區的分佈式的消息中間件。消息生產者經過將消息發送至broker的某一個分區,消費者經過broker的分區獲取消息進行消費,一個broker可有多個分區,一個topic可有多個分區,topic下的每個分區存在副本機制,也就是存在一個leader和多個follower,生產者將消息寫入leader分區,follower同步leader中新增的消息,每一個分區都有index文件和log文件。消費者消費分區中的消息經過偏移值offset進行記錄。app
kafka存儲消息到文件中採用分片及索引機制,每一個topic在一個broker上的數據放在一個文件夾下,該文件夾名稱爲topic名稱加分區號,如minerprofit-1,(minerprofit爲topic名稱,1爲分區號),每個分區上的數據又分爲多個segment,每個segment有index文件和log文件,segment中index和log文件的命名是在該segment第一個offset值。socket
0000000000000000000000.index 0000000000000000000000.log 0000000000000000017866.index 0000000000000000017866.log 0000000000000000029866.index 0000000000000000029866.log
log文件記錄具體的消息數據,index記錄的是消息的偏移值與消息數據的索引值,依據offset值找到具體的消息的過程是:例如offset爲5,首先在index文件中,採用二分法查找offset爲5對應的索引值,而後獲取該索引值,在log文件中依據該索引值獲取消息。分佈式
kafka零拷貝技術: linux有用戶態和核心態,正常的訪問文件linux須要核心態和用戶態進行轉換,kafka內部機制可以作到不通過用戶態,具體機制能夠了解了解。ide
kafka順序寫磁盤。高併發
分區的優劣:ui
優點:this
高併發: 採起分區機制,將不一樣數據放置在不一樣的分區,提升併發能力,相似ConcurrentHashMap的分段鎖機制,當須要訪問不一樣分區的數據,可以將鎖粒度下降。 方便擴展: 當有多個broker時候,多個topic可方便擴展。
缺點:
分區的副本機制可能致使生產者發送的消息不可以正常同步到全部follower,當leader掛掉後,形成消息丟失。
生產者是經過封裝一個ProductRecord對象,須要指定具體的topic,分區,key,value,headers。
/** * Creates a record with a specified timestamp to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign * the timestamp using System.currentTimeMillis(). * @param key The key that will be included in the record * @param value The record contents * @param headers the headers that will be included in the record */ public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers);
消息發送到具體分區的選擇:
生產者消息確認(ack)機制:
kafka分區存在副本機制,一個leader和多個follower,生產者將消息發送到leader,follower同leader同步數據,存在一種狀況,當生產者將消息發送至leader後,follower還未同步到數據,leader就宕機了,而後follower就選擇新的leader,新的leader進行數據同步後,就不清楚了生產者發送的消息時候成功了,因此在此過程當中就涉及到消息確認問題。
副本同步數據有半數確認機制和所有確認機制,kafka使用的是全數確認機制。
kafka設計了三種acks消息確認機制:
kafka消息重複或者消息丟失均可能在此發生。
public static final String ACKS_CONFIG = "acks"; private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the " + " durability of records that are sent. The following settings are allowed: " + " <ul>" + " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the" + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be" + " made that the server has received the record in this case, and the <code>retries</code> configuration will not" + " take effect (as the client won't generally know of any failures). The offset given back for each record will" + " always be set to -1." + " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond" + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after" + " acknowledging the record but before the followers have replicated it then the record will be lost." + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to" + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica" + " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.";
ISR機制:
當acks使用all的時候,全部follower同步數據後再發送acks就會很慢,kafka設計維護一個isr列表,isr是全部follower中的一部分,當isr內的follower同步完了數據,leader就會馬上發送ack確認,isr經過肯定follower是否在必定時間內與其是否完成數據同步後而確認是否將該follower加入到isr列表中。該時間經過replica.lag.time.max.ms參數設定。
log文件中的HW,LEO
發佈訂閱模式中消費者消費消息有兩種方式,一種是消息中間推送消息到消費者,另外一種方式是消費者經過消息中間件拉去消息。推的方式會致使消費者處理消息的速度趕不上推送的速度,拉的方式就會致使消費者在沒有消息的時候會不斷的輪詢,致使空運行,kafka採用拉的方式,能夠設置時間來間隔性的訪問消息中間件。
消費者組:多個消費者可組成一個消費者組,一個消費者組訂閱一個topic,消費者組中的一個消費者消費了消息就不會再有另一個消費者消費消息,當有多個消費者後就涉及分區分消費者策略。
分區分消費者策略:
消費者offset值維護:
kafka老版本將offset維護放置zookeeper中,0.9版本後將維護放置在kafka __consumer_offset topic 中,
exclude.internal.topics=false #開啓內部topic