數倉1.1 | 概述| 集羣環境搭建

 

宏觀上; 模塊調用關係圖
複雜問題簡單化; 清晰數據結構; 1天數據1層html

1. 數據倉庫DW

數據來源:
  爬蟲 日誌採集系統 業務數據 財務系統java

數據去向:
  報表系統、用戶畫像、推薦系統、機器學習、風控系統

node

項目需求分析

  ① 數據採集平臺搭建
  ② 實現用戶行爲數據倉庫的分層搭建
  ③ 實現業務數據倉庫的分層搭建
  ④ 針對數據倉庫中的數據進行,留存、轉化率、GMV(天天交易額)、復購率、活躍等報表行爲;mysql

項目框架

技術選型linux

 

採集:
方式一: log日誌--->flume--->kafka(API)--->hdfs; 方式二: Logstash(讀取日誌)-->ELK(存儲查詢)全文檢索引擎-sqoop
DataX導數據;  mysql->sqoopweb

存儲:mysql(存儲業務--分析結果) ;ES(存、查都很快)<---->HBase(存快,分析慢); S3sql

計算:Tez(分析hive中指標)&hive; Flink--Sparkapache

查詢:Presto,Impala,Kylinjson


系統架構圖:bootstrap

日誌文件| mysql數據表--->分別由flume| sqoop處理--> 分別交給-->kafka| HDFS
由Yarn統一調度
Hive| Presto負責數據查詢;
Azkaban任務調度器
最後可視化展現;

系統數據流程:

Web/App埋點行爲數據--->log日誌服務器(友盟-第三方日誌服務器)--->logFile格式->Flume生產-->kafka(kafka(至關於路由池)能夠接實時數據、es等)--flume消費-->HDFS
業務交互-->mysql(業務服務器-->Nginx實現負載均衡)->sqoop-->>hdfs--->hive數倉-->把結果存儲到mysql

框架版本選型 產品          版本 Hadoop         2.7.2 Flume          1.7.0 Kafka        0.11.0.2 Kafka Manager   1.3.3.22 Hive        1.2.1 Sqoop        1.4.6 MySQL         5.6.24 Azkaban      2.5.0 Java         1.8 Zookeeper      3.4.10 Presto        0.189 集羣資源規劃設計          服務器hadoop101 服務器hadoop102 服務器hadoop103 HDFS         NameNode      DataNode    DataNode     
            DataNode                SecondaryNameNode Yarn        NodeManager Resourcemanager   NodeManager
                        NodeManager Zookeeper      Zookeeper Zookeeper      Zookeeper Flume(採集日誌)    Flume      Flume Kafka         Kafka     Kafka         Kafka Flume(消費Kafka)                    Flume Hive          Hive MySQL         MySQL Presto         Presto

2. 數據生成模塊

埋點數據--想記錄的數據(web端、app端):
產品字段ap(產品字段能夠有多個app)
①公共字段 全部的事件都須要記錄的字段,公共的; <<-cm-->>AppBase 
  cm(公共字段基本全部安卓手機都包含的字段); cm公共字段;json對象
  et事件; et事件字段:json數組

②業務字段(埋點上報的字段,有具體的業務類型, 有(用戶)具體的行爲;)

日誌格式:
  時間戳|json字符串
  cm:公共字段
  et:事件(日誌)字段(用戶行爲--針對每個事件)

 

 

 事件日誌的設計:

 ①商品列表頁(loading)

action            動做:開始加載=1,加載成功=2,加載失敗=3
loading_time    加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間)
loading_way        加載類型:1-讀取緩存,2-從接口拉新數;(加載成功才上報加載類型)
extend1            擴展字段 Extend1
extend2            擴展字段 Extend2
type            加載類型:自動加載=1,用戶下拽加載=2,底部加載=3(底部條觸發點擊底部提示條/點擊返回頂部加載)
type1            加載失敗碼:把加載失敗狀態碼報回來(報空爲加載成功,沒有失敗)
View Code

②商品點擊(display)

