Apache Kafka 企業級消息隊列

一、大綱

  • 瞭解 Apache Kafka是什麼java

  • 掌握Apache Kafka的基本架構node

  • 搭建Kafka集羣git

  • 掌握操做集羣的兩種方式github

  • 瞭解Apache Kafka高級部分的內容算法

二、消息系統的做用是什麼?

消息系統最核心的功能有三個,分別是解耦、異步、並行spring

下面咱們經過用戶註冊的案例來講明消息系統的做用:shell

2.一、用戶註冊的通常流程

 

問題:隨着後端流程愈來愈多,每步流程都須要額外的耗費不少時間,從而會致使用戶更長的等待延遲。apache

2.二、改進成並行流程

 問題:系統並行的發起了4 個請求,4 個請求中,若是某一個環節執行1 分鐘,其餘環節再快,用戶也須要等待1 分鐘。若是其中一個環節異常以後,整個服務掛掉了。bootstrap

如何解決???vim

2.三、經過消息系統解決(異步架構)

 

可見,經過消息架構解決了並行架構的問題。

三、瞭解 Apache Kafka

3.一、簡介

官網:http://kafka.apache.org/

  • Apache Kafka 是一個開源消息系統,由Scala 寫成。是由Apache 軟件基金會開發的一個開源消息系統項目。

  • Kafka 最初是由LinkedIn 開發,並於2011 年初開源。2012 年10 月從Apache Incubator 畢業。該項目的目標是爲處理實時數據提供一個統1、高通量、低等待(低延時)的平臺。

  • Kafka 是一個分佈式消息系統:具備生產者、消費者的功能。它提供了相似於JMS 的特性,可是在設計實現上徹底不一樣,此外它並非JMS 規範的實現。【重點】

3.二、kafka的基本結構

 

 

  • Producer:消息的發送者

  • Consumer:消息的接收者

  • kafka cluster:kafka的集羣。

  • Topic:就是消息類別名,一個topic中一般放置一類消息。每一個topic都有一個或者多個訂閱者(消費者)。

消息的生產者將消息推送到kafka集羣,消息的消費者從kafka集羣中拉取消息。

3.三、kafka的完整架構

 

說明:

  • broker:集羣中的每個kafka實例,稱之爲broker;

  • ZooKeeper:Kafka 利用ZooKeeper 保存相應元數據信息, Kafka 元數據信息包括如代理節點信息、Kafka集羣信息、舊版消費者信息及其消費偏移量信息、主題信息、分區狀態信息、分區副本分配方案信息、動態配置信息等。

  • ConsumerGroup:在Kafka 中每個消費者都屬於一個特定消費組( ConsumerGroup ),咱們能夠爲每一個消費者指定一個消費組,以groupld 表明消費組名稱,經過group.id 配置設置。若是不指定消費組,則該消費者屬於默認消費組test-consumer-group 。

3.四、kafka的特性

  • 消息持久化

    • Kafka 基於文件系統來存儲和緩存消息。

  • 高吞吐量

    • Kafka 將數據寫到磁盤,充分利用磁盤的順序讀寫。同時, Kafka 在數據寫入及數據同步採用了零拷貝( zero-copy )技術,採用sendFile()函數調用,sendFile()函數是在兩個文件描述符之間直接傳遞數據,徹底在內核中操做,從而避免了內核緩衝區與用戶緩衝區之間數據的拷貝,操做效率極高。

    • Kafka 還支持數據壓縮及批量發送,同時Kafka 將每一個主題劃分爲多個分區,這一系列的優化及實現方法使得Kafka 具備很高的吞吐量。經大多數公司對Kafka 應用的驗證, Kafka 支持每秒數百萬級別的消息

  • 高擴展性

    • Kafka 依賴ZooKeeper來對集羣進行協調管理,這樣使得Kafka 更加容易進行水平擴展,生產者、消費者和代理都爲分佈式,可配置多個。

    • 同時在機器擴展時無需將整個集羣停機,集羣可以自動感知,從新進行負責均衡及數據複製。

  • 多客戶端支持

    • Kafka 核心模塊用Scala 語言開發,Kafka 提供了多種開發語言的接入,如Java 、Scala、C 、C++、Python 、Go 、Erlang 、Ruby 、Node. 等。

  • 安全機制

    • 的Kafka 支持如下幾種安全措施:

      • 經過SSL 和SASL(Kerberos), SASL/PLA時驗證機制支持生產者、消費者與broker鏈接時的身份認證;

      • 支持代理與ZooKeeper 鏈接身份驗證;

      • 通訊時數據加密;

      • 客戶端讀、寫權限認證;

      • Kafka 支持與外部其餘認證受權服務的集成;

  • 數據備份

    • Kafka 能夠爲每一個topic指定副本數,對數據進行持久化備份,這能夠必定程度上防止數據丟失,提升可用性。

  • 輕量級

    • Kafka 的實例是無狀態的,即broker不記錄消息是否被消費,消費偏移量的管理交由消費者本身或組協調器來維護。

    • 同時集羣自己幾乎不須要生產者和消費者的狀態信息,這就使得Kafka很是輕量級,同時生產者和消費者客戶端實現也很是輕量級。

  • 消息壓縮

    • Kafka 支持Gzip, Snappy 、LZ4 這3 種壓縮方式,一般把多條消息放在一塊兒組成MessageSet,而後再把Message Set 放到一條消息裏面去,從而提升壓縮比率進而提升吞吐量。

