Kafka入門寶典(詳細截圖版)

file

一、瞭解 Apache Kafka

1.一、簡介

file

官網:http://kafka.apache.org/java

  • Apache Kafka 是一個開源消息系統,由Scala 寫成。是由Apache 軟件基金會開發的一個開源消息系統項目。
  • Kafka 最初是由LinkedIn 開發,並於2011 年初開源。2012 年10 月從Apache Incubator 畢業。該項目的目標是爲處理實時數據提供一個統1、高通量、低等待(低延時)的平臺。
  • Kafka 是一個分佈式消息系統:具備生產者、消費者的功能。它提供了相似於JMS 的特性,可是在設計實現上徹底不一樣,此外它並非JMS 規範的實現。【重點】

1.二、kafka的基本結構

file

  • Producer:消息的發送者
  • Consumer:消息的接收者
  • kafka cluster:kafka的集羣。
  • Topic:就是消息類別名,一個topic中一般放置一類消息。每一個topic都有一個或者多個訂閱者(消費者)。

消息的生產者將消息推送到kafka集羣,消息的消費者從kafka集羣中拉取消息。node

1.三、kafka的完整架構

file

說明:git

  • broker:集羣中的每個kafka實例,稱之爲broker;
  • ZooKeeper:Kafka 利用ZooKeeper 保存相應元數據信息, Kafka 元數據信息包括如代理節點信息、Kafka集羣信息、舊版消費者信息及其消費偏移量信息、主題信息、分區狀態信息、分區副本分配方案信息、動態配置信息等。
  • ConsumerGroup:在Kafka 中每個消費者都屬於一個特定消費組( ConsumerGroup ),咱們能夠爲每一個消費者指定一個消費組,以groupld 表明消費組名稱,經過group.id 配置設置。若是不指定消費組,則該消費者屬於默認消費組test-consumer-group 。

1.四、kafka的特性

  • 消息持久化
    • Kafka 基於文件系統來存儲和緩存消息。
  • 高吞吐量
    • Kafka 將數據寫到磁盤,充分利用磁盤的順序讀寫。同時, Kafka 在數據寫入及數據同步採用了*零拷貝*( zero-copy )技術,採用sendFile()函數調用,sendFile()函數是在兩個文件描述符之間直接傳遞數據,徹底在內核中操做,從而避免了內核緩衝區與用戶緩衝區之間數據的拷貝,操做效率極高。
    • Kafka 還支持數據壓縮及批量發送,同時Kafka 將每一個主題劃分爲多個分區,這一系列的優化及實現方法使得Kafka 具備很高的吞吐量。經大多數公司對Kafka 應用的驗證, Kafka 支持每秒數百萬級別的消息
  • 高擴展性
    • Kafka 依賴ZooKeeper來對集羣進行協調管理,這樣使得Kafka 更加容易進行水平擴展,生產者、消費者和代理都爲分佈式,可配置多個。
    • 同時在機器擴展時無需將整個集羣停機,集羣可以自動感知,從新進行負責均衡及數據複製。
  • 多客戶端支持
    • Kafka 核心模塊用Scala 語言開發,Kafka 提供了多種開發語言的接入,如Java 、Scala、C 、C++、Python 、Go 、Erlang 、Ruby 、Node. 等。
  • 安全機制
    • Kafka 支持如下幾種安全措施:
      • 經過SSL 和SASL(Kerberos), SASL/PLA時驗證機制支持生產者、消費者與broker鏈接時的身份認證;
      • 支持代理與ZooKeeper 鏈接身份驗證;
      • 通訊時數據加密;
      • 客戶端讀、寫權限認證;
      • Kafka 支持與外部其餘認證受權服務的集成;
  • 數據備份
    • Kafka 能夠爲每一個topic指定副本數,對數據進行持久化備份,這能夠必定程度上防止數據丟失,提升可用性。
  • 輕量級
    • Kafka 的實例是無狀態的,即broker不記錄消息是否被消費,消費偏移量的管理交由消費者本身或組協調器來維護。
    • 同時集羣自己幾乎不須要生產者和消費者的狀態信息,這就使得Kafka很是輕量級,同時生產者和消費者客戶端實現也很是輕量級。
  • 消息壓縮
    • Kafka 支持Gzip, Snappy 、LZ4 這3 種壓縮方式,一般把多條消息放在一塊兒組成MessageSet,而後再把Message Set 放到一條消息裏面去,從而提升壓縮比率進而提升吞吐量。

