kafka docker

基本概念介紹

Broker 能夠簡單理解爲一個 Kafka 節點, 多個 Broker 節點構成整個 Kafka 集羣;
Topic 某種類型的消息的合集;
Partition 它是 Topic 在物理上的分組, 多個 Partition 會被分散地存儲在不一樣的 Kafka 節點上; 單個 Partition 的消息是保證有序的, 但整個 Topic 的消息就不必定是有序的;
Segment 包含消息內容的指定大小的文件, 由 index 文件和 log 文件組成; 一個 Partition 由多個 Segment 文件組成
Offset Segment 文件中消息的索引值, 從 0 開始計數
Replica (N) 消息的冗餘備份, 表現爲每一個 Partition 都會有 N 個徹底相同的冗餘備份, 這些備份會被儘可能分散存儲在不一樣的機器上;
Producer 經過 Broker 發佈新的消息到某個 Topic 中;
Consumer 經過 Broker 從某個 Topic 中獲取消息;
如何使用 Kafka
首先介紹下如何搭建 Kafka 集羣. 咱們基於 docker-compose 來搭建一個 2 個節點的集羣, 這裏 是詳細的介紹文檔.git

搭建 Kafka 集羣

首先編寫一個 docker-compose.yml 文件:算法

version: '2.1'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181" 
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.2.136
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

2181(宿主機器):2181(docker內)
其中 KAFKA_ADVERTISED_HOST_NAME 須要被替換成你本機的 IP 地址, 不能是 localhost 0.0.0.0 之類的地址. KAFKA_CREATE_TOPICS 是爲了演示能夠在 Kafka 集羣啓動的時候建立一些默認的 Topic; test:1:1 的含義是默認建立一個名字爲 test, Partition 和 Replica 數量都爲 1 的 Topic.docker

在 docker-compose.yml 文件所在的目錄執行 docker-compose up -d --scale kafka=2 就會在本機啓動一個有兩個節點的 Kafka 集羣:bootstrap

➜  Kafka git:(master) docker-compose up -d --scale kafka=2
Creating network "kafka_default" with the default driver
Creating kafka_kafka_1     ... done
Creating kafka_kafka_2     ... done
Creating kafka_zookeeper_1 ... done
➜  Kafka git:(master) docker ps
CONTAINER ID        IMAGE                    COMMAND                  CREATED                  STATUS              PORTS                                                NAMES
d5927ffbd582        wurstmeister/kafka       "start-kafka.sh"         Less than a second ago   Up 6 seconds        0.0.0.0:32774->9092/tcp                              kafka_kafka_2
17916afee832        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   Less than a second ago   Up 7 seconds        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafka_zookeeper_1
578c02c01fd9        wurstmeister/kafka       "start-kafka.sh"         Less than a second ago   Up 6 seconds        0.0.0.0:32773->9092/tcp                              kafka_kafka_1

兩個節點的 Kafka 集羣已經成功啓動, 節點對應的 container 名分別爲 kafka_kafka_1 和 kafka_kafka_2
經過 Cli 工具演示生產和消費消息
Kafka 官方自帶了一些 cli 工具, 能夠進入到 container 內部去訪問這些命令:安全

➜  Kafka git:(master) docker exec -it kafka_kafka_1 bash
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper kafka_zookeeper_1:2181
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

上面的命令列出了當前 Kafka 集羣的全部 Topic.bash

我本身更喜歡直接在宿主機訪問 Kafka 集羣, 這就須要先安裝上 kafka , 在 macOS 中能夠經過 brew install kafka 來安裝.併發

安裝完成後的使用方法和上面相似, 如列出全部 topic :hexo

➜  Kafka git:(master) kafka-topics --describe --zookeeper localhost:2181
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

接下來咱們來演示如何生產與消費消息.tcp

建立一個新的 Topic:工具

➜  Kafka git:(master) kafka-topics --create --topic chat --partitions 3 --zookeeper localhost:2181 --replication-factor 2
Created topic "chat".

新建立的 Topic 名字爲 chat, partition 數爲 3, replica 數爲 2. 能夠經過下面的命令驗證 Topic 是否成功建立:

➜  Kafka git:(master) kafka-topics --describe --zookeeper localhost:2181
Topic:chat      PartitionCount:3        ReplicationFactor:2     Configs:
        Topic: chat     Partition: 0    Leader: 1001    Replicas: 1001,1002     Isr: 1001,1002
        Topic: chat     Partition: 1    Leader: 1002    Replicas: 1002,1001     Isr: 1002,1001
        Topic: chat     Partition: 2    Leader: 1001    Replicas: 1001,1002     Isr: 1001,1002
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