action    動做:曝光商品=1,點擊商品=2,
goodsid    商品ID(服務端下發的ID)
place    順序(第幾條商品,第一條爲0,第二條爲1,如此類推)
extend1    曝光類型:1 - 首次曝光 2-重複曝光(沒有使用)
category    分類ID(服務端定義的分類ID)
View Code

③商品詳情頁(newsdetail)詳情頁從哪來

entry            頁面入口來源:應用首頁=1、push=2、詳情頁相關推薦=3
action            動做:開始加載=1,加載成功=2(pv),加載失敗=3, 退出頁面=4
goodsid            商品ID(服務端下發的ID)
show_style        商品樣式:0、無圖、1、一張大圖、2、兩張圖、3、三張小圖、4、一張小圖、5、一張大圖兩張小圖
news_staytime    頁面停留時長:從商品開始加載時開始計算,到用戶關閉頁面所用的時間。若中途用跳轉到其它頁面了,則暫停計時,待回到詳情頁時恢復計時。或中途劃出的時間超過10分鐘,則本次計時做廢,不上報本次數據。如未加載成功退出,則報空。
loading_time    加載時長:計算頁面開始加載到接口返回數據的時間 (開始加載報0,加載成功或加載失敗才上報時間)
type1            加載失敗碼:把加載失敗狀態碼報回來(報空爲加載成功,沒有失敗)
category        分類ID(服務端定義的分類ID)
View Code

④廣告(ad)

entry            入口:商品列表頁=1  應用首頁=2 商品詳情頁=3
action            動做:請求廣告=1 取緩存廣告=2  廣告位展現=3 廣告展現=4 廣告點擊=5 
content            狀態:成功=1  失敗=2  
detail            失敗碼(沒有則上報空)
source            廣告來源:admob=1 facebook=2  ADX(百度)=3 VK(俄羅斯)=4
behavior        用戶行爲:主動獲取廣告=1  ;被動獲取廣告=2
newstype        Type: 1- 圖文 2-圖集 3-段子 4-GIF 5-視頻 6-調查 7-純文 8-視頻+圖文  9-GIF+圖文  0-其餘
show_style        內容樣式:無圖(純文字)=6 一張大圖=1  三站小圖+文=4 一張小圖=2 一張大圖兩張小圖+文=3 圖集+文 = 5  一張大圖+文=11   GIF大圖+文=12  視頻(大圖)+文 = 13 來源於詳情頁相關推薦的商品,上報樣式都爲0(由於都是左文右圖)
View Code

⑤消息通知(notification)

action            動做:通知產生=1,通知彈出=2,通知點擊=3,常駐通知展現(不重複上報,一天以內只報一次)=4
type            通知id:預警通知=1,天氣預報(早=2,晚=3),常駐=4
ap_time            客戶端彈出時間
content            備用字段
View Code

⑥用戶前臺活躍(active_foreground)

push_id        推送的消息的id,若是不是從推送消息打開,傳空
access        1.push 2.icon 3.其餘
View Code

⑦用戶後臺活躍(active_background)

active_source    1=upgrade,2=download(下載),3=plugin_upgrade
View Code

⑧ 評論(comment)

序號    字段名稱    字段描述    字段類型    長度    容許空    缺省值
1        comment_id    評論表            int        10,0        
2            userid    用戶id            int        10,00
3      p_comment_id    父級評論id(爲0則是一級評論,不爲0則是回覆)    int    10,04        content       評論內容            string    10005        addtime      建立時間            string6        other_id   評論的相關id        int        10,07     praise_count    點贊數量        int        10,00
8     reply_count    回覆數量        int        10,00
View Code

⑨收藏(favorites)

序號    字段名稱    字段描述    字段類型    長度    容許空    缺省值
1        id                主鍵        int        10,0        
2        course_id        商品id        int        10,00
3        userid            用戶ID        int        10,00
4        add_time        建立時間    string
View Code

10 點贊(praise)

序號    字段名稱    字段描述    字段類型    長度    容許空    缺省值
1        id            主鍵id            int        10,0        
2        userid        用戶id            int        10,03        target_id    點讚的對象id    int        10,04        type      點贊類型     1問答點贊 2問答評論點贊 3 文章點贊數4 評論點贊    int    10,05        add_time    添加時間        string
View Code

