宏觀上; 模塊調用關係圖
複雜問題簡單化; 清晰數據結構; 1天數據1層html
數據來源:
爬蟲 日誌採集系統 業務數據 財務系統java
數據去向:
報表系統、用戶畫像、推薦系統、機器學習、風控系統
node
① 數據採集平臺搭建
② 實現用戶行爲數據倉庫的分層搭建
③ 實現業務數據倉庫的分層搭建
④ 針對數據倉庫中的數據進行,留存、轉化率、GMV(天天交易額)、復購率、活躍等報表行爲;mysql
技術選型linux
採集:
方式一: log日誌--->flume--->kafka(API)--->hdfs; 方式二: Logstash(讀取日誌)-->ELK(存儲查詢)全文檢索引擎-sqoop
DataX導數據; mysql->sqoopweb
存儲:mysql(存儲業務--分析結果) ;ES(存、查都很快)<---->HBase(存快,分析慢); S3sql
計算:Tez(分析hive中指標)&hive; Flink--Sparkapache
查詢:Presto,Impala,Kylinjson
系統架構圖:bootstrap
日誌文件| mysql數據表--->分別由flume| sqoop處理--> 分別交給-->kafka| HDFS
由Yarn統一調度
Hive| Presto負責數據查詢;
Azkaban任務調度器
最後可視化展現;
系統數據流程:
Web/App埋點行爲數據--->log日誌服務器(友盟-第三方日誌服務器)--->logFile格式->Flume生產-->kafka(kafka(至關於路由池)能夠接實時數據、es等)--flume消費-->HDFS
業務交互-->mysql(業務服務器-->Nginx實現負載均衡)->sqoop-->>hdfs--->hive數倉-->把結果存儲到mysql
框架版本選型 產品 版本 Hadoop 2.7.2 Flume 1.7.0 Kafka 0.11.0.2 Kafka Manager 1.3.3.22 Hive 1.2.1 Sqoop 1.4.6 MySQL 5.6.24 Azkaban 2.5.0 Java 1.8 Zookeeper 3.4.10 Presto 0.189 集羣資源規劃設計 服務器hadoop101 服務器hadoop102 服務器hadoop103 HDFS NameNode DataNode DataNode
DataNode SecondaryNameNode Yarn NodeManager Resourcemanager NodeManager
NodeManager Zookeeper Zookeeper Zookeeper Zookeeper Flume(採集日誌) Flume Flume Kafka Kafka Kafka Kafka Flume(消費Kafka) Flume Hive Hive MySQL MySQL Presto Presto
埋點數據--想記錄的數據(web端、app端):
產品字段ap(產品字段能夠有多個app)
①公共字段 全部的事件都須要記錄的字段,公共的; <<-cm-->>AppBase
cm(公共字段基本全部安卓手機都包含的字段); cm公共字段;json對象
et事件; et事件字段:json數組
②業務字段(埋點上報的字段,有具體的業務類型, 有(用戶)具體的行爲;)
日誌格式:
時間戳|json字符串
cm:公共字段
et:事件(日誌)字段(用戶行爲--針對每個事件)
事件日誌的設計:
①商品列表頁(loading)
action 動做:開始加載=1,加載成功=2,加載失敗=3 loading_time 加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間) loading_way 加載類型:1-讀取緩存,2-從接口拉新數;(加載成功才上報加載類型) extend1 擴展字段 Extend1 extend2 擴展字段 Extend2 type 加載類型:自動加載=1,用戶下拽加載=2,底部加載=3(底部條觸發點擊底部提示條/點擊返回頂部加載) type1 加載失敗碼:把加載失敗狀態碼報回來(報空爲加載成功,沒有失敗)
②商品點擊(display)
action 動做:曝光商品=1,點擊商品=2, goodsid 商品ID(服務端下發的ID) place 順序(第幾條商品,第一條爲0,第二條爲1,如此類推) extend1 曝光類型:1 - 首次曝光 2-重複曝光(沒有使用) category 分類ID(服務端定義的分類ID)
③商品詳情頁(newsdetail)詳情頁從哪來
entry 頁面入口來源:應用首頁=1、push=2、詳情頁相關推薦=3 action 動做:開始加載=1,加載成功=2(pv),加載失敗=3, 退出頁面=4 goodsid 商品ID(服務端下發的ID) show_style 商品樣式:0、無圖、1、一張大圖、2、兩張圖、3、三張小圖、4、一張小圖、5、一張大圖兩張小圖 news_staytime 頁面停留時長:從商品開始加載時開始計算,到用戶關閉頁面所用的時間。若中途用跳轉到其它頁面了,則暫停計時,待回到詳情頁時恢復計時。或中途劃出的時間超過10分鐘,則本次計時做廢,不上報本次數據。如未加載成功退出,則報空。 loading_time 加載時長:計算頁面開始加載到接口返回數據的時間 (開始加載報0,加載成功或加載失敗才上報時間) type1 加載失敗碼:把加載失敗狀態碼報回來(報空爲加載成功,沒有失敗) category 分類ID(服務端定義的分類ID)
④廣告(ad)
entry 入口:商品列表頁=1 應用首頁=2 商品詳情頁=3 action 動做:請求廣告=1 取緩存廣告=2 廣告位展現=3 廣告展現=4 廣告點擊=5 content 狀態:成功=1 失敗=2 detail 失敗碼(沒有則上報空) source 廣告來源:admob=1 facebook=2 ADX(百度)=3 VK(俄羅斯)=4 behavior 用戶行爲:主動獲取廣告=1 ;被動獲取廣告=2 newstype Type: 1- 圖文 2-圖集 3-段子 4-GIF 5-視頻 6-調查 7-純文 8-視頻+圖文 9-GIF+圖文 0-其餘 show_style 內容樣式:無圖(純文字)=6 一張大圖=1 三站小圖+文=4 一張小圖=2 一張大圖兩張小圖+文=3 圖集+文 = 5 一張大圖+文=11 GIF大圖+文=12 視頻(大圖)+文 = 13 來源於詳情頁相關推薦的商品,上報樣式都爲0(由於都是左文右圖)
⑤消息通知(notification)
action 動做:通知產生=1,通知彈出=2,通知點擊=3,常駐通知展現(不重複上報,一天以內只報一次)=4 type 通知id:預警通知=1,天氣預報(早=2,晚=3),常駐=4 ap_time 客戶端彈出時間 content 備用字段
⑥用戶前臺活躍(active_foreground)
push_id 推送的消息的id,若是不是從推送消息打開,傳空 access 1.push 2.icon 3.其餘
⑦用戶後臺活躍(active_background)
active_source 1=upgrade,2=download(下載),3=plugin_upgrade
⑧ 評論(comment)
序號 字段名稱 字段描述 字段類型 長度 容許空 缺省值 1 comment_id 評論表 int 10,0 2 userid 用戶id int 10,0 √ 0 3 p_comment_id 父級評論id(爲0則是一級評論,不爲0則是回覆) int 10,0 √ 4 content 評論內容 string 1000 √ 5 addtime 建立時間 string √ 6 other_id 評論的相關id int 10,0 √ 7 praise_count 點贊數量 int 10,0 √ 0 8 reply_count 回覆數量 int 10,0 √ 0
⑨收藏(favorites)
序號 字段名稱 字段描述 字段類型 長度 容許空 缺省值 1 id 主鍵 int 10,0 2 course_id 商品id int 10,0 √ 0 3 userid 用戶ID int 10,0 √ 0 4 add_time 建立時間 string √
10 點贊(praise)
序號 字段名稱 字段描述 字段類型 長度 容許空 缺省值 1 id 主鍵id int 10,0 2 userid 用戶id int 10,0 √ 3 target_id 點讚的對象id int 10,0 √ 4 type 點贊類型 1問答點贊 2問答評論點贊 3 文章點贊數4 評論點贊 int 10,0 √ 5 add_time 添加時間 string √
11 錯誤日
errorBrief 錯誤摘要
errorDetail 錯誤詳情
12啓動日誌數據start action=1能夠算成前臺活躍
entry 入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget =5 open_ad_type 開屏廣告類型: 開屏原生廣告=1, 開屏插屏廣告=2 action 狀態:成功=1 失敗=2 loading_time 加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間) detail 失敗碼(沒有則上報空) extend1 失敗的message(沒有則上報空)
sdk軟件開發工具
12個主題(1個appbase公共日誌)對應12張表(12張(12個bean對象,再加一個公共的即共13個bean對象)用戶行爲表), 1張啓動日誌表; 8張業務表; 數倉分4層; 20*4=80張表;
啓動日誌1張表-->離線和實時; 須要寫flume的攔截器
事件日誌kafka的事件event主題 11個; 分的越細越靈活,
啓動日誌-1類
事件日誌-11類
啓動日誌頁面: 1552739869506|{ "cm": {"ln":"-62.5","sv":"V2.8.9","os":"8.2.7","g":"6N617W86@gmail.com","mid":"999","nw":"3G","l":"en","vc":"18","hw":"640*960","ar":"MX","uid":"999","t":"1552692232488","la":"-4.9","md":"HTC-8","vn":"1.0.3","ba":"HTC","sr":"I"}, "ap":"gmall", "et":[{"ett":"1552655708510", "en":"display", "kv":{"goodsid":"245","action":"1","extend1":"2","place":"4","category":"20"}}, {"ett":"1552683751477", "en":"ad", "kv":{"entry":"3","show_style":"3","action":"5","detail":"325","source":"2","behavior":"1","content":"1","newstype":"9"}}, {"ett":"1552670223504", "en":"active_foreground", "kv":{"access":"","push_id":"3"}}, {"ett":"1552735759451", "en":"active_background","kv":{"active_source":"2"}}]}
將生成的jar包log-collector-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷貝到hadoop101服務器上,
並同步到hadoop102的/opt/module路徑下 [kris@hadoop101 module]$ xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar 在hadoop102上執行jar程序 [kris@hadoop101 module]$ java -classpath log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.atguigu.appclient.AppMain >/opt/module/test.log 在/tmp/logs路徑下查看生成的日誌文件 [kris@hadoop101 module]$ cd /tmp/logs/ [kris@hadoop101 logs]$ ls
Linux環境變量配置:
(1)修改/etc/profile文件:全部用戶的Shell都有權使用這些環境變量。
(2)修改~/.bashrc文件:針對某一個特定的用戶,若是你須要給某個用戶權限使用這些環境變量,你只須要修改其我的用戶主目錄下的.bashrc文件就能夠了。
(3)配置登陸遠程服務器當即source一下環境變量
[kris@hadoop101 ~]$ cat /etc/profile >> .bashrc [kris@hadoop102 ~]$ cat /etc/profile >> .bashrc [kris@hadoop103 ~]$ cat /etc/profile >> .bashrc
日誌生成集羣啓動腳本
[kris@hadoop101 bin]$ vim lg.sh #!/bin/bash for i in hadoop101 hadoop102 do ssh $i "java -classpath /opt/module/logcollector-1.0-SNAPSHOT-jar-with-dependencies.jar com.atguigu.appclient. AppMain >/opt/module/test.log &" done 修改腳本執行權限 [kris@hadoop101 bin]$ chmod +x lg.sh 啓動腳本 [kris@hadoop101 module]$ lg.sh
集羣時間同步修改腳本
在/home/kris/bin目錄下建立腳本dt.sh [kris@hadoop101 bin]$ vim dt.sh #!/bin/bash log_date=$1 for i in hadoop101 hadoop102 hadoop103 do ssh $i "sudo date -s $log_date" done 修改腳本執行權限 [kris@hadoop101 bin]$ chmod 777 dt.sh 啓動腳本 [kris@hadoop101 bin]$ dt.sh 2019-2-10
集羣全部進程查看腳本; 在/home/kris/bin目錄下建立腳本xcall.sh
[kris@hadoop101 bin]$ vim xcall.sh #!/bin/bash for i in hadoop101 hadoop102 hadoop103 do echo ----------$i------------ ssh $i "$*" done 修改腳本執行權限 [kris@hadoop101 bin]$ chmod 777 xcall.sh 啓動腳本 [kris@hadoop101 bin]$ xcall.sh jps
服務器hadoop101 服務器hadoop102 服務器hadoop103 HDFS NameNode DataNode DataNode
DataNode SecondaryNameNode Yarn NodeManager Resourcemanager NodeManager
NodeManager
http://www.javashuo.com/article/p-aktfcjha-cc.html
輸入端採用壓縮DEFLATE(deflate)壓縮
mapper輸出以後採用LZO或snappy
reducer輸出以後gzip或bzip2
1)下載後的文件名是hadoop-lzo-master,它是一個zip格式的壓縮包,先進行解壓,而後用maven編譯。生成hadoop-lzo-0.4.20。 2)將編譯好後的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/
[kris@hadoop101 software]$ mv hadoop-lzo-0.4.20.jar /opt/module/hadoop-2.7.2/share/hadoop/common/ [kris@hadoop101 common]$ ls hadoop-lzo-0.4.20.jar 3)同步hadoop-lzo-0.4.20.jar到hadoop10三、hadoop104 [kris@hadoop101 common]$ xsync hadoop-lzo-0.4.20.jar
2 添加配置
1)core-site.xml增長配置支持LZO壓縮
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>io.compression.codecs</name> <value> org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.SnappyCodec, com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec </value> </property> <property> <name>io.compression.codec.lzo.class</name> <value>com.hadoop.compression.lzo.LzoCodec</value> </property> </configuration>
2)同步core-site.xml到hadoop10二、hadoop103
[kris@hadoop101 hadoop]$ xsync core-site.xml
兩種壓縮方式配置一種便可
1)將編譯後支持Snappy壓縮的Hadoop jar包解壓縮,並將lib/native目錄中全部文件上傳到hadoop102的/opt/module/hadoop-2.7.2/lib/native目錄。
2)從新啓動Hadoop。
3)檢查支持的壓縮方式
[kris@hadoop101 native]$ hadoop checknative hadoop: true /opt/module/hadoop-2.7.2/lib/native/libhadoop.so zlib: true /lib64/libz.so.1 snappy: true /opt/module/hadoop-2.7.2/lib/native/libsnappy.so.1 lz4: true revision:99 bzip2: false
服務器hadoop101 服務器hadoop102 服務器hadoop103
Zookeeper Zookeeper Zookeeper Zookeeper
詳細安裝見:
http://www.javashuo.com/article/p-rqvxhabj-da.html
zookeeper集羣啓動腳本;
chmod 777 zk.sh
[kris@hadoop101 bin]$ vim zk.sh #!/bin/bash case $1 in "start"){ for i in hadoop101 hadoop102 hadoop103 do ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start" done };; "stop"){ for i in hadoop101 hadoop102 hadoop103 do ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop" done };; esac
https://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html 可以使用ctrl+F搜索
服務器hadoop101 服務器hadoop102 服務器hadoop103
Flume(採集日誌) Flume Flume
詳細安裝見:
http://www.javashuo.com/article/p-cqzhvvcj-dm.html
TailDirSource是Flume 1.7提供的Source組件,在1.6中並無。
Flume直接讀log日誌的數據,log日誌的格式是app-yyyy-mm-dd.log。
Flume的具體配置以下:
(1)在/opt/module/flume/conf目錄下建立file-flume-kafka.conf文件
[kris@hadoop101 conf]$ vim file-flume-kafka.conf
a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2
# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder
# selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = logType
a1.sources.r1.selector.mapping.start = c1
a1.sources.r1.selector.mapping.event = c2
# configure channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c2.type = memory
a1.channels.c2.capacity=10000
a1.channels.c2.byteCapacityBufferPercentage=20
# configure sink
# start-sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic_start
a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.channel = c1
# event-sink
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topic_event
a1.sinks.k2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k2.kafka.flumeBatchSize = 2000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.channel = c2
本項目中自定義了兩個攔截器,分別是:ETL攔截器、日誌類型區分攔截器。
ETL攔截器主要用於,過濾時間戳不合法和json數據不完整的日誌
日誌類型區分攔截器主要用於,將啓動日誌和事件日誌區分開來,方便發往Kafka的不一樣topic。
攔截器打包以後,只須要單獨包,不須要將依賴的包上傳。依賴包在flume的lib目錄下面已經存在了。打包以後要放入flume的lib文件夾下面。
須要先將打好的包放入到hadoop101的/opt/module/flume/lib文件夾下面。
[kris@hadoop101 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT.jar
分發flume到hadoop10二、hadoop103
[kris@hadoop101 module]$ xsync flume/ [kris@hadoop101 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/file-flume-kafka.conf &
日誌採集Flume啓動中止腳本
roundValue:30s數據滾動一次;開發中通常1/h滾動一次 ; logFile日誌保存30天;
在/home/kris/bin目錄下建立腳本f1.sh;並添加執行權限;chmod +x f1.sh
[kris@hadoop101 bin]$ vim f1.sh #!/bin/bash case $1 in "start"){ for i in hadoop101 hadoop102 do echo "------------啓動 $i 採集flume數據-----------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent -f /opt/module/flume/conf/file-flume-kafka.conf -n a1 -Dflume.r oot.logger=INFO,LOGFILE >/dev/null 2>&1 &" done };; "stop"){ for i in hadoop101 hadoop102 do echo "------------中止 $i 採集flume數據------------" ssh $i "ps -ef | grep file-flume-kafka | grep -v grep | awk '{print \$2}' | xargs kill" done };; esac
nohup,該命令能夠在你退出賬戶/關閉終端以後繼續運行相應的進程。nohup就是不掛起的意思,不掛斷地運行命令。
/dev/null表明linux的空設備文件,全部往這個文件裏面寫入的內容都會丟失,俗稱「黑洞」。
1)在hadoop103的/opt/module/flume/conf目錄下建立kafka-flume-hdfs.conf文件
[kris@hadoop103 conf]$ vim kafka-flume-hdfs.conf ;配置了不產生大量小文件!
## 組件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics=topic_start
## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r2.kafka.topics=topic_event
## channel1
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000
## channel2
a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=10000
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 30
a1.sinks.k2.hdfs.roundUnit = second
## 不要產生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制輸出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
## 拼裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
日誌消費Flume啓動中止腳本
2)在/home/kris/bin目錄下建立腳本f2.sh;並chmod +x f2.sh
[kris@hadoop101 bin]$ vim f2.sh
#! /bin/bash case $1 in "start"){ for i in hadoop103 do echo "------------啓動 $i 消費flume------------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt 2>&1 &" done };; "stop"){ for i in hadoop103 do echo "------------中止 $i 消費flume------------" ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill -9" done };; esac
最快消費(最大吞吐量),消費> 生產;kafka可對接ES等
詳細安裝見:
http://www.javashuo.com/article/p-efqkajqh-en.html
kafka啓動關閉腳本:
#!/bin/bash case $1 in "start"){ for i in hadoop101 hadoop102 hadoop103 do echo "------------啓動 $i kafka----------------" ssh $i "export JMX_PORT=9988 && /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/serve r.properties" done };; "stop"){ for i in hadoop101 hadoop102 hadoop103 do echo "------------中止 $i kafka----------------" ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop" done };; esac ~
注意:啓動Kafka時要先開啓JMX端口,是用於後續KafkaManager監控。
詳細見:
http://www.javashuo.com/article/p-qhmabbfl-et.html
啓動KafkaManager
[kris@hadoop101 kafka-manager-1.3.3.22]$ nohup bin/kafka-manager -Dhttp.port=7456 >/opt/module/kafka-manager-1.3.3.22/start.log 2>&1 &
在瀏覽器中打開 http://hadoop101:7456
至此,就能夠查看整個Kafka集羣的狀態,包括:Topic的狀態、Brokers的狀態、Cosumer的狀態。
在kafka的/opt/module/kafka-manager-1.3.3.22/application.home_IS_UNDEFINED 目錄下面,能夠看到kafka-manager的日誌。
Kafka Manager啓動中止腳本
1)在/home/kris/bin目錄下建立腳本km.sh; chmod +x km.sh
[kris@hadoop101 bin]$ vim km.sh
#!/bin/bash case $1 in "start"){ echo "---------啓動KafkaManager---------" nohup /opt/module/kafka-manager/bin/kafka-manager -Dhttp.port=7456 >/opt/module/kafka-manager/start.log 2>&1 & };; "stop"){ echo "---------中止KafkaManager---------" ps -ef | grep ProdServerStart | grep -v grep | awk '{print $2}' | xargs kill };; esac
查看全部Kafka Topic [kris@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic_start ##刪除主題 生產消息 [kris@hadoop101 kafka]$ bin/kafka-console-producer.sh \ --broker-list hadoop101:9092 --topic topic_start >hello world >kris kris 消費消息;能夠檢測下 [kris@hadoop102 kafka]$ bin/kafka-console-consumer.sh \ --zookeeper hadoop101:2181 --from-beginning --topic topic_start
用Kafka官方自帶的腳本,對Kafka進行壓測。Kafka壓測時,能夠查看到哪一個地方出現了瓶頸(CPU,內存,網絡IO)。通常都是網絡IO達到瓶頸。
kafka-consumer-perf-test.sh
kafka-producer-perf-test.sh
(1)在/opt/module/kafka/bin目錄下面有這兩個文件。咱們來測試一下
[kris@hadoop101 kafka]$ bin/kafka-producer-perf-test.sh --topic test --record-size 100 --num-recor 100000 --throughput 1000 --producer-props bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092 5000 records sent, 1000.0 records/sec (0.10 MB/sec), 2.6 ms avg latency, 183.0 max latency. 5012 records sent, 1002.4 records/sec (0.10 MB/sec), 1.0 ms avg latency, 36.0 max latency. 5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.6 ms avg latency, 8.0 max latency. 5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.4 ms avg latency, 22.0 max latency. 5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.6 ms avg latency, 45.0 max latency. 5002 records sent, 1000.2 records/sec (0.10 MB/sec), 0.3 ms avg latency, 3.0 max latency. 5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.8 ms avg latency, 27.0 max latency. 5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.5 ms avg latency, 54.0 max latency. 5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.7 ms avg latency, 60.0 max latency. 5003 records sent, 1000.4 records/sec (0.10 MB/sec), 0.4 ms avg latency, 29.0 max latency. 5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.7 ms avg latency, 50.0 max latency. 5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.9 ms avg latency, 82.0 max latency. 5003 records sent, 1000.2 records/sec (0.10 MB/sec), 0.4 ms avg latency, 32.0 max latency. 5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.8 ms avg latency, 67.0 max latency. 5002 records sent, 1000.2 records/sec (0.10 MB/sec), 0.9 ms avg latency, 80.0 max latency. 5002 records sent, 1000.0 records/sec (0.10 MB/sec), 0.4 ms avg latency, 18.0 max latency. 5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.9 ms avg latency, 75.0 max latency. 5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.5 ms avg latency, 23.0 max latency. 5003 records sent, 1000.2 records/sec (0.10 MB/sec), 0.5 ms avg latency, 26.0 max latency. 100000 records sent, 999.950002 records/sec (0.10 MB/sec), 0.72 ms avg latency, 183.00 ms max latency, 0 ms 50th, 1 ms 95th, 3 ms 99th, 44 ms 99.9th
測試生成了多少數據,消費了多少數據;每條信息大小,總共發送的條數;每秒多少條數據;
說明:record-size是一條信息有多大,單位是字節。num-records是總共發送多少條信息。throughput 是每秒多少條信息。
參數解析:本例中一共寫入10w條消息,每秒向Kafka寫入了0.10MB的數據,平均是1000條消息/秒,每次寫入的平均延遲爲0.72毫秒,最大的延遲爲183毫秒。
Consumer的測試,若是這四個指標(IO,CPU,內存,網絡)都不能改變,考慮增長分區數來提高性能。
[kris@hadoop103 kafka]$ bin/kafka-consumer-perf-test.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic_event --fetch-size 10000 --messages 10000000 --threads 1 start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec 2019-03-15 00:04:21:474, 2019-03-15 00:04:21:740, 1.1851, 4.4551, 1492, 5609.0226
參數說明:
--zookeeper 指定zookeeper的連接信息
--topic 指定topic的名稱
--fetch-size 指定每次fetch的數據的大小
--messages 總共要消費的消息個數
測試結果說明:
開始測試時間,結束測試時間;最大吞吐率1.1851MB/S;最近每秒消費4.4551MB/S;最大每秒消費1492條;平均每秒消費5609.0226條;
Kafka機器數量(經驗公式)=2*(峯值生產速度*副本數/100)+1
先要預估一天大概產生多少數據,而後用Kafka自帶的生產壓測(只測試Kafka的寫入速度,保證數據不積壓),計算出峯值生產速度。再根據設定的副本數,就能預估出須要部署Kafka的數量。
好比咱們採用壓力測試測出寫入的速度是10M/s一臺,峯值的業務數據的速度是50M/s。副本數爲2。
Kafka機器數量=2*(50*2/100)+1=3臺
1)在/home/kris/bin目錄下建立腳本cluster.sh
[kris@hadoop101 bin]$ vim cluster.sh
#!/bin/bash case $1 in "start"){ echo "-----------啓動集羣----------" /opt/module/hadoop-2.7.2/sbin/start-dfs.sh ssh hadoop102 /opt/module/hadoop-2.7.2/sbin/start-yarn.sh zk.sh start f1.sh start kf.sh start sleep 4s; f2.sh start km.sh start };; "stop"){ echo "------------中止集羣----------------" km.sh stop f2.sh stop kf.sh stop sleep 7s; f1.sh stop sleep 3s; zk.sh stop ssh hadoop102 "/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh" /opt/module/hadoop-2.7.2/sbin/stop-dfs.sh };; esac
改時間從新啓動集羣,由於flume和kafka會去通訊看時間,時間誤差大就會掛掉
詳細安裝:只在1臺節點hadoop101上安裝便可
http://www.javashuo.com/article/p-pllflswg-ez.html
Hive運行引擎Tez的安裝配置
1)下載tez的依賴包:http://tez.apache.org 2)拷貝apache-tez-0.9.1-bin.tar.gz到hadoop102的/opt/module目錄 [kris@hadoop101 module]$ ls apache-tez-0.9.1-bin.tar.gz 3)解壓縮apache-tez-0.9.1-bin.tar.gz [kris@hadoop101 module]$ tar -zxvf apache-tez-0.9.1-bin.tar.gz 4)修更名稱 [kris@hadoop101 module]$ mv apache-tez-0.9.1-bin/ tez-0.9.1
1)進入到Hive的配置目錄:/opt/module/hive/conf [kris@hadoop101 conf]$ pwd /opt/module/hive/conf 2)在hive-env.sh文件中添加tez環境變量配置和依賴包環境變量配置 [kris@hadoop101 conf]$ vim hive-env.sh 添加以下配置
# Set HADOOP_HOME to point to a specific hadoop install directory export HADOOP_HOME=/opt/module/hadoop-2.7.2 # Hive Configuration Directory can be controlled by: export HIVE_CONF_DIR=/opt/module/hive/conf # Folder containing extra libraries required for hive compilation/execution can be controlled by: export TEZ_HOME=/opt/module/tez-0.9.1 #是你的tez的解壓目錄 export TEZ_JARS="" for jar in `ls $TEZ_HOME |grep jar`; do export TEZ_JARS=$TEZ_JARS:$TEZ_HOME/$jar done for jar in `ls $TEZ_HOME/lib`; do export TEZ_JARS=$TEZ_JARS:$TEZ_HOME/lib/$jar done export HIVE_AUX_JARS_PATH=/opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar$TEZ_JARS
3)在hive-site.xml文件中添加以下配置,更改hive計算引擎
<property> <name>hive.execution.engine</name> <value>tez</value> </property>
配置Tez
1)在Hive 的/opt/module/hive/conf下面建立一個tez-site.xml文件 [kris@hadoop101 conf]$ pwd /opt/module/hive/conf [kris@hadoop101 conf]$ vim tez-site.xml 添加以下內容
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>tez.lib.uris</name> <value>${fs.defaultFS}/tez/tez-0.9.1,${fs.defaultFS}/tez/tez-0.9.1/lib</value> </property> <property> <name>tez.lib.uris.classpath</name> <value>${fs.defaultFS}/tez/tez-0.9.1,${fs.defaultFS}/tez/tez-0.9.1/lib</value> </property> <property> <name>tez.use.cluster.hadoop-libs</name> <value>true</value> </property> <property> <name>tez.history.logging.service.class</name> <value>org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService</value> </property> </configuration>
上傳Tez到集羣
1)將/opt/module/tez-0.9.1上傳到HDFS的/tez路徑 [kris@hadoop101 conf]$ hadoop fs -mkdir /tez [kris@hadoop101 conf]$ hadoop fs -put /opt/module/tez-0.9.1/ /tez [kris@hadoop101 conf]$ hadoop fs -ls /tez /tez/tez-0.9.1
測試
1)啓動Hive [kris@hadoop101 hive]$ bin/hive 2)建立LZO表 hive (default)> create table student( id int, name string); 3)向表中插入數據 hive (default)> insert into student values(1,"zhangsan"); 4)若是沒有報錯就表示成功了 hive (default)> select * from student; 1 zhangsan
小結
1)運行Tez時檢查到用過多內存而被NodeManager殺死進程問題:
這種問題是從機上運行的Container試圖使用過多的內存,而被NodeManager kill掉了。
解決方法:
方案一:或者是關掉虛擬內存檢查。咱們選這個,修改yarn-site.xml;修改完以後要分發
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
方案二:mapred-site.xml中設置Map和Reduce任務的內存配置以下:(value中實際配置的內存須要根據本身機器內存大小及應用狀況進行修改)
<property> <name>mapreduce.map.memory.mb</name> <value>1536</value> </property> <property> <name>mapreduce.map.java.opts</name> <value>-Xmx1024M</value> </property> <property> <name>mapreduce.reduce.memory.mb</name> <value>3072</value> </property> <property> <name>mapreduce.reduce.java.opts</name> <value>-Xmx2560M</value> </property>