項目實戰從0到1之hive(36)大數據項目之電商數倉(用戶行爲數據採集)(四)

4.5 Kafka安裝

img

 

4.5.1 Kafka集羣安裝

集羣規劃:java

img

4.5.2 Kafka集羣啓動中止腳本

1)在/home/kgg/bin目錄下建立腳本kf.shnode

[kgg@hadoop101 bin]$ vim kf.sh

在腳本中填寫以下內容apache

#! /bin/bash

case $1 in
"start"){
       for i in hadoop101 hadoop102 hadoop103
       do
               echo " --------啓動 $i Kafka-------"
               # 用於KafkaManager監控
               ssh $i "export JMX_PORT=9988 && /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties "
       done
};;
"stop"){
       for i in hadoop101 hadoop102 hadoop103
       do
               echo " --------中止 $i Kafka-------"
               ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
       done
};;
esac

說明:啓動Kafka時要先開啓JMX端口,是用於後續KafkaManager監控。 2)增長腳本執行權限bootstrap

[kgg@hadoop101 bin]$ chmod 777 kf.sh

3)kf集羣啓動腳本vim

[kgg@hadoop101 module]$ kf.sh start

4)kf集羣中止腳本安全

[kgg@hadoop101 module]$ kf.sh stop

4.5.3 查看Kafka Topic列表

[kgg@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list

4.5.4 建立Kafka Topic

進入到/opt/module/kafka/目錄下分別建立:啓動日誌主題、事件日誌主題。 1)建立啓動日誌主題bash

[kgg@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181  --create --replication-factor 1 --partitions 1 --topic topic_start

2)建立事件日誌主題服務器

[kgg@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181  --create --replication-factor 1 --partitions 1 --topic topic_event

4.5.5 刪除Kafka Topic網絡

1)刪除啓動日誌主題ssh

[kgg@hadoop101 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic_start

2)刪除事件日誌主題

[kgg@hadoop101 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic_event

4.5.6 Kafka生產消息

[kgg@hadoop101 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop101:9092 --topic topic_start
>hello world
>kgg kgg

4.5.7 Kafka消費消息

[kgg@hadoop101 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop101:9092 --from-beginning --topic topic_start
--from-beginning:會把主題中以往全部的數據都讀取出來。根據業務場景選擇是否增長該配置。

4.5.8 查看Kafka Topic詳情

[kgg@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 \
--describe --topic topic_start

4.5.9 項目經驗之Kafka壓力測試

1)Kafka壓測 用Kafka官方自帶的腳本,對Kafka進行壓測。Kafka壓測時,能夠查看到哪一個地方出現了瓶頸(CPU,內存,網絡IO)。通常都是網絡IO達到瓶頸。

kafka-consumer-perf-test.sh
kafka-producer-perf-test.sh

2)Kafka Producer壓力測試 (1)在/opt/module/kafka/bin目錄下面有這兩個文件。咱們來測試一下

[kgg@hadoop101 kafka]$ bin/kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 100000 --throughput 1000 --producer-props bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092

說明:record-size是一條信息有多大,單位是字節。num-records是總共發送多少條信息。throughput 是每秒多少條信息。

(2)Kafka會打印下面的信息

5000 records sent, 999.4 records/sec (0.10 MB/sec), 1.9 ms avg latency, 254.0 max latency.
5002 records sent, 1000.4 records/sec (0.10 MB/sec), 0.7 ms avg latency, 12.0 max latency.
5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.8 ms avg latency, 4.0 max latency.
5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.7 ms avg latency, 3.0 max latency.
5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.8 ms avg latency, 5.0 max latency.

參數解析:本例中一共寫入10w條消息,每秒向Kafka寫入了0.10MB的數據,平均是1000條消息/秒,每次寫入的平均延遲爲0.8毫秒,最大的延遲爲254毫秒。

3)Kafka Consumer壓力測試 Consumer的測試,若是這四個指標(IO,CPU,內存,網絡)都不能改變,考慮增長分區數來提高性能。

[kgg@hadoop101 kafka]$ 
bin/kafka-consumer-perf-test.sh --zookeeper hadoop101:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1

參數說明: --zookeeper 指定zookeeper的連接信息 --topic 指定topic的名稱 --fetch-size 指定每次fetch的數據的大小 --messages 總共要消費的消息個數

測試結果說明: start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec 2019-02-19 20:29:07:566, 2019-02-19 20:29:12:170, 9.5368, 2.0714, 100010, 21722.4153 開始測試時間,測試結束數據,最大吞吐率9.5368MB/s,平均每秒消費2.0714MB/s,最大每秒消費100010條,平均每秒消費21722.4153條。

4.5.10 項目經驗之Kafka機器數量計算

Kafka機器數量(經驗公式)=2(峯值生產速度副本數/100)+1 先拿到峯值生產速度,再根據設定的副本數,就能預估出須要部署Kafka的數量。 好比咱們的峯值生產速度是50M/s。副本數爲2。 Kafka機器數量=2(502/100)+ 1=3臺

4.6 消費Kafka數據Flume

img

集羣規劃

img

4.6.1 日誌消費Flume配置

1)Flume配置分析