3.五、kafka的應用場景

  • 消息系統。

    • Kafka 做爲一款優秀的消息系統,具備高吞吐量、內置的分區、備份冗餘分佈式等特色,爲大規模消息處理提供了一種很好的解決方案。

  • 應用監控。

    • 利用Kafka 採集應用程序和服務器健康相關的指標,如CPU 佔用率、IO 、內存、鏈接數、TPS 、QPS 等,而後將指標信息進行處理,從而構建一個具備監控儀表盤、曲線圖等可視化監控系統。例如,不少公司採用Kafka 與ELK (Elastic Search 、Logstash 和Kibana)整合構建應用服務監控系統。

  • 網站用戶行爲追蹤。

    • 爲了更好地瞭解用戶行爲、操做習慣,改善用戶體驗,進而對產品升級改進,將用戶操做軌跡、內容等信息發送到Kafka 集羣上,經過Hadoop 、Spark 或Strom等進行數據分析處理,生成相應的統計報告,爲推薦系統推薦對象建模提供數據源,進而爲每一個用戶進行個性化推薦。

  • 流處理。

    • 須要將己收集的流數據提供給其餘流式計算框架進行處理,用Kafka 收集流數據是一個不錯的選擇。

  • 持久性日誌。

    • Kafka 能夠爲外部系統提供一種持久性日誌的分佈式系統。日誌能夠在多個節點間進行備份, Kafka 爲故障節點數據恢復提供了一種從新同步的機制。同時, Kafka很方便與HDFS 和Flume 進行整合,這樣就方便將Kafka 採集的數據持久化到其餘外部系統。

四、Kafka的安裝與配置

準備三臺虛擬機,分別是node01,node02,node03,而且修改hosts文件以下:

vim /etc/hosts
#注意: 前面的ip地址改爲本身的ip地址

192.168.40.133 node01
192.168.40.134 node02
192.168.40.135 node03

#3臺服務器的時間要一致
#時間更新:
yum install -y rdate
rdate -s time-b.nist.gov

4.一、基礎環境配置

4.1.一、JDK環境

因爲Kafka 是用Scala 語言開發的,運行在JVM上,所以在安裝Kafka 以前須要先安裝JDK 。

安裝過程略過,我這裏使用的是jdk1.8。

4.1.二、ZooKeeper環境

4.1.2.一、安裝ZooKeeper

Kafka 依賴ZooKeeper ,經過ZooKeeper 來對服務節點、消費者上下線管理、集羣、分區元數據管理等,所以ZooKeeper 也是Kafka 得以運行的基礎環境之一。


#上傳zookeeper-3.4.9.tar.gz到/export/software
cd /export/software
mkdir -p /export/servers/
tar -xvf zookeeper-3.4.9.tar.gz -C /export/servers/
#建立ZooKeeper的data目錄
mkdir /export/data/zookeeper -p
cd /export/servers/zookeeper-3.4.9/conf/
#修改配置文件
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
#設置data目錄
dataDir=/export/data/zookeeper
#啓動ZooKeeper
./zkServer.sh start
#檢查是否啓動成功
jps
4.1.2.三、搭建ZooKeeper集羣

#在/export/data/zookeeper目錄中建立myid文件
vim /export/data/zookeeper/myid
#寫入對應的節點的id,如:1,2等,保存退出

#在conf下,修改zoo.cfg文件
vim zoo.cfg
#添加以下內容
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
4.1.2.三、配置環境變量

vim /etc/profile
export ZK_HOME=/export/servers/zookeeper-3.4.9
export PATH=${ZK_HOME}/bin:$PATH

#當即生效
source /etc/profile
4.1.2.四、分發到其它機器

scp /etc/profile node02:/etc/
scp /etc/profile node03:/etc/

cd /export/servers
scp -r zookeeper-3.4.9 node02:/export/servers/
scp -r zookeeper-3.4.9 node03:/export/servers/
4.1.2.五、一鍵啓動、中止腳本

mkdir -p /export/servers/onekey/zk
vim slave
#輸入以下內容
node01
node02
node03
#保存退出

vim startzk.sh
#輸入以下內容
cat /export/servers/onekey/zk/slave | while read line
do
{
echo "開始啓動 --> "$line
ssh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh start >/dev/null 2>&1 &"
}&
wait
done
echo "★★★啓動完成★★★"
#保存退出

vim stopzk.sh
#輸入以下內容
cat /export/servers/onekey/zk/slave | while read line
do
{
echo "開始中止 --> "$line
ssh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh stop >/dev/null 2>&1 &"
}&
wait
done
echo "★★★中止完成★★★"
#保存退出