1.五、kafka的應用場景

  • 消息系統。
    • Kafka 做爲一款優秀的消息系統,具備高吞吐量、內置的分區、備份冗餘分佈式等特色,爲大規模消息處理提供了一種很好的解決方案。
  • 應用監控。
    • 利用Kafka 採集應用程序和服務器健康相關的指標,如CPU 佔用率、IO 、內存、鏈接數、TPS 、QPS 等,而後將指標信息進行處理,從而構建一個具備監控儀表盤、曲線圖等可視化監控系統。例如,不少公司採用Kafka 與ELK (Elastic Search 、Logstash 和Kibana)整合構建應用服務監控系統。
  • 網站用戶行爲追蹤。
    • 爲了更好地瞭解用戶行爲、操做習慣,改善用戶體驗,進而對產品升級改進,將用戶操做軌跡、內容等信息發送到Kafka 集羣上,經過Hadoop 、Spark 或Strom等進行數據分析處理,生成相應的統計報告,爲推薦系統推薦對象建模提供數據源,進而爲每一個用戶進行個性化推薦。
  • 流處理。
    • 須要將己收集的流數據提供給其餘流式計算框架進行處理,用Kafka 收集流數據是一個不錯的選擇。
  • 持久性日誌。
    • Kafka 能夠爲外部系統提供一種持久性日誌的分佈式系統。日誌能夠在多個節點間進行備份, Kafka 爲故障節點數據恢復提供了一種從新同步的機制。同時, Kafka很方便與HDFS 和Flume 進行整合,這樣就方便將Kafka 採集的數據持久化到其餘外部系統。

二、Kafka的安裝與配置

準備三臺虛擬機,分別是node01,node02,node03,而且修改hosts文件以下:github

~~~shellvim /etc/hostsshell

注意: 前面的ip地址改爲本身的ip地址

192.168.40.133 node01192.168.40.134 node02192.168.40.135 node03apache

3臺服務器的時間要一致

時間更新:

yum install -y rdaterdate -s time-b.nist.gov~~~bootstrap

2.一、基礎環境配置

2.1.一、JDK環境

因爲Kafka 是用Scala 語言開發的,運行在JVM上,所以在安裝Kafka 以前須要先安裝JDK 。vim

安裝過程略過,我這裏使用的是jdk1.8。api

file

2.1.二、ZooKeeper環境

2.1.2.一、安裝ZooKeeper

Kafka 依賴ZooKeeper ,經過ZooKeeper 來對服務節點、消費者上下線管理、集羣、分區元數據管理等,所以ZooKeeper 也是Kafka 得以運行的基礎環境之一。瀏覽器

#上傳zookeeper-3.4.9.tar.gz到/export/software
cd /export/software
mkdir -p /export/servers/
tar -xvf zookeeper-3.4.9.tar.gz -C /export/servers/
#建立ZooKeeper的data目錄
mkdir /export/data/zookeeper -p
cd /export/servers/zookeeper-3.4.9/conf/
#修改配置文件
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
#設置data目錄
dataDir=/export/data/zookeeper
#啓動ZooKeeper
./zkServer.sh start
#檢查是否啓動成功
jps複製代碼

2.1.2.三、搭建ZooKeeper集羣

~~~shell

在/export/data/zookeeper目錄中建立myid文件

