官網 http://kafka.apache.org/java
介紹 http://kafka.apache.org/introshell
路徑: http://kafka.apache.org/downloadsapache
wget https://archive.apache.org/dist/kafka/0.11.0.3/kafka_2.11-0.11.0.3.tgz sudo tar -zxvf kafka_2.11-0.11.0.3.tgz -C /opt/software/ cd /opt/software/kafka_2.11-0.11.0.3/config/ sudo chown -C admin:admin /opt/software/kafka_2.11-0.11.0.3/ vim server.properties 修改配置信息 # The id of the broker. This must be set to a unique integer for each broker. broker.id=1 delete.topic.enable=true log.dirs=/opt/software/kafka_2.11-0.11.0.3/logs zookeeper.connect=localhost:2181 多個節點配置最好是 hadoop1 , hadoop2 , hadoop3 ,kafka依賴於zookeeper的,集羣配置以下 zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181 建立日誌文件 前提是你zk啓動好了 kafka啓動 就下面 (最好配置下環境變量) ,記住給其餘幾臺機器穿文件時注意 broker.id=1記得修改. kafka-server-start.sh -daemon /opt/software/kafka_2.11-0.11.0.3/config/server.properties 啓動成功就是 [admin@localhost kafka_2.11-0.11.0.3]$ jps 2880 Jps 2804 Kafka 2185 QuorumPeerMain 修改 bin 下的 kafka-server-stop.sh 文件 PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}') 修改後: PIDS=$(ps ax | grep -i 'Kafka' | grep java | grep -v grep | awk '{print $1}') 就可使用了 kafka-server-stop.sh關閉 羣啓動腳本
1. 增長 kafka-topics.sh --zookeeper localhost:2181 --create --topic myfirsttopic --partitions 1 --replication-factor 1 意思就是: 執行zk地址 建立topic 指定topic名字 建立1個分區 建立一個副本 此時 zkCli查看 [zk: localhost:2181(CONNECTED) 6] ls / [app, cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, latest_producer_id_block, config] [zk: localhost:2181(CONNECTED) 7] ls /brokers [ids, topics, seqid] [zk: localhost:2181(CONNECTED) 8] ls /brokers/topics [myfirsttopic] (這裏多了一個topic節點) [zk: localhost:2181(CONNECTED) 9] 2.查看所有信息 kafka-topics.sh --zookeeper localhost:2181 --list 查看某個節點信息 kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic [admin@localhost kafka_2.11-0.11.0.3]$ kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic Topic:myfirsttopic PartitionCount:1 ReplicationFactor:1 Configs: Topic: myfirsttopic Partition(分區從0開始): 0 Leader(所位於的broker-id): 0 Replicas(副本所在的broker-id位置): 0 Isr(和leader保持同步的副本集合): 0 3. 刪除 kafka-topics.sh --zookeeper localhost:2181 --delete --topic myfirsttopic 4.改(alter) kafka-topics.sh --zookeeper localhost:2181 --alter --topic myfirsttopic --patication 2(修改後的分區只能增長不能減小) 5.啓動生產者消費者 kafka-console-producer.sh --topic myfirsttopic --broker-list localhost:9092(能夠指定多個) 第一種方式 kafka-console-consumer.sh --topic myfirsttopic --bootstrap-server localhost:9092 第二種方式 kafka-console-consumer.sh --topic myfirsttopic --bootstrap-server localhost:9092 --from-beginning(從頭開始消費,涉及到offset)
最開始的模型 P - T - C 模型 , 消費者將本身發佈的主題發給Broker,而後存入Broker中,等待消費者使用 , 此時會發現啥問題 ,單點問題, 一個Topic ,那麼寫入效率會很低, 並且容易宕機bootstrap
繼續 , 進步,完善上面寫入較慢的問題 : , 咱們發現,此時消費者這邊讀就麻煩了 , 我要三個都讀一遍 , 無語 ,繼續改進 ....vim
此時到了咱們第三種架構 , 你有幾個 partition ,我去消費的時候, 就派三我的去消費 ,此時就引入了下面的架構 :服務器
可是此時又有一個問題 , 玩意我哪一個Topic-Partition 掛掉了咋辦呢 ? ,此時就須要創建 副本(duplicate) ,所以引入下面架構 : ,每個分片都會有相同的副本 ,因此不怕掛掉一個 ,有副本能夠接管 .架構
因此以上架構模型 , 使得 Kafka的 讀寫 吞吐量 都是特別高的 , 具體細節實現就不講解了 .麼那本事 .app
1)Producer : 消息生產者,就是向kafka broker發消息的客戶端;oop
2)Consumer :消息消費者,向kafka broker取消息的客戶端;3d
3)Consumer Group :消費者組,由多個consumer組成。消費者組內每一個消費者負責消費不一樣分區的數據,一個分區只能由一個消費者消費;消費者組之間互不影響。全部的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者。
4)Broker :一臺kafka服務器就是一個broker。一個集羣由多個broker組成。一個broker能夠容納多個topic。
5)Topic :能夠理解爲一個隊列,生產者和消費者面向的都是一個topic;
6)Partition:能夠理解過度片 , 爲了實現擴展性,一個很是大的topic能夠分佈到多個broker(即服務器)上,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列;
7)Replicate: 副本,爲保證集羣中的某個節點發生故障時,該節點上的partition數據不丟失,且kafka仍然可以繼續工做,kafka提供了副本機制,一個topic的每一個分區都有若干個副本,一個leader和若干個follower。
8)leader:每一個分區多個副本的「主」,生產者發送數據的對象,以及消費者消費數據的對象都是leader。
9)follower:每一個分區多個副本中的「從」,實時從leader中同步數據,保持和leader數據的同步。leader發生故障時,某個follower會成爲新的follower。
咱們主要關注一個點就是偏移量(offset)
,讀寫所有取決於offset
那麼問題來了 ,offset是啥 ,在哪 , 此時就引出了落盤操做 ,在一個log日誌文件裏, 他的每一條插入記錄都有一個offset , 消費者消費數據,只用拿着 offset(前提是他得自個拿個本本記着,我此次看到哪了)去查找就好了 .
可是咱們知道若是一個Topic的分片partition
所有讓一個日誌文件去寫的話,會形成,這個日誌文件很大 ,查起來也不方便 , 難受 ,此時就引入了 再分片segment
, 和 索引 index
.
根據上面那張圖 , 咱們差很少能夠看清一個topic的架構了 .
先說文件是怎麼存儲的把 , 由於任何一個抽象的概念都對應這一個物理概念 .,否則找個就空的 .
首先一個Topic名爲 TName
的 , 此時他的分區名字叫作 TName-0
,TName-1
,TName-2
的三個文件夾 , 舉個栗子 TName-0
文件夾裏 ,會對應着3個segment
,每個segment
會是一個 偏移量.index
和偏移量.log
的文件 ,因此一共6個文件 , 三個log ,三個index .
再看看 index
和 log
到底是啥吧 :
其中 , 咱們好比說拿着一個offset =3
的去找 ,會先找到 segment-1
,而後去找0.index
,去裏面找偏移量爲3的 ,而後找到一個物理偏移值 ,拿着這個值 咱們又去了 0.log
文件, 終於找到了咱們想要的數據 .
咱們將一個信息 發出去 ,這個信息主要包含