Kafka---如何配置Kafka集羣和zookeeper集羣

Kafka的集羣配置通常有三種方法,即html

    (1)Single node – single broker集羣;node

    (2)Single node – multiple broker集羣;
    (3)Multiple node – multiple broker集羣。apache

    前兩種方法官網上有配置過程((1)(2)配置方法官網教程),下面會簡單介紹前兩種方法,主要介紹最後一種方法。bootstrap

準備工做:服務器

    1.Kafka的壓縮包,這裏選用的是kafka_2.10-0.8.2.2.tgz網絡

    2.三臺CentOS 6.4 64位虛擬機。分別是192.168.121.34(主機名爲master)、192.168.121.35(主機名爲datanode1)、192.168.121.36(主       機名爲datanode2)。app

1、Single node – single broker集羣配置(單節點單boker集羣配置)

  注:圖片來源自網絡dom

1.解壓Kafka的壓縮包

    [root@master kafkainstall]# tar -xzf kafka_2.10-0.8.2.0.tgzsocket

    [root@master kafkainstall]# cd kafka_2.10-0.8.2.2async

這裏我新建了一個kafkainstall文件夾來存放加壓後的文件,而後進入解壓後的kafka_2.10-0.8.2.2文件夾。

2.啓動zookeeper服務

    因爲Kafka的壓縮包裏已經有了zookeeper,並且提供了啓動kafka的腳本(在kafka_2.10-0.8.2.2/bin目錄下)和zookeeper的配置文件      (在kafka_2.10-0.8.2.2/config目錄下):

    [root@master kafka_2.10-0.8.2.2]# bin/zookeeper-server-start.sh config/zookeeper.properties &

    zookeeper的配置文件zookeeper.properties裏面的關鍵屬性:

    # the directory where the snapshot is stored.
      dataDir=/tmp/zookeeper
    # the port at which the clients will connect
      clientPort=2181

     默認狀況下,zookeeper的snapshot 文件會存儲在/tmp/zookeeper下,zookeeper服務器會監聽 2181端口。

3.啓動Kafka broker服務

    因爲kafka已經提供了啓動kafka的腳本(在kafka_2.10-0.8.2.2/bin目錄下),這裏直接啓動便可:

    [root@master kafka_2.10-0.8.2.2]# bin/kafka-server-start.sh config/server.properties &

    Kafka broker的配置文件的關鍵屬性:

    # The id of the broker. This must be set to a unique integer for each broker.
      broker.id=0

    # The port the socket server listens on
      port=9092

    # A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.

      zookeeper.connect=localhost:2181

4.建立只有一個Partition的topic

    [root@master kafka_2.10-0.8.2.2]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest-topic

    這裏建立了一個mytest-topic的topic。

5.啓動一個生產者進程來發送消息

    [root@master kafka_2.10-0.8.2.2]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest-topic

    其中,(1)參數broker-list定義了生產者要推送消息的broker地址,以<IP地址:端口>形式 ,由上面的broker的配置文件可知                                      爲localhost:9092;

              (2)參數topic指定生產者發送給哪一個topic。   

    生產者配置文件關鍵屬性:

     # list of brokers used for bootstrapping knowledge about the rest of the cluster
     # format: host1:port1,host2:port2 ...

metadata.broker.list=localhost:9092

    # specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync

     # message encoder
serializer.class=kafka.serializer.DefaultEncoder

    接着你就能夠輸入你想要發送給消費者的消息了。(也能夠先啓動消費者進程,這樣生產者發送的消息能夠馬上顯示)

6.啓動一個消費者進程來消費消息

    須要另外打開一個終端:    

    [root@master kafka_2.10-0.8.2.2]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytest-topic --from-beginning

   其中,(1)參數zookeeper指定了鏈接zookeeper的地址,以<IP地址:端口>形式;

             (2)topic參數指定了從哪一個topic來pull消息。

     當你執行這個命令以後,你即可以看到控制檯上打印出的生產者生產的消息:

     消費者配置文件consumer.properties關鍵屬性:

     # Zookeeper connection string
     # comma separated host:port pairs, each corresponding to a zk
     # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"

        zookeeper.connect=localhost:2181
    # timeout in ms for connecting to zookeeper
      zookeeper.connection.timeout.ms=60000
    #consumer group id
      group.id=test-consumer-group

2、Single node – multiple broker集羣(單節點多boker集羣配置)

   注:圖片來源自網絡

1.啓動zookeeper服務

    啓動方法跟上面同樣

2.啓動Kafka broker服務

    若是須要在單個節點(即一臺機子)上面啓動多個broker(這裏咱們啓動三個broker),須要準備多個server.properties文件便可,咱們須要複製kafka_2.10-0.8.2.2/config/server.properties文件。

    以下:

    [root@master config]# cp server.properties server-1.properties

    [root@master config]# cp server.properties server-2.properties

    而後修改server-1.properties和server-2.properties。

    server-1:

1. broker.id=1

        2.port=9093

 3.log.dirs=/tmp/kafka-logs-1

 

    server-2:

1. broker.id=2

        2.port=9094

 3.log.dirs=/tmp/kafka-logs-2

     而後咱們再用這兩個配置文件分別啓動一個broker:

    [root@master kafka_2.10-0.8.2.2]# bin/kafka-server-start.sh config/server-1.properties &

    [root@master kafka_2.10-0.8.2.2]# bin/kafka-server-start.sh config/server-2.properties &

     而後啓動:

    [root@master kafka_2.10-0.8.2.2]# bin/kafka-server-start.sh config/server.properties &