#設置可執行權限
chmod +x startzk.sh stopzk.sh

#添加到環境變量中
export ZK_ONEKEY=/export/servers/onekey
export PATH=${ZK_ONEKEY}/zk:$PATH
4.1.2.六、檢查啓動是否成功

發現三臺機器都有「QuorumPeerMain」進程,說明機器已經啓動成功了。

檢查集羣是否正常:

zkServer.sh status

發現,集羣運行一切正常。

4.二、安裝Kafka

4.2.一、單機版Kafka安裝

第一步:上傳Kafka安裝包而且解壓


rz 上傳kafka_2.11-1.1.0.tgz到 /export/software/
cd /export/software/
tar -xvf kafka_2.11-1.1.0.tgz -C /export/servers/
cd /export/servers
mv kafka_2.11-1.1.0/ kafka

第二步:配置環境變量


vim /etc/profile

#輸入以下內容
export KAFKA_HOME=/export/servers/kafka
export PATH=${KAFKA_HOME}/bin:$PATH

#保存退出
source /etc/profile

第三步:修改配置文件


cd /export/servers/kafka
cd config
vim server.properties

# The id of the broker. This must be set to a unique integer for each broker.
# 必需要只要一個brokerid,而且它必須是惟一的。
broker.id=0

# A comma separated list of directories under which to store log files
# 日誌數據文件存儲的路徑 (如不存在,須要手動建立該目錄, mkdir -p /export/data/kafka/)
log.dirs=/export/data/kafka

# ZooKeeper的配置,本地模式下指向到本地的ZooKeeper服務便可
zookeeper.connect=node01:2181

# 保存退出

第四步:啓動kafka服務


# 以守護進程的方式啓動kafka
kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties

第五步:檢測kafka是否啓動

若是進程中有名爲kafka的進程,就說明kafka已經啓動了。

4.2.二、驗證kafka是否安裝成功

因爲kafka是將元數據保存在ZooKeeper中的,因此,能夠經過查看ZooKeeper中的信息進行驗證kafka是否安裝成功。

4.2.三、部署kafka-manager

Kafka Manager 由 yahoo 公司開發,該工具能夠方便查看集羣 主題分佈狀況,同時支持對 多個集羣的管理、分區平衡以及建立主題等操做。

源碼託管於github:https://github.com/yahoo/kafka-manager

第一步:上傳Kafka-manager安裝包而且解壓


rz上傳kafka-manager-1.3.3.17.tar.gz到 /export/software/
cd /export/software
tar -xvf kafka-manager-1.3.3.17.tar.gz -C /export/servers/
cd /export/servers/kafka-manager-1.3.3.17/conf

第二步:修改配置文件


#修改配置文件
vim application.conf
#新增項,http訪問服務的端口
http.port=19000
#修改爲本身的zk機器地址和端口
kafka-manager.zkhosts="node01:2181"
#保存退出

第三步:啓動服務


cd /export/servers/kafka-manager-1.3.3.17/bin
#啓動服務
./kafka-manager -Dconfig.file=../conf/application.conf

#製做啓動腳本
vim /etc/profile
export KAFKA_MANAGE_HOME=/export/servers/kafka-manager-1.3.3.17
export PATH=${KAFKA_MANAGE_HOME}/bin:$PATH

source /etc/profile

cd /export/servers/onekey/
mkdir kafka-manager
cd kafka-manager
vim start-kafka-manager.sh
nohup kafka-manager -Dconfig.file=${KAFKA_MANAGE_HOME}/conf/application.conf >/dev/null 2>&1 &
chmod +x start-kafka-manager.sh
vim /etc/profile
export PATH=${ZK_ONEKEY}/kafka-manager:$PATH
source /etc/profile

第四步:檢查是否啓動成功

打開瀏覽器,輸入地址:http://node01:19000/,便可看到kafka-manage管理界面。

 

4.2.四、kafka-manager的使用

進入管理界面,是沒有顯示Cluster信息的,須要添加後才能操做。

  • 添加 Cluster:

 

 

輸入Cluster Name、ZooKeeper信息、以及Kafka的版本信息(這裏最高只能選擇1.0.0)。

點擊Save按鈕保存。

添加成功。

  • 查看kafka的信息

  • 查看Broker信息

  • 查看Topic列表

  • 查看單個topic信息以及操做

  • 優化副本選舉

  • 查看消費者信息

4.2.五、搭建kafka集羣

kafka集羣的搭建是很是簡單的,只須要將上面的單機版的kafka分發的其餘機器,而且將ZooKeeper信息修改爲集羣的配置以及設置不一樣的broker值便可。

第一步:將kafka分發到node0二、node03


cd /export/servers/
scp -r kafka node02:/export/servers/
scp -r kafka node03:/export/servers/
scp /etc/profile node02:/etc/
scp /etc/profile node03:/etc/
# 分別到node0二、node03機器上執行
source /etc/profile

