【kafka】安裝部署kafka集羣(kafka版本:kafka_2.12-2.3.0)

3.2.1 下載kafka並安裝
kafka_2.12-2.3.0.tgzhtml

tar -zxvf kafka_2.12-2.3.0.tgz


3.2.2 配置kafka集羣java

在config/server.properties中修改參數:
[hadoop@hadoop01 kafka_2.12-2.3.0]$ cd config


[hadoop@hadoop01 config]$ gedit server.properties
參數1:添加host.name=hadoop01
############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
host.name=hadoop01
參數2:修改log.dirs
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/home/hadoop/kafka_2.12-2.3.0/logs
參數3:添加zookeeper節點地址和端口
############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181


3.2.3 拷貝至各節點
scp -r ~/kafka_2.12-2.3.0 hadoop02:~/
scp -r ~/kafka_2.12-2.3.0 hadoop03:~/

3.2.4 到各節點修改server.properties裏面的broker.id參數,其他參數不變node

hadoop02節點:
############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
host.name=hadoop02
hadoop03節點:
############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
host.name=hadoop03


3.2.5啓動kafka(須要先啓動zookeeper)

3.2.5.1 啓動kafka以前必須先啓動zookeeperapache

[hadoop@hadoop01 ~]$ cd bin
[hadoop@hadoop01 bin]$ sh zookeeperstart.sh --這裏是一鍵啓動zookeeper腳本 ,具體見https://www.cnblogs.com/CQ-LQJ/p/11605603.html
INFO:starting zookeeper on 01
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
INFO:starting zookeeper on 02
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
INFO:starting zookeeper on 03
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
INFO:starting zookeeper on 01
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
INFO:starting zookeeper on 02
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
INFO:starting zookeeper on 03
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower



3.2.5.2.1 zookeeper啓動後執行啓動kafka命令bootstrap

