數據倉庫(Data Warehouse)html
是爲企業全部決策制定過程,提供全部系統數據支持的戰略集合。java
一、項目需求node
1)用戶行爲數據採集平臺搭建linux
2)業務數據採集平臺搭建git
3)數據倉庫維度建模github
4)分析:用戶、流量、會員、商品、銷售、地區、活動等電商核心主題,統計的報表指標近100。web
5)採用即席查詢工具,隨時進行指標分析sql
6)對集羣性能進行監控,發生異常須要報警apache
7)元數據管理json
8)質量監控
技術選型主要須要考慮的因素:數據量大小、業務需求、行業內經驗、技術成熟度、開發維護成本、總成本預算
數據採集傳輸:Flume、Kafka、Sqoop、Logstash、DataX、
數據存儲:Mysql、HDFS、HBase、Redis、MongoDB
數據計算:Hive、Tez、Spark、Flink、Storm
數據查詢:Presto、Druid、Impala、Kylin
數據可視化:Echarts、Superset、QuickBI、DataV
任務調度:Azkaban、Oozie
集羣監控:Zabbix
元數據管理:Atlas
數據質量監控:Griffin
服務器是選擇物理機仍是雲主機?
1)物理機:
128G內存,20核物理CPU,40線程,8THDD和2TSSD硬盤,戴爾品牌單臺報價4萬出頭。通常物理機壽命5年左右。
2)雲主機:
以阿里云爲例,和上面大體相同配置,每一年5萬。
(1)按天天日活躍用戶100萬,每人一天平均100條:100萬*100條 = 1億條
(2)每條日誌1K左右,天天1億條:100000000 / 1024 /1024 = 約100G
(3)半年內不擴容服務器來算:100G * 180 天 = 約18T
(4)保存3個副本:18T * 3 = 54T
(5)預留20%~30%Buffer=54T/0.7=77T
(6)須要約8T*10臺服務器
服務名稱 |
子服務 |
服務器 hadoop102 |
服務器 hadoop103 |
服務器 hadoop104 |
HDFS |
NameNode |
√ |
|
|
DataNode |
√ |
√ |
√ |
|
SecondaryNameNode |
|
|
√ |
|
Yarn |
NodeManager |
√ |
√ |
√ |
Resourcemanager |
|
√ |
|
|
Zookeeper |
Zookeeper Server |
√ |
√ |
√ |
Flume(採集日誌) |
Flume |
√ |
√ |
|
Kafka |
Kafka |
√ |
√ |
√ |
Flume(消費Kafka) |
Flume |
|
|
√ |
Hive |
Hive |
√ |
|
|
MySQL |
MySQL |
√ |
|
|
Sqoop |
Sqoop |
√ |
|
|
Presto |
Coordinator |
√ |
|
|
Worker |
|
√ |
√ |
|
Azkaban |
AzkabanWebServer |
√ |
|
|
AzkabanExecutorServer |
√ |
|
|
|
Druid |
Druid |
√ |
√ |
√ |
Kylin |
|
√ |
|
|
Hbase |
HMaster |
√ |
|
|
HRegionServer |
√ |
√ |
√ |
|
Superset |
|
√ |
|
|
Atlas |
|
√ |
|
|
Solr |
Jar |
√ |
|
|
Griffin |
|
√ |
|
|
服務數總計 |
|
19 |
9 |
9 |
公共字段:基本全部安卓手機都包含的字段
業務字段:埋點上報的字段,有具體的業務類型
下面就是一個示例,表示業務字段的上傳。
{
"ap":"xxxxx",//項目數據來源 app pc
"cm": { //公共字段
"mid": "", // (String) 設備惟一標識
"uid": "", // (String) 用戶標識
"vc": "1", // (String) versionCode,程序版本號
"vn": "1.0", // (String) versionName,程序版本名
"l": "zh", // (String) language系統語言
"sr": "", // (String) 渠道號,應用從哪一個渠道來的。
"os": "7.1.1", // (String) Android系統版本
"ar": "CN", // (String) area區域
"md": "BBB100-1", // (String) model手機型號
"ba": "blackberry", // (String) brand手機品牌
"sv": "V2.2.1", // (String) sdkVersion
"g": "", // (String) gmail
"hw": "1620x1080", // (String) heightXwidth,屏幕寬高
"t": "1506047606608", // (String) 客戶端日誌產生時的時間
"nw": "WIFI", // (String) 網絡模式
"ln": 0, // (double) lng經度
"la": 0 // (double) lat 緯度
},
"et": [ //事件
{
"ett": "1506047605364", //客戶端事件產生時間
"en": "display", //事件名稱
"kv": { //事件結果,以key-value形式自行定義
"goodsid": "236",
"action": "1",
"extend1": "1",
"place": "2",
"category": "75"
}
}
]
}
示例日誌(服務器時間戳 | 日誌):
1540934156385|{
"ap": "gmall",
"cm": {
"uid": "1234",
"vc": "2",
"vn": "1.0",
"la": "EN",
"sr": "",
"os": "7.1.1",
"ar": "CN",
"md": "BBB100-1",
"ba": "blackberry",
"sv": "V2.2.1",
"g": "abc@gmail.com",
"hw": "1620x1080",
"t": "1506047606608",
"nw": "WIFI",
"ln": 0
},
"et": [
{
"ett": "1506047605364", //客戶端事件產生時間
"en": "display", //事件名稱
"kv": { //事件結果,以key-value形式自行定義
"goodsid": "236",
"action": "1",
"extend1": "1",
"place": "2",
"category": "75"
}
},{
"ett": "1552352626835",
"en": "active_background",
"kv": {
"active_source": "1"
}
}
]
}
}
下面是各個埋點日誌格式。其中商品點擊屬於信息流的範疇
事件名稱:loading
標籤 |
含義 |
action |
動做:開始加載=1,加載成功=2,加載失敗=3 |
loading_time |
加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間) |
loading_way |
加載類型:1-讀取緩存,2-從接口拉新數據 |
extend1 |
擴展字段 Extend1 |
extend2 |
擴展字段 Extend2 |
type |
加載類型:自動加載=1,用戶下拽加載=2,底部加載=3(底部條觸發點擊底部提示條/點擊返回頂部加載) |
type1 |
加載失敗碼:把加載失敗狀態碼報回來(報空爲加載成功,沒有失敗) |
事件標籤:display
標籤 |
含義 |
|
action |
動做:曝光商品=1,點擊商品=2, |
|
goodsid |
商品ID(服務端下發的ID) |
|
place |
順序(第幾條商品,第一條爲0,第二條爲1,如此類推) |
|
extend1 |
曝光類型:1 - 首次曝光 2-重複曝光 |
|
category |
分類ID(服務端定義的分類ID) |
|
事件標籤:newsdetail
標籤 |
含義 |
|
entry |
頁面入口來源:應用首頁=一、push=二、詳情頁相關推薦=3 |
|
action |
動做:開始加載=1,加載成功=2(pv),加載失敗=3, 退出頁面=4 |
|
goodsid |
商品ID(服務端下發的ID) |
|
show_style |
商品樣式:0、無圖、一、一張大圖、二、兩張圖、三、三張小圖、四、一張小圖、五、一張大圖兩張小圖 |
|
news_staytime |
頁面停留時長:從商品開始加載時開始計算,到用戶關閉頁面所用的時間。若中途用跳轉到其它頁面了,則暫停計時,待回到詳情頁時恢復計時。或中途劃出的時間超過10分鐘,則本次計時做廢,不上報本次數據。如未加載成功退出,則報空。 |
|
loading_time |
加載時長:計算頁面開始加載到接口返回數據的時間 (開始加載報0,加載成功或加載失敗才上報時間) |
|
type1 |
加載失敗碼:把加載失敗狀態碼報回來(報空爲加載成功,沒有失敗) |
|
category |
分類ID(服務端定義的分類ID) |
|
事件名稱:ad
標籤 |
含義 |
entry |
入口:商品列表頁=1 應用首頁=2 商品詳情頁=3 |
action |
動做: 廣告展現=1 廣告點擊=2 |
contentType |
Type: 1 商品 2 營銷活動 |
displayMills |
展現時長 毫秒數 |
itemId |
商品id |
activityId |
營銷活動id |
事件標籤:notification
標籤 |
含義 |
action |
動做:通知產生=1,通知彈出=2,通知點擊=3,常駐通知展現(不重複上報,一天以內只報一次)=4 |
type |
通知id:預警通知=1,天氣預報(早=2,晚=3),常駐=4 |
ap_time |
客戶端彈出時間 |
content |
備用字段 |
事件標籤: active_background
標籤 |
含義 |
active_source |
1=upgrade,2=download(下載),3=plugin_upgrade |
描述:評論表
序號 |
字段名稱 |
字段描述 |
字段類型 |
長度 |
容許空 |
缺省值 |
1 |
comment_id |
評論表 |
int |
10,0 |
|
|
2 |
userid |
用戶id |
int |
10,0 |
√ |
0 |
3 |
p_comment_id |
父級評論id(爲0則是一級評論,不爲0則是回覆) |
int |
10,0 |
√ |
|
4 |
content |
評論內容 |
string |
1000 |
√ |
|
5 |
addtime |
建立時間 |
string |
|
√ |
|
6 |
other_id |
評論的相關id |
int |
10,0 |
√ |
|
7 |
praise_count |
點贊數量 |
int |
10,0 |
√ |
0 |
8 |
reply_count |
回覆數量 |
int |
10,0 |
√ |
0 |
描述:收藏
序號 |
字段名稱 |
字段描述 |
字段類型 |
長度 |
容許空 |
缺省值 |
1 |
id |
主鍵 |
int |
10,0 |
|
|
2 |
course_id |
商品id |
int |
10,0 |
√ |
0 |
3 |
userid |
用戶ID |
int |
10,0 |
√ |
0 |
4 |
add_time |
建立時間 |
string |
|
√ |
|
描述:全部的點贊表
序號 |
字段名稱 |
字段描述 |
字段類型 |
長度 |
容許空 |
缺省值 |
1 |
id |
主鍵id |
int |
10,0 |
|
|
2 |
userid |
用戶id |
int |
10,0 |
√ |
|
3 |
target_id |
點讚的對象id |
int |
10,0 |
√ |
|
4 |
type |
點贊類型 1問答點贊 2問答評論點贊 3 文章點贊數4 評論點贊 |
int |
10,0 |
√ |
|
5 |
add_time |
添加時間 |
string |
|
√ |
|
errorBrief |
錯誤摘要 |
errorDetail |
錯誤詳情 |
事件標籤: start
標籤 |
含義 |
entry |
入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget =5 |
open_ad_type |
開屏廣告類型: 開屏原生廣告=1, 開屏插屏廣告=2 |
action |
狀態:成功=1 失敗=2 |
loading_time |
加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間) |
detail |
失敗碼(沒有則上報空) |
extend1 |
失敗的message(沒有則上報空) |
en |
日誌類型start |
{
"action":"1",
"ar":"MX",
"ba":"HTC",
"detail":"",
"en":"start",
"entry":"2",
"extend1":"",
"g":"43R2SEQX@gmail.com",
"hw":"640*960",
"l":"en",
"la":"20.4",
"ln":"-99.3",
"loading_time":"2",
"md":"HTC-2",
"mid":"995",
"nw":"4G",
"open_ad_type":"2",
"os":"8.1.2",
"sr":"B",
"sv":"V2.0.6",
"t":"1561472502444",
"uid":"995",
"vc":"10",
"vn":"1.3.4"
}
1)建立 log-collector
GroupId : com.test
Project name : log-collector
2)建立一個包名:com.test.appclient
3)在com.test.appclient包下建立一個類,AppMain。
4)在pom.xml文件中添加以下內容
<!--版本號統一--> <properties> <slf4j.version>1.7.20</slf4j.version> <logback.version>1.0.7</logback.version> </properties> <dependencies> <!--阿里巴巴開源json解析框架--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.51</version> </dependency> <!--日誌生成框架--> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback.version}</version> </dependency> </dependencies> <!--編譯打包插件--> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin </artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.test.appclient.AppMain</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
注意:com.test.appclient.AppMain要和本身建的全類名一致。
1)建立包名:com.test.bean
2)在com.test.bean包下依次建立以下bean對象
package com.test.bean; /** * 公共日誌 */ public class AppBase{ private String mid; // (String) 設備惟一標識 private String uid; // (String) 用戶uid private String vc; // (String) versionCode,程序版本號 private String vn; // (String) versionName,程序版本名 private String l; // (String) 系統語言 private String sr; // (String) 渠道號,應用從哪一個渠道來的。 private String os; // (String) Android系統版本 private String ar; // (String) 區域 private String md; // (String) 手機型號 private String ba; // (String) 手機品牌 private String sv; // (String) sdkVersion private String g; // (String) gmail private String hw; // (String) heightXwidth,屏幕寬高 private String t; // (String) 客戶端日誌產生時的時間 private String nw; // (String) 網絡模式 private String ln; // (double) lng經度 private String la; // (double) lat 緯度 public String getMid() { return mid; } public void setMid(String mid) { this.mid = mid; } public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } public String getVc() { return vc; } public void setVc(String vc) { this.vc = vc; } public String getVn() { return vn; } public void setVn(String vn) { this.vn = vn; } public String getL() { return l; } public void setL(String l) { this.l = l; } public String getSr() { return sr; } public void setSr(String sr) { this.sr = sr; } public String getOs() { return os; } public void setOs(String os) { this.os = os; } public String getAr() { return ar; } public void setAr(String ar) { this.ar = ar; } public String getMd() { return md; } public void setMd(String md) { this.md = md; } public String getBa() { return ba; } public void setBa(String ba) { this.ba = ba; } public String getSv() { return sv; } public void setSv(String sv) { this.sv = sv; } public String getG() { return g; } public void setG(String g) { this.g = g; } public String getHw() { return hw; } public void setHw(String hw) { this.hw = hw; } public String getT() { return t; } public void setT(String t) { this.t = t; } public String getNw() { return nw; } public void setNw(String nw) { this.nw = nw; } public String getLn() { return ln; } public void setLn(String ln) { this.ln = ln; } public String getLa() { return la; } public void setLa(String la) { this.la = la; } }
package com.test.bean; /** * 啓動日誌 */ public class AppStart extends AppBase { private String entry;//入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget =5 private String open_ad_type;//開屏廣告類型: 開屏原生廣告=1, 開屏插屏廣告=2 private String action;//狀態:成功=1 失敗=2 private String loading_time;//加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間) private String detail;//失敗碼(沒有則上報空) private String extend1;//失敗的message(沒有則上報空) private String en;//啓動日誌類型標記 public String getEntry() { return entry; } public void setEntry(String entry) { this.entry = entry; } public String getOpen_ad_type() { return open_ad_type; } public void setOpen_ad_type(String open_ad_type) { this.open_ad_type = open_ad_type; } public String getAction() { return action; } public void setAction(String action) { this.action = action; } public String getLoading_time() { return loading_time; } public void setLoading_time(String loading_time) { this.loading_time = loading_time; } public String getDetail() { return detail; } public void setDetail(String detail) { this.detail = detail; } public String getExtend1() { return extend1; } public void setExtend1(String extend1) { this.extend1 = extend1; } public String getEn() { return en; } public void setEn(String en) { this.en = en; } }
package com.test.bean; /** * 錯誤日誌 */ public class AppErrorLog { private String errorBrief; //錯誤摘要 private String errorDetail; //錯誤詳情 public String getErrorBrief() { return errorBrief; } public void setErrorBrief(String errorBrief) { this.errorBrief = errorBrief; } public String getErrorDetail() { return errorDetail; } public void setErrorDetail(String errorDetail) { this.errorDetail = errorDetail; } }
package com.test.bean; /** * 商品詳情 */ public class AppNewsDetail { private String entry;//頁面入口來源:應用首頁=一、push=二、詳情頁相關推薦=3 private String action;//動做:開始加載=1,加載成功=2(pv),加載失敗=3, 退出頁面=4 private String goodsid;//商品ID(服務端下發的ID) private String showtype;//商品樣式:0、無圖一、一張大圖二、兩張圖三、三張小圖四、一張小圖五、一張大圖兩張小圖 來源於詳情頁相關推薦的商品,上報樣式都爲0(由於都是左文右圖) private String news_staytime;//頁面停留時長:從商品開始加載時開始計算,到用戶關閉頁面所用的時間。若中途用跳轉到其它頁面了,則暫停計時,待回到詳情頁時恢復計時。或中途劃出的時間超過10分鐘,則本次計時做廢,不上報本次數據。如未加載成功退出,則報空。 private String loading_time;//加載時長:計算頁面開始加載到接口返回數據的時間 (開始加載報0,加載成功或加載失敗才上報時間) private String type1;//加載失敗碼:把加載失敗狀態碼報回來(報空爲加載成功,沒有失敗) private String category;//分類ID(服務端定義的分類ID) public String getEntry() { return entry; } public void setEntry(String entry) { this.entry = entry; } public String getAction() { return action; } public void setAction(String action) { this.action = action; } public String getGoodsid() { return goodsid; } public void setGoodsid(String goodsid) { this.goodsid = goodsid; } public String getShowtype() { return showtype; } public void setShowtype(String showtype) { this.showtype = showtype; } public String getNews_staytime() { return news_staytime; } public void setNews_staytime(String news_staytime) { this.news_staytime = news_staytime; } public String getLoading_time() { return loading_time; } public void setLoading_time(String loading_time) { this.loading_time = loading_time; } public String getType1() { return type1; } public void setType1(String type1) { this.type1 = type1; } public String getCategory() { return category; } public void setCategory(String category) { this.category = category; } }
package com.test.bean; /** * 商品列表 */ public class AppLoading { private String action;//動做:開始加載=1,加載成功=2,加載失敗=3 private String loading_time;//加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間) private String loading_way;//加載類型:1-讀取緩存,2-從接口拉新數據 (加載成功才上報加載類型) private String extend1;//擴展字段 Extend1 private String extend2;//擴展字段 Extend2 private String type;//加載類型:自動加載=1,用戶下拽加載=2,底部加載=3(底部條觸發點擊底部提示條/點擊返回頂部加載) private String type1;//加載失敗碼:把加載失敗狀態碼報回來(報空爲加載成功,沒有失敗) public String getAction() { return action; } public void setAction(String action) { this.action = action; } public String getLoading_time() { return loading_time; } public void setLoading_time(String loading_time) { this.loading_time = loading_time; } public String getLoading_way() { return loading_way; } public void setLoading_way(String loading_way) { this.loading_way = loading_way; } public String getExtend1() { return extend1; } public void setExtend1(String extend1) { this.extend1 = extend1; } public String getExtend2() { return extend2; } public void setExtend2(String extend2) { this.extend2 = extend2; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getType1() { return type1; } public void setType1(String type1) { this.type1 = type1; } }
package com.test.bean; /** * 廣告 */ public class AppAd { private String entry;//入口:商品列表頁=1 應用首頁=2 商品詳情頁=3 private String action;//動做: 廣告展現=1 廣告點擊=2 private String contentType;//Type: 1 商品 2 營銷活動 private String displayMills;//展現時長 毫秒數 private String itemId; //商品id private String activityId; //營銷活動id public String getEntry() { return entry; } public void setEntry(String entry) { this.entry = entry; } public String getAction() { return action; } public void setAction(String action) { this.action = action; } public String getActivityId() { return activityId; } public void setActivityId(String activityId) { this.activityId = activityId; } public String getContentType() { return contentType; } public void setContentType(String contentType) { this.contentType = contentType; } public String getDisplayMills() { return displayMills; } public void setDisplayMills(String displayMills) { this.displayMills = displayMills; } public String getItemId() { return itemId; } public void setItemId(String itemId) { this.itemId = itemId; } }
package com.test.bean; /** * 商品點擊日誌 */ public class AppDisplay { private String action;//動做:曝光商品=1,點擊商品=2, private String goodsid;//商品ID(服務端下發的ID) private String place;//順序(第幾條商品,第一條爲0,第二條爲1,如此類推) private String extend1;//曝光類型:1 - 首次曝光 2-重複曝光(沒有使用) private String category;//分類ID(服務端定義的分類ID) public String getAction() { return action; } public void setAction(String action) { this.action = action; } public String getGoodsid() { return goodsid; } public void setGoodsid(String goodsid) { this.goodsid = goodsid; } public String getPlace() { return place; } public void setPlace(String place) { this.place = place; } public String getExtend1() { return extend1; } public void setExtend1(String extend1) { this.extend1 = extend1; } public String getCategory() { return category; } public void setCategory(String category) { this.category = category; } }
package com.test.bean; /** * 消息通知日誌 */ public class AppNotification { private String action;//動做:通知產生=1,通知彈出=2,通知點擊=3,常駐通知展現(不重複上報,一天以內只報一次)=4 private String type;//通知id:預警通知=1,天氣預報(早=2,晚=3),常駐=4 private String ap_time;//客戶端彈出時間 private String content;//備用字段 public String getAction() { return action; } public void setAction(String action) { this.action = action; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getAp_time() { return ap_time; } public void setAp_time(String ap_time) { this.ap_time = ap_time; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
package com.test.bean; /** * 用戶後臺活躍 */ public class AppActive_background { private String active_source;//1=upgrade,2=download(下載),3=plugin_upgrade public String getActive_source() { return active_source; } public void setActive_source(String active_source) { this.active_source = active_source; } }
package com.test.bean; /** * 評論 */ public class AppComment { private int comment_id;//評論表 private int userid;//用戶id private int p_comment_id;//父級評論id(爲0則是一級評論,不爲0則是回覆) private String content;//評論內容 private String addtime;//建立時間 private int other_id;//評論的相關id private int praise_count;//點贊數量 private int reply_count;//回覆數量 public int getComment_id() { return comment_id; } public void setComment_id(int comment_id) { this.comment_id = comment_id; } public int getUserid() { return userid; } public void setUserid(int userid) { this.userid = userid; } public int getP_comment_id() { return p_comment_id; } public void setP_comment_id(int p_comment_id) { this.p_comment_id = p_comment_id; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public String getAddtime() { return addtime; } public void setAddtime(String addtime) { this.addtime = addtime; } public int getOther_id() { return other_id; } public void setOther_id(int other_id) { this.other_id = other_id; } public int getPraise_count() { return praise_count; } public void setPraise_count(int praise_count) { this.praise_count = praise_count; } public int getReply_count() { return reply_count; } public void setReply_count(int reply_count) { this.reply_count = reply_count; } }
package com.test.bean; /** * 收藏 */ public class AppFavorites { private int id;//主鍵 private int course_id;//商品id private int userid;//用戶ID private String add_time;//建立時間 public int getId() { return id; } public void setId(int id) { this.id = id; } public int getCourse_id() { return course_id; } public void setCourse_id(int course_id) { this.course_id = course_id; } public int getUserid() { return userid; } public void setUserid(int userid) { this.userid = userid; } public String getAdd_time() { return add_time; } public void setAdd_time(String add_time) { this.add_time = add_time; } }
package com.test.bean; /** * 點贊 */ public class AppPraise { private int id; //主鍵id private int userid;//用戶id private int target_id;//點讚的對象id private int type;//點贊類型 1問答點贊 2問答評論點贊 3 文章點贊數4 評論點贊 private String add_time;//添加時間 public int getId() { return id; } public void setId(int id) { this.id = id; } public int getUserid() { return userid; } public void setUserid(int userid) { this.userid = userid; } public int getTarget_id() { return target_id; } public void setTarget_id(int target_id) { this.target_id = target_id; } public int getType() { return type; } public void setType(int type) { this.type = type; } public String getAdd_time() { return add_time; } public void setAdd_time(String add_time) { this.add_time = add_time; } }
在AppMain類中添加以下內容:
package com.test.appclient; import java.io.UnsupportedEncodingException; import java.util.Random; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.test.bean.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 日誌行爲數據模擬 */ public class AppMain { private final static Logger logger = LoggerFactory.getLogger(AppMain.class); private static Random rand = new Random(); // 設備id private static int s_mid = 0; // 用戶id private static int s_uid = 0; // 商品id private static int s_goodsid = 0; public static void main(String[] args) { // 參數一:控制發送每條的延時時間,默認是0 Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L; // 參數二:循環遍歷次數 int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000; // 生成數據 generateLog(delay, loop_len); } private static void generateLog(Long delay, int loop_len) { for (int i = 0; i < loop_len; i++) { int flag = rand.nextInt(2); switch (flag) { case (0): //應用啓動 AppStart appStart = generateStart(); String jsonString = JSON.toJSONString(appStart); //控制檯打印 logger.info(jsonString); break; case (1): JSONObject json = new JSONObject(); json.put("ap", "app"); json.put("cm", generateComFields()); JSONArray eventsArray = new JSONArray(); // 事件日誌 // 商品點擊,展現 if (rand.nextBoolean()) { eventsArray.add(generateDisplay()); json.put("et", eventsArray); } // 商品詳情頁 if (rand.nextBoolean()) { eventsArray.add(generateNewsDetail()); json.put("et", eventsArray); } // 商品列表頁 if (rand.nextBoolean()) { eventsArray.add(generateNewList()); json.put("et", eventsArray); } // 廣告 if (rand.nextBoolean()) { eventsArray.add(generateAd()); json.put("et", eventsArray); } // 消息通知 if (rand.nextBoolean()) { eventsArray.add(generateNotification()); json.put("et", eventsArray); } // 用戶後臺活躍 if (rand.nextBoolean()) { eventsArray.add(generateBackground()); json.put("et", eventsArray); } //故障日誌 if (rand.nextBoolean()) { eventsArray.add(generateError()); json.put("et", eventsArray); } // 用戶評論 if (rand.nextBoolean()) { eventsArray.add(generateComment()); json.put("et", eventsArray); } // 用戶收藏 if (rand.nextBoolean()) { eventsArray.add(generateFavorites()); json.put("et", eventsArray); } // 用戶點贊 if (rand.nextBoolean()) { eventsArray.add(generatePraise()); json.put("et", eventsArray); } //時間 long millis = System.currentTimeMillis(); //控制檯打印 logger.info(millis + "|" + json.toJSONString()); break; } // 延遲 try { Thread.sleep(delay); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 公共字段設置 */ private static JSONObject generateComFields() { AppBase appBase = new AppBase(); //設備id appBase.setMid(s_mid + ""); s_mid++; // 用戶id appBase.setUid(s_uid + ""); s_uid++; // 程序版本號 5,6等 appBase.setVc("" + rand.nextInt(20)); //程序版本名 v1.1.1 appBase.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10)); // 安卓系統版本 appBase.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10)); // 語言 es,en,pt int flag = rand.nextInt(3); switch (flag) { case (0): appBase.setL("es"); break; case (1): appBase.setL("en"); break; case (2): appBase.setL("pt"); break; } // 渠道號 從哪一個渠道來的 appBase.setSr(getRandomChar(1)); // 區域 flag = rand.nextInt(2); switch (flag) { case 0: appBase.setAr("BR"); case 1: appBase.setAr("MX"); } // 手機品牌 ba ,手機型號 md,就取2位數字了 flag = rand.nextInt(3); switch (flag) { case 0: appBase.setBa("Sumsung"); appBase.setMd("sumsung-" + rand.nextInt(20)); break; case 1: appBase.setBa("Huawei"); appBase.setMd("Huawei-" + rand.nextInt(20)); break; case 2: appBase.setBa("HTC"); appBase.setMd("HTC-" + rand.nextInt(20)); break; } // 嵌入sdk的版本 appBase.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10)); // gmail appBase.setG(getRandomCharAndNumr(8) + "@gmail.com"); // 屏幕寬高 hw flag = rand.nextInt(4); switch (flag) { case 0: appBase.setHw("640*960"); break; case 1: appBase.setHw("640*1136"); break; case 2: appBase.setHw("750*1134"); break; case 3: appBase.setHw("1080*1920"); break; } // 客戶端產生日誌時間 long millis = System.currentTimeMillis(); appBase.setT("" + (millis - rand.nextInt(99999999))); // 手機網絡模式 3G,4G,WIFI flag = rand.nextInt(3); switch (flag) { case 0: appBase.setNw("3G"); break; case 1: appBase.setNw("4G"); break; case 2: appBase.setNw("WIFI"); break; } // 拉丁美洲 西經34°46′至西經117°09;北緯32°42′至南緯53°54′ // 經度 appBase.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + ""); // 緯度 appBase.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + ""); return (JSONObject) JSON.toJSON(appBase); } /** * 商品展現事件 */ private static JSONObject generateDisplay() { AppDisplay appDisplay = new AppDisplay(); boolean boolFlag = rand.nextInt(10) < 7; // 動做:曝光商品=1,點擊商品=2, if (boolFlag) { appDisplay.setAction("1"); } else { appDisplay.setAction("2"); } // 商品id String goodsId = s_goodsid + ""; s_goodsid++; appDisplay.setGoodsid(goodsId); // 順序 設置成6條吧 int flag = rand.nextInt(6); appDisplay.setPlace("" + flag); // 曝光類型 flag = 1 + rand.nextInt(2); appDisplay.setExtend1("" + flag); // 分類 flag = 1 + rand.nextInt(100); appDisplay.setCategory("" + flag); JSONObject jsonObject = (JSONObject) JSON.toJSON(appDisplay); return packEventJson("display", jsonObject); } /** * 商品詳情頁 */ private static JSONObject generateNewsDetail() { AppNewsDetail appNewsDetail = new AppNewsDetail(); // 頁面入口來源 int flag = 1 + rand.nextInt(3); appNewsDetail.setEntry(flag + ""); // 動做 appNewsDetail.setAction("" + (rand.nextInt(4) + 1)); // 商品id appNewsDetail.setGoodsid(s_goodsid + ""); // 商品來源類型 flag = 1 + rand.nextInt(3); appNewsDetail.setShowtype(flag + ""); // 商品樣式 flag = rand.nextInt(6); appNewsDetail.setShowtype("" + flag); // 頁面停留時長 flag = rand.nextInt(10) * rand.nextInt(7); appNewsDetail.setNews_staytime(flag + ""); // 加載時長 flag = rand.nextInt(10) * rand.nextInt(7); appNewsDetail.setLoading_time(flag + ""); // 加載失敗碼 flag = rand.nextInt(10); switch (flag) { case 1: appNewsDetail.setType1("102"); break; case 2: appNewsDetail.setType1("201"); break; case 3: appNewsDetail.setType1("325"); break; case 4: appNewsDetail.setType1("433"); break; case 5: appNewsDetail.setType1("542"); break; default: appNewsDetail.setType1(""); break; } // 分類 flag = 1 + rand.nextInt(100); appNewsDetail.setCategory("" + flag); JSONObject eventJson = (JSONObject) JSON.toJSON(appNewsDetail); return packEventJson("newsdetail", eventJson); } /** * 商品列表 */ private static JSONObject generateNewList() { AppLoading appLoading = new AppLoading(); // 動做 int flag = rand.nextInt(3) + 1; appLoading.setAction(flag + ""); // 加載時長 flag = rand.nextInt(10) * rand.nextInt(7); appLoading.setLoading_time(flag + ""); // 失敗碼 flag = rand.nextInt(10); switch (flag) { case 1: appLoading.setType1("102"); break; case 2: appLoading.setType1("201"); break; case 3: appLoading.setType1("325"); break; case 4: appLoading.setType1("433"); break; case 5: appLoading.setType1("542"); break; default: appLoading.setType1(""); break; } // 頁面 加載類型 flag = 1 + rand.nextInt(2); appLoading.setLoading_way("" + flag); // 擴展字段1 appLoading.setExtend1(""); // 擴展字段2 appLoading.setExtend2(""); // 用戶加載類型 flag = 1 + rand.nextInt(3); appLoading.setType("" + flag); JSONObject jsonObject = (JSONObject) JSON.toJSON(appLoading); return packEventJson("loading", jsonObject); } /** * 廣告相關字段 */ private static JSONObject generateAd() { AppAd appAd = new AppAd(); // 入口 int flag = rand.nextInt(3) + 1; appAd.setEntry(flag + ""); // 動做 flag = rand.nextInt(5) + 1; appAd.setAction(flag + ""); // 內容類型類型 flag = rand.nextInt(6)+1; appAd.setContentType(flag+ ""); // 展現樣式 flag = rand.nextInt(120000)+1000; appAd.setDisplayMills(flag+""); flag=rand.nextInt(1); if(flag==1){ appAd.setContentType(flag+""); flag =rand.nextInt(6); appAd.setItemId(flag+ ""); }else{ appAd.setContentType(flag+""); flag =rand.nextInt(1)+1; appAd.setActivityId(flag+ ""); } JSONObject jsonObject = (JSONObject) JSON.toJSON(appAd); return packEventJson("ad", jsonObject); } /** * 啓動日誌 */ private static AppStart generateStart() { AppStart appStart = new AppStart(); //設備id appStart.setMid(s_mid + ""); s_mid++; // 用戶id appStart.setUid(s_uid + ""); s_uid++; // 程序版本號 5,6等 appStart.setVc("" + rand.nextInt(20)); //程序版本名 v1.1.1 appStart.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10)); // 安卓系統版本 appStart.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10)); //設置日誌類型 appStart.setEn("start"); // 語言 es,en,pt int flag = rand.nextInt(3); switch (flag) { case (0): appStart.setL("es"); break; case (1): appStart.setL("en"); break; case (2): appStart.setL("pt"); break; } // 渠道號 從哪一個渠道來的 appStart.setSr(getRandomChar(1)); // 區域 flag = rand.nextInt(2); switch (flag) { case 0: appStart.setAr("BR"); case 1: appStart.setAr("MX"); } // 手機品牌 ba ,手機型號 md,就取2位數字了 flag = rand.nextInt(3); switch (flag) { case 0: appStart.setBa("Sumsung"); appStart.setMd("sumsung-" + rand.nextInt(20)); break; case 1: appStart.setBa("Huawei"); appStart.setMd("Huawei-" + rand.nextInt(20)); break; case 2: appStart.setBa("HTC"); appStart.setMd("HTC-" + rand.nextInt(20)); break; } // 嵌入sdk的版本 appStart.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10)); // gmail appStart.setG(getRandomCharAndNumr(8) + "@gmail.com"); // 屏幕寬高 hw flag = rand.nextInt(4); switch (flag) { case 0: appStart.setHw("640*960"); break; case 1: appStart.setHw("640*1136"); break; case 2: appStart.setHw("750*1134"); break; case 3: appStart.setHw("1080*1920"); break; } // 客戶端產生日誌時間 long millis = System.currentTimeMillis(); appStart.setT("" + (millis - rand.nextInt(99999999))); // 手機網絡模式 3G,4G,WIFI flag = rand.nextInt(3); switch (flag) { case 0: appStart.setNw("3G"); break; case 1: appStart.setNw("4G"); break; case 2: appStart.setNw("WIFI"); break; } // 拉丁美洲 西經34°46′至西經117°09;北緯32°42′至南緯53°54′ // 經度 appStart.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + ""); // 緯度 appStart.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + ""); // 入口 flag = rand.nextInt(5) + 1; appStart.setEntry(flag + ""); // 開屏廣告類型 flag = rand.nextInt(2) + 1; appStart.setOpen_ad_type(flag + ""); // 狀態 flag = rand.nextInt(10) > 8 ? 2 : 1; appStart.setAction(flag + ""); // 加載時長 appStart.setLoading_time(rand.nextInt(20) + ""); // 失敗碼 flag = rand.nextInt(10); switch (flag) { case 1: appStart.setDetail("102"); break; case 2: appStart.setDetail("201"); break; case 3: appStart.setDetail("325"); break; case 4: appStart.setDetail("433"); break; case 5: appStart.setDetail("542"); break; default: appStart.setDetail(""); break; } // 擴展字段 appStart.setExtend1(""); return appStart; } /** * 消息通知 */ private static JSONObject generateNotification() { AppNotification appNotification = new AppNotification(); int flag = rand.nextInt(4) + 1; // 動做 appNotification.setAction(flag + ""); // 通知id flag = rand.nextInt(4) + 1; appNotification.setType(flag + ""); // 客戶端彈時間 appNotification.setAp_time((System.currentTimeMillis() - rand.nextInt(99999999)) + ""); // 備用字段 appNotification.setContent(""); JSONObject jsonObject = (JSONObject) JSON.toJSON(appNotification); return packEventJson("notification", jsonObject); } /** * 後臺活躍 */ private static JSONObject generateBackground() { AppActive_background appActive_background = new AppActive_background(); // 啓動源 int flag = rand.nextInt(3) + 1; appActive_background.setActive_source(flag + ""); JSONObject jsonObject = (JSONObject) JSON.toJSON(appActive_background); return packEventJson("active_background", jsonObject); } /** * 錯誤日誌數據 */ private static JSONObject generateError() { AppErrorLog appErrorLog = new AppErrorLog(); String[] errorBriefs = {"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)", "at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"}; //錯誤摘要 String[] errorDetails = {"java.lang.NullPointerException\\n " + "at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n " + "at cn.lift.dfdf.web.AbstractBaseController.validInbound", "at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n " + "at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n" + " at java.lang.reflect.Method.invoke(Method.java:606)\\n"}; //錯誤詳情 //錯誤摘要 appErrorLog.setErrorBrief(errorBriefs[rand.nextInt(errorBriefs.length)]); //錯誤詳情 appErrorLog.setErrorDetail(errorDetails[rand.nextInt(errorDetails.length)]); JSONObject jsonObject = (JSONObject) JSON.toJSON(appErrorLog); return packEventJson("error", jsonObject); } /** * 爲各個事件類型的公共字段(時間、事件類型、Json數據)拼接 */ private static JSONObject packEventJson(String eventName, JSONObject jsonObject) { JSONObject eventJson = new JSONObject(); eventJson.put("ett", (System.currentTimeMillis() - rand.nextInt(99999999)) + ""); eventJson.put("en", eventName); eventJson.put("kv", jsonObject); return eventJson; } /** * 獲取隨機字母組合 * * @param length 字符串長度 */ private static String getRandomChar(Integer length) { StringBuilder str = new StringBuilder(); Random random = new Random(); for (int i = 0; i < length; i++) { // 字符串 str.append((char) (65 + random.nextInt(26)));// 取得大寫字母 } return str.toString(); } /** * 獲取隨機字母數字組合 * @param length 字符串長度 */ private static String getRandomCharAndNumr(Integer length) { StringBuilder str = new StringBuilder(); Random random = new Random(); for (int i = 0; i < length; i++) { boolean b = random.nextBoolean(); if (b) { // 字符串 // int choice = random.nextBoolean() ? 65 : 97; 取得65大寫字母仍是97小寫字母 str.append((char) (65 + random.nextInt(26)));// 取得大寫字母 } else { // 數字 str.append(String.valueOf(random.nextInt(10))); } } return str.toString(); } /** * 收藏 */ private static JSONObject generateFavorites() { AppFavorites favorites = new AppFavorites(); favorites.setCourse_id(rand.nextInt(10)); favorites.setUserid(rand.nextInt(10)); favorites.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + ""); JSONObject jsonObject = (JSONObject) JSON.toJSON(favorites); return packEventJson("favorites", jsonObject); } /** * 點贊 */ private static JSONObject generatePraise() { AppPraise praise = new AppPraise(); praise.setId(rand.nextInt(10)); praise.setUserid(rand.nextInt(10)); praise.setTarget_id(rand.nextInt(10)); praise.setType(rand.nextInt(4) + 1); praise.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + ""); JSONObject jsonObject = (JSONObject) JSON.toJSON(praise); return packEventJson("praise", jsonObject); } /** * 評論 */ private static JSONObject generateComment() { AppComment comment = new AppComment(); comment.setComment_id(rand.nextInt(10)); comment.setUserid(rand.nextInt(10)); comment.setP_comment_id(rand.nextInt(5)); comment.setContent(getCONTENT()); comment.setAddtime((System.currentTimeMillis() - rand.nextInt(99999999)) + ""); comment.setOther_id(rand.nextInt(10)); comment.setPraise_count(rand.nextInt(1000)); comment.setReply_count(rand.nextInt(200)); JSONObject jsonObject = (JSONObject) JSON.toJSON(comment); return packEventJson("comment", jsonObject); } /** * 生成單個漢字 */ private static char getRandomChar() { String str = ""; int hightPos; // int lowPos; Random random = new Random(); //隨機生成漢子的兩個字節 hightPos = (176 + Math.abs(random.nextInt(39))); lowPos = (161 + Math.abs(random.nextInt(93))); byte[] b = new byte[2]; b[0] = (Integer.valueOf(hightPos)).byteValue(); b[1] = (Integer.valueOf(lowPos)).byteValue(); try { str = new String(b, "GBK"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); System.out.println("錯誤"); } return str.charAt(0); } /** * 拼接成多個漢字 */ private static String getCONTENT() { StringBuilder str = new StringBuilder(); for (int i = 0; i < rand.nextInt(100); i++) { str.append(getRandomChar()); } return str.toString(); } }
Logback主要用於在磁盤和控制檯打印日誌。
Logback具體使用:
1)在resources文件夾下建立logback.xml文件。
2)在logback.xml文件中填寫以下配置
<?xml version="1.0" encoding="UTF-8"?> <configuration debug="false"> <!--定義日誌文件的存儲地址 勿在 LogBack 的配置中使用相對路徑 --> <property name="LOG_HOME" value="/tmp/logs/" /> <!-- 控制檯輸出 --> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <!--格式化輸出:%d表示日期,%thread表示線程名,%-5level:級別從左顯示5個字符寬度%msg:日誌消息,%n是換行符 --> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> </encoder> </appender> <!-- 按照天天生成日誌文件。存儲事件日誌 --> <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- <File>${LOG_HOME}/app.log</File>設置日誌不超過${log.max.size}時的保存路徑,注意,若是是web項目會保存到Tomcat的bin目錄 下 --> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--日誌文件輸出的文件名 --> <FileNamePattern>${LOG_HOME}/app-%d{yyyy-MM-dd}.log</FileNamePattern> <!--日誌文件保留天數 --> <MaxHistory>30</MaxHistory> </rollingPolicy> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%msg%n</pattern> </encoder> <!--日誌文件最大的大小 --> <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <MaxFileSize>10MB</MaxFileSize> </triggeringPolicy> </appender> <!--異步打印日誌--> <appender name ="ASYNC_FILE" class= "ch.qos.logback.classic.AsyncAppender"> <!-- 不丟失日誌.默認的,若是隊列的80%已滿,則會丟棄TRACT、DEBUG、INFO級別的日誌 --> <discardingThreshold >0</discardingThreshold> <!-- 更改默認的隊列的深度,該值會影響性能.默認值爲256 --> <queueSize>512</queueSize> <!-- 添加附加的appender,最多隻能添加一個 --> <appender-ref ref = "FILE"/> </appender> <!-- 日誌輸出級別 --> <root level="INFO"> <appender-ref ref="STDOUT" /> <appender-ref ref="ASYNC_FILE" /> <appender-ref ref="error" /> </root> </configuration>
見大數據軟件安裝之Hadoop(Apache)(數據存儲及計算)
若HDFS存儲空間緊張,須要對DataNode進行磁盤擴展。
1)在DataNode節點增長磁盤並進行掛載。
2)在hdfs-site.xml文件中配置多目錄,注意新掛載磁盤的訪問權限問題。
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///${hadoop.tmp.dir}/dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value>
</property>
3)增長磁盤後,保證每一個目錄數據均衡
開啓數據均衡命令:bin/start-balancer.sh -threshold 10
對於參數10,表明的是集羣中各個節點的磁盤空間利用率相差不超過10%,可根據實際狀況調整。
中止數據均衡命令:bin/stop-banlancer.sh
1)hadoop自己並不支持壓縮,故須要使用twitter提供的hadoop-lzo開源組件。hadoop-lzo需依賴hadoop和lzo進行編譯,編譯步驟以下。
lzo需依賴hadoop和lzo進行編譯,編譯步驟以下。
Hadoop支持LZO 0. 環境準備 maven(下載安裝,配置環境變量,修改sitting.xml加阿里雲鏡像) gcc-c++ zlib-devel autoconf automake libtool 經過yum安裝便可,yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool 1. 下載、安裝並編譯LZO wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz tar -zxvf lzo-2.10.tar.gz cd lzo-2.10 ./configure -prefix=/usr/local/hadoop/lzo/ make make install 2. 編譯hadoop-lzo源碼 2.1 下載hadoop-lzo的源碼,下載地址:https://github.com/twitter/hadoop-lzo/archive/master.zip 2.2 解壓以後,修改pom.xml <hadoop.current.version>2.7.2</hadoop.current.version> 2.3 聲明兩個臨時環境變量 export C_INCLUDE_PATH=/usr/local/hadoop/lzo/include export LIBRARY_PATH=/usr/local/hadoop/lzo/lib 2.4 編譯 進入hadoop-lzo-master,執行maven編譯命令 mvn package -Dmaven.test.skip=true 2.5 進入target,hadoop-lzo-0.4.21-SNAPSHOT.jar 即編譯成功
2)將編譯好後的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/
[test@hadoop102 common]$ pwd
/opt/module/hadoop-2.7.2/share/hadoop/common
[test@hadoop102 common]$ ls
hadoop-lzo-0.4.20.jar
3)同步hadoop-lzo-0.4.20.jar 到hadoop10三、hadoop104
[test@hadoop102 common]$ xsync hadoop-lzo-0.4.20.jar
4)core-site.xml增長配置支持LZO壓縮
<configuration>
<property>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec,
com.hadoop.compression.lzo.LzoCodec,
com.hadoop.compression.lzo.LzopCodec
</value>
</property>
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
</configuration>
5)同步core-site.xml到hadoop10三、hadoop104
[test@hadoop102 hadoop]$ xsync core-site.xml
6)啓動及查看集羣
[test@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[test@hadoop102 hadoop-2.7.2]$ sbin/start-yarn.sh
1)建立LZO文件的索引,LZO壓縮文件的可切片特性依賴其索引,故咱們須要手動爲LZO壓縮文件建立索引。若無索引,則LZO文件的切片只有一個。
hadoop jar /path/to/your/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer big_file.lzo
2)測試
(1)將bigtable.lzo(150M)上傳到集羣的根目錄
[test@hadoop102 module]$ hadoop fs -mkdir /input
[test@hadoop102 module]$ hadoop fs -put bigtable.lzo /input
(2)對上傳的LZO文件建索引
[test@hadoop102 module]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo
測試內容:向HDFS集羣寫10個128M的文件
[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB
19/05/02 11:45:23 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write
19/05/02 11:45:23 INFO fs.TestDFSIO: Date & time: Thu May 02 11:45:23 CST 2019
19/05/02 11:45:23 INFO fs.TestDFSIO: Number of files: 10
19/05/02 11:45:23 INFO fs.TestDFSIO: Total MBytes processed: 1280.0
19/05/02 11:45:23 INFO fs.TestDFSIO: Throughput mb/sec: 10.69751115716984
19/05/02 11:45:23 INFO fs.TestDFSIO: Average IO rate mb/sec: 14.91699504852295
19/05/02 11:45:23 INFO fs.TestDFSIO: IO rate std deviation: 11.160882132355928
19/05/02 11:45:23 INFO fs.TestDFSIO: Test exec time sec: 52.315
測試內容:讀取HDFS集羣10個128M的文件
[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB
19/05/02 11:56:36 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read
19/05/02 11:56:36 INFO fs.TestDFSIO: Date & time: Thu May 02 11:56:36 CST 2019
19/05/02 11:56:36 INFO fs.TestDFSIO: Number of files: 10
19/05/02 11:56:36 INFO fs.TestDFSIO: Total MBytes processed: 1280.0
19/05/02 11:56:36 INFO fs.TestDFSIO: Throughput mb/sec: 16.001000062503905
19/05/02 11:56:36 INFO fs.TestDFSIO: Average IO rate mb/sec: 17.202795028686523
19/05/02 11:56:36 INFO fs.TestDFSIO: IO rate std deviation: 4.881590515873911
19/05/02 11:56:36 INFO fs.TestDFSIO: Test exec time sec: 49.116
19/05/02 11:56:36 INFO fs.TestDFSIO:
[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -clean
(1)使用RandomWriter來產生隨機數,每一個節點運行10個Map任務,每一個Map產生大約1G大小的二進制隨機數
[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar randomwriter random-data
(2)執行Sort程序
[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar sort random-data sorted-data
(3)驗證數據是否真正排好序了
[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data
1)HDFS參數調優hdfs-site.xml
dfs.namenode.handler.count=20 * log2(Cluster Size),好比集羣規模爲8臺時,此參數設置爲60
The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes.
NameNode有一個工做線程池,用來處理不一樣DataNode的併發心跳以及客戶端併發的元數據操做。對於大集羣或者有大量客戶端的集羣來講,一般須要增大參數dfs.namenode.handler.count的默認值10。設置該值的通常原則是將其設置爲集羣大小的天然對數乘以20,即20logN,N爲集羣大小。
2)YARN參數調優yarn-site.xml
(1)情景描述:總共7臺機器,天天幾億條數據,數據源->Flume->Kafka->HDFS->Hive
面臨問題:數據統計主要用HiveSQL,沒有數據傾斜,小文件已經作了合併處理,開啓的JVM重用,並且IO沒有阻塞,內存用了不到50%。可是仍是跑的很是慢,並且數據量洪峯過來時,整個集羣都會宕掉。基於這種狀況有沒有優化方案。
(2)解決辦法:
內存利用率不夠。這個通常是Yarn的2個配置形成的,單個任務能夠申請的最大內存大小,和Hadoop單個節點可用內存大小。調節這兩個參數能提升系統內存的利用率。
(a)yarn.nodemanager.resource.memory-mb
表示該節點上YARN可以使用的物理內存總量,默認是8192(MB),注意,若是你的節點內存資源不夠8GB,則須要調減少這個值,而YARN不會智能的探測節點的物理內存總量。
(b)yarn.scheduler.maximum-allocation-mb
單個任務可申請的最多物理內存量,默認是8192(MB)。
3)Hadoop宕機
(1)若是MR形成系統宕機。此時要控制Yarn同時運行的任務數,和每一個任務申請的最大內存。調整參數:yarn.scheduler.maximum-allocation-mb(單個任務可申請的最多物理內存量,默認是8192MB)
(2)若是寫入文件過量形成NameNode宕機。那麼調高Kafka的存儲大小,控制從Kafka到HDFS的寫入速度。高峯期的時候用Kafka進行緩存,高峯期過去數據同步會自動跟上。
集羣規劃
|
服務器hadoop102 |
服務器hadoop103 |
服務器hadoop104 |
Zookeeper |
Zookeeper |
Zookeeper |
Zookeeper |
1)在hadoop102的/home/test/bin目錄下建立腳本
[test@hadoop102 bin]$ vim zk.sh
在腳本中編寫以下內容
#! /bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop"
done
};;
"status"){
for i in hadoop102 hadoop103 hadoop104
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status"
done
};;
esac
2)增長腳本執行權限
[test@hadoop102 bin]$ chmod 777 zk.sh
3)Zookeeper集羣啓動腳本
[test@hadoop102 module]$ zk.sh start
4)Zookeeper集羣中止腳本
[test@hadoop102 module]$ zk.sh stop
1)修改/etc/profile文件:用來設置系統環境參數,好比$PATH. 這裏面的環境變量是對系統內全部用戶生效。使用bash命令,須要source /etc/profile一下。
2)修改~/.bashrc文件:針對某一個特定的用戶,環境變量的設置只對該用戶本身有效。使用bash命令,只要以該用戶身份運行命令行就會讀取該文件。
3)把/etc/profile裏面的環境變量追加到~/.bashrc目錄
[test@hadoop102 ~]$ cat /etc/profile >> ~/.bashrc
[test@hadoop103 ~]$ cat /etc/profile >> ~/.bashrc
[test@hadoop104 ~]$ cat /etc/profile >> ~/.bashrc
4)說明
登陸式Shell,採用用戶名好比test登陸,會自動加載/etc/profile
非登陸式Shell,採用ssh 好比ssh hadoop103登陸,不會自動加載/etc/profile,會自動加載~/.bashrc
儘可能將環境變量 部署在 /etc/profile.d/env.sh
1)代碼參數說明
// 參數一:控制發送每條的延時時間,默認是0
Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;
// 參數二:循環遍歷次數
int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;
2)將生成的jar包log-collector-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷貝到hadoop102服務器/opt/module上,並同步到hadoop103的/opt/module路徑下,
[test@hadoop102 module]$ xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar
3)在hadoop102上執行jar程序
[test@hadoop102 module]$ java -classpath log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.test.appclient.AppMain >/opt/module/test.log
說明1:
java -classpath 須要在jar包後面指定全類名;
java -jar 須要查看一下解壓的jar包META-INF/ MANIFEST.MF文件中,Main-Class是否有全類名。若是有能夠用java -jar,若是沒有就須要用到java -classpath
說明2:/dev/null表明linux的空設備文件,全部往這個文件裏面寫入的內容都會丟失,俗稱「黑洞」。
標準輸入0:從鍵盤得到輸入 /proc/self/fd/0
標準輸出1:輸出到屏幕(即控制檯) /proc/self/fd/1
錯誤輸出2:輸出到屏幕(即控制檯) /proc/self/fd/2
4)在/tmp/logs路徑下查看生成的日誌文件
[test@hadoop102 module]$ cd /tmp/logs/
[test@hadoop102 logs]$ ls
app-2020-03-10.log
1)在/home/test/bin目錄下建立腳本lg.sh
[test@hadoop102 bin]$ vim lg.sh
2)在腳本中編寫以下內容
#! /bin/bash
for i in hadoop102 hadoop103
do
ssh $i "java -classpath /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.test.appclient.AppMain $1 $2 >/dev/null 2>&1 &"
done
3)修改腳本執行權限
[test@hadoop102 bin]$ chmod 777 lg.sh
4)啓動腳本
[test@hadoop102 module]$ lg.sh
5)分別在hadoop10二、hadoop103的/tmp/logs目錄上查看生成的數據
[test@hadoop102 logs]$ ls
app-2020-03-10.log
[test@hadoop103 logs]$ ls
app-2020-03-10.log
1)在/home/test/bin目錄下建立腳本dt.sh
[test@hadoop102 bin]$ vim dt.sh
2)在腳本中編寫以下內容
#!/bin/bash
for i in hadoop102 hadoop103 hadoop104
do
echo "========== $i =========="
ssh -t $i "sudo date -s $1"
done
注意:ssh -t 一般用於ssh遠程執行sudo命令
3)修改腳本執行權限
[test@hadoop102 bin]$ chmod 777 dt.sh
4)啓動腳本
[test@hadoop102 bin]$ dt.sh 2020-03-10
1)在/home/test/bin目錄下建立腳本xcall.sh
[test@hadoop102 bin]$ vim xcall.sh
2)在腳本中編寫以下內容
#! /bin/bash
for i in hadoop102 hadoop103 hadoop104
do
echo --------- $i ----------
ssh $i "$*"
done
3)修改腳本執行權限
[test@hadoop102 bin]$ chmod 777 xcall.sh
4)啓動腳本
[test@hadoop102 bin]$ xcall.sh jps
集羣規劃:
|
服務器hadoop102 |
服務器hadoop103 |
服務器hadoop104 |
Flume(採集日誌) |
Flume |
Flume |
|
1)Source
(1)Taildir Source相比Exec Source、Spooling Directory Source的優點
TailDir Source:斷點續傳、多目錄。Flume1.6之前須要本身自定義Source記錄每次讀取文件位置,實現斷點續傳。
Exec Source能夠實時蒐集數據,可是在Flume不運行或者Shell命令出錯的狀況下,數據將會丟失。
Spooling Directory Source監控目錄,不支持斷點續傳。
(2)batchSize大小如何設置?
答:Event 1K左右時,500-1000合適(默認爲100)
2)Channel
採用Kafka Channel,省去了Sink,提升了效率。
注意在Flume1.7之前,Kafka Channel不多有人使用,由於發現parseAsFlumeEvent這個配置起不了做用。也就是不管parseAsFlumeEvent配置爲true仍是false,都會轉爲Flume Event。
這樣的話,形成的結果是,會始終都把Flume的headers中的信息混合着內容一塊兒寫入Kafka的消息中,這顯然不是我所須要的,我只是須要把內容寫入便可。
1)Flume 配置分析
Flume直接讀log日誌的數據,log日誌的格式是app-yyyy-mm-dd.log。
2)Flume的配置以下:
(1)在/opt/module/flume/conf目錄下建立file-flume-kafka.conf文件
[test@hadoop102 conf]$ vim file-flume-kafka.conf
在文件配置以下內容
# 組件定義
a1.sources=r1
a1.channels=c1 c2
# taildir方式數據
# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json #記錄日誌讀取位置
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ #讀取日誌位置
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.test.flume.interceptor.LogETLInterceptor$Builder #ETL攔截器
a1.sources.r1.interceptors.i2.type = com.test.flume.interceptor.LogTypeInterceptor$Builder #日誌類型攔截器
a1.sources.r1.selector.type = multiplexing # 根據日誌類型分數據
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_start #日誌類型是start ,數據發往channel1
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c2.kafka.topic = topic_event #日誌類型是event,數據發往channel2
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
注意:com.test.flume.interceptor.LogETLInterceptor和com.test.flume.interceptor.LogTypeInterceptor是自定義的攔截器的全類名。須要根據用戶自定義的攔截器作相應修改。
本項目中自定義了兩個攔截器,分別是:ETL攔截器、日誌類型區分攔截器。
ETL攔截器主要用於,過濾時間戳不合法和Json數據不完整的日誌
日誌類型區分攔截器主要用於,將啓動日誌和事件日誌區分開來,方便發往Kafka的不一樣Topic。
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
Flume ETL攔截器LogETLInterceptor
package com.test.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; public class LogETLInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 1 獲取數據 byte[] body = event.getBody(); String log = new String(body, Charset.forName("UTF-8")); // 2 判斷數據類型並向Header中賦值 if (log.contains("start")) { if (LogUtils.validateStart(log)){ return event; } }else { if (LogUtils.validateEvent(log)){ return event; } } // 3 返回校驗結果 return null; } @Override public List<Event> intercept(List<Event> events) { ArrayList<Event> interceptors = new ArrayList<>(); for (Event event : events) { Event intercept1 = intercept(event); if (intercept1 != null){ interceptors.add(intercept1); } } return interceptors; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new LogETLInterceptor(); } @Override public void configure(Context context) { } } }
package com.test.flume.interceptor; import org.apache.commons.lang.math.NumberUtils; public class LogUtils { public static boolean validateEvent(String log) { // 服務器時間 | json // 1549696569054 | {"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]} // 1 切割 String[] logContents = log.split("\\|"); // 2 校驗 if(logContents.length != 2){ return false; } //3 校驗服務器時間 if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){ return false; } // 4 校驗json if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){ return false; } return true; } public static boolean validateStart(String log) { // {"action":"1","ar":"MX","ba":"HTC","detail":"542","en":"start","entry":"2","extend1":"","g":"S3HQ7LKM@gmail.com","hw":"640*960","l":"en","la":"-43.4","ln":"-98.3","loading_time":"10","md":"HTC-5","mid":"993","nw":"WIFI","open_ad_type":"1","os":"8.2.1","sr":"D","sv":"V2.9.0","t":"1559551922019","uid":"993","vc":"0","vn":"1.1.5"} if (log == null){ return false; } // 校驗json if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){ return false; } return true; } } 5)Flume日誌類型區分攔截器LogTypeInterceptor package com.test.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Map; public class LogTypeInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 區分日誌類型: body header // 1 獲取body數據 byte[] body = event.getBody(); String log = new String(body, Charset.forName("UTF-8")); // 2 獲取header Map<String, String> headers = event.getHeaders(); // 3 判斷數據類型並向Header中賦值 if (log.contains("start")) { headers.put("topic","topic_start"); }else { headers.put("topic","topic_event"); } return event; } @Override public List<Event> intercept(List<Event> events) { ArrayList<Event> interceptors = new ArrayList<>(); for (Event event : events) { Event intercept1 = intercept(event); interceptors.add(intercept1); } return interceptors; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new LogTypeInterceptor(); } @Override public void configure(Context context) { } } }
攔截器打包以後,只須要單獨包,不須要將依賴的包上傳。打包以後要放入Flume的lib文件夾下面。
1)在/home/test/bin目錄下建立腳本f1.sh
[test@hadoop102 bin]$ vim f1.sh
在腳本中填寫以下內容
#! /bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 do echo " --------啓動 $i 採集flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/test1 2>&1 &" done };; "stop"){ for i in hadoop102 hadoop103 do echo " --------中止 $i 採集flume-------" ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill" done };; esac
說明1:nohup,該命令能夠在你退出賬戶/關閉終端以後繼續運行相應的進程。nohup就是不掛起的意思,不掛斷地運行命令。
說明2:awk 默認分隔符爲空格
說明3:xargs 表示取出前面命令運行的結果,做爲後面命令的輸入參數。
2)增長腳本執行權限
[test@hadoop102 bin]$ chmod 777 f1.sh
3)f1集羣啓動腳本
[test@hadoop102 module]$ f1.sh start
4)f1集羣中止腳本
[test@hadoop102 module]$ f1.sh stop
集羣規劃:
|
服務器hadoop102 |
服務器hadoop103 |
服務器hadoop104 |
Kafka |
Kafka |
Kafka |
Kafka |
1)在/home/test/bin目錄下建立腳本kf.sh
[test@hadoop102 bin]$ vim kf.sh
在腳本中填寫以下內容
#! /bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------啓動 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties "
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------中止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
done
};;
esac
2)增長腳本執行權限
[test@hadoop102 bin]$ chmod 777 kf.sh
3)kf集羣啓動腳本
[test@hadoop102 module]$ kf.sh start
4)kf集羣中止腳本
[test@hadoop102 module]$ kf.sh stop
[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
進入到/opt/module/kafka/目錄下分別建立:啓動日誌主題、事件日誌主題。
1)建立啓動日誌主題
[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --create --replication-factor 1 --partitions 1 --topic topic_start
2)建立事件日誌主題
[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --create --replication-factor 1 --partitions 1 --topic topic_event
1)刪除啓動日誌主題
[test@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_start
2)刪除事件日誌主題
[test@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_event
[test@hadoop102 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic topic_start
>hello world
>test test
[test@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic topic_start
--from-beginning:會把主題中以往全部的數據都讀取出來。根據業務場景選擇是否增長該配置。
[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--describe --topic topic_start
1)Kafka壓測
用Kafka官方自帶的腳本,對Kafka進行壓測。Kafka壓測時,能夠查看到哪一個地方出現了瓶頸(CPU,內存,網絡IO)。通常都是網絡IO達到瓶頸。
kafka-consumer-perf-test.sh
kafka-producer-perf-test.sh
2)Kafka Producer壓力測試
(1)在/opt/module/kafka/bin目錄下面有這兩個文件。咱們來測試一下
[test@hadoop102 kafka]$ bin/kafka-producer-perf-test.sh --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
說明:
record-size是一條信息有多大,單位是字節。
num-records是總共發送多少條信息。
throughput 是每秒多少條信息,設成-1,表示不限流,可測出生產者最大吞吐量。
(2)Kafka會打印下面的信息
100000 records sent, 95877.277085 records/sec (9.14 MB/sec), 187.68 ms avg latency, 424.00 ms max latency, 155 ms 50th, 411 ms 95th, 423 ms 99th, 424 ms 99.9th.
參數解析:本例中一共寫入10w條消息,吞吐量爲9.14 MB/sec,每次寫入的平均延遲爲187.68毫秒,最大的延遲爲424.00毫秒。
3)Kafka Consumer壓力測試
Consumer的測試,若是這四個指標(IO,CPU,內存,網絡)都不能改變,考慮增長分區數來提高性能。
[test@hadoop102 kafka]$
bin/kafka-consumer-perf-test.sh --zookeeper hadoop102:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1
參數說明:
--zookeeper 指定zookeeper的連接信息
--topic 指定topic的名稱
--fetch-size 指定每次fetch的數據的大小
--messages 總共要消費的消息個數
測試結果說明:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2019-02-19 20:29:07:566, 2019-02-19 20:29:12:170, 9.5368, 2.0714, 100010, 21722.4153
開始測試時間,測試結束數據,共消費數據9.5368MB,吞吐量2.0714MB/s,共消費100010條,平均每秒消費21722.4153條。
Kafka機器數量(經驗公式)=2*(峯值生產速度*副本數/100)+1
先拿到峯值生產速度,再根據設定的副本數,就能預估出須要部署Kafka的數量。
好比咱們的峯值生產速度是50M/s。副本數爲2。
Kafka機器數量=2*(50*2/100)+ 1=3臺
集羣規劃
|
服務器hadoop102 |
服務器hadoop103 |
服務器hadoop104 |
Flume(消費Kafka) |
|
|
Flume |
1)Flume配置分析
2)Flume的具體配置以下:
(1)在hadoop104的/opt/module/flume/conf目錄下建立kafka-flume-hdfs.conf文件
[test@hadoop104 conf]$ vim kafka-flume-hdfs.conf
在文件配置以下內容
## 組件定義
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1 #kafka start主題源數據
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start
## source2 #kafka event主題源數據
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
## sink1 #start主題數據輸出到HDFS路徑
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
##sink2 #event 主題數據輸出到HDFS路徑若是hadoop和flume不在一臺服務器須要在路徑前邊增長hdfs://hadoop102:9000/
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
## 不要產生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10 #生成文件大小設定
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制輸出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream #支持LZO數據壓縮設置
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
## 拼裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
1)FileChannel和MemoryChannel區別
MemoryChannel傳輸數據速度更快,但由於數據保存在JVM的堆內存中,Agent進程掛掉會致使數據丟失,適用於對數據質量要求不高的需求。
FileChannel傳輸速度相對於Memory慢,但數據安全保障高,Agent進程掛掉也能夠從失敗中恢復數據。
2)FileChannel優化
經過配置dataDirs指向多個路徑,每一個路徑對應不一樣的硬盤,增大Flume吞吐量。
官方說明以下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir和backupCheckpointDir也儘可能配置在不一樣硬盤對應的目錄中,保證checkpoint壞掉後,能夠快速使用backupCheckpointDir恢復數據
3)Sink:HDFS Sink
(1)HDFS存入大量小文件,有什麼影響?
元數據層面:每一個小文件都有一份元數據,其中包括文件路徑,文件名,全部者,所屬組,權限,建立時間等,這些信息都保存在Namenode內存中。因此小文件過多,會佔用Namenode服務器大量內存,影響Namenode性能和使用壽命
計算層面:默認狀況下MR會對每一個小文件啓用一個Map任務計算,很是影響計算性能。同時也影響磁盤尋址時間。
(2)HDFS小文件處理
官方默認的這三個參數配置寫入HDFS後會產生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基於以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0幾個參數綜合做用,效果以下:
(1)文件在達到128M時會滾動生成新文件
(2)文件建立超3600秒時會滾動生成新文件
1)在/home/test/bin目錄下建立腳本f2.sh
[test@hadoop102 bin]$ vim f2.sh
#! /bin/bash case $1 in "start"){ for i in hadoop104 do echo " --------啓動 $i 消費flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt 2>&1 &" done };; "stop"){ for i in hadoop104 do echo " --------中止 $i 消費flume-------" ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill" done };; esac
2)增長腳本執行權限
[test@hadoop102 bin]$ chmod 777 f2.sh
3)f2集羣啓動腳本
[test@hadoop102 module]$ f2.sh start
4)f2集羣中止腳本
[test@hadoop102 module]$ f2.sh stop
1)問題描述:若是啓動消費Flume拋出以下異常
ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded
2)解決方案步驟:
(1)在hadoop102服務器的/opt/module/flume/conf/flume-env.sh文件中增長以下配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
(2)同步配置到hadoop10三、hadoop104服務器
[test@hadoop102 conf]$ xsync flume-env.sh
3)Flume內存參數設置及優化
JVM heap通常設置爲4G或更高,部署在單獨的服務器上(4核8線程16G內存)
-Xmx與-Xms最好設置一致,減小內存抖動帶來的性能影響,若是設置不一致容易致使頻繁fullgc。
-Xms表示JVM Heap(堆內存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆內存)最大容許的尺寸,按需分配。若是不設置一致,容易在初始化時,因爲內存不夠,頻繁觸發fullgc。
1)在/home/test/bin目錄下建立腳本cluster.sh
[test@hadoop102 bin]$ vim cluster.sh
#! /bin/bash
case $1 in
"start"){
echo " -------- 啓動 集羣 -------"
echo " -------- 啓動 hadoop集羣 -------"
/opt/module/hadoop-2.7.2/sbin/start-dfs.sh
ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/start-yarn.sh"
#啓動 Zookeeper集羣
zk.sh start
sleep 4s;
#啓動 Flume採集集羣
f1.sh start
#啓動 Kafka採集集羣
kf.sh start
sleep 6s;
#啓動 Flume消費集羣
f2.sh start
};;
"stop"){
echo " -------- 中止 集羣 -------"
#中止 Flume消費集羣
f2.sh stop
#中止 Kafka採集集羣
kf.sh stop
sleep 6s;
#中止 Flume採集集羣
f1.sh stop
#中止 Zookeeper集羣
zk.sh stop
echo " -------- 中止 hadoop集羣 -------"
ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh"
/opt/module/hadoop-2.7.2/sbin/stop-dfs.sh
};;
esac
2)增長腳本執行權限
[test@hadoop102 bin]$ chmod 777 cluster.sh
3)cluster集羣啓動腳本
[test@hadoop102 module]$ cluster.sh start
4)cluster集羣中止腳本
[test@hadoop102 module]$ cluster.sh stop
數據倉庫的輸入數據源和輸出系統分別是什麼?
輸入系統:埋點產生的用戶給行爲數據、JavaEE後臺產生的業務數據。
輸出系統:報表系統、用戶畫像系統、推薦系統
1)Apache:運維麻煩,組件間兼容性須要本身調研。(通常大廠使用,技術實力雄厚,有專業的運維人員)(建議使用)
2)CDH:國內使用最多的版本,但CM不開源,但其實對中、小公司使用來講沒有影響
3)HDP:開源,能夠進行二次開發,可是沒有CDH穩定,國內使用較少
1)Linux經常使用命令
序號 |
命令 |
命令解釋 |
1 |
top |
查看內存 |
2 |
df -h |
查看磁盤存儲狀況 |
3 |
iotop |
查看磁盤IO讀寫(yum install iotop安裝) |
4 |
iotop -o |
直接查看比較高的磁盤讀寫程序 |
5 |
netstat -tunlp | grep 端口號 |
查看端口占用狀況 |
6 |
uptime |
查看報告系統運行時長及平均負載 |
7 |
ps aux |
查看進程 |
2)Shell經常使用工具
awk、sed、cut、sort
1)Hadoop默認不支持LZO壓縮,若是須要支持LZO壓縮,須要添加jar包,並在hadoop的cores-site.xml文件中添加相關壓縮配置。
見 項目經驗之LZO建立索引
2)Hadoop經常使用端口號
50070 hdfs,8088 mr任務,19888 歷史服務器,9000 客戶端訪問集羣
3)Hadoop配置文件以及簡單的Hadoop集羣搭建
core-site.xml hadoop-env.sh
hdfs-site.xml yarn-env.sh
yarn-site.xml mapred-env.sh
mapred-site.xml slaves
4)HDFS讀流程和寫流程
5)MapReduce的Shuffle過程及Hadoop優化(包括:壓縮、小文件、集羣優化)
Shuffer在map方法以後,reduce方法以前
數據出來後首先進入getpartition(),而後進入還原緩衝區,還原緩衝區一側存數據,一側存索引,到達80%進行反向溢寫,
還原緩衝區默認大小是100M。溢寫過程當中(進行排序,按照快排的手段排序,對key的索引排序,按照字典順序排),溢寫 以前要進行各類排序,排完序以後把溢寫文件存進來(產生大量溢寫文件),對溢寫文件進行歸併排序,歸併完以後按照指定分 區存好數據。等待reduce端來拉去數據,拉取本身指定分區數據,拉取過來先放到內存,內存不夠溢寫到磁盤,無論事內存 仍是磁盤數據都進行歸併,歸併過程中進行分組排序,最後進入到對應的reduce方法裏去。
Shuffer優化 還原緩衝區默認大小是100 調到200M ;設置到90%溢寫(減小溢寫文件個數,起到優化做用);
溢寫文件能夠提早採用一次combiner(前提條件是求和);默認一次歸併個數是10個,能夠調到20個-30個;
爲了減小磁盤IO在map端對數據採用壓縮;有幾個地方能夠壓縮Map輸入端、Map輸出端、Reduce輸出端能夠進行壓縮;
6)Yarn的Job提交流程
7)Yarn的默認調度器、調度器分類、以及他們之間的區別
默認是FIFO調度器
FIFO調度器、容量調度器、公平調度器
FIFO調度器:先進先出
選型:
對併發度要求搞,且錢的公司:公平調度器(中、大公司)
對併發度要求不是過高,且不是特別錢:容量(中小公司)
容量調度器:默認只一個default隊列,在開發時會用多個隊列
技術框架:hive、spark、flink
業務建立隊列:登錄註冊、購物車、用戶行爲、業務數據。。。分開放的好處是解耦、下降風險
8)HDFS存儲多目錄
9)Hadoop參數調優
10)項目經驗之基準測試
1)選舉機制
半數機制,安裝奇數臺服務器
10臺服務器安裝幾個zookeeper:3臺。
20臺服務器安裝幾個zookeeper:5臺。
100臺服務器安裝幾個zookeeper:11臺。
不是越多越好,也不是越少越好。若是多,通訊時間常,效率低;如太少,可靠性差。
2)經常使用命令
ls、get、create
1)Flume組成,Put事務,Take事務
Taildir Source:斷點續傳、多目錄。Flume1.6之前須要本身自定義Source記錄每次讀取文件位置,實現斷點續傳。
File Channel:數據存儲在磁盤,宕機數據能夠保存。可是傳輸速率慢。適合對數據傳輸可靠性要求高的場景,好比,金融行業。
Memory Channel:數據存儲在內存中,宕機數據丟失。傳輸速率快。適合對數據傳輸可靠性要求不高的場景,好比,普通的日誌數據。
Kafka Channel:減小了Flume的Sink階段,提升了傳輸效率。
Source到Channel是Put事務
Channel到Sink是Take事務
2)Flume攔截器
(1)攔截器注意事項
項目中自定義了:ETL攔截器和區分類型攔截器。
採用兩個攔截器的優缺點:優勢,模塊化開發和可移植性;缺點,性能會低一些
(2)自定義攔截器步驟
a)實現 Interceptor
b)重寫四個方法
c)靜態內部類,實現Interceptor.Builder
3)Flume Channel選擇器
4)Flume 監控器
Ganglia
5)Flume採集數據會丟失嗎?
不會,Channel存儲能夠存儲在File中,數據傳輸自身有事務。
6)Flume內存
開發中在flume-env.sh中設置JVM heap爲4G或更高,部署在單獨的服務器上(4核8線程16G內存)
-Xmx與-Xms最好設置一致,減小內存抖動帶來的性能影響,若是設置不一致容易致使頻繁fullgc。
-Xms表示JVM Heap(堆內存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆內存)最大容許的尺寸,按需分配。若是不設置一致,容易在初始化時,因爲內存不夠,頻繁觸發fullgc。
7)FileChannel優化
經過配置dataDirs指向多個路徑,每一個路徑對應不一樣的硬盤,增大Flume吞吐量。
官方說明以下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir和backupCheckpointDir也儘可能配置在不一樣硬盤對應的目錄中,保證checkpoint壞掉後,能夠快速使用backupCheckpointDir恢復數據
8)Sink:HDFS Sink小文件處理
(1)HDFS存入大量小文件,有什麼影響?
元數據層面:每一個小文件都有一份元數據,其中包括文件路徑,文件名,全部者,所屬組,權限,建立時間等,這些信息都保存在Namenode內存中。因此小文件過多,會佔用Namenode服務器大量內存,影響Namenode性能和使用壽命
計算層面:默認狀況下MR會對每一個小文件啓用一個Map任務計算,很是影響計算性能。同時也影響磁盤尋址時間。
(2)HDFS小文件處理
官方默認的這三個參數配置寫入HDFS後會產生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基於以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0幾個參數綜合做用,效果以下:
(1)文件在達到128M時會滾動生成新文件
(2)文件建立超3600秒時會滾動生成新文件
舉例:在2018-01-01 05:23的時侯sink接收到數據,那會產生以下tmp文件:
1)Kafka壓測
Kafka官方自帶壓力測試腳本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka壓測時,能夠查看到哪一個地方出現了瓶頸(CPU,內存,網絡IO)。通常都是網絡IO達到瓶頸。
2)Kafka的機器數量
Kafka機器數量=2*(峯值生產速度*副本數/100)+1
3)Kafka的日誌保存時間
3天
4)Kafka的硬盤大小
天天的數據量*3天
5)Kafka監控
公司本身開發的監控器;
開源的監控器:KafkaManager、KafkaMonitor
6)Kakfa分區數。
(1)建立一個只有1個分區的topic
(2)測試這個topic的producer吞吐量和consumer吞吐量。
(3)假設他們的值分別是Tp和Tc,單位能夠是MB/s。
(4)而後假設總的目標吞吐量是Tt,那麼分區數=Tt / min(Tp,Tc)
例如:producer吞吐量=10m/s;consumer吞吐量=50m/s,指望吞吐量100m/s;
分區數=100 / 10 =10分區
分區數通常設置爲:3-10個
7)副本數設定
通常咱們設置成2個或3個,不少企業設置爲2個。
8)多少個Topic
一般狀況:多少個日誌類型就多少個Topic。也有對日誌類型進行合併的。
9)Kafka丟不丟數據
Ack=0,producer不等待kafka broker的ack,一直生產數據。
Ack=1,leader數據落盤就發送ack,producer收到ack才繼續生產數據。
Ack=-1,ISR中的全部副本數據羅盤才發送ack,producer收到ack才繼續生產數據。
10)Kafka的ISR副本同步隊列
ISR(In-Sync Replicas),副本同步隊列。ISR中包括Leader和Follower。若是Leader進程掛掉,會在ISR隊列中選擇一個服務做爲新的Leader。有replica.lag.max.messages(延遲條數)和replica.lag.time.max.ms(延遲時間)兩個參數決定一臺服務是否能夠加入ISR副本隊列,在0.10版本移除了replica.lag.max.messages參數,防止服務頻繁的進去隊列。
任意一個維度超過閾值都會把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也會先存放在OSR中。
11)Kafka分區分配
Range和RoundRobin
12)Kafka中數據量計算
天天總數據量100g,天天產生1億條日誌, 10000萬/24/60/60=1150條/每秒鐘
平均每秒鐘:1150條
低谷每秒鐘:400條
高峯每秒鐘:1150條*(2-20倍)=2300條-23000條
每條日誌大小:0.5k-2k(取1k)
每秒多少數據量:2.0M-20MB
13) Kafka掛掉
(1)Flume記錄
(2)日誌有記錄
(3)短時間沒事
14)Kafka消息數據積壓,Kafka消費能力不足怎麼處理?
(1)若是是Kafka消費能力不足,則能夠考慮增長Topic的分區數,而且同時提高消費組的消費者數量,消費者數=分區數。(二者缺一不可)
(2)若是是下游的數據處理不及時:提升每批次拉取的數量。批次拉取數據過少(拉取數據/處理時間<生產速度),使處理的數據小於生產的數據,也會形成數據積壓。
15)Kafka冪等性
Kafka0.11版本引入了冪等性,冪等性配合at least once語義能夠實現exactly once語義。但只能保證單次會話的冪等。
16)Kafka事務
Kafka0.11版本引入Kafka的事務機制,其能夠保證生產者發往多個分區的一批數據的原子性。