11 錯誤日

errorBrief    錯誤摘要
errorDetail    錯誤詳情
View Code

12啓動日誌數據start  action=1能夠算成前臺活躍

entry                入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget =5
open_ad_type        開屏廣告類型:  開屏原生廣告=1, 開屏插屏廣告=2
action                狀態:成功=1  失敗=2
loading_time        加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間)
detail                失敗碼(沒有則上報空)
extend1                失敗的message(沒有則上報空)
View Code

sdk軟件開發工具
12個主題(1個appbase公共日誌)對應12張表(12張(12個bean對象,再加一個公共的即共13個bean對象)用戶行爲表), 1張啓動日誌表;  8張業務表; 數倉分4層; 20*4=80張表;
啓動日誌1張表-->離線和實時; 須要寫flume的攔截器
事件日誌kafka的事件event主題 11個; 分的越細越靈活,

啓動日誌-1類
事件日誌-11類

啓動日誌頁面:
1552739869506|{
"cm":
{"ln":"-62.5","sv":"V2.8.9","os":"8.2.7","g":"6N617W86@gmail.com","mid":"999","nw":"3G","l":"en","vc":"18","hw":"640*960","ar":"MX","uid":"999","t":"1552692232488","la":"-4.9","md":"HTC-8","vn":"1.0.3","ba":"HTC","sr":"I"},
"ap":"gmall",
"et":[{"ett":"1552655708510",
        "en":"display",
        "kv":{"goodsid":"245","action":"1","extend1":"2","place":"4","category":"20"}},
      {"ett":"1552683751477",
        "en":"ad",
        "kv":{"entry":"3","show_style":"3","action":"5","detail":"325","source":"2","behavior":"1","content":"1","newstype":"9"}},
      {"ett":"1552670223504",
        "en":"active_foreground",
        "kv":{"access":"","push_id":"3"}},
     {"ett":"1552735759451",
        "en":"active_background","kv":{"active_source":"2"}}]}

 

 

將生成的jar包log-collector-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷貝到hadoop101服務器上,
並同步到hadoop102的/opt/module路徑下 [kris@hadoop101 module]$ xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar 在hadoop102上執行jar程序 [kris@hadoop101 module]$ java -classpath log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.atguigu.appclient.AppMain >/opt/module/test.log 在/tmp/logs路徑下查看生成的日誌文件 [kris@hadoop101 module]$ cd /tmp/logs/ [kris@hadoop101 logs]$ ls

Linux環境變量配置:

(1)修改/etc/profile文件:全部用戶的Shell都有權使用這些環境變量。
(2)修改~/.bashrc文件:針對某一個特定的用戶,若是你須要給某個用戶權限使用這些環境變量,你只須要修改其我的用戶主目錄下的.bashrc文件就能夠了。
(3)配置登陸遠程服務器當即source一下環境變量

[kris@hadoop101 ~]$ cat /etc/profile >> .bashrc [kris@hadoop102 ~]$ cat /etc/profile >> .bashrc [kris@hadoop103 ~]$ cat /etc/profile >> .bashrc

日誌生成集羣啓動腳本

[kris@hadoop101 bin]$ vim lg.sh 
#!/bin/bash
        for i in hadoop101 hadoop102
        do
            ssh $i "java -classpath /opt/module/logcollector-1.0-SNAPSHOT-jar-with-dependencies.jar com.atguigu.appclient.
AppMain >/opt/module/test.log &"
        done

修改腳本執行權限
[kris@hadoop101 bin]$ chmod +x lg.sh
啓動腳本
[kris@hadoop101 module]$ lg.sh 
View Code

集羣時間同步修改腳本

在/home/kris/bin目錄下建立腳本dt.sh
[kris@hadoop101 bin]$ vim dt.sh
#!/bin/bash
log_date=$1
for i in hadoop101 hadoop102 hadoop103
do
        ssh $i "sudo date -s $log_date"
done        
修改腳本執行權限
[kris@hadoop101 bin]$ chmod 777 dt.sh
啓動腳本
[kris@hadoop101 bin]$ dt.sh 2019-2-10
    