vim /export/data/zookeeper/myid

寫入對應的節點的id,如:1,2等,保存退出

在conf下,修改zoo.cfg文件

vim zoo.cfg

添加以下內容

server.1=node01:2888:3888server.2=node02:2888:3888server.3=node03:2888:3888~~~

2.1.2.三、配置環境變量

~~~shellvim /etc/profileexport ZK_HOME=/export/servers/zookeeper-3.4.9export PATH=${ZK_HOME}/bin:$PATH

當即生效

source /etc/profile~~~

2.1.2.四、分發到其它機器

~~~shellscp /etc/profile node02:/etc/scp /etc/profile node03:/etc/

cd /export/serversscp -r zookeeper-3.4.9 node02:/export/servers/scp -r zookeeper-3.4.9 node03:/export/servers/~~~

2.1.2.五、一鍵啓動、中止腳本

~~~shellmkdir -p /export/servers/onekey/zkvim slave

輸入以下內容

node01node02node03

保存退出

vim startzk.sh

輸入以下內容

cat /export/servers/onekey/zk/slave | while read linedo{echo "開始啓動 --> "$linessh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh start >/dev/null 2>&1 &"}&waitdoneecho "★★★啓動完成★★★"

保存退出

vim stopzk.sh

輸入以下內容

cat /export/servers/onekey/zk/slave | while read linedo{echo "開始中止 --> "$linessh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh stop >/dev/null 2>&1 &"}&waitdoneecho "★★★中止完成★★★"

保存退出

設置可執行權限

chmod +x startzk.sh stopzk.sh

添加到環境變量中

export ZK_ONEKEY=/export/servers/onekeyexport PATH=${ZK_ONEKEY}/zk:$PATH~~~

2.1.2.六、檢查啓動是否成功

file

發現三臺機器都有「QuorumPeerMain」進程,說明機器已經啓動成功了。

檢查集羣是否正常:

zkServer.sh status

file

file

file

發現,集羣運行一切正常。

2.二、安裝Kafka

2.2.一、單機版Kafka安裝

第一步:上傳Kafka安裝包而且解壓

~~~shellrz 上傳kafka_2.11-1.1.0.tgz到 /export/software/cd /export/software/tar -xvf kafka_2.11-1.1.0.tgz -C /export/servers/cd /export/serversmv kafka_2.11-1.1.0/ kafka~~~

第二步:配置環境變量

~~~shellvim /etc/profile

輸入以下內容

export KAFKA_HOME=/export/servers/kafkaexport PATH=${KAFKA_HOME}/bin:$PATH

保存退出

source /etc/profile~~~

第三步:修改配置文件

~~~shellcd /export/servers/kafkacd configvim server.properties

The id of the broker. This must be set to a unique integer for each broker.

必需要只要一個brokerid,而且它必須是惟一的。

broker.id=0

A comma separated list of directories under which to store log files

日誌數據文件存儲的路徑 (如不存在,須要手動建立該目錄, mkdir -p /export/data/kafka/)

log.dirs=/export/data/kafka

ZooKeeper的配置,本地模式下指向到本地的ZooKeeper服務便可

zookeeper.connect=node01:2181

保存退出

~~~

第四步:啓動kafka服務

~~~shell

以守護進程的方式啓動kafka

kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties~~~

第五步:檢測kafka是否啓動

file

若是進程中有名爲kafka的進程,就說明kafka已經啓動了。

2.2.二、驗證kafka是否安裝成功

因爲kafka是將元數據保存在ZooKeeper中的,因此,能夠經過查看ZooKeeper中的信息進行驗證kafka是否安裝成功。

file

file

file

2.2.三、部署kafka-manager

Kafka Manager 由 yahoo 公司開發,該工具能夠方便查看集羣 主題分佈狀況,同時支持對 多個集羣的管理、分區平衡以及建立主題等操做。

源碼託管於github:https://github.com/yahoo/kafka-manager

第一步:上傳Kafka-manager安裝包而且解壓

~~~shellrz上傳kafka-manager-1.3.3.17.tar.gz到 /export/software/cd /export/softwaretar -xvf kafka-manager-1.3.3.17.tar.gz -C /export/servers/cd /export/servers/kafka-manager-1.3.3.17/conf~~~

第二步:修改配置文件

~~~shell

修改配置文件

vim application.conf

新增項,http訪問服務的端口

http.port=19000

修改爲本身的zk機器地址和端口

kafka-manager.zkhosts="node01:2181"

保存退出

~~~

第三步:啓動服務

~~~shellcd /export/servers/kafka-manager-1.3.3.17/bin

啓動服務

./kafka-manager -Dconfig.file=../conf/application.conf

製做啓動腳本

vim /etc/profileexport KAFKAMANAGEHOME=/export/servers/kafka-manager-1.3.3.17export PATH=${KAFKAMANAGEHOME}/bin:$PATH

source /etc/profile

cd /export/servers/onekey/mkdir kafka-managercd kafka-managervim start-kafka-manager.shnohup kafka-manager -Dconfig.file=${KAFKAMANAGEHOME}/conf/application.conf >/dev/null 2>&1 &chmod +x start-kafka-manager.shvim /etc/profileexport PATH=${ZK_ONEKEY}/kafka-manager:$PATHsource /etc/profile

~~~

第四步:檢查是否啓動成功

打開瀏覽器,輸入地址:http://node01:19000/,便可看到kafka-manage管理界面。

file

2.2.四、kafka-manager的使用

進入管理界面,是沒有顯示Cluster信息的,須要添加後才能操做。

  • 添加 Cluster:

file

輸入Cluster Name、ZooKeeper信息、以及Kafka的版本信息(這裏最高只能選擇1.0.0)。

file

點擊Save按鈕保存。

file

添加成功。

  • 查看kafka的信息
    file
  • 查看Broker信息
    file
  • 查看Topic列表
    file
  • 查看單個topic信息以及操做
    file
  • 優化副本選舉
    file
  • 查看消費者信息
    file

2.2.五、搭建kafka集羣

kafka集羣的搭建是很是簡單的,只須要將上面的單機版的kafka分發的其餘機器,而且將ZooKeeper信息修改爲集羣的配置以及設置不一樣的broker值便可。

第一步:將kafka分發到node0二、node03

~~~cd /export/servers/scp -r kafka node02:/export/servers/scp -r kafka node03:/export/servers/scp /etc/profile node02:/etc/scp /etc/profile node03:/etc/

分別到node0二、node03機器上執行

source /etc/profile~~~

第二步:修改node0一、node0二、node03上的kafka配置文件

  • node01:

~~~shellcd /export/servers/kafka/configvim server.propertieszookeeper.connect=node01:2181,node02:2181,node03:2181~~~

  • node02:

~~~shellcd /export/servers/kafka/configvim server.propertiesbroker.id=1zookeeper.connect=node01:2181,node02:2181,node03:2181~~~

  • node03:

~~~shellcd /export/servers/kafka/configvim server.propertiesbroker.id=2zookeeper.connect=node01:2181,node02:2181,node03:2181~~~

第三步:編寫一鍵啓動、中止腳本。注意:該腳本依賴於環境變量中的KAFKA_HOME。

~~~shellmkdir -p /export/servers/onekey/kafkavim slave

輸入以下內容

node01node02node03

保存退出

vim start-kafka.sh

輸入以下內容

cat /export/servers/onekey/kafka/slave | while read linedo{echo "開始啓動 --> "$linessh $line "source /etc/profile;nohup sh ${KAFKAHOME}/bin/kafka-server-start.sh -daemon ${KAFKAHOME}/config/server.properties >/dev/null 2>&1 &"}&waitdoneecho "★★★啓動完成★★★"

保存退出

chmod +x start-kafka.sh

vim stop-kafka.sh

輸入以下內容

cat /export/servers/onekey/kafka/slave | while read linedo{echo "開始中止 --> "$linessh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-stop.sh >/dev/null 2>&1 &"}&waitdoneecho "★★★中止完成★★★"

保存退出

chmod +x stop-kafka.sh

加入到環境變量中

export PATH=${ZK_ONEKEY}/kafka:$PATHsource /etc/profile~~~

第四步:經過kafka-manager管理工具查看集羣信息。file

因而可知,kafka集羣已經啓動完成。

三、Kafka快速入門

對kafka的操做有2種方式,一種是經過命令行方式,一種是經過API方式。

3.一、經過命令行Kafka

Kafka在bin目錄下提供了shell腳本文件,能夠對Kafka進行操做,分別是:file經過命令行的方式,咱們將體驗下kafka,以便咱們對kafka有進一步的認知。

3.1.一、topic的操做

3.1.1.一、建立topic

~~~shellkafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic

執行結果:

Created topic "my-kafka-topic".~~~

參數說明:

  • zookeeper:參數是必傳參數,用於配置 Kafka 集羣與 ZooKeeper 鏈接地址。至少寫一個。
  • partitions:參數用於設置主題分區數,該配置爲必傳參數。
  • replication-factor:參數用來設置主題副本數 ,該配置也是必傳參數。
  • topic:指定topic的名稱。
3.1.1.二、查看topic列表

~~~shellkafka-topics.sh --list --zookeeper node01:2181

__consumer_offsetsmy-kafka-topic~~~

能夠查看列表。

若是須要查看topic的詳細信息,須要使用describe命令。

~~~shellkafka-topics.sh --describe --zookeeper node01:2181 --topic test-topic

若不指定topic,則查看全部topic的信息

kafka-topics.sh --describe --zookeeper node01:2181~~~

3.1.1.三、刪除topic

經過kafka-topics.sh執行刪除動做,須要在server.properties文件中配置 delete.topic.enable=true,該配置默認爲 false。

不然執行該腳本並未真正刪除主題 ,將該topic標記爲刪除狀態 。

~~~shellkafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic

執行以下

[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topicTopic my-kafka-topic is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.

若是將delete.topic.enable=true

[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic2Topic my-kafka-topic2 is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.

說明:雖然設置後,刪除時依然提示沒有設置爲true,實際上已經刪除了。

~~~

3.1.二、生產者的操做

~~~shellkafka-console-producer.sh --broker-list node01:9092 --topic my-kafka-topic~~~

能夠看到,已經向topic發送了消息。

3.1.三、消費者的操做

~~~shellkafka-console-consumer.sh --bootstrap-server node01:9092 --topic my-kafka-topic

經過以上命令,能夠看到消費者能夠接收生產者發送的消息

若是須要從頭開始接收數據,須要添加--from-beginning參數

kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic my-kafka-topic~~~

file

3.二、經過Java Api操做Kafka

除了經過命令行的方式操做kafka外,還能夠經過Java api的方式操做,這種方式將更加的經常使用。

3.2.一、建立工程

file

導入依賴:

~~~xml itcast-bigdata cn.itcast.bigdata 1.0.0-SNAPSHOT 4.0.0

<artifactId>itcast-bigdata-kafka</artifactId>複製代碼
<dependencies>複製代碼
<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>複製代碼
</dependencies>複製代碼
<build>
        <plugins>
            <!-- java編譯插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
複製代碼

~~~

3.2.二、topic的操做

因爲主題的元數據信息是註冊在 ZooKeeper 相 應節點之中,因此對主題的操做實質是對 ZooKeeper 中記錄主題元數據信息相關路徑的操做。 Kafka將對 ZooKeeper 的相關操做封裝成一 個 ZkUtils 類 , 井封裝了一個AdrninUtils 類調用 ZkClient 類的相關方法以實現對 Kafka 元數據 的操做,包括對主題、代理、消費者等相關元數據的操做。對主題操做的相關 API調用較簡單, 相應操做都是經過調用 AdminUtils類的相應方法來完成的。

~~~javapackage cn.itcast.kafka;

import kafka.admin.AdminUtils;import kafka.utils.ZkUtils;import org.apache.kafka.common.security.JaasUtils;import org.junit.Test;

import java.util.Properties;

public class TestKafkaTopic {

@Test
    public void testCreateTopic() {
        ZkUtils zkUtils = null;
        try {
            //參數:zookeeper的地址,session超時時間,鏈接超時時間,是否啓用zookeeper安全機制
            zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());複製代碼
String topicName = "my-kafka-topic-test1";
            if (!AdminUtils.topicExists(zkUtils, topicName)) {
                //參數:zkUtils,topic名稱,partition數量,副本數量,參數,機架感知模式
                AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), AdminUtils.createTopic$default$6());
                System.out.println(topicName + " 建立成功!");
            } else {
                System.out.println(topicName + " 已存在!");
            }
        } finally {
            if (null != zkUtils) {
                zkUtils.close();
            }
        }複製代碼
}
}複製代碼

~~~

測試結果:

file

3.2.2.一、刪除topic

~~~java@Testpublic void testDeleteTopic() {ZkUtils zkUtils = null;try {//參數:zookeeper的地址,session超時時間,鏈接超時時間,是否啓用zookeeper安全機制zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());String topicName = "my-kafka-topic-test1";if (AdminUtils.topicExists(zkUtils, topicName)) {//參數:zkUtils,topic名稱AdminUtils.deleteTopic(zkUtils, topicName);System.out.println(topicName + " 刪除成功!");} else {System.out.println(topicName + " 不已存在!");}} finally {if (null != zkUtils) {zkUtils.close();}}

}
~~~複製代碼

測試結果:

file

3.2.三、生產者的操做

~~~javapackage cn.itcast.kafka;

import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import org.junit.Test;

import java.util.Properties;

public class TestProducer {

@Test
    public void testProducer() throws InterruptedException {
        Properties config = new Properties();複製代碼
// 設置kafka服務列表,多個用逗號分隔
        config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
        // 設置序列化消息 Key 的類
        config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 設置序列化消息 value 的類
        config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());複製代碼
// 初始化
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(config);
        for (int i = 0; i < 100 ; i++) {
            ProducerRecord record = new ProducerRecord("my-kafka-topic","data-" + i);
            // 發送消息
            kafkaProducer.send(record);
            System.out.println("發送消息 --> " + i);複製代碼
Thread.sleep(100);
        }複製代碼
kafkaProducer.close();複製代碼
}複製代碼

}

~~~

3.2.四、消費者的操做

~~~javapackage cn.itcast.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import org.junit.Test;

import javax.sound.midi.Soundbank;import java.util.Arrays;import java.util.Properties;

public class TestConsumer {

@Test
    public void testConsumer() {
        Properties config = new Properties();
        // 設置kafka服務列表,多個用逗號分隔
        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
        // 設置消費者分組id
        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        // 設置序反列化消息 Key 的類
        config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 設置序反列化消息 value 的類
        config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
複製代碼
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
        // 訂閱topic
        kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic"));複製代碼
while (true) { // 使用死循環不斷的拉取數據
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                String value = record.value();
                long offset = record.offset();
                System.out.println("value = " + value + ", offset = " + offset);
            }
        }複製代碼
}
}
~~~

複製代碼

什麼是Kafka?Kafka監控工具彙總Kafka快速入門Kafka核心之ConsumerKafka核心之Producer

替代Flume——Kafka Connect簡介最簡單流處理引擎——Kafka Streams簡介

更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算

file

相關文章
相關標籤/搜索