從零開始學習Kafka

簡介

kafka是一個分佈式消息隊列。具備高性能、持久化、多副本備份、橫向擴展能力。生產者往隊列裏寫消息,消費者從隊列裏取消息進行業務邏輯。通常在架構設計中起到解耦、削峯、異步處理的做用。html

Kafka核心組件-intsmaze

  •   Topic:消息根據Topic進行歸類,能夠理解爲一個隊裏。
  •   Producer:消息生產者,就是向kafka broker發消息的客戶端。
  •   Consumer:消息消費者,向kafka broker取消息的客戶端。
  •   broker:每一個kafka實例(server),一臺kafka服務器就是一個broker,一個集羣由多個broker組成,一個broker能夠容納多個topic。
  •   Zookeeper:依賴集羣保存meta信息。
      
    你們先看kafka的介紹或者教程啊,上來都顯示一堆長篇大論,各自文字圖片,看着很懵逼,頭暈。搞程序的,要讓ta跑起來,再針對可運行的成果,慢慢了解ta。因此本文會由淺入深,先實踐後理論,結合實踐講理論。java

    Kafka安裝配置

    下載

wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz

解壓

tar -zxvf kafka_2.11-2.2.0.tgz

修改 kafka-server 的配置文件node

cd kafka_2.11-2.2.0
 
vim  config/server.properties

修改其中的:web

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# A comma separated list of directories under which to store log files
log.dirs=/data/kafka-logs

啓動zk【默認端口2181】

bin/zookeeper-server-start.sh config/zookeeper.properties

image.png

啓動Kafka

使用 kafka-server-start.sh 啓動 kafka 服務:算法

bin/kafka-server-start.sh config/server.properties

image.png

測試使用

建立 topic

使用 kafka-topics.sh 建立單分區單副本的 topic demospring

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo

image.png

查看 topic 列表:apache

bin/kafka-topics.sh --list --zookeeper localhost:2181

image.png

發送消息【生產者】

使用 kafka-console-producer.sh 發送消息:bootstrap

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo

讀取消息【消費者】

使用 kafka-console-consumer.sh 接收消息並在終端打印:vim

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning

image.png
注意不要使用
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning,高版本已經不支持安全

