kafka官網: http://kafka.apache.org/javascript
kafka是一種高吞吐量的分佈式發佈訂閱消息系統,用它能夠在不一樣系統中間傳遞分發消息html
zookeeper是快速、高可用、容錯、分佈式的協調服務,kafka使用zookeeper用於管理和協調代理,每一個kafka代理經過zookeeper協調其餘kafka代理java
下載地址:apache
http://kafka.apache.org/downloads.htmlbootstrap
點擊Scala 2.11 - kafka_2.11-2.1.1.tgz (asc, sha512) (帶src是源代碼)windows
而後點擊http://mirrors.shu.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz緩存
下載完成後解壓,放到一個合適的目錄下,我放在E:\Kafka\kafka_2.11-2.1.1服務器
1.找到config目錄下的server.properties,網絡
設置log.dirs地址,我設置成log.dirs=E:\Kafka\kafka_2.11-2.1.1\kafka-logs異步
zookeeper.connect=localhost:2181是默認的, 端口和你zookeeper設置的端口保持一致就行
http://zookeeper.apache.org/releases.html#download
點擊Active releases may be downloaded from Apache mirrors:Download
進入https://www.apache.org/dyn/closer.cgi/zookeeper/後點擊http://mirror.bit.edu.cn/apache/zookeeper/
下載完成後解壓,我放在E:\Kafka\zookeeper-3.4.13
1.通常配置文件都在conf目錄下,找到zookeeper的conf目錄下的zoo_sample.cfg
將其從新命名爲zoo.cfg,
修改dataDir地址爲本身合適的dataDir=E:\\Kafka\\zookeeper-3.4.13\\tmp
2.相似配置java環境變量同樣,設置ZOOKEEPER_HOME, 而後在path中配置%ZOOKEEPER_HOME%\bin
這樣在任意目錄下經過cmd命令:zkServer, windows系統就能找到對應目錄下bin目錄下的zkServer.cmd命令並執行
在kafka的解壓目錄下,例如E:\Kafka\kafka_2.11-2.1.1
按住shift在當前目錄下進入cmd命令窗口
啓動kafka, 命令窗口別關
.\bin\windows\kafka-server-start.bat .\config\server.properties 另開一個命令窗口, 建立一個名爲test的topic .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 查看主題 .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 建立生產者 .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test 執行完後可隨便輸入一個字符串 建立消費者 .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning 執行完命令後會發現 命令行出現了剛纔生產的字符串 查看topic詳情 .\bin\windows\kafka-topics.bat --describe --zookeeper 127.0.0.1:2181 --topic test 在生產者消費者模式下, 按ctrl+c 退出
啓動kafka時出現各類問題和解決, 第一個出現的是錯誤: 找不到或沒法加載主類
這是因爲個人java環境由jre換成了jdk,找到kafka_2.12-1.0.0\bin\windows\kafka-run-class.bat
文件,將set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %
%CLASSPATH%先後加上雙引號
第二個報錯是java version52.0, 經過cmd , java -version命令發現安裝版本是1.8, 可是環境變量配置的路徑jdk是1.7的,版本不一致, 修改環境變量java_home路徑解決
備份機制是幹啥的: 備份機制保證了kafka集羣中的節點掛掉後而不影響整個集羣的工做
生產者向topic中發送數據,消費者消費該topic對應的數據,爲了提升吞吐量,生產者會將該topic對應的數據分別發送到多個partition,每一個partition都有必定數量的副本做爲備份,以提升kafka的高可用性
p0-leader副本 ------- p0-follower副本 | ----------------------------------------------P0-follower副本
生產者和消費者都只在leader副本上寫讀數據,三個leader副本平均分配在三個broker上,其餘follower副本都只作備份,以防leader宕機,follower副本升級成爲leader副本
三個broker之間是有必定的策略進行數據的讀寫的,follower副本會隔指定的時間去leader副本上讀取最新消息,包括元數據和日誌消息
因此kafka節點複製備份其實就是複製分區裏的leader副本,當生產者發佈消息到topic的某個分區時,消息首先被傳遞到leader副本,而後leader通知follower有新消息過來,follower去leader中拉取消息,一旦有足夠的副本收到消息,leader就會提交這個消息,消費者就能消費到這個消息了。
leader負責維護和跟蹤同步副本列表中全部follower滯後狀態,消息提交以後才被成功複製到全部的同步副本,消息複製延遲受最慢的follower限制,
5.1 follower副本發生故障
若是某個follower落後太多或宕機,leader會把他從isr中剔除出去。那麼該副本對應的分區也就稱之爲同步失效分區,即under-replicated分區,follower重啓後會去leader上恢復最新的HW並將日誌截斷到HW,並繼續從leader中獲取HW之後的消息,一旦徹底遇上leader,副本將被從新加入到ISR隊列中,系統將從新回到fully replicated(全量同步)模式。
5.2 leader副本發生故障
leader發生故障,其餘follower會爭相競爭作leader,最終只有一個follower競爭成功升級成爲leader,故障leader重啓後成爲follower去新leader同步消息 (使用Zookeeper實現leader選舉。若是leader失敗,controller會從ISR選出一個新的leader
)
注 :broker概念
已發佈的消息保存在一組服務器中,稱之爲Kafka集羣。集羣中的每個服務器都是一個代理(Broker). 消費者能夠訂閱一個或多個主題(topic),並從Broker拉數據,從而消費這些已發佈的消息。
ISR:in-sync replicas
kafka維護的一個副本維護隊列,ISR的副本保持和leader的同步,固然leader自己也在ISR中。初始狀態全部的副本都處於ISR中,當一個消息發送給leader的時候,leader會等待ISR中全部的副本告訴它已經接收了這個消息,若是一個副本失敗了,那麼它會被移除ISR。下一條消息來的時候,leader就會將消息發送給當前的ISR中節點了
HW: high watermark
是指ISR中全部節點都已經複製完的消息的offset。也是消費者所能獲取到的消息的最大offset
LEO:LogEndOffset,表示每一個分區log的最後一條消息的offset
消息是否會丟失從兩個角度來看
6.1消息發送
kafka消息的發送方式分同步(sync)、異步(async)兩種方式
生產者若是異步發送,會形成消息丟失,發送的過程當中kafka會先把消息緩存起來。而後批量發送。 若批量發送以前client宕機會形成消息丟失。生產者不丟失消息須要同步發送
kafka服務器默認異步刷盤,先刷到系統頁緩存,而後再刷新到日誌文件。頁緩存的數據可能會丟失。解決能夠同步的方式刷盤,可是這樣效率很低,比rabbitmq低。
配置ack=all , min.insync.replas > 1 是能夠保證頁緩存數據不丟失
關閉自動提交?
unclean.leader.election.enable 默認是false 可靠性優先, 不在ISR裏的follower不可以參與選舉,此時沒法進行新的選舉,此時整個分區處於不可用狀態
6.2消息消費
使用高級接口High-level API,可能存在一個問題就是當消息消費者從集羣中把消息取出來、並提交了新的消息offset值後,還沒來得及消費就掛掉了,那麼下次再消費時以前沒消費成功的消息就消失了
簡單來講,
producer.type屬性進行配置同步異步
request.required.acks屬性來確認消息的生產,-1---表示Leader和Follower都接收成功時確認;
同步模式下,確認機制設置爲-1,即讓消息寫入Leader和Follower以後再確認消息發送成功
Kafka將每一個Topic進行分區Patition,以提升消息的並行處理,同時爲保證高可用性,每一個分區都有必定數量的副本 Replica,這樣當部分服務器不可用時副本所在服務器就能夠接替上來,保證系統可用性。在Leader上負責讀寫,Follower負責數據的同步。當一個Leader發生故障如何從Follower中選擇新Leader呢?
Kafka在Zookeeper上針對每一個Topic都維護了一個ISR(in-sync replica---已同步的副本)的集合,集合的增減Kafka都會更新該記錄。若是某分區的Leader不可用,Kafka就從ISR集合中選擇一個副本做爲新的Leader。這樣就能夠容忍的失敗數比較高,假如某Topic有N+1個副本,則能夠容忍N個服務器不可用。
若是ISR中副本都不可用,有兩種處理方法:
(1)等待ISR集合中副本復活後選擇一個可用的副本;
(2)選擇集羣中其餘可用副本;
磁盤吞吐量 磁盤容量 內存 網絡 CPU
At most once—Messages may be lost but are never redelivered. 最多一次 --- 消息可能丟失,但毫不會重發。 At least once—Messages are never lost but may be redelivered. 至少一次 --- 消息毫不會丟失,但有可能從新發送。 Exactly once—this is what people actually want, each message is delivered once and only once. 正好一次 --- 這是人們真正想要的,每一個消息傳遞一次且僅一次。