第二步:修改node0一、node0二、node03上的kafka配置文件

  • node01:


    cd /export/servers/kafka/config
    vim server.properties
    zookeeper.connect=node01:2181,node02:2181,node03:2181
  • node02:


    cd /export/servers/kafka/config
    vim server.properties
    broker.id=1
    zookeeper.connect=node01:2181,node02:2181,node03:2181
  • node03:


    cd /export/servers/kafka/config
    vim server.properties
    broker.id=2
    zookeeper.connect=node01:2181,node02:2181,node03:2181

第三步:編寫一鍵啓動、中止腳本。注意:該腳本依賴於環境變量中的KAFKA_HOME。


mkdir -p /export/servers/onekey/kafka
vim slave
#輸入以下內容
node01
node02
node03
#保存退出

vim start-kafka.sh
#輸入以下內容
cat /export/servers/onekey/kafka/slave | while read line
do
{
echo "開始啓動 --> "$line
ssh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties >/dev/null 2>&1 &"
}&
wait
done
echo "★★★啓動完成★★★"
#保存退出
chmod +x start-kafka.sh

vim stop-kafka.sh
#輸入以下內容
cat /export/servers/onekey/kafka/slave | while read line
do
{
echo "開始中止 --> "$line
ssh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-stop.sh >/dev/null 2>&1 &"
}&
wait
done
echo "★★★中止完成★★★"
#保存退出
chmod +x stop-kafka.sh

#加入到環境變量中
export PATH=${ZK_ONEKEY}/kafka:$PATH
source /etc/profile

第四步:經過kafka-manager管理工具查看集羣信息。

因而可知,kafka集羣已經啓動完成。

 

五、Kafka快速入門

對kafka的操做有2種方式,一種是經過命令行方式,一種是經過API方式。

5.一、經過命令行Kafka

Kafka在bin目錄下提供了shell腳本文件,能夠對Kafka進行操做,分別是:

經過命令行的方式,咱們將體驗下kafka,以便咱們對kafka有進一步的認知。

5.1.一、topic的操做

5.1.1.一、建立topic

kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic

#執行結果:
Created topic "my-kafka-topic".

參數說明:

  • zookeeper:參數是必傳參數,用於配置 Kafka 集羣與 ZooKeeper 鏈接地址。至少寫一個。

  • partitions:參數用於設置主題分區數,該配置爲必傳參數。

  • replication-factor:參數用來設置主題副本數 ,該配置也是必傳參數。

  • topic:指定topic的名稱。

5.1.1.二、查看topic列表

kafka-topics.sh --list --zookeeper node01:2181

__consumer_offsets
my-kafka-topic

能夠查看列表。

若是須要查看topic的詳細信息,須要使用describe命令。


kafka-topics.sh --describe --zookeeper node01:2181 --topic test-topic
#若不指定topic,則查看全部topic的信息
kafka-topics.sh --describe --zookeeper node01:2181
5.1.1.三、刪除topic

經過kafka-topics.sh執行刪除動做,須要在server.properties文件中配置 delete.topic.enable=true,該配置默認爲 false。

不然執行該腳本並未真正刪除主題 ,將該topic標記爲刪除狀態 。


kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic

# 執行以下
[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic
Topic my-kafka-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

# 若是將delete.topic.enable=true
[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic2
Topic my-kafka-topic2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

# 說明:雖然設置後,刪除時依然提示沒有設置爲true,實際上已經刪除了。

5.1.二、生產者的操做


kafka-console-producer.sh --broker-list node01:9092 --topic my-kafka-topic

能夠看到,已經向topic發送了消息。

5.1.三、消費者的操做


kafka-console-consumer.sh --bootstrap-server node01:9092 --topic my-kafka-topic
# 經過以上命令,能夠看到消費者能夠接收生產者發送的消息

# 若是須要從頭開始接收數據,須要添加--from-beginning參數
kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic my-kafka-topic

5.二、經過Java Api操做Kafka

除了經過命令行的方式操做kafka外,還能夠經過Java api的方式操做,這種方式將更加的經常使用。

5.2.一、建立工程

導入依賴:


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <parent>
       <artifactId>bigdata</artifactId>
       <groupId>cn.bigdata</groupId>
       <version>1.0.0-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>

   <artifactId>bigdata-kafka</artifactId>

   <dependencies>

       <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka_2.11</artifactId>
           <version>1.1.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka-clients</artifactId>
           <version>1.1.0</version>
       </dependency>
       <dependency>
           <groupId>junit</groupId>
           <artifactId>junit</artifactId>
           <version>4.12</version>
       </dependency>

   </dependencies>
   
   <build>
       <plugins>
           <!-- java編譯插件 -->
           <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-compiler-plugin</artifactId>
               <version>3.2</version>
               <configuration>
                   <source>1.8</source>
                   <target>1.8</target>
                   <encoding>UTF-8</encoding>
               </configuration>
           </plugin>
       </plugins>
   </build>


</project>

5.2.二、topic的操做

因爲主題的元數據信息是註冊在 ZooKeeper 相 應節點之中,因此對主題的操做實質是對 ZooKeeper 中記錄主題元數據信息相關路徑的操做。 Kafka將對 ZooKeeper 的相關操做封裝成一 個 ZkUtils 類 , 井封裝了一個AdrninUtils 類調用 ZkClient 類的相關方法以實現對 Kafka 元數據 的操做,包括對主題、代理、消費者等相關元數據的操做。對主題操做的相關 API調用較簡單, 相應操做都是經過調用 AdminUtils類的相應方法來完成的。


package cn. .kafka;

import kafka.admin.AdminUtils;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;

import java.util.Properties;

public class TestKafkaTopic {

   @Test
   public void testCreateTopic() {
       ZkUtils zkUtils = null;
       try {
           //參數:zookeeper的地址,session超時時間,鏈接超時時間,是否啓用zookeeper安全機制
           zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());

           String topicName = "my-kafka-topic-test1";
           if (!AdminUtils.topicExists(zkUtils, topicName)) {
               //參數:zkUtils,topic名稱,partition數量,副本數量,參數,機架感知模式
               AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), AdminUtils.createTopic$default$6());
               System.out.println(topicName + " 建立成功!");
          } else {
               System.out.println(topicName + " 已存在!");
          }
      } finally {
           if (null != zkUtils) {
               zkUtils.close();
          }
      }

  }
}