img

2)Flume的具體配置以下: (1)在hadoop103的/opt/module/flume/conf目錄下建立kafka-flume-hdfs.conf文件

[kgg@hadoop103 conf]$ vim kafka-flume-hdfs.conf

在文件配置以下內容

## 組件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics=topic_start

## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r2.kafka.topics=topic_event

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second

##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = second

## 不要產生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0

## 控制輸出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream

a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop

## 拼裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

img

4.6.2 項目經驗之Flume內存優化

1)問題描述:若是啓動消費Flume拋出以下異常

ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded

2)解決方案步驟: (1)在hadoop101服務器的/opt/module/flume/conf/flume-env.sh文件中增長以下配置

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

(2)同步配置到hadoop10二、hadoop103服務器

[kgg@hadoop101 conf]$ xsync flume-env.sh

3)Flume內存參數設置及優化 JVM heap通常設置爲4G或更高,部署在單獨的服務器上(4核8線程16G內存) -Xmx與-Xms最好設置一致,減小內存抖動帶來的性能影響,若是設置不一致容易致使頻繁fullgc。

4.6.3 項目經驗之Flume組件

1)FileChannel和MemoryChannel區別 MemoryChannel傳輸數據速度更快,但由於數據保存在JVM的堆內存中,Agent進程掛掉會致使數據丟失,適用於對數據質量要求不高的需求。 FileChannel傳輸速度相對於Memory慢,但數據安全保障高,Agent進程掛掉也能夠從失敗中恢復數據。

2)FileChannel優化 經過配置dataDirs指向多個路徑,每一個路徑對應不一樣的硬盤,增大Flume吞吐量。 官方說明以下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir和backupCheckpointDir也儘可能配置在不一樣硬盤對應的目錄中,保證checkpoint壞掉後,能夠快速使用backupCheckpointDir恢復數據

3)Sink:HDFS Sink (1)HDFS存入大量小文件,有什麼影響? 元數據層面:每一個小文件都有一份元數據,其中包括文件路徑,文件名,全部者,所屬組,權限,建立時間等,這些信息都保存在Namenode內存中。因此小文件過多,會佔用Namenode服務器大量內存,影響Namenode性能和使用壽命 計算層面:默認狀況下MR會對每一個小文件啓用一個Map任務計算,很是影響計算性能。同時也影響磁盤尋址時間。 (2)HDFS小文件處理 官方默認的這三個參數配置寫入HDFS後會產生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount 基於以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0幾個參數綜合做用,效果以下: (1)文件在達到128M時會滾動生成新文件 (2)文件建立超3600秒時會滾動生成新文件

4.6.4 日誌消費Flume啓動中止腳本

1)在/home/kgg/bin目錄下建立腳本f2.sh

[kgg@hadoop101 bin]$ vim f2.sh

在腳本中填寫以下內容

#! /bin/bash

case $1 in
"start"){
       for i in hadoop103
       do
               echo " --------啓動 $i 消費flume-------"
               ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt   2>&1 &"
       done
};;
"stop"){
       for i in hadoop103
       do
               echo " --------中止 $i 消費flume-------"
               ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
       done

};;
esac

2)增長腳本執行權限

[kgg@hadoop101 bin]$ chmod 777 f2.sh

3)f2集羣啓動腳本

[kgg@hadoop101 module]$ f2.sh start

4)f2集羣中止腳本

[kgg@hadoop101 module]$ f2.sh stop

4.7 採集通道啓動/中止腳本

1)在/home/kgg/bin目錄下建立腳本cluster.sh

[kgg@hadoop101 bin]$ vim cluster.sh

在腳本中填寫以下內容

#! /bin/bash

case $1 in
"start"){
   echo " -------- 啓動 集羣 -------"

   echo " -------- 啓動 hadoop集羣 -------"
  /opt/module/hadoop-2.7.2/sbin/start-dfs.sh
   ssh hadoop102 "/opt/module/hadoop-2.7.2/sbin/start-yarn.sh"

   #啓動 Zookeeper集羣
  zk.sh start

sleep 4s;

   #啓動 Flume採集集羣
  f1.sh start

   #啓動 Kafka採集集羣
  kf.sh start

sleep 6s;

   #啓動 Flume消費集羣
  f2.sh start

  };;
"stop"){
   echo " -------- 中止 集羣 -------"

   #中止 Flume消費集羣
  f2.sh stop

   #中止 Kafka採集集羣
  kf.sh stop

   sleep 6s;

   #中止 Flume採集集羣
  f1.sh stop

   #中止 Zookeeper集羣
  zk.sh stop

   echo " -------- 中止 hadoop集羣 -------"
   ssh hadoop102 "/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh"
  /opt/module/hadoop-2.7.2/sbin/stop-dfs.sh
};;
esac

2)增長腳本執行權限

[kgg@hadoop101 bin]$ chmod 777 cluster.sh

3)cluster集羣啓動腳本

[kgg@hadoop101 module]$ cluster.sh start

4)cluster集羣中止腳本

[kgg@hadoop101 module]$ cluster.sh stop
相關文章
相關標籤/搜索