集羣四部曲(四):完美的Kafka集羣搭建

    以前寫過一篇關於Kafka消息的發佈-訂閱,只不過是基於一臺服務器,不夠全面,下面我要說下Kafka集羣環境的搭建和消息的發佈-訂閱,但願你們喜歡。下面的集羣搭建是基於單機部署的環境,因此你們先參照單機部署完成基本的環境配置吧CentOs7 Kafka單機消息的發佈-訂閱java

     1、環境:虛擬機CentOs7系統,完整的環境,請確認已安裝JDK,Zookeeper(這裏可參考以前的Zookeeper集羣搭建:http://www.javashuo.com/article/p-climrngx-cm.html),可經過克隆已配置好的虛擬機環境,這次仍然沿用以前使用過的服務器,因此,使用的節點爲3個。node

    2、環境配置(Kafka的解壓不說了)bootstrap

    (1)進入config目錄,修改server.properties文件內容以下:服務器

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=slave01

//中間省略,默認配置便可
############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=slave01:2181,slave02:2181,slave03:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

    網上的版本不盡相同,但異曲同工,主要修改的點只有幾個:broker.id,zookeeper.connect,注意:broker.id和Zookeeper集羣配置時myid文件中的值保持一致。同時,還要修改一處地方(這裏可能被大多數人忽略了,致使以後啓動報錯),手動修改meta.properties文件,在配置的log.dir目錄下,若不修改此處會報下面的錯誤:app

FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
    kafka.common.InconsistentBrokerIdException: Configured brokerId 2 doesn't match stored brokerId 1 in meta.properties
            at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:630)
            at kafka.server.KafkaServer.startup(KafkaServer.scala:175)
            at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:99)
            at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:45)

    總結一下,修改broker.id須要注意 :socket

    server.prorperties文件;ide

    meta.properties文件;oop

    (2)拷貝修改配置測試

    能夠經過scp拷貝的方式將kafka安裝目錄放置到另外兩個節點上,注意須要修改對應的值:broker.id和host.name,其餘一致。url

    (3)啓動Kafka集羣

    啓動以前請確保關閉防火牆(詳見以前的集羣配置)。

    因爲咱們使用的是外部的Zookeeper,而不是自身攜帶的Zookeeper,因此,先啓動Zookeeper集羣,成功後在三個節點上,均執行下面的命令:

./bin/kafka-server-start.sh config/server.properties

    如無心外,都可啓動成功。

    (4)建立topic 

    在slave01節點上,執行如下命令建立一個topic:

bin/kafka-topics.sh --create --topic kafkatopictest --replication-factor 3 --partitions 2 --zookeeper slave01:2181

    建立成功顯示:

Created topic "kafkatopictest"

    (5)發送消息至kafka 

    在slave02節點上執行如下命令向kafka發送消息:

[hadoop@slave02 bin]$ ./kafka-console-producer.sh --broker-list slave02:9092 --sync --topic kafkatopictest
>hello world, 下班了,everyone!

    (6)接收kafka發送來的消息

    在slave03節點上執行如下命令接收kafka發送消息:

[hadoop@slave03 bin]$ ./kafka-console-consumer.sh --zookeeper slave01:2181 --topic kafkatopictest --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
hello world下班了,everyone!

    (7)查看topic列表

    如在slave01節點上想要查看已經建立的topic,可經過如下命令:

[hadoop@slave01 bin]$ ./kafka-topics.sh --zookeeper slave01:2181 --list
kafkatopictest
test

    好了,到此Kafka集羣搭建和測試完成了。

相關文章
相關標籤/搜索