測試結果:

5.2.2.一、刪除topic

   @Test
   public void testDeleteTopic() {
       ZkUtils zkUtils = null;
       try {
           //參數:zookeeper的地址,session超時時間,鏈接超時時間,是否啓用zookeeper安全機制
           zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());
           String topicName = "my-kafka-topic-test1";
           if (AdminUtils.topicExists(zkUtils, topicName)) {
               //參數:zkUtils,topic名稱
               AdminUtils.deleteTopic(zkUtils, topicName);
               System.out.println(topicName + " 刪除成功!");
          } else {
               System.out.println(topicName + " 不已存在!");
          }
      } finally {
           if (null != zkUtils) {
               zkUtils.close();
          }
      }

  }

測試結果:

5.2.三、生產者的操做


package cn. .kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;

import java.util.Properties;

public class TestProducer {

   @Test
   public void testProducer() throws InterruptedException {
       Properties config = new Properties();

       // 設置kafka服務列表,多個用逗號分隔
       config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
       // 設置序列化消息 Key 的類
       config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
       // 設置序列化消息 value 的類
       config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

       // 初始化
       KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(config);
       for (int i = 0; i < 100 ; i++) {
           ProducerRecord record = new ProducerRecord("my-kafka-topic","data-" + i);
           // 發送消息
           kafkaProducer.send(record);
           System.out.println("發送消息 --> " + i);

           Thread.sleep(100);
      }

       kafkaProducer.close();

  }

}

5.2.四、消費者的操做


package cn. .kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;

import javax.sound.midi.Soundbank;
import java.util.Arrays;
import java.util.Properties;

public class TestConsumer {

   @Test
   public void testConsumer() {
       Properties config = new Properties();
       // 設置kafka服務列表,多個用逗號分隔
       config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
       // 設置消費者分組id
       config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
       // 設置序反列化消息 Key 的類
       config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
       // 設置序反列化消息 value 的類
       config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());


       KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
       // 訂閱topic
       kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic"));

       while (true) { // 使用死循環不斷的拉取數據
           ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
           for (ConsumerRecord<String, String> record : records) {
               String value = record.value();
               long offset = record.offset();
               System.out.println("value = " + value + ", offset = " + offset);
          }
      }

  }
}

六、Kafka高級

6.一、生產者的同步和異步模式

 

  • 異步方式:兩個 send 方法都返回一個 Future<RecordMetadata>對象,即只負責將消息 發送到消息緩衝區,並不等待 Sender線程處理結果,若但願瞭解異步方式消息發送成功與否 ,能夠在回調函數中進行相應處理, 當消息被 Sender線程處理後會回調 Callback。

  • 同步方式:經過調用 send方法返回的 Future對象的 get()方法以阻塞式獲取執行結果, 即等待 Sender線程處理的最終結果。

  • 默認採用的異步方式。

6.1.一、異步實現


package cn. .kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;

import java.util.Properties;

public class TestAsyncProducer {

   @Test
   public void testProducer() throws InterruptedException {
       Properties config = new Properties();

       // 設置kafka服務列表,多個用逗號分隔
       config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
       // 設置序列化消息 Key 的類
       config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
       // 設置序列化消息 value 的類
       config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
       //緩衝區數量
       config.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "1000");
       //等待時間
       config.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10000");

       // 初始化
       KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(config);
       for (int i = 0; i < 10 ; i++) {
           ProducerRecord record = new ProducerRecord("my-kafka-topic","data-" + i);
           // 發送消息
           kafkaProducer.send(record, new Callback() {
               @Override
               public void onCompletion(RecordMetadata metadata, Exception exception) {
                   System.out.println("消息的callbakc --> " + metadata);
              }
          });
           System.out.println("發送消息 --> " + i);

           Thread.sleep(100);
      }