View Code

集羣全部進程查看腳本; 在/home/kris/bin目錄下建立腳本xcall.sh

[kris@hadoop101 bin]$ vim xcall.sh
#!/bin/bash
for i in hadoop101 hadoop102 hadoop103
do
        echo ----------$i------------
        ssh $i "$*"
done

修改腳本執行權限
[kris@hadoop101 bin]$ chmod 777 xcall.sh
啓動腳本
[kris@hadoop101 bin]$ xcall.sh jps
    
View Code

3. 集羣的搭建

 

Hadoop安裝

            服務器hadoop101    服務器hadoop102    服務器hadoop103
HDFS           NameNode      DataNode          DataNode        
            DataNode                SecondaryNameNode Yarn        NodeManager Resourcemanager   NodeManager
                       NodeManager

http://www.javashuo.com/article/p-aktfcjha-cc.html

添加LZO支持包

輸入端採用壓縮DEFLATE(deflate)壓縮
mapper輸出以後採用LZO或snappy
reducer輸出以後gzip或bzip2

1)下載後的文件名是hadoop-lzo-master,它是一個zip格式的壓縮包,先進行解壓,而後用maven編譯。生成hadoop-lzo-0.4.202)將編譯好後的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/
[kris@hadoop101 software]$ mv hadoop-lzo-0.4.20.jar /opt/module/hadoop-2.7.2/share/hadoop/common/ [kris@hadoop101 common]$ ls hadoop-lzo-0.4.20.jar 3)同步hadoop-lzo-0.4.20.jar到hadoop10三、hadoop104 [kris@hadoop101 common]$ xsync hadoop-lzo-0.4.20.jar

2 添加配置
1)core-site.xml增長配置支持LZO壓縮

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

<property>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec,
com.hadoop.compression.lzo.LzoCodec,
com.hadoop.compression.lzo.LzopCodec
</value>
</property>

<property>
    <name>io.compression.codec.lzo.class</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

</configuration>
View Code

2)同步core-site.xml到hadoop10二、hadoop103
[kris@hadoop101 hadoop]$ xsync core-site.xml

兩種壓縮方式配置一種便可

配置Hadoop支持Snappy壓縮

1)將編譯後支持Snappy壓縮的Hadoop jar包解壓縮,並將lib/native目錄中全部文件上傳到hadoop102的/opt/module/hadoop-2.7.2/lib/native目錄。

2)從新啓動Hadoop。

3)檢查支持的壓縮方式

[kris@hadoop101 native]$ hadoop checknative
hadoop:  true /opt/module/hadoop-2.7.2/lib/native/libhadoop.so
zlib:    true /lib64/libz.so.1
snappy:  true /opt/module/hadoop-2.7.2/lib/native/libsnappy.so.1
lz4:     true revision:99
bzip2:   false

 

Zookeeper安裝

              服務器hadoop101    服務器hadoop102    服務器hadoop103
Zookeeper            Zookeeper        Zookeeper         Zookeeper

詳細安裝見:

http://www.javashuo.com/article/p-rqvxhabj-da.html

zookeeper集羣啓動腳本;

chmod 777 zk.sh

[kris@hadoop101 bin]$ vim zk.sh 
#!/bin/bash
case $1 in
"start"){
        for i in hadoop101 hadoop102 hadoop103
        do
        ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start"
        done
};;
"stop"){
        for i in hadoop101 hadoop102 hadoop103
        do
        ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop"
        done
};;
esac
View Code

Flume安裝

https://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html  可以使用ctrl+F搜索

            服務器hadoop101    服務器hadoop102   服務器hadoop103
Flume(採集日誌)    Flume          Flume    

詳細安裝見:

http://www.javashuo.com/article/p-cqzhvvcj-dm.html

TailDirSource是Flume 1.7提供的Source組件,在1.6中並無。

Flume直接讀log日誌的數據,log日誌的格式是app-yyyy-mm-dd.log。

Flume的具體配置以下:

       (1)在/opt/module/flume/conf目錄下建立file-flume-kafka.conf文件

[kris@hadoop101 conf]$ vim file-flume-kafka.conf

