Apache Kafka 是分佈式發佈-訂閱消息系統(消息中間件)。它最初由 LinkedIn 公司開發,之後成爲 Apache 項目的一部分。Kafka 是一種快速、可擴展的、設計內在就是分佈式的,分區的和可複製的提交日誌服務。html
簡單說明什麼是Kafka:java
舉個例子,生產者消費者,生產者生產雞蛋,消費者消費雞蛋,生產者生產一個雞蛋, 消費者就消費一個雞蛋,假設消費者消費雞蛋的時候噎住了(系統宕機了),生產者還在生 產雞蛋,那新生產的雞蛋就丟失了。再好比生產者很強勁(大交易量的狀況),生產者1秒鐘生產100 個雞蛋,消費者1 秒鐘只能吃50 個雞蛋,那要不了一會,消費者就吃不消了
傳統消息中間件服務 RabbitMQ、Apache ActiveMQ 等。node
Apache Kafka 與傳統消息系統相比,有如下不一樣:算法
1.它是分佈式系統,易於向外擴展;apache
2.它同時爲發佈和訂閱提供高吞吐量;bootstrap
3.它支持多訂閱者,當失敗時能自動平衡消費者;服務器
4.它將消息持久化到磁盤,所以可用於批量消費,例如 ETL,以及實時應用程序。app
術語tcp |
解釋分佈式 |
||
Broker |
Kafka 集羣包含一個或多個服務器,這種服務器被稱爲 broker |
||
Topic |
每條發佈到 Kafka 集羣的消息都有一個類別,這個類別被稱爲 Topic。(物 理上不一樣 Topic 的消息分開存儲,邏輯上一個 Topic 的消息雖然保存於一個或多個 broker 上但用戶只需指定消息的 Topic 便可生產或消費數據而沒必要關心數據存於何處) |
||
Partition |
Partition 是物理上的概念,每一個 Topic 包含一個或多個 Partition. |
||
Producer |
負責發佈消息到 Kafka broker |
||
Consumer |
消息消費者,向 Kafka broker 讀取消息的客戶端 |
||
Consumer Group |
每一個 Consumer 屬於一個特定的 Consumer Group(可爲每一個 Consumer 指定 group name,若不指定 group name 則屬於默認的 group) |
||
replica |
partition 的副本,保障 partition 的高可用 |
||
leader |
replica 中的一個角色, producer 和 consumer 只跟 leader 交互 |
||
follower |
replica 中的一個角色,從 leader 中複製數據 |
||
controller |
Kafka 集羣中的其中一個服務器,用來進行 leader election 以及各類 failover |
小白理解:
producer:生產者,就是它來生產「雞蛋」的。
consumer:消費者,生出的「雞蛋」它來消費。
topic:把它理解爲標籤,生產者每生產出來一個雞蛋就貼上一個標籤(topic),消費者可不是誰生產的「雞蛋」都吃的,這樣不一樣的生產者生產出來的「雞蛋」,消費者就能夠選擇性的「吃」了。
broker:就是籃子了。
若是從技術角度,topic標籤實際就是隊列,生產者把全部「雞蛋(消息)」都放到對應的隊列裏了,消費者到指定的隊列裏取。
Apache kafka 官方: http://kafka.apache.org/downloads.html
Scala 2.11 - kafka_2.11-0.10.2.0.tgz (asc, md5)
參照 Zookeeper 官網搭建一個 ZK 集羣, 並啓動 ZK 集羣。
vi server.properties broker.id=0 //爲依次增加的:0、一、二、三、4,集羣中惟一id log.dirs=/kafkaData/logs // Kafka 的消息數據存儲路徑zookeeper.connect=master:2181,slave1:2181,slave2:2181 //zookeeperServers 列表,各節點以逗號分開 Vi zookeeper.properties dataDir=/root/zkdata #指向你安裝的zk 的數據存儲目錄 # 將 Kafka server.properties zookeeper.properties 文件拷貝到其餘節點機器 KAFKA_HOME/config>scp server.properties zookeeper.properties xx:$PWD
在每臺節點上啓動:
bin/kafka-server-start.sh -daemon config/server.properties &
bin/kafka-topics.sh --create --zookeeper hadoop:2181,hadoop001:2181,hadoop002:2181 --replication-factor 3 --partitions 3 --topic testTopic
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
第一行是對全部分區的一個描述,而後每一個分區對應一行,由於只有一個分區因此下面一行。
leader:負責處理消息的讀和寫,leader 是從全部節點中隨機選擇的.
replicas:列出了全部的副本節點,無論節點是否在服務中.
isr:是正在服務中的節點.
在例子中,節點 1 是做爲 leader 運行。
bin/kafka-console-producer.sh --broker-list hadoop:9092,hadoop001:9092 --topic test
bin/kafka-console-consumer.sh --bootstrap-server hadoop:9092 --from-beginning --topic hellotopic
Kill -9 pid[leader 節點]
另一個節點被選作了 leader,node 1 再也不出如今 in-sync 副本列表中: bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
雖然最初負責續寫消息的 leader down 掉了,但以前的消息仍是能夠消費的:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.2.1</version> </dependency>
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency>
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; /** * kafka 生產端Api開發 */ public class ProducerApi { public static void main(String[] args) throws Exception{ Properties props = new Properties(); props.setProperty("bootstrap.servers","hadoop:9092,hadoop001:9092,hadoop002:9092"); props.setProperty("key.serializer",StringSerializer.class.getName()); props.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); /** * 發送數據的時候是否須要應答 * 取值範圍: * [all,-1,0,1] * 0:leader不作任何應答 * 1:leader會給producer作出應答 * all、-1:fllower->leader->producer * 默認值: 1 */ //props.setProperty("acks","1") /** * 自定義分區 * 默認值:org.apache.kafaka.clients.producer.internals.DefaultPartitoner */ //props.setProperty("partitioner.class","org.apache.kafaka.clients.producer.internals.DefaultPartitoner"); //建立一個生產者的客戶端實例 KafkaProducer<Object, Object> kafkaproducer = new KafkaProducer<>(props); int count=0; while (count<1000){ int partitionNum=count%3; //封裝一條消息 ProducerRecord record = new ProducerRecord("testTopic", partitionNum, "", count+""); //發送一條消息 kafkaproducer.send(record); count++; Thread.sleep(1*1000); } //釋放 kafkaproducer.close(); } }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Properties; /** * 消費端Api開發 */ public class ConsumerApi { public static void main(String[] args) { Properties config = new Properties(); HashMap<String, Object> props = new HashMap<>(); config.put("bootstrap.servers","hadoop:9092,hadoop001:9092,hadoop002:9092"); config.put("key.deserializer",StringDeserializer.class.getName()); config.put("value.deserializer",StringDeserializer.class.getName()); config.put("group.id","day12_001"); /** * 從哪一個位置開始獲取數據 * 取值範圍: * [latest,earliest,none] * 默認值: * latest */ config.put("auto.offset.reset","earliest"); /** * 是否要自動遞交偏移量(offset)這條數據在某個分區所在位置的編號 */ config.put("enable.auto.commit",true); /** * 設置500毫秒遞交一次offset值 */ config.put("auto.commit.interval.ms",500); //建立一個客戶端實例 KafkaConsumer<Object, Object> kafkaConsumer = new KafkaConsumer<>(config); //訂閱主題 kafkaConsumer.subscribe(Arrays.asList("testTopic")); while (true){ //拉取數據,會從kafka因此分區下拉取數據 ConsumerRecords<Object, Object> records = kafkaConsumer.poll(2000); Iterator<ConsumerRecord<Object, Object>> iterator = records.iterator(); while (iterator.hasNext()){ ConsumerRecord<Object, Object> record = iterator.next(); System.out.println("record"+record); } } } }
如上圖所示,一個典型的 Kafka 集羣中包含若干 Producer,若干 broker(Kafka 支持水平擴展, 通常 broker 數量越多,集羣吞吐率越高),若干 Consumer Group,以及一個 Zookeeper 集羣。Kafka 經過 Zookeeper 管理集羣配置,選舉 leader。Producer 使用 push 模式將消息發佈到 broker,Consumer 使用 pull 模式從 broker 訂閱並消費消息。
中,屬於順序寫磁盤。
主題是發佈記錄的類別或訂閱源名稱。Kafka的主題老是多用戶; 也就是說,一個主題能夠有零個,一個或多個消費者訂閱寫入它的數據。
對於每一個主題,Kafka羣集都維護一個以下所示的分區日誌:
1.指定了 partition,則直接使用;
2.未指定 partition 但指定 key,經過對 key 的 value 進行 hash 選出一個 partition
3.partition 和 key 都未指定,使用輪詢選出一個 partition 。
物理上把 topic 分紅一個或多個 partition(對應 server.properties 中的 num.partitions=3 配置),每一個 partition 物理上對應一個文件夾(該文件夾存儲該 partition 的全部消息和索引文件),以下:
不管消息是否被消費,kafka 都會保留全部消息。有兩種策略能夠刪除舊數據:
log.retention.hours=168 #基於時間
log.retention.bytes=1073741824 #基於大小
Partition 中的每條 Message 由 offset 來表示它在這個 partition 中的偏移量,這個 offset 不是該 Message 在 partition 數據文件中的實際存儲位置,而是邏輯上一個值,它惟一肯定了
partition 中的一條 Message。所以,能夠認爲 offset 是 partition 中 Message 的 id。partition
中的每條 Message 包含了如下三個屬性:
offset
MessageSize
data
其中 offset 爲 long 型,MessageSize 爲 int32,表示 data 有多大,data 爲 message 的具體內容。
咱們來思考一下,若是一個partition 只有一個數據文件會怎麼樣?
1) 新數據是添加在文件末尾,不論文件數據文件有多大,這個操做永遠都是高效的。
2) 查找某個offset 的Message是順序查找的。所以,若是數據文件很大的話,查找的效率就低。
那 Kafka 是如何解決查找效率的的問題呢?有兩大法寶:1) 分段 2) 索引。
Kafka 解決查詢效率的手段之一是將數據文件分段,好比有 100 條 Message,它們的 offset 是從 0 到 99。假設將數據文件分紅 5 段,第一段爲 0-19,第二段爲 20-39,以此類推,每段放在一個單獨的數據文件裏面,數據文件以該段中最小的 offset 命名。這樣在查找指定 offset 的 Message 的時候,用二分查找就能夠定位到該 Message 在哪一個段中。
數據文件分段使得能夠在一個較小的數據文件中查找對應 offset 的 Message 了,可是這依然須要順序掃描才能找到對應 offset 的 Message。爲了進一步提升查找的效率,Kafka 爲每一個分段後的數據文件創建了索引文件,文件名與數據文件的名字是同樣的,只是文件擴展名爲.index。
索引文件中包含若干個索引條目,每一個條目表示數據文件中一條 Message 的索引。索引包含兩個部分,分別爲相對 offset 和 position。
1.相對 offset:由於數據文件分段之後,每一個數據文件的起始 offset 不爲 0,相對 offset 表示這條 Message 相對於其所屬數據文件中最小的 offset 的大小。舉例,分段後的一個數據文件的 offset 是從 20 開始,那麼 offset 爲 25 的 Message 在 index 文件中的相對 offset 就是 25-20 = 5。存儲相對 offset 能夠減少索引文件佔用的空間。
2.position,表示該條 Message 在數據文件中的絕對位置。只要打開文件並移動文件指針到這個 position 就能夠讀取對應的 Message 了。
index 文件中並無爲數據文件中的每條 Message 創建索引,而是採用了稀疏存儲的方式, 每隔必定字節的數據創建一條索引。這樣避免了索引文件佔用過多的空間,從而能夠將索引 文件保留在內存中。但缺點是沒有創建索引的 Message 也不能一次定位到其在數據文件的位置,從而須要作一次順序掃描,可是此次順序掃描的範圍就很小了。
咱們以幾張圖來總結一下 Message 是如何在 Kafka 中存儲的,以及如何查找指定 offset 的Message 的。
Message 是按照 topic 來組織,每一個 topic 能夠分紅多個的 partition,好比:有 5 個 partition的名爲爲 page_visits 的 topic 的目錄結構爲:
partition 是分段的,每一個段叫 Segment,包括了一個數據文件和一個索引文件,下圖是某個partition 目錄下的文件:
能夠看到,這個 partition 有 4 個 Segment。圖示 Kafka 是如何查找 Message 的。
好比:要查找絕對 offset 爲 7 的 Message:
首先是用二分查找肯定它是在哪一個 LogSegment 中,天然是在第一個 Segment 中。
打開這個 Segment 的 index 文件,也是用二分查找找到 offset 小於或者等於指定 offset 的索引條目中最大的那個 offset。天然 offset 爲 6 的那個索引是咱們要找的,經過索引文件咱們知道 offset 爲 6 的 Message 在數據文件中的位置爲 9807。
打開數據文件,從位置爲9807 的那個地方開始順序掃描直到找到offset 爲7 的那條Message。這套機制是創建在 offset 是有序的。索引文件被映射到內存中,因此查找的速度仍是很快的。
一句話,Kafka 的 Message 存儲採用了分區(partition),分段(LogSegment)和稀疏索引這幾個手段來達到了高效性。