       kafkaProducer.close();

  }

}

測試的結果:

發送消息 --> 0
發送消息 --> 1
發送消息 --> 2
發送消息 --> 3
發送消息 --> 4
發送消息 --> 5
發送消息 --> 6
發送消息 --> 7
發送消息 --> 8
發送消息 --> 9
消息的callbakc --> my-kafka-topic-0@-1
消息的callbakc --> my-kafka-topic-0@-1
消息的callbakc --> my-kafka-topic-0@-1
消息的callbakc --> my-kafka-topic-0@-1
消息的callbakc --> my-kafka-topic-0@-1
消息的callbakc --> my-kafka-topic-0@-1
消息的callbakc --> my-kafka-topic-0@-1
消息的callbakc --> my-kafka-topic-0@-1
消息的callbakc --> my-kafka-topic-0@-1
消息的callbakc --> my-kafka-topic-0@-1

Process finished with exit code 0



6.1.二、同步實現


package cn. .kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class TestSyncProducer {

   @Test
   public void testProducer() throws InterruptedException, ExecutionException {
       Properties config = new Properties();

       // 設置kafka服務列表,多個用逗號分隔
       config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
       // 設置序列化消息 Key 的類
       config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
       // 設置序列化消息 value 的類
       config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

       // 初始化
       KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(config);
       for (int i = 0; i < 10 ; i++) {
           ProducerRecord record = new ProducerRecord("my-kafka-topic","data-" + i);
           // 發送消息
           Future future = kafkaProducer.send(record, new Callback() {
               @Override
               public void onCompletion(RecordMetadata metadata, Exception exception) {
                   System.out.println("消息的callbakc --> " + metadata);
              }
          });
           future.get(); //同步模式進行阻塞
           System.out.println("發送消息 --> " + i);

           Thread.sleep(100);
      }

       kafkaProducer.close();

  }

}

測試:

消息的callbakc --> my-kafka-topic-0@3128
發送消息 --> 0
消息的callbakc --> my-kafka-topic-0@3129
發送消息 --> 1
消息的callbakc --> my-kafka-topic-0@3130
發送消息 --> 2
消息的callbakc --> my-kafka-topic-0@3131
發送消息 --> 3
消息的callbakc --> my-kafka-topic-0@3132
發送消息 --> 4
消息的callbakc --> my-kafka-topic-0@3133
發送消息 --> 5
消息的callbakc --> my-kafka-topic-0@3134
發送消息 --> 6
消息的callbakc --> my-kafka-topic-0@3135
發送消息 --> 7
消息的callbakc --> my-kafka-topic-0@3136
發送消息 --> 8
消息的callbakc --> my-kafka-topic-0@3137
發送消息 --> 9

Process finished with exit code 0



6.二、消費者組

在 Kafka 中每個消費者都屬於一個特定消費組( ConsumerGroup),咱們能夠爲每一個消費者指定一個消費組,以 groupld 表明消費組名稱,經過 group.id 配置設置 。 若是不指定消費組,則 該消費者屬於默 認消費組 test-consumer-group。

須要重點說明的是:

  • 同一個主題的一條消息只能同一個消費者組下的某一個消費者消費。

  • 不一樣消費組的消費者可同時消費該消息。

6.2.一、測試:同一個消息只能爲同組的一個消費者消費

消費者1: my-client-1


package cn. .kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import org.springframework.kafka.listener.MessageListener;

import javax.sound.midi.Soundbank;
import java.util.Arrays;
import java.util.Properties;

public class TestConsumer {

   @Test
   public void testConsumer() {
       Properties config = new Properties();
       // 設置kafka服務列表,多個用逗號分隔
       config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
       // 設置消費者分組id
       config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-test-2");
       // 設置序反列化消息 Key 的類
       config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
       // 設置序反列化消息 value 的類
       config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());


       KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
       // 訂閱topic
       kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic"));

       while (true) { // 使用死循環不斷的拉取數據
           ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
           for (ConsumerRecord<String, String> record : records) {
               String value = record.value();
               long offset = record.offset();
               System.out.println("value = " + value + ", offset = " + offset);
          }
      }
  }
}

消費者2:my-client-2


package cn. .kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;

import java.util.Arrays;
import java.util.Properties;

public class TestConsumer2 {

   @Test
   public void testConsumer() {
       Properties config = new Properties();
       // 設置kafka服務列表,多個用逗號分隔
       config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
       // 設置消費者分組id
       config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-test-2");
       // 設置序反列化消息 Key 的類
       config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
       // 設置序反列化消息 value 的類
       config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());


       KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
       // 訂閱topic
       kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic"));

       while (true) { // 使用死循環不斷的拉取數據
           ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
           for (ConsumerRecord<String, String> record : records) {
               String value = record.value();
               long offset = record.offset();
               System.out.println("value = " + value + ", offset = " + offset);
          }
      }

  }
}