查看描述 topics 信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo
[root@localhost kafka_2.11-2.2.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo
Topic:demo      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: demo     Partition: 0    Leader: 1       Replicas: 1     Isr: 1

image.png

第一行給出了全部分區的摘要,每一個附加行給出了關於一個分區的信息。 因爲咱們只有一個分區,因此只有一行。

  • 「Leader」: 是負責給定分區的全部讀取和寫入的節點。 每一個節點將成爲分區隨機選擇部分的領導者。
  • 「Replicas」: 是複製此分區日誌的節點列表,不管它們是不是領導者,或者即便他們當前處於活動狀態。
  • 「Isr」: 是一組「同步」副本。這是複製品列表的子集,當前活着並被引導到領導者

擴展-集羣配置

Kafka 支持兩種模式的集羣搭建:能夠在單機上運行多個 broker 實例來實現集羣,也可在多臺機器上搭建集羣,下面介紹下如何實現單機多 broker 實例集羣,其實很簡單,只須要以下配置便可。

單機多broker 集羣配置

利用單節點部署多個 broker。 不一樣的 broker 設置不一樣的 id,監聽端口及日誌目錄。 例如:

cp config/server.properties config/server-2.properties
vi config/server-2.properties

修改內容:

broker.id=2

listeners = PLAINTEXT://127.0.0.1:9093

log.dirs=/data/kafka-logs2

一樣,配置第三個broker:

cp config/server-2.properties config/server-3.properties
vi config/server-3.properties

修改內容:

broker.id=2

listeners = PLAINTEXT://127.0.0.1:9093

log.dirs=/data/kafka-logs2

listeners 申明此kafka服務器須要監聽的端口號,默認會使用localhost的地址,若是是在遠程服務器上運行則必須配置,例如:         
listeners=PLAINTEXT:// 192.168.180.128:9092
並確保服務器的9092端口可以訪問

啓動2/3 borker

bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-server-start.sh config/server-3.properties &

至此,單機多broker實例的集羣配置完畢。

擴展-多機多borker集羣

分別在多個節點按上述方式安裝 Kafka,配置啓動多個 Zookeeper 實例。

假設三臺機器 IP 地址是 : 192.168.153.135, 192.168.153.136, 192.168.153.137

分別配置多個機器上的 Kafka 服務,設置不一樣的 broker id,zookeeper.connect 設置以下:

config/server.properties裏面的 zookeeper.connect

zookeeper.connect=192.168.153.135:2181,192.168.153.136:2181,192.168.153.137:2181

使用 Kafka Connect 來導入/導出數據

從控制檯寫入數據並將其寫回控制檯是一個方便的起點,但您可能想要使用其餘來源的數據或將數據從 Kafka 導出到其餘系統。對於許多系統,您可使用 Kafka Connect 來導入或導出數據,而沒必要編寫自定義集成代碼。

Kafka Connect 是 Kafka 包含的一個工具,能夠將數據導入和導出到 Kafka。它是一個可擴展的工具,運行 鏈接器,實現與外部系統交互的自定義邏輯。在這個快速入門中,咱們將看到如何使用簡單的鏈接器運行 Kafka Connect,這些鏈接器將數據從文件導入到 Kafka topic,並將數據從 Kafka topic 導出到文件。

參考:

  • http://www.54tianzhisheng.cn/2018/01/04/Kafka/
  • http://kafka.apache.org/10/documentation/streams/quickstart
  • http://kafka.apache.org/20/documentation.html#quickstart

代碼測試

準備測試kafka

cp config/server.properties config/server-idea.properties
vi config/server-idea.properties
 
broker.id=999

listeners = PLAINTEXT://192.168.1.177:9999

log.dirs=/data/kafka-logs-999

192.168.1.177爲kafka所在機器的ip地址,9999端口號是對外提供的端口,下文會使用到

Springboot 發送消息、接受消息源碼

很簡單的一個小demo,能夠直接拷貝使用。

KafkaApplication.java:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class KafkaApplication {

    public static void main(String[] args) {

        ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);

        KafkaTemplate kafkaTemplate = context.getBean(KafkaTemplate.class);

        for (int i = 0; i < 10; i++) {
            //調用消息發送類中的消息發送方法
            kafkaTemplate.send("mytopic", System.currentTimeMillis() + "發送" + i);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @KafkaListener(topics = {"mytopic"},groupId = "halburt-demo2")
    public void consumer1(String message) {
        System.out.println("consumer1收到消息:" + message);
    }

    @KafkaListener(topics = {"mytopic"} ,groupId = "halburt-demo")
    public void consumer2(ConsumerRecord<?, ?> record) {
        System.out.println("consumer2收到消息");
        System.out.println("    topic" + record.topic());
        System.out.println("    key:" + record.key());
        System.out.println("    value:"+record.value());
    }
}

application.yml:

server:
  port: 8090
spring:
  kafka:
    consumer:
      auto-commit-interval: 100
      bootstrap-servers: 192.168.1.177:9999
      enable-auto-commit: true
      group-id: halburt-demo
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 1
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      concurrency: 5
    producer:
      bootstrap-servers: 192.168.1.177:9999
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

192.168.1.177:9999即爲kafka的配置文件中配置

pom.xml依賴:

依賴版本:

spring-boot.version:2.1.3.RELEASE
spring-kafka.version:2.2.0.RELEASE

【此處有坑】此處依賴版本能夠不用這2個版本,可是必定要注意springboot和kafka的版本對應

<dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.0.RELEASE</version>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.1.3.RELEASE</version>
        </dependency>
    </dependencies>

啓動kafka並run Application.java

bin/kafka-server-start.sh config/server-idea.properties &

image.png
已經啓動了zk,此處不用再啓動,若是未啓動,須要啓動zk。
image.png

cd /home/hd/kafka_2.11-2.2.0/
bin/zookeeper-server-start.sh config/zookeeper.properties&

kafka啓動成功以後,run Application,會看到日誌以下:
image.png
已經接收到消息了。

若是你是跟着本文從頭開始的,必定注意此處有個坑

若是你是從頭開始跟這個本文學習的,那麼你直接啓動的話,會發現消息發出去了,可是沒有接收到。
我也是查了很久,看了不少教程,別人都行我就不行。
若是你的zk有其餘的topic節點的話,會收不到消息,直接上解決方案:刪除全部的zk節點。怎麼刪除?

上碼:

/**
 * zookeeper znode遞歸刪除節點
 * @author Halburt
 *
 */
public class DeleteZkNode {
    //zookeeper的地址 
    private static final String connectString = "192.168.1.177:2181";

    private static final int sessionTimeout = 2000;

    private static ZooKeeper zookeeper = null;

    /**
     * main函數
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {

        //調用rmr,刪除全部目錄
        rmr("/");
    }

    /**
     * 遞歸刪除 由於zookeeper只容許刪除葉子節點,若是要刪除非葉子節點,只能使用遞歸
     * @param path
     * @throws IOException
     */
    public static void rmr(String path) throws Exception {
        ZooKeeper zk = getZookeeper();
        //獲取路徑下的節點
        List<String> children = zk.getChildren(path, false);
        for (String pathCd : children) {
            //獲取父節點下面的子節點路徑
            String newPath = "";
            //遞歸調用,判斷是不是根節點
            if (path.equals("/")) {
                newPath = "/" + pathCd;
            } else {
                newPath = path + "/" + pathCd;
            }
            rmr(newPath);
        }
        //刪除節點,並過濾zookeeper節點和 /節點
        if (path != null && !path.trim().startsWith("/zookeeper") && !path.trim().equals("/")) {
            zk.delete(path, -1);
            //打印刪除的節點路徑
            System.out.println("被刪除的節點爲:" + path);
        }
    }

    /**
     * 獲取Zookeeper實例
     * @return
     * @throws IOException
     */
    public static ZooKeeper getZookeeper() throws IOException {
        zookeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {

            }
        });
        return zookeeper;
    }

}

