大數據安裝之Kafka(用於實時處理的消息隊列)

1、安裝部署kafka

一、集羣規劃

hadoop102                                 hadoop103                          hadoop104html

zk                                               zk                                         zkapache

kafka                                          kafka                                    kafkabootstrap

二、jar包下載

http://kafka.apache.org/downloads.html服務器

 

三、集羣部署

1)解壓安裝包

[test@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz  -C /opt/module/網絡

2)修改解壓後更名

[test@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafkassh

3)在/opt/module/kafka目錄下建立logs文件夾

[test@hadoop102 kafka]$ mkdir logssocket

4)修改配置文件

[test@hadoop102 kafka]$ cd config/oop

[test@hadoop102 config]$ vi server.propertiesspa

輸入如下內容:.net

   

broker的全局惟一編號,不能重複

broker.id=0

#刪除topic功能使能

delete.topic.enable=true

#處理網絡請求的線程數量

num.network.threads=3

#用來處理磁盤IO的線程數量

num.io.threads=8

#發送套接字的緩衝區大小

socket.send.buffer.bytes=102400

#接收套接字的緩衝區大小

socket.receive.buffer.bytes=102400

#請求套接字的緩衝區大小

socket.request.max.bytes=104857600

#kafka運行日誌存放的路徑

log.dirs=/opt/module/kafka/logs

#topic在當前broker上的分區個數

num.partitions=1

#用來恢復和清理data下數據的線程數量

num.recovery.threads.per.data.dir=1

#segment文件保留的最長時間,超時將被刪除

log.retention.hours=168

#配置鏈接Zookeeper集羣地址

zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

5)配置環境變量

[test@hadoop102 module]$ sudo /etc/profile.d/env.sh

#KAFKA_HOME

export KAFKA_HOME=/opt/module/kafka

export PATH=$PATH:$KAFKA_HOME

[test@hadoop102 module]$ source /etc/profile.d/env.sh

6)分發安裝包

[test@hadoop102 module]$ xsync kafka/

  注意:分發以後記得配置其餘機器的環境變量

7)分別在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的

  broker.id=一、broker.id=2

       注:broker.id不得重複

8)啓動集羣

依次在hadoop10二、hadoop10三、hadoop104節點上啓動kafka

[test@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

[test@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon  config/server.properties

[test@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon  config/server.properties

9)關閉集羣

[test@hadoop102 kafka]$ bin/kafka-server-stop.sh stop

[test@hadoop103 kafka]$ bin/kafka-server-stop.sh stop

[test@hadoop104 kafka]$ bin/kafka-server-stop.sh stop

10)kafka羣起腳本

for i in hadoop102 hadoop103 hadoop104

do

echo "========== $i =========="

ssh $i '/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties'

 

2、 Kafka命令行操做

1)查看當前服務器中的全部topic

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

2)建立topic

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first

選項說明:

--topic 定義topic名

--replication-factor  定義副本數

--partitions  定義分區數

3)刪除topic

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first

須要server.properties中設置delete.topic.enable=true不然只是標記刪除。

4)發送消息

[test@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first

>hello world

>test  test

5)消費消息

[test@hadoop102 kafka]$ bin/kafka-console-consumer.sh \

--zookeeper hadoop102:2181 --topic first

 

[test@hadoop102 kafka]$ bin/kafka-console-consumer.sh \

--bootstrap-server hadoop102:9092 --topic first

 

[test@hadoop102 kafka]$ bin/kafka-console-consumer.sh \

--bootstrap-server hadoop102:9092 --from-beginning --topic first

--from-beginning:會把主題中以往全部的數據都讀取出來。

6)查看某個Topic的詳情

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first

7)修改分區數

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6

相關文章
相關標籤/搜索