測試:

分別啓動消費者1和消費者2,而後在控制檯輸入消息,觀察消費者1和消費者2的控制檯打印狀況。

6.2.二、測試:不一樣組的消費能夠獲取相同的數據

消費者1:my-client-11在my-group-test-3組中


package cn. .kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import org.springframework.kafka.listener.MessageListener;

import javax.sound.midi.Soundbank;
import java.util.Arrays;
import java.util.Properties;

public class TestConsumer {

   @Test
   public void testConsumer() {
       Properties config = new Properties();
       // 設置kafka服務列表,多個用逗號分隔
       config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
       // 設置消費者分組id
       config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-test-3");
       // 設置序反列化消息 Key 的類
       config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
       // 設置序反列化消息 value 的類
       config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());


       KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
       // 訂閱topic
       kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic"));

       while (true) { // 使用死循環不斷的拉取數據
           ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
           for (ConsumerRecord<String, String> record : records) {
               String value = record.value();
               long offset = record.offset();
               System.out.println("value = " + value + ", offset = " + offset);
          }
      }

  }
}

消費者2:my-client-22在my-group-test-4組中




import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;

import java.util.Arrays;
import java.util.Properties;

public class TestConsumer2 {

   @Test
   public void testConsumer() {
       Properties config = new Properties();
       // 設置kafka服務列表,多個用逗號分隔
       config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
       // 設置消費者分組id
       config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-test-4");
       // 設置序反列化消息 Key 的類
       config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
       // 設置序反列化消息 value 的類
       config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());


       KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
       // 訂閱topic
       kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic"));

       while (true) { // 使用死循環不斷的拉取數據
           ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
           for (ConsumerRecord<String, String> record : records) {
               String value = record.value();
               long offset = record.offset();
               System.out.println("value = " + value + ", offset = " + offset);
          }
      }

  }
}

測試:

因而可知,2個消費者獲取到的消息是同樣的。

6.2.三、總結

消費組是 Kafka 用來實現對一個主題消息進行廣播和單播的手段,實現消息廣播只需指定各消費者均屬於不一樣的消費組,消息單播則只 需讓各消費者屬於同 一個消費組 。

6.三、分區和副本

6.3.一、分區

6.3.1.一、什麼是分區?

  • 建立topic時,若是不指定分區,那麼,topic的數據只是在一個broker中存儲。

    • 問題:若是一臺機器存儲不下怎麼辦?

  • 建立topic時,能夠指定分區,將一個topic的數據分散存儲到多個broker,實現分區存儲,從而解決一個機器存儲不下的問題,也就實現了存儲的橫向擴展。

6.3.1.二、如何設置分區?

第一種:經過腳本建立topic時指定


kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 3 --topic my-kafka-topic-3

第二種:在經過java api建立topic時指定


//參數:zkUtils,topic名稱,partition數量,副本數量,參數,機架感知模式
               AdminUtils.createTopic(zkUtils, topicName, 3, 1, new Properties(), AdminUtils.createTopic$default$6());
6.3.1.三、分區和消費者的關係(負載均衡機制)

說明:

  • 同一個partition的數據能夠被不一樣的消費組獲取。

  • 同一個組內的消費者不能同時消費同一個partition。

  • 同一個組內的消費者能夠消費不一樣的partition,也就是說,同一個組內的消費者數 小於等於 partition數,不能大於。

    • 若是同組內的消費者數大於partition數,那麼必定會有消費者是空閒的。

6.3.二、副本

6.3.2.一、什麼是副本?

  • 建立topic時,若是不指定副本,那麼一個partition只會在一個Broker中存儲

    • 問題:若是該機器宕機,那麼topic的數據將丟失。

  • 建立topic時,若是設置副本,那麼kafka將在其餘分區中保存該分區的數據,已確保數據的可靠性。

  • 在多個副本中,kafka會選取一個做爲leader提供服務。其它的做爲Follower節點存在。

6.3.2.二、如何設置副本?

第一種:經過腳本建立topic時指定


kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic my-kafka-topic-4

第二種:在經過java api建立topic時指定


//參數:zkUtils,topic名稱,partition數量,副本數量,參數,機架感知模式
               AdminUtils.createTopic(zkUtils, topicName, 3, 3, new Properties(), AdminUtils.createTopic$default$6());
6.3.2.三、副本之間數據同步是否會丟失數據?

消息的生產者將消息發送到leader,Follower會進行同步消息,若是消息還未同步完成,leader宕機,消息會丟失嗎?

其實, Kafka爲生產者提供3種消息確認機制(acks),分別是,0,-1,1,默認爲:1。

( 1) 當 acks=0時,生產者不用等待代理返回確認信息,而連續發送消息。顯然這種 方式加快了消息投遞的速度,然而沒法保證消息是否己被代理接受 ,有可能存在丟失數據 的風險。