[hadoop@hadoop01 bin]$ cd ~/kafka_2.12-2.3.0
[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-server-start.sh  config/server.properties --此命令正確,但不能關閉窗口,卡在進程界面


第一次啓動報錯(解決辦法:升級到jdk8 131或者151都可):
[2019-09-30 00:28:56,482] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-09-30 00:28:56,875] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.VerifyError: Uninitialized object exists on backward branch 209
Exception Details:
  Location:
    scala/collection/immutable/HashMap$HashTrieMap.split()Lscala/collection/immutable/Seq; @249: goto
  Reason:
    Error exists in the bytecode
  Bytecode:
    0000000: 2ab6 0064 04a0 001e b200 c1b2 00c6 04bd
    0000010: 0002 5903 2a53 c000 c8b6 00cc b600 d0c0
    0000020: 00d2 b02a b600 38b8 0042 3c1b 04a4 0156
    0000030: 1b05 6c3d 2a1b 056c 2ab6 0038 b700 d43e
    0000040: 2ab6 0038 021d 787e 3604 2ab6 0038 0210
    0000050: 201d 647c 7e36 05bb 0019 59b2 00c6 2ab6
    0000060: 003a c000 c8b6 00d8 b700 db1c b600 df3a
    0000070: 0619 06c6 001a 1906 b600 e3c0 008b 3a07
    0000080: 1906 b600 e6c0 008b 3a08 a700 0dbb 00e8
    0000090: 5919 06b7 00eb bf19 073a 0919 083a 0abb
    00000a0: 0002 5915 0419 09bb 0019 59b2 00c6 1909
    00000b0: c000 c8b6 00d8 b700 db03 b800 f13a 0e3a
    00000c0: 0d03 190d b900 f501 0019 0e3a 1136 1036
    00000d0: 0f15 0f15 109f 0027 150f 0460 1510 190d
    00000e0: 150f b900 f802 00c0 0005 3a17 1911 1917
    00000f0: b800 fc3a 1136 1036 0fa7 ffd8 1911 b801
    0000100: 00b7 0069 3a0b bb00 0259 1505 190a bb00
    0000110: 1959 b200 c619 0ac0 00c8 b600 d8b7 00db
    0000120: 03b8 00f1 3a13 3a12 0319 12b9 00f5 0100
    0000130: 1913 3a16 3615 3614 1514 1515 9f00 2715
    0000140: 1404 6015 1519 1215 14b9 00f8 0200 c000
    0000150: 053a 1819 1619 18b8 0103 3a16 3615 3614
    0000160: a7ff d819 16b8 0100 b700 693a 0cbb 0105
    0000170: 5919 0bbb 0105 5919 0cb2 010a b701 0db7
    0000180: 010d b02a b600 3a03 32b6 010f b0       
  Stackmap Table:
    same_frame(@35)
    full_frame(@141,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118]},{})
    append_frame(@151,Object[#139],Object[#139])
    full_frame(@209,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Top,Top,Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#159],Uninitialized[#159],Integer,Object[#139]})
    full_frame(@252,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Top,Top,Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#159],Uninitialized[#159],Integer,Object[#139]})
    full_frame(@312,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Object[#2],Top,Object[#25],Object[#62],Integer,Integer,Object[#116],Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#262],Uninitialized[#262],Integer,Object[#139]})
    full_frame(@355,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Object[#2],Top,Object[#25],Object[#62],Integer,Integer,Object[#116],Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#262],Uninitialized[#262],Integer,Object[#139]})
    full_frame(@387,{Object[#2],Integer},{})

    at scala.collection.immutable.HashMap$.scala$collection$immutable$HashMap$$makeHashTrieMap(HashMap.scala:185)
    at scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:220)
    at scala.collection.immutable.HashMap.updated(HashMap.scala:62)
    at scala.collection.immutable.Map$Map4.updated(Map.scala:227)
    at scala.collection.immutable.Map$Map4.$plus(Map.scala:228)
    at scala.collection.immutable.Map$Map4.$plus(Map.scala:200)
    at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:32)
    at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
    at scala.collection.TraversableOnce.$anonfun$toMap$1(TraversableOnce.scala:320)
    at scala.collection.TraversableOnce$$Lambda$10/838411509.apply(Unknown Source)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableOnce.toMap(TraversableOnce.scala:319)
    at scala.collection.TraversableOnce.toMap$(TraversableOnce.scala:317)
    at scala.collection.AbstractTraversable.toMap(Traversable.scala:108)
    at kafka.api.ApiVersion$.<init>(ApiVersion.scala:98)
    at kafka.api.ApiVersion$.<clinit>(ApiVersion.scala)
    at kafka.server.Defaults$.<init>(KafkaConfig.scala:146)
    at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
    at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:854)
    at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
    at kafka.metrics.KafkaMetricsConfig.<init>(KafkaMetricsConfig.scala:32)
    at kafka.metrics.KafkaMetricsReporter$.startReporters(KafkaMetricsReporter.scala:62)
    at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:27)
    at kafka.Kafka$.main(Kafka.scala:68)
    at kafka.Kafka.main(Kafka.scala)
如今jdk版本:
[hadoop@hadoop01 kafka_2.12-2.3.0]$ java -version
java version "1.8.0_11"
Java(TM) SE Runtime Environment (build 1.8.0_11-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.11-b03, mixed mode)


kafka沒法啓動或卡死
<可能緣由:虛擬機內存不足

解決方法:修改啓動腳本的初始內存1G -> 200m

一、打開腳本 vim bin/kafka-server-start.sh

二、找到:export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

三、修改成:export KAFKA_HEAP_OPTS="-Xmx200m -Xms200m"

四、重啓kafka:bin/kafka-server-start.sh config/server.properties>

3.2.5.2.2 kafka啓動方式:
加daemon和&,用守護進程的啓動方式。(直接在控制檯打印輸出,不能關閉窗口 命令:bin/kafka-server-start.sh config/server.properties)vim

[hadoop@hadoop01 bin]$ cd ~/kafka_2.12-2.3.0
[hadoop@hadoop01 kafka_2.12-2.3.0]# bin/kafka-server-start.sh -daemon config/server.properties & 



3.2.5.2.3 用jps命令查看kafka是否啓動成功api

[hadoop@hadoop01 kafka_2.12-2.3.0]# jps
8736 NodeManager
8593 ResourceManager
8083 DataNode
7942 NameNode
8330 SecondaryNameNode
12700 Jps
11981 Kafka

注:啓動關閉kafka和zookeeper的順序,先啓動zookeeper再啓動kafka,先中止kafka再中止zookeeper

3.2.5.2.4 在其餘節點利用一樣方式啓動kafka

說明事項:在hadoop集羣應用中,只要啓動任何一個組件自帶的zookeeper,或者是獨立的zookeeper,就能夠爲其餘任何須要zookeeper服務的組件提供支持,並不須要單獨啓動自帶的zookeeper

3.2.6 測試kafka

3.2.6.1 建立topics(消息主題:至關於文件系統目錄,用於保存消息內容)app

[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-topics.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --create --topic testr --replication-factor 3 --partitions 3
Created topic testr.



3.2.6.2 查看topic 和 topic詳情oop

[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-topics.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --list
testr


3.2.6.3 開啓Kafka producer生產者(在hadoop01和hadoop02和hadoop03均可以),模擬producer發送消息,用命令行的方式手動的往kafka的topic裏面發送消息:測試

[hadoop@hadoop02 kafka_2.12-2.3.0]$ bin/kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testr
>THI^H^H\^H^H^H   this is message from terminal
>hello
>my^H^H  hahha
>
>
>11111
>
>11111111111111111111
> 

注:做爲producer,上面啓動的終端一直處於發送消息的狀態,就是等待用戶輸入命令,並保存到topic。這個時候,咱們須要在另開一個終端建立consumer消費者,才能接受這些消息

3.2.6.4 開啓Kafka consumer消費者(在hadoop1和hadoop2和hadoop3均可以)

[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-console-consumer.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic testr --from-beginning (kafka版本0.9之前用此命令)
[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testr --from-beginning (kafka版本0.9之後用此命令)
hello

  hahha
11111                                                                                                                                                                      
this is message from terminal


11111111111111111111


說明事項: producer 和 consumer 經過kafka中間件聯繫起來,這種應用模式在不少數據處理系統中均可以發揮積極的做用。例如:在一些實時的大數據應用中,kafka能夠保存從數據源產生的數據,consumer能夠採用本身的數據率保存數據,所以kafka起到一個緩衝的做用

相關文章
相關標籤/搜索