3.建立只有1個Partition和3個備份的的topic

    [root@master kafka_2.10-0.8.2.2]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

4.啓動一個Producer發送消息

     若是用一個Producer發送給多個broker(這裏是3個),惟一須要改變的就是在broker-list屬性中指定要鏈接的broker:

    [root@master kafka_2.10-0.8.2.2]#bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,

localhost:9094 --topic my-replicated-topic

5.啓動一個消費者來消費消息

    [root@master kafka_2.10-0.8.2.2]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topicmy-replicated-topic --from-beginning

     若是要讓不一樣的Producer發送給不一樣的broker,咱們也僅僅須要爲每一個Producer配置響應的broker-list屬性便可。

3、Multiple node – multiple broker集羣(多節點多boker集羣配置)

 注:圖片來源自網絡

 注:上圖中每一個Node裏有兩個broker,我這裏爲了簡單寫,在每一個節點裏有一個broker(經過上面的單節點多broker的介紹,能夠很容易        擴展)

1.首先須要配置一個zookeeper集羣

    上面一和二中提到的都是在192.168.121.34(主機名爲master)上進行的,如今要擴展爲多節點多broker集羣,就要在另外2臺機子上也要安裝Kafka,方法同一中的步驟1。

2.zookeeper集羣配置

    zookeeper-0(即上面192.168.121.34(主機名爲master)中的zookeeper):

配置修改成:

     # the directory where the snapshot is stored.

dataDir=/tmp/zookeeper
     # the port at which the clients will connect
clientPort=2181
    #the blow five lines are added by @author.
initLimit=5
syncLimit=2
server.0=192.168.121.34:2888:3888
server.1=192.168.121.35:2889:3889
server.2=192.168.121.36:2890:3890

    而後在dataDir目錄/data/zookeeper/下寫一個myid文件,命令以下:

echo0 >myid

注意:這個id是zookeeper的主機標識,每一個主機id不一樣第二臺是1192.168.121.35(主機名爲datanode1),第三臺是2192.168.121.36(主機名爲datanode2)。也就是說3個zookeeper配置文件除了myid不一樣,其餘都同樣。

    最後依次啓動3臺機子上的zookeeper服務。

3.配置broker 集羣

     broker的配置配置文件(server.properties):按照單節點多實例配置方法在一個節點上啓動1個實例,不一樣的地方是zookeeper的鏈接串      須要把全部節點的zookeeper都鏈接起來。

     (1)192.168.121.34(主機名爲master)中的kafka_2.10-0.8.2.2/bin/目錄下的server.properties文件修改:

    # Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=192.168.121.34

    # A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-0

    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.

zookeeper.connect=192.168.121.34:2181,192.168.121.35:2181,192.168.121.36:2181

    # Timeout in ms for connecting to zookeeper
       zookeeper.connection.timeout.ms=60000

    注意:把host.name的註釋去掉,並更改成本機的IP地址。zookeeper.connection.timeout.ms的默認爲6000,可是最好改大點,否則容易超時,但也不能太大,太大影響效率。

    (2)192.168.121.35(主機名爲datanode1)中的kafka_2.10-0.8.2.2/bin/目錄下的server.properties文件修改:  

 

    # Hostname the broker will bind to. If not set, the server will bind to all interfaces
       host.name=192.168.121.35

   # A comma seperated list of directories under which to store log files
      log.dirs=/tmp/kafka-logs-1

 

    其它與上面(1)中相同。

    (3)192.168.121.36(主機名爲datanode2)中的kafka_2.10-0.8.2.2/bin/目錄下的server.properties文件修改:

 

    # Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=192.168.121.36

    # A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

    其它與上面(1)中相同。

4.生產者配置文件修改

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format:
host1:port1,host2:port2 ...
metadata.broker.list=192.168.121.34:9092,192.168.121.35:9092,192.168.121.36:9092
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)

producer.type=async

5.消費者配置文件修改

     # Zookeeper connection string
     # comma separated host:port pairs, each corresponding to a zk
     # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"

  zookeeper.connect=191.168.121.34:2181,191.168.121.35:2181,191.168.121.36:2181
     # timeout in ms for connecting to zookeeper
  zookeeper.connection.timeout.ms=60000

6.生產者發送消息

    (1)首先建立一個test-replicated-topic(在192.168.121.34(主機名爲master)中)

     [root@master kafka_2.10-0.8.2.2]#bin/kafka-topics.sh --create --zookeeper192.168.121.34:2181 --replication-factor 3 --partitions 1 --topictest-replicated-topic

    而後查看已有的topic:

          

   能夠看到test-replicated-topic已經建立成功,而後咱們再看每一個broker在作什麼:

        

    其中leader是負責對給定的partition執行全部的讀和寫的節點,此時的leader是0號節點(即0號broker)。更多解釋請看官網

    (2)生產者發送消息(192.168.121.34(主機名爲master)節點上)

         

7.消費者消費消息(分別在三臺機子上消費上面發送的消息)

   (1)master上:

       

    (2)datanode1上:

         

   (3)datanode2上:

       

    能夠看到,三個節點上的消費者都能正常的接收到其中一個節點上發送的消息。這說明kafka集羣基本上已經超過部署。

    PS:實際操做過程當中3個節點上的zookeeper的監聽端口我也沒有統一用2181,可是能夠用統一的端口,並無影響。

相關文章
相關標籤/搜索