(2)當 acsk=1時,生產者須要等待 Leader 己成功將消息寫入日誌文件中。這種方式 在必定程度上下降了數據丟失的可能性,但仍沒法保證數據必定不會丟失。若是在 Leader副本 成功存儲數據後, Follower 副本尚未來得及進行同步,而此時 Leader 着機了,那麼此時雖 然數據己進行了存儲,因爲原來的 Leader 己不可用而會從集羣中下線,同時存活的代理又不再會有從原來的 Leader副本存儲的數據,此時數據就會丟失。

(3)當 acks=-1 時, Leader副本和全部 ISR列表中的副本都完成數據存儲時纔會向生產者 發送確認信息,這種策略保證只要 Leader 副本和 Follower 副本中至少有一個節點存活,數據就 不會丟失。爲了保證數據不丟失,須要保證同步的副本至少大於1,經過參數 min.insync.replicas 設置,當同步副本數不足此配置值時,生產者會拋出異常,但這種方式同時也影響了生產者發 送消息的速度以及吞吐量。

6.四、消費者如何保證消息不丟失?

在kafka0.8.2版本以後,消費者將消費記錄(offset)值,保存在名爲__consumer_offsets的topic中。

因此,只要可以正確記錄offset值,就能保證消息不丟失,可是有可能消息會重複消費。

6.五、Kafka的文件存儲機制

6.5.一、Kafka 文件存儲基本結構

  • 在Kafka 文件存儲中,同一個topic 下有多個不一樣partition,每一個partition 爲一個目錄,partiton命名規則爲topic 名稱+有序序號,第一個partiton 序號從0 開始,序號最大值爲partitions數量減1。

  • 每一個partion(目錄)至關於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每一個段segment file 消息數量不必定相等,這種特性方便old segment file 快速被刪除。默認保留7 天的數據。

6.5.二、Segment文件

  • Segment file 組成:由3大部分組成,分別爲index file 、data file、timeindex file,此3個文件一一對應,分別表示爲segment 索引文件、數據文件、時間日誌。

  • 當log文件等於1G時,新的會寫入到下一個segment中。

  • Segment 文件命名規則:partion 全局的第一個segment 從0 開始,後續每一個segment文件名爲上一個segment 文件最後一條消息的offset 值。數值最大爲64 位long 大小,19 位數字字符長度,沒有數字用0 填充。

  • 索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message 的物理偏移地址。

上述圖中索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message 的物理偏移地址。其中以索引文件中元數據3,497 爲例,依次在數據文件中表示第3 個message(在全局partiton表示第368772 個message)、以及該消息的物理偏移地址爲497。

6.5.三、Kafka 查找message

若是須要讀取offset=368776 的message,如何查找?

6.5.3.一、查找segment file

00000000000000000000.index 表示最開始的文件,起始偏移量(offset)爲000000000000000368769.index 的消息量起始偏移量爲368770 = 368769 + 100000000000000737337.index 的起始偏移量爲737338=737337 + 1其餘後續文件依次類推。以起始偏移量命名並排序這些文件,只要根據offset 二分查找文件列表,就能夠快速定位到具體文件。當offset=368776 時定位到00000000000000368769.index 和對應log 文件。

6.5.3.二、經過segment file 查找message

當offset=368776 時, 依次定位到00000000000000368769.index 的元數據物理位置和00000000000000368769.log 的物理偏移地址,而後再經過00000000000000368769.log 順序查找直到offset=368776 爲止。

6.5.四、思考?

kafka爲何要將數據進行分段存儲,好處是什麼?

  • 讀寫數據速度快

    • 小文件必定比大文件快

  • 對於舊數據,清除起來方便

6.六、生產者數據分發策略

kafka在數據生產的時候,有一個數據分發策略。默認的狀況使用DefaultPartitioner.class類。

分發策略有三種:

  1. 若是是用戶指定了partition,生產就不會調用DefaultPartitioner.partition()方法

  2. 當用戶指定key,使用hash算法。Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions

    1. 若是key一直不變,同一個key算出來的hash值是個固定值。

    2. 若是是固定值,這種hash取模就沒有意義。

  3. 當用既沒有指定partition也沒有key,就採用輪詢算法。

源碼:


public class KafkaProducer<K, V> implements Producer<K, V> {
  。。。。。。。
     
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
       Integer partition = record.partition();
       return partition != null ?
               partition :
               partitioner.partition(
                       record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
  }
   

 

DefaultPartitioner.java文件:


   public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
       List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
       int numPartitions = partitions.size();
       if (keyBytes == null) {
           // 輪詢算法
           int nextValue = nextValue(topic);
           List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
           if (availablePartitions.size() > 0) {
               int part = Utils.toPositive(nextValue) % availablePartitions.size();
               return availablePartitions.get(part).partition();
          } else {
               // no partitions are available, give a non-available partition
               return Utils.toPositive(nextValue) % numPartitions;
          }
      } else {
           // hash the keyBytes to choose a partition hash算法
           return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
      }
  }

 

 



相關文章
相關標籤/搜索