3.2.1 下載kafka並安裝
kafka_2.12-2.3.0.tgzhtml
tar -zxvf kafka_2.12-2.3.0.tgz
3.2.2 配置kafka集羣java
在config/server.properties中修改參數:
[hadoop@hadoop01 kafka_2.12-2.3.0]$ cd config
[hadoop@hadoop01 config]$ gedit server.properties
參數1:添加host.name=hadoop01
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
host.name=hadoop01
參數2:修改log.dirs
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/home/hadoop/kafka_2.12-2.3.0/logs
參數3:添加zookeeper節點地址和端口
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181
3.2.3 拷貝至各節點
scp -r ~/kafka_2.12-2.3.0 hadoop02:~/
scp -r ~/kafka_2.12-2.3.0 hadoop03:~/
3.2.4 到各節點修改server.properties裏面的broker.id參數,其他參數不變node
hadoop02節點:
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
host.name=hadoop02
hadoop03節點:
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
host.name=hadoop03
3.2.5啓動kafka(須要先啓動zookeeper)
3.2.5.1 啓動kafka以前必須先啓動zookeeperapache
[hadoop@hadoop01 ~]$ cd bin
[hadoop@hadoop01 bin]$ sh zookeeperstart.sh --這裏是一鍵啓動zookeeper腳本 ,具體見https://www.cnblogs.com/CQ-LQJ/p/11605603.html
INFO:starting zookeeper on 01
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
INFO:starting zookeeper on 02
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
INFO:starting zookeeper on 03
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
INFO:starting zookeeper on 01
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
INFO:starting zookeeper on 02
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
INFO:starting zookeeper on 03
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
3.2.5.2.1 zookeeper啓動後執行啓動kafka命令bootstrap
[hadoop@hadoop01 bin]$ cd ~/kafka_2.12-2.3.0
[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-server-start.sh config/server.properties --此命令正確,但不能關閉窗口,卡在進程界面
第一次啓動報錯(解決辦法:升級到jdk8 131或者151都可):
[2019-09-30 00:28:56,482] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-09-30 00:28:56,875] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.VerifyError: Uninitialized object exists on backward branch 209
Exception Details:
Location:
scala/collection/immutable/HashMap$HashTrieMap.split()Lscala/collection/immutable/Seq; @249: goto
Reason:
Error exists in the bytecode
Bytecode:
0000000: 2ab6 0064 04a0 001e b200 c1b2 00c6 04bd
0000010: 0002 5903 2a53 c000 c8b6 00cc b600 d0c0
0000020: 00d2 b02a b600 38b8 0042 3c1b 04a4 0156
0000030: 1b05 6c3d 2a1b 056c 2ab6 0038 b700 d43e
0000040: 2ab6 0038 021d 787e 3604 2ab6 0038 0210
0000050: 201d 647c 7e36 05bb 0019 59b2 00c6 2ab6
0000060: 003a c000 c8b6 00d8 b700 db1c b600 df3a
0000070: 0619 06c6 001a 1906 b600 e3c0 008b 3a07
0000080: 1906 b600 e6c0 008b 3a08 a700 0dbb 00e8
0000090: 5919 06b7 00eb bf19 073a 0919 083a 0abb
00000a0: 0002 5915 0419 09bb 0019 59b2 00c6 1909
00000b0: c000 c8b6 00d8 b700 db03 b800 f13a 0e3a
00000c0: 0d03 190d b900 f501 0019 0e3a 1136 1036
00000d0: 0f15 0f15 109f 0027 150f 0460 1510 190d
00000e0: 150f b900 f802 00c0 0005 3a17 1911 1917
00000f0: b800 fc3a 1136 1036 0fa7 ffd8 1911 b801
0000100: 00b7 0069 3a0b bb00 0259 1505 190a bb00
0000110: 1959 b200 c619 0ac0 00c8 b600 d8b7 00db
0000120: 03b8 00f1 3a13 3a12 0319 12b9 00f5 0100
0000130: 1913 3a16 3615 3614 1514 1515 9f00 2715
0000140: 1404 6015 1519 1215 14b9 00f8 0200 c000
0000150: 053a 1819 1619 18b8 0103 3a16 3615 3614
0000160: a7ff d819 16b8 0100 b700 693a 0cbb 0105
0000170: 5919 0bbb 0105 5919 0cb2 010a b701 0db7
0000180: 010d b02a b600 3a03 32b6 010f b0
Stackmap Table:
same_frame(@35)
full_frame(@141,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118]},{})
append_frame(@151,Object[#139],Object[#139])
full_frame(@209,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Top,Top,Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#159],Uninitialized[#159],Integer,Object[#139]})
full_frame(@252,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Top,Top,Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#159],Uninitialized[#159],Integer,Object[#139]})
full_frame(@312,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Object[#2],Top,Object[#25],Object[#62],Integer,Integer,Object[#116],Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#262],Uninitialized[#262],Integer,Object[#139]})
full_frame(@355,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Object[#2],Top,Object[#25],Object[#62],Integer,Integer,Object[#116],Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#262],Uninitialized[#262],Integer,Object[#139]})
full_frame(@387,{Object[#2],Integer},{})
at scala.collection.immutable.HashMap$.scala$collection$immutable$HashMap$$makeHashTrieMap(HashMap.scala:185)
at scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:220)
at scala.collection.immutable.HashMap.updated(HashMap.scala:62)
at scala.collection.immutable.Map$Map4.updated(Map.scala:227)
at scala.collection.immutable.Map$Map4.$plus(Map.scala:228)
at scala.collection.immutable.Map$Map4.$plus(Map.scala:200)
at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:32)
at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
at scala.collection.TraversableOnce.$anonfun$toMap$1(TraversableOnce.scala:320)
at scala.collection.TraversableOnce$$Lambda$10/838411509.apply(Unknown Source)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableOnce.toMap(TraversableOnce.scala:319)
at scala.collection.TraversableOnce.toMap$(TraversableOnce.scala:317)
at scala.collection.AbstractTraversable.toMap(Traversable.scala:108)
at kafka.api.ApiVersion$.<init>(ApiVersion.scala:98)
at kafka.api.ApiVersion$.<clinit>(ApiVersion.scala)
at kafka.server.Defaults$.<init>(KafkaConfig.scala:146)
at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:854)
at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
at kafka.metrics.KafkaMetricsConfig.<init>(KafkaMetricsConfig.scala:32)
at kafka.metrics.KafkaMetricsReporter$.startReporters(KafkaMetricsReporter.scala:62)
at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:27)
at kafka.Kafka$.main(Kafka.scala:68)
at kafka.Kafka.main(Kafka.scala)
如今jdk版本:
[hadoop@hadoop01 kafka_2.12-2.3.0]$ java -version
java version "1.8.0_11"
Java(TM) SE Runtime Environment (build 1.8.0_11-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.11-b03, mixed mode)
kafka沒法啓動或卡死
<可能緣由:虛擬機內存不足
解決方法:修改啓動腳本的初始內存1G -> 200m
一、打開腳本 vim bin/kafka-server-start.sh
二、找到:export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
三、修改成:export KAFKA_HEAP_OPTS="-Xmx200m -Xms200m"
四、重啓kafka:bin/kafka-server-start.sh config/server.properties>
3.2.5.2.2 kafka啓動方式:
加daemon和&,用守護進程的啓動方式。(直接在控制檯打印輸出,不能關閉窗口 命令:bin/kafka-server-start.sh config/server.properties)vim
[hadoop@hadoop01 bin]$ cd ~/kafka_2.12-2.3.0
[hadoop@hadoop01 kafka_2.12-2.3.0]# bin/kafka-server-start.sh -daemon config/server.properties &
3.2.5.2.3 用jps命令查看kafka是否啓動成功api
[hadoop@hadoop01 kafka_2.12-2.3.0]# jps
8736 NodeManager
8593 ResourceManager
8083 DataNode
7942 NameNode
8330 SecondaryNameNode
12700 Jps
11981 Kafka
注:啓動關閉kafka和zookeeper的順序,先啓動zookeeper再啓動kafka,先中止kafka再中止zookeeper
3.2.5.2.4 在其餘節點利用一樣方式啓動kafka
說明事項:在hadoop集羣應用中,只要啓動任何一個組件自帶的zookeeper,或者是獨立的zookeeper,就能夠爲其餘任何須要zookeeper服務的組件提供支持,並不須要單獨啓動自帶的zookeeper
3.2.6 測試kafka
3.2.6.1 建立topics(消息主題:至關於文件系統目錄,用於保存消息內容)app
[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-topics.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --create --topic testr --replication-factor 3 --partitions 3
Created topic testr.
3.2.6.2 查看topic 和 topic詳情oop
[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-topics.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --list
testr
3.2.6.3 開啓Kafka producer生產者(在hadoop01和hadoop02和hadoop03均可以),模擬producer發送消息,用命令行的方式手動的往kafka的topic裏面發送消息:測試
[hadoop@hadoop02 kafka_2.12-2.3.0]$ bin/kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testr
>THI^H^H\^H^H^H this is message from terminal
>hello
>my^H^H hahha
>
>
>11111
>
>11111111111111111111
>
注:做爲producer,上面啓動的終端一直處於發送消息的狀態,就是等待用戶輸入命令,並保存到topic。這個時候,咱們須要在另開一個終端建立consumer消費者,才能接受這些消息
3.2.6.4 開啓Kafka consumer消費者(在hadoop1和hadoop2和hadoop3均可以)
[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-console-consumer.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic testr --from-beginning (kafka版本0.9之前用此命令)
[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testr --from-beginning (kafka版本0.9之後用此命令)
hello
hahha
11111
this is message from terminal
11111111111111111111
說明事項: producer 和 consumer 經過kafka中間件聯繫起來,這種應用模式在不少數據處理系統中均可以發揮積極的做用。例如:在一些實時的大數據應用中,kafka能夠保存從數據源產生的數據,consumer能夠採用本身的數據率保存數據,所以kafka起到一個緩衝的做用