終端命令查看消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.177:9999 --topic mytopic  --from-beginning

image.png

安利一下可視化工具Kafka Tool 2

下載地址

Kafka Tool 2是一款Kafka的可視化客戶端工具,能夠很是方便的查看Topic的隊列信息以及消費者信息以及kafka節點信息。下載地址:http://www.kafkatool.com/download.html

使用

先建立鏈接

下載安裝以後會彈出一個配置鏈接的窗口,咱們能夠看到這個窗口左上角爲Add Cluster(添加集羣),但不要緊,對應單節點的Kafka實例來講也是能夠的,由於這個軟件監控的是Zookeeper而不是Kafka,Kafka的集羣搭建也是依賴Zookeeper來實現的,因此默認狀況下咱們都是直接經過Zookeeper去完成大部分操做。
image.png

建立完成以後,鏈接

咱們能夠看到已經建立好的Topic。這個軟件默認顯示數據的類型爲Byte,能夠在設置裏面找到對應的修改選項
image.png
接下來就本身探索吧
image.,接下來就本身探索吧png

理論學習

kafka單節點的結構以下:

image.png
單節點broker包含多個topic主題,而每一個topic則包含多個partition副本,每一個partition會有序的存儲消息。

kafka的整體數據流

kafka對外使用topic的概念,生產者往topic裏寫消息,消費者從topic讀消息。爲了作到水平擴展,一個topic實際是由多個partition組成的,遇到瓶頸時,能夠經過增長partition的數量來進行橫向擴容。單個parition內是保證消息有序。每新寫一條消息,kafka就是在對應的文件append寫,因此性能很是高。kafka的整體數據流是這樣的:
2835676-f378607bc841309a.png

Producers往Brokers裏面的指定Topic中寫消息,Consumers從Brokers裏面拉去指定Topic的消息,而後進行業務處理。

名詞解析

Producer

消費者: Producer將消息發佈到指定的Topic中,同時Producer也能決定將此消息歸屬於哪一個partition;好比基於"round-robin"方式或者經過其餘的一些算法等.

Consumer

每一個consumer屬於一個consumer group;反過來講,每一個group中能夠有多個consumer.發送到Topic的消息,只會被訂閱此Topic的每一個group中的一個consumer消費(對於一條消息來講,同一組的消費者只會有一個消費者去消費).

 若是全部的consumer都具備相同的group,這種狀況和queue模式很像;消息將會在consumers之間負載均衡.
 若是全部的consumer都具備不一樣的group,那這就是"發佈-訂閱";消息將會廣播給全部的消費者.

在kafka中,一個partition中的消息只會被group中的一個consumer消費;每一個group中consumer消息消費互相獨立;咱們能夠認爲一個group是一個"訂閱"者,一個Topic中的每一個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer能夠消費多個partitions中的消息.kafka只能保證一個partition中的消息被某個consumer消費時,消息是順序的。事實上,從Topic角度來講,消息仍不是有序的。

Topics

一個Topic能夠認爲是一類消息,每一個topic將被分紅多個partition(區),每一個partition在存儲層面是append log文件。任何發佈到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱爲offset(偏移量),offset爲一個long型數字,它是惟一標記一條消息。它惟一的標記一條消息。kafka並無提供其餘額外的索引機制來存儲offset,由於在kafka中幾乎不容許對消息進行「隨機讀寫」。

Partition

topic物理上的分組,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列

如下是單個生產者和消費者從兩個分區主題讀取和寫入的簡單示例。
image.png

此圖顯示了一個producer向2個partition分區寫入日誌,以及消費者從相同日誌中讀取的內容。日誌中的每條記錄都有一個相關的條目號,稱之爲偏移量offset。消費者使用此偏移來記錄其在partitiond讀取日誌的位置。

固然若是存在多個消費者的話,根據groupId分組,同一組的消費者不會重複讀取日誌。

換句話說:
訂閱topic是以一個消費組來訂閱的,一個消費組裏面能夠有多個消費者。同一個消費組中的兩個消費者,不會同時消費一個partition。換句話來講,就是一個partition,只能被消費組裏的一個消費者消費,可是能夠同時被多個消費組消費。所以,若是消費組內的消費者若是比partition多的話,那麼就會有個別消費者一直空閒。