a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2

# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2

#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder

# selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = logType
a1.sources.r1.selector.mapping.start = c1
a1.sources.r1.selector.mapping.event = c2

# configure channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.byteCapacityBufferPercentage=20

a1.channels.c2.type = memory
a1.channels.c2.capacity=10000
a1.channels.c2.byteCapacityBufferPercentage=20

# configure sink
# start-sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic_start
a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.channel = c1

# event-sink
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topic_event
a1.sinks.k2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k2.kafka.flumeBatchSize = 2000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.channel = c2
View Code

Flume攔截器

本項目中自定義了兩個攔截器,分別是:ETL攔截器、日誌類型區分攔截器。

ETL攔截器主要用於,過濾時間戳不合法和json數據不完整的日誌

日誌類型區分攔截器主要用於,將啓動日誌和事件日誌區分開來,方便發往Kafka的不一樣topic。

攔截器打包以後,只須要單獨包,不須要將依賴的包上傳。依賴包在flume的lib目錄下面已經存在了。打包以後要放入flume的lib文件夾下面。

須要先將打好的包放入到hadoop101的/opt/module/flume/lib文件夾下面。

[kris@hadoop101 lib]$ ls | grep interceptor

  flume-interceptor-1.0-SNAPSHOT.jar

分發flume到hadoop10二、hadoop103

[kris@hadoop101 module]$ xsync flume/ [kris@hadoop101 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/file-flume-kafka.conf &

 

日誌採集Flume啓動中止腳本

roundValue:30s數據滾動一次;開發中通常1/h滾動一次 ;  logFile日誌保存30天;

 在/home/kris/bin目錄下建立腳本f1.sh;並添加執行權限;chmod +x f1.sh

[kris@hadoop101 bin]$ vim f1.sh 
#!/bin/bash
case $1 in
"start"){
        for i in hadoop101 hadoop102
        do
           echo "------------啓動 $i 採集flume數據-----------" 
        ssh $i "nohup /opt/module/flume/bin/flume-ng agent -f /opt/module/flume/conf/file-flume-kafka.conf -n a1 -Dflume.r
oot.logger=INFO,LOGFILE >/dev/null 2>&1 &"
        done
};;
"stop"){
        for i in hadoop101 hadoop102
        do
           echo "------------中止 $i 採集flume數據------------"
        ssh $i "ps -ef | grep file-flume-kafka | grep -v grep | awk '{print \$2}' | xargs kill"
        done
};;
esac
View Code

nohup,該命令能夠在你退出賬戶/關閉終端以後繼續運行相應的進程。nohup就是不掛起的意思,不掛斷地運行命令。

/dev/null表明linux的空設備文件,全部往這個文件裏面寫入的內容都會丟失,俗稱「黑洞」。

Flume(hadoop103)消費Kafka數據寫到HDFS

1)在hadoop103的/opt/module/flume/conf目錄下建立kafka-flume-hdfs.conf文件

[kris@hadoop103 conf]$ vim kafka-flume-hdfs.conf  ;配置了不產生大量小文件!

## 組件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics=topic_start

## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r2.kafka.topics=topic_event

## channel1
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000

## channel2
a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=10000

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second

##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 30
a1.sinks.k2.hdfs.roundUnit = second

## 不要產生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0

## 控制輸出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.fileType = CompressedStream 

a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop

## 拼裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
View Code

日誌消費Flume啓動中止腳本

2)在/home/kris/bin目錄下建立腳本f2.sh;並chmod +x f2.sh

[kris@hadoop101 bin]$ vim f2.sh

#! /bin/bash

case $1 in
"start"){
        for i in hadoop103
        do
                echo "------------啓動 $i 消費flume------------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt   2>&1 &"
        done
};;
"stop"){
        for i in hadoop103
        do
                echo "------------中止 $i 消費flume------------"
                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill -9"
        done

};;
esac
View Code

 

最快消費(最大吞吐量),消費> 生產;kafka可對接ES等

 Kafka安裝

詳細安裝見:

http://www.javashuo.com/article/p-efqkajqh-en.html

kafka啓動關閉腳本:

#!/bin/bash
case $1 in
"start"){
        for i in hadoop101 hadoop102 hadoop103
        do
           echo "------------啓動 $i kafka----------------"
        ssh $i "export JMX_PORT=9988 && /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/serve
r.properties"
        done
};;
"stop"){
        for i in hadoop101 hadoop102 hadoop103
        do
           echo "------------中止 $i kafka----------------"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
        done
};;
esac
~                 
View Code

注意:啓動Kafka時要先開啓JMX端口,是用於後續KafkaManager監控。

Kafka Manager安裝

詳細見:

http://www.javashuo.com/article/p-qhmabbfl-et.html

啓動KafkaManager

[kris@hadoop101 kafka-manager-1.3.3.22]$ 
nohup bin/kafka-manager   -Dhttp.port=7456 >/opt/module/kafka-manager-1.3.3.22/start.log 2>&1 &

在瀏覽器中打開 http://hadoop101:7456

至此,就能夠查看整個Kafka集羣的狀態,包括:Topic的狀態、Brokers的狀態、Cosumer的狀態。

在kafka的/opt/module/kafka-manager-1.3.3.22/application.home_IS_UNDEFINED 目錄下面,能夠看到kafka-manager的日誌。

Kafka Manager啓動中止腳本

1)在/home/kris/bin目錄下建立腳本km.sh; chmod +x km.sh

[kris@hadoop101 bin]$ vim km.sh

#!/bin/bash
case $1 in
"start"){
        echo "---------啓動KafkaManager---------"
        nohup /opt/module/kafka-manager/bin/kafka-manager -Dhttp.port=7456 >/opt/module/kafka-manager/start.log 2>&1 &
};;
"stop"){
        echo "---------中止KafkaManager---------"
        ps -ef | grep ProdServerStart | grep -v grep | awk '{print $2}' | xargs kill
};;
esac
View Code

 