建立生產者和消費者進程

消息的生產和消費都須要知道對應的 Broker 地址, 若是在 docker 宿主機上訪問的話就須要知道對應的映射端口. 咱們能夠經過下面的命令獲取:

clipboard.png

而後經過下面的命令分別去建立消息生產者和消費者:

kafka-console-producer --broker-list localhost:32773 --topic chat
kafka-console-consumer --bootstrap-server localhost:32773 --topic chat --from-beginning

在生產者中輸入消息, 就能夠在消費者中看到對應的消息輸出了, 效果以下圖所示:

clipboard.png

能夠經過 <Ctrl-c> 來退出這兩個進程.

文件存儲原理介紹
咱們先回顧下前面關於 Topic chat 的一些信息:

Topic:chat PartitionCount:3 ReplicationFactor:2 Configs:

Topic: chat     Partition: 0    Leader: 1001    Replicas: 1001,1002     Isr: 1001,1002
    Topic: chat     Partition: 1    Leader: 1002    Replicas: 1002,1001     Isr: 1002,1001
    Topic: chat     Partition: 2    Leader: 1001    Replicas: 1001,1002     Isr: 1001,1002

從上面能夠看出 ID 爲 1001 的節點 (kafka_kafka_1) 存儲了 Partition 0 和 Partitiont 2 的 Leader 部分, 同時也存儲了 Partition 1 的一個備份.

Partition 是按照下面的算法分佈到多個 Kafka 節點:

將全部 N 個 Broker 和待分配的 M 個Partition排序;
將第 i 個 Partition 分配到第 (i mod N) 個Broker上;
將第 i 個 Partition 的第 j 個副本分配到第 ((i + j) mod N) 個Broker上.
接下來咱們看一看 Partition 具體是怎麼存儲的

咱們能夠登陸到節點 1001 內部看下對應的文件存儲:

➜ blog git:(hexo) ✗ docker exec -it kafka_kafka_1 bash
bash-4.4# cd /kafka/kafka-logs-578c02c01fd9/
bash-4.4# ls -d chat*
chat-0 chat-1 chat-2
能夠看到每個 Partition 都是和一個目錄對應的, 同時每個目錄裏都包含了一個 index 文件和 log 文件:

bash-4.4# ls -lh chat-0
total 16
-rw-r--r-- 1 root root 10.0M May 8 20:52 00000000000000000000.index
-rw-r--r-- 1 root root 77 May 8 20:35 00000000000000000000.log
-rw-r--r-- 1 root root 10.0M May 8 20:52 00000000000000000000.timeindex
-rw-r--r-- 1 root root 10 May 8 20:52 00000000000000000001.snapshot
-rw-r--r-- 1 root root 8 May 8 20:35 leader-epoch-checkpoint
其中 log 文件存儲實際的消息內容, 而和它同名的 index 文件存儲消息的索引數據. log 的文件名存放的是上一個 log 文件中最後一個消息的 offset 值.

能夠按照下面的方法找到指定 offset 對應的消息

首先定位到對應的 segment ; 這個直接根據文件名進行二分查找就能夠找到對應的 segement 了;
再在 segment 的 index 文件中順序查找到 offset 在 log 文件中的位置; index 文件會被映射到內存中.
總結
Kafka 經過給 Topic 指定多個 Partition, 而各個 Partition 分佈在不一樣的節點上, 這樣便能提供比較好的併發能力. 同時, 對於 Partition 還能夠指定對應的 Replica 數, 這也極大地提升了數據存儲的安全性, 防止出現數據丟失.

基於文件名去輔助定位消息的設計仍是很巧妙的!

最開始計劃寫本文時是想經過設計一個聊天的場景來說解的, 發送者是消息生產者, 接受者是消息的消費者, 對於每一個用戶都去生成一個對應的 Topic. 後來以爲工做量有些略大, 就放棄了. 或許想學習 Go 的 Kafaka SDK sarama 的時候就會去實現這個示例.

監控

docker run -itd --name=kafka-manager -p 9000:9000 -e ZK_HOSTS="127.0.0.1:2181" sheepkiller/kafka-manager

相關文章
相關標籤/搜索