其實consumer可使用任意順序消費日誌消息,它只須要將offset重置爲任意值.(offset將會保存在zookeeper中,kafka集羣幾乎不須要維護任何consumer和producer狀態信息,這些信息有zookeeper保存)

 partition有多個.最根本緣由是kafka基於文件存儲.經過分區,能夠將日誌內容分散到多個partition上,來避免文件大小達到單機磁盤的上限,每一個partiton都會被當前server(kafka實例)保存;能夠將一個topic切分多任意多個partitions,來消息保存/消費的效率.此外越多的partitions意味着能夠容納更多的consumer,有效提高併發消費的能力.

使用場景

消息系統、消息隊列

  對於一些常規的消息系統,kafka是個不錯的選擇;partitons/replication和容錯,可使kafka具備良好的擴展性和性能優點.不過到目前爲止,咱們應該很清楚認識到,kafka並無提供JMS中的"事務性""消息傳輸擔保(消息確認機制)""消息分組"等企業級特性;kafka只能使用做爲"常規"的消息系統,在必定程度上,還沒有確保消息的發送與接收絕對可靠(好比,消息重發,消息發送丟失等)

日誌聚合

 kafka的特性決定它很是適合做爲"日誌收集中心";application能夠將操做日誌"批量""異步"的發送到kafka集羣中,而不是保存在本地或者DB中;kafka能夠批量提交消息/壓縮消息等,這對producer端而言,幾乎感受不到性能的開支.此時consumer端可使hadoop等其餘系統化的存儲和分析系統.

網站活動追蹤、調用鏈系統、事件採集

能夠將網頁/用戶操做等信息發送到kafka中.並實時監控,或者離線統計分析等

等等其餘場景

server.properties配置文件解讀

############################# Server Basics #############################
# 節點的ID,必須與其它節點不一樣
broker.id=0
# 選擇啓用刪除主題功能,默認false
#delete.topic.enable=true
############################# Socket Server Settings #############################

# 套接字服務器堅挺的地址。若是沒有配置,就使用java.net.InetAddress.getCanonicalHostName()的返回值
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# 節點的主機名會通知給生產者和消費者。若是沒有設置,若是配置了"listeners"就使用"listeners"的值。
# 不然就使用java.net.InetAddress.getCanonicalHostName()的返回值
#advertised.listeners=PLAINTEXT://your.host.name:9092

# 將偵聽器的名稱映射到安全協議,默認狀況下它們是相同的。有關詳細信息,請參閱配置文檔
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# 服務器用來接受請求或者發送響應的線程數
num.network.threads=3

# 服務器用來處理請求的線程數,可能包括磁盤IO
num.io.threads=8

# 套接字服務器使用的發送緩衝區大小
socket.send.buffer.bytes=102400

# 套接字服務器使用的接收緩衝區大小
socket.receive.buffer.bytes=102400

# 單個請求最大能接收的數據量
socket.request.max.bytes=104857600


############################# Log Basics #############################

# 一個逗號分隔的目錄列表,用來存儲日誌文件
log.dirs=/tmp/kafka-logs

# 每一個主題的日誌分區的默認數量。更多的分區容許更大的並行操做,可是它會致使節點產生更多的文件
num.partitions=1

# 每一個數據目錄中的線程數,用於在啓動時日誌恢復,並在關閉時刷新。
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
# 內部主題設置
# 對於除了開發測試以外的其餘任何東西,group元數據內部主題的複製因子「__consumer_offsets」和「__transaction_state」,建議值大於1,以確保可用性(如3)。
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################



# 在強制刷新數據到磁盤以前容許接收消息的數量
#log.flush.interval.messages=10000

# 在強制刷新以前,消息能夠在日誌中停留的最長時間
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# 如下的配置控制了日誌段的處理。策略能夠配置爲每隔一段時間刪除片斷或者到達必定大小以後。
# 當知足這些條件時,將會刪除一個片斷。刪除老是發生在日誌的末尾。

# 一個日誌的最小存活時間,能夠被刪除
log.retention.hours=168

# 一個基於大小的日誌保留策略。段將被從日誌中刪除只要剩下的部分段不低於log.retention.bytes。
#log.retention.bytes=1073741824

# 每個日誌段大小的最大值。當到達這個大小時,會生成一個新的片斷。
log.segment.bytes=1073741824

# 檢查日誌段的時間間隔,看是否能夠根據保留策略刪除它們
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

zookeeper.connect=localhost:2181

# 鏈接到Zookeeper的超時時間
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

group.initial.rebalance.delay.ms=0

參考文章

https://www.cnblogs.com/likehua/p/3999538.html

https://www.jianshu.com/p/d3e963ff8b70

若有表述不當之處,敬請指正。

相關文章
相關標籤/搜索