查看全部Kafka Topic
[kris@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list
bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic_start  ##刪除主題
生產消息
[kris@hadoop101 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop101:9092 --topic topic_start
>hello world
>kris  kris

消費消息;能夠檢測下
[kris@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop101:2181 --from-beginning --topic topic_start

1)Kafka壓測

用Kafka官方自帶的腳本,對Kafka進行壓測。Kafka壓測時,能夠查看到哪一個地方出現了瓶頸(CPU,內存,網絡IO)。通常都是網絡IO達到瓶頸。

kafka-consumer-perf-test.sh

kafka-producer-perf-test.sh

2)Kafka Producer壓力測試

(1)在/opt/module/kafka/bin目錄下面有這兩個文件。咱們來測試一下

[kris@hadoop101 kafka]$ bin/kafka-producer-perf-test.sh  --topic test --record-size 100 --num-recor 100000 --throughput 1000 --producer-props bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092
5000 records sent, 1000.0 records/sec (0.10 MB/sec), 2.6 ms avg latency, 183.0 max latency.
5012 records sent, 1002.4 records/sec (0.10 MB/sec), 1.0 ms avg latency, 36.0 max latency.
5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.6 ms avg latency, 8.0 max latency.
5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.4 ms avg latency, 22.0 max latency.
5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.6 ms avg latency, 45.0 max latency.
5002 records sent, 1000.2 records/sec (0.10 MB/sec), 0.3 ms avg latency, 3.0 max latency.
5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.8 ms avg latency, 27.0 max latency.
5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.5 ms avg latency, 54.0 max latency.
5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.7 ms avg latency, 60.0 max latency.
5003 records sent, 1000.4 records/sec (0.10 MB/sec), 0.4 ms avg latency, 29.0 max latency.
5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.7 ms avg latency, 50.0 max latency.
5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.9 ms avg latency, 82.0 max latency.
5003 records sent, 1000.2 records/sec (0.10 MB/sec), 0.4 ms avg latency, 32.0 max latency.
5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.8 ms avg latency, 67.0 max latency.
5002 records sent, 1000.2 records/sec (0.10 MB/sec), 0.9 ms avg latency, 80.0 max latency.
5002 records sent, 1000.0 records/sec (0.10 MB/sec), 0.4 ms avg latency, 18.0 max latency.
5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.9 ms avg latency, 75.0 max latency.
5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.5 ms avg latency, 23.0 max latency.
5003 records sent, 1000.2 records/sec (0.10 MB/sec), 0.5 ms avg latency, 26.0 max latency.
100000 records sent, 999.950002 records/sec (0.10 MB/sec), 0.72 ms avg latency, 183.00 ms max latency, 0 ms 50th, 1 ms 95th, 3 ms 99th, 44 ms 99.9th

 測試生成了多少數據,消費了多少數據;每條信息大小,總共發送的條數;每秒多少條數據;

說明:record-size是一條信息有多大,單位是字節。num-records是總共發送多少條信息。throughput 是每秒多少條信息。

參數解析:本例中一共寫入10w條消息,每秒向Kafka寫入了0.10MB的數據,平均是1000條消息/秒,每次寫入的平均延遲爲0.72毫秒,最大的延遲爲183毫秒。

Kafka Consumer壓力測試

Consumer的測試,若是這四個指標(IO,CPU,內存,網絡)都不能改變,考慮增長分區數來提高性能。

[kris@hadoop103 kafka]$ bin/kafka-consumer-perf-test.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic_event --fetch-size 10000 --messages 10000000 --threads 1     
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2019-03-15 00:04:21:474, 2019-03-15 00:04:21:740, 1.1851, 4.4551, 1492, 5609.0226

參數說明:

--zookeeper 指定zookeeper的連接信息

--topic 指定topic的名稱

--fetch-size 指定每次fetch的數據的大小

--messages 總共要消費的消息個數

測試結果說明:

開始測試時間,結束測試時間;最大吞吐率1.1851MB/S;最近每秒消費4.4551MB/S;最大每秒消費1492條;平均每秒消費5609.0226條;

Kafka機器數量計算

Kafka機器數量(經驗公式)=2*(峯值生產速度*副本數/100)+1

先要預估一天大概產生多少數據,而後用Kafka自帶的生產壓測(只測試Kafka的寫入速度,保證數據不積壓),計算出峯值生產速度。再根據設定的副本數,就能預估出須要部署Kafka的數量。

好比咱們採用壓力測試測出寫入的速度是10M/s一臺,峯值的業務數據的速度是50M/s。副本數爲2。

Kafka機器數量=2*(50*2/100)+1=3臺

採集通道啓動/中止腳本

1)在/home/kris/bin目錄下建立腳本cluster.sh

[kris@hadoop101 bin]$ vim cluster.sh

#!/bin/bash
case $1 in
"start"){
        echo "-----------啓動集羣----------"
        /opt/module/hadoop-2.7.2/sbin/start-dfs.sh 
        ssh hadoop102 /opt/module/hadoop-2.7.2/sbin/start-yarn.sh
        zk.sh start
        f1.sh start
        kf.sh start
        sleep 4s;
        f2.sh start
        km.sh start
};;
"stop"){
        echo "------------中止集羣----------------"
        km.sh stop
        f2.sh stop
        kf.sh stop
        sleep 7s;
        f1.sh stop
        sleep 3s;
        zk.sh stop
        ssh hadoop102 "/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh"
        /opt/module/hadoop-2.7.2/sbin/stop-dfs.sh
};;
esac
View Code

改時間從新啓動集羣,由於flume和kafka會去通訊看時間,時間誤差大就會掛掉

 Hive&Mysql的安裝

詳細安裝:只在1臺節點hadoop101上安裝便可

http://www.javashuo.com/article/p-pllflswg-ez.html

Hive運行引擎Tez的安裝配置

 

1)下載tez的依賴包:http://tez.apache.org
2)拷貝apache-tez-0.9.1-bin.tar.gz到hadoop102的/opt/module目錄
[kris@hadoop101 module]$ ls
apache-tez-0.9.1-bin.tar.gz
3)解壓縮apache-tez-0.9.1-bin.tar.gz
[kris@hadoop101 module]$ tar -zxvf apache-tez-0.9.1-bin.tar.gz
4)修更名稱
[kris@hadoop101 module]$ mv apache-tez-0.9.1-bin/ tez-0.9.1

在Hive中配置Tez 

1)進入到Hive的配置目錄:/opt/module/hive/conf
[kris@hadoop101 conf]$ pwd
/opt/module/hive/conf
2)在hive-env.sh文件中添加tez環境變量配置和依賴包環境變量配置
[kris@hadoop101 conf]$ vim hive-env.sh
添加以下配置
# Set HADOOP_HOME to point to a specific hadoop install directory
export HADOOP_HOME=/opt/module/hadoop-2.7.2

# Hive Configuration Directory can be controlled by:
export HIVE_CONF_DIR=/opt/module/hive/conf

# Folder containing extra libraries required for hive compilation/execution can be controlled by:
export TEZ_HOME=/opt/module/tez-0.9.1    #是你的tez的解壓目錄
export TEZ_JARS=""
for jar in `ls $TEZ_HOME |grep jar`; do
    export TEZ_JARS=$TEZ_JARS:$TEZ_HOME/$jar
done
for jar in `ls $TEZ_HOME/lib`; do
    export TEZ_JARS=$TEZ_JARS:$TEZ_HOME/lib/$jar
done

export HIVE_AUX_JARS_PATH=/opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar$TEZ_JARS
View Code

3)在hive-site.xml文件中添加以下配置,更改hive計算引擎

<property>
  <name>hive.execution.engine</name>
  <value>tez</value>
</property>

配置Tez

1)在Hive 的/opt/module/hive/conf下面建立一個tez-site.xml文件
[kris@hadoop101 conf]$ pwd
/opt/module/hive/conf
[kris@hadoop101 conf]$ vim tez-site.xml
添加以下內容
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
    <name>tez.lib.uris</name>    <value>${fs.defaultFS}/tez/tez-0.9.1,${fs.defaultFS}/tez/tez-0.9.1/lib</value>
</property>
<property>
    <name>tez.lib.uris.classpath</name>        <value>${fs.defaultFS}/tez/tez-0.9.1,${fs.defaultFS}/tez/tez-0.9.1/lib</value>
</property>
<property>
     <name>tez.use.cluster.hadoop-libs</name>
     <value>true</value>
</property>
<property>
     <name>tez.history.logging.service.class</name>        <value>org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService</value>
</property>
</configuration>
View Code

上傳Tez到集羣

1)將/opt/module/tez-0.9.1上傳到HDFS的/tez路徑
[kris@hadoop101 conf]$ hadoop fs -mkdir /tez
[kris@hadoop101 conf]$ hadoop fs -put /opt/module/tez-0.9.1/ /tez
[kris@hadoop101 conf]$ hadoop fs -ls /tez
/tez/tez-0.9.1

測試

1)啓動Hive
[kris@hadoop101 hive]$ bin/hive
2)建立LZO表
hive (default)> create table student(
id int,
name string);
3)向表中插入數據
hive (default)> insert into student values(1,"zhangsan");
4)若是沒有報錯就表示成功了
hive (default)> select * from student;
1       zhangsan

小結
1)運行Tez時檢查到用過多內存而被NodeManager殺死進程問題:

這種問題是從機上運行的Container試圖使用過多的內存,而被NodeManager kill掉了。

解決方法:

方案一:或者是關掉虛擬內存檢查。咱們選這個,修改yarn-site.xml;修改完以後要分發

<property>
     <name>yarn.nodemanager.vmem-check-enabled</name>
     <value>false</value>
</property>

 

方案二:mapred-site.xml中設置Map和Reduce任務的內存配置以下:(value中實際配置的內存須要根據本身機器內存大小及應用狀況進行修改)

<property>
  <name>mapreduce.map.memory.mb</name>
  <value>1536</value>
</property>
<property>
  <name>mapreduce.map.java.opts</name>
  <value>-Xmx1024M</value>
</property>
<property>
  <name>mapreduce.reduce.memory.mb</name>
  <value>3072</value>
</property>
<property>
  <name>mapreduce.reduce.java.opts</name>
  <value>-Xmx2560M</value>
</property>
View Code
相關文章
相關標籤/搜索