大數據項目之電商數倉一(用戶行爲採集)

1、數據倉庫概念

數據倉庫(Data Warehouse)html

  是爲企業全部決策制定過程,提供全部系統數據支持的戰略集合。java

2、項目需求及架構設計

2.1 項目需求分析

  一、項目需求node

   1)用戶行爲數據採集平臺搭建linux

   2)業務數據採集平臺搭建git

   3)數據倉庫維度建模github

      4)分析:用戶、流量、會員、商品、銷售、地區、活動等電商核心主題,統計的報表指標近100。web

      5)採用即席查詢工具,隨時進行指標分析sql

      6)對集羣性能進行監控,發生異常須要報警apache

    7)元數據管理json

      8)質量監控

2.2 項目框架

2.2.1 技術選型

技術選型主要須要考慮的因素:數據量大小、業務需求、行業內經驗、技術成熟度、開發維護成本、總成本預算

  數據採集傳輸:FlumeKafkaSqoop、Logstash、DataX、

  數據存儲:MysqlHDFS、HBase、Redis、MongoDB

  數據計算:HiveTezSpark、Flink、Storm

  數據查詢:PrestoDruid、Impala、Kylin

  數據可視化:Echarts、Superset、QuickBI、DataV

  任務調度:Azkaban、Oozie

  集羣監控:Zabbix

  元數據管理:Atlas

  數據質量監控:Griffin

2.2.2 系統數據流程設計

2.2.3 框架版本選型

2.2.4 服務器選型

  服務器是選擇物理機仍是雲主機?

1)物理機:

  128G內存,20核物理CPU,40線程,8THDD和2TSSD硬盤,戴爾品牌單臺報價4萬出頭。通常物理機壽命5年左右

2)雲主機:

  以阿里云爲例,和上面大體相同配置,每一年5萬

2.2.5 集羣資源規劃設計

一、集羣規模

1)如何確認集羣規模?(按每臺服務器8T磁盤,128G內存)

(1)按天天日活躍用戶100萬,每人一天平均100條:100萬*100條 = 1億條

(2)每條日誌1K左右,天天1億條:100000000 / 1024 /1024 = 約100G

(3)半年內不擴容服務器來算:100G * 180 天 = 約18T

(4)保存3個副本:18T * 3 = 54T

(5)預留20%~30%Buffer=54T/0.7=77T

(6)須要約8T*10臺服務器

2)若是要考慮數倉分層?數據採用壓縮?須要從新計算

二、集羣服務器規劃

服務名稱

子服務

服務器

hadoop102

服務器

hadoop103

服務器

hadoop104

HDFS

NameNode

 

 

DataNode

SecondaryNameNode

 

 

Yarn

NodeManager

Resourcemanager

 

 

Zookeeper

Zookeeper Server

Flume(採集日誌)

Flume

 

Kafka

Kafka

Flume(消費Kafka)

Flume

 

 

Hive

Hive

 

 

MySQL

MySQL

 

 

Sqoop

Sqoop

 

 

Presto

Coordinator

 

 

Worker

 

Azkaban

AzkabanWebServer

 

 

AzkabanExecutorServer

 

 

Druid

Druid

Kylin

 

 

 

Hbase

HMaster

 

 

HRegionServer

Superset

 

 

 

Atlas

 

 

 

Solr

Jar

 

 

Griffin

 

 

 

服務數總計

 

19

9

9

3、數據生成模塊

3.1 埋點數據基本格式

公共字段:基本全部安卓手機都包含的字段

業務字段:埋點上報的字段,有具體的業務類型

下面就是一個示例,表示業務字段的上傳。

{

"ap":"xxxxx",//項目數據來源 app pc

"cm": {  //公共字段

      "mid": "",  // (String) 設備惟一標識

        "uid": "",  // (String) 用戶標識

        "vc": "1",  // (String) versionCode,程序版本號

        "vn": "1.0",  // (String) versionName,程序版本名

        "l": "zh",  // (String) language系統語言

        "sr": "",  // (String) 渠道號,應用從哪一個渠道來的

        "os": "7.1.1",  // (String) Android系統版本

        "ar": "CN",  // (String) area區域

        "md": "BBB100-1",  // (String) model手機型號

        "ba": "blackberry",  // (String) brand手機品牌

        "sv": "V2.2.1",  // (String) sdkVersion

        "g": "",  // (String) gmail

        "hw": "1620x1080",  // (String) heightXwidth,屏幕寬高

        "t": "1506047606608",  // (String) 客戶端日誌產生時的時間

        "nw": "WIFI",  // (String) 網絡模式

        "ln": 0,  // (double) lng經度

        "la": 0  // (double) lat 緯度

    },

"et":  [  //事件

            {

                "ett": "1506047605364",  //客戶端事件產生時間

                "en": "display",  //事件名稱

                "kv": {  //事件結果,以key-value形式自行定義

                    "goodsid": "236",

                    "action": "1",

                    "extend1": "1",

"place": "2",

"category": "75"

                }

            }

        ]

}

示例日誌(服務器時間戳 | 日誌):

1540934156385|{

    "ap": "gmall",

    "cm": {

        "uid": "1234",

        "vc": "2",

        "vn": "1.0",

        "la": "EN",

        "sr": "",

        "os": "7.1.1",

        "ar": "CN",

        "md": "BBB100-1",

        "ba": "blackberry",

        "sv": "V2.2.1",

        "g": "abc@gmail.com",

        "hw": "1620x1080",

        "t": "1506047606608",

        "nw": "WIFI",

        "ln": 0

    },

        "et": [

            {

                "ett": "1506047605364",  //客戶端事件產生時間

                "en": "display",  //事件名稱

                "kv": {  //事件結果,以key-value形式自行定義

                    "goodsid": "236",

                    "action": "1",

                    "extend1": "1",

"place": "2",

"category": "75"

                }

            },{

              "ett": "1552352626835",

              "en": "active_background",

              "kv": {

                   "active_source": "1"

              }

           }

        ]

    }

}

下面是各個埋點日誌格式。其中商品點擊屬於信息流的範疇

3.2 事件日誌數

 

 

 

3.2.1 商品列表頁(loading)

事件名稱:loading

標籤

含義

action

動做:開始加載=1,加載成功=2,加載失敗=3

loading_time

加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間)

loading_way

加載類型:1-讀取緩存,2-從接口拉新數據
(加載成功才上報加載類型)

extend1

擴展字段 Extend1

extend2

擴展字段 Extend2

type

加載類型:自動加載=1,用戶下拽加載=2,底部加載=3(底部條觸發點擊底部提示條/點擊返回頂部加載)

type1

加載失敗碼:把加載失敗狀態碼報回來(報空爲加載成功,沒有失敗)

 

3.2.2 商品點擊(display)

事件標籤:display

標籤

含義

action

動做:曝光商品=1,點擊商品=2,

goodsid

商品ID(服務端下發的ID)

place

順序(第幾條商品,第一條爲0,第二條爲1,如此類推)

extend1

曝光類型:1 - 首次曝光 2-重複曝光

category

分類ID(服務端定義的分類ID)

     

 

3.2.3 商品詳情頁(newsdetail)

事件標籤:newsdetail

標籤

含義

entry

頁面入口來源:應用首頁=一、push=二、詳情頁相關推薦=3

action

動做:開始加載=1,加載成功=2(pv),加載失敗=3, 退出頁面=4

goodsid

商品ID(服務端下發的ID)

show_style

商品樣式:0、無圖、一、一張大圖、二、兩張圖、三、三張小圖、四、一張小圖、五、一張大圖兩張小圖

news_staytime

頁面停留時長:從商品開始加載時開始計算,到用戶關閉頁面所用的時間。若中途用跳轉到其它頁面了,則暫停計時,待回到詳情頁時恢復計時。或中途劃出的時間超過10分鐘,則本次計時做廢,不上報本次數據。如未加載成功退出,則報空。

loading_time

加載時長:計算頁面開始加載到接口返回數據的時間 (開始加載報0,加載成功或加載失敗才上報時間)

type1

加載失敗碼:把加載失敗狀態碼報回來(報空爲加載成功,沒有失敗)

category

分類ID(服務端定義的分類ID)

     

 

3.2.4 廣告(ad)

事件名稱:ad

標籤

含義

entry

入口:商品列表頁=1  應用首頁=2 商品詳情頁=3

action

動做: 廣告展現=1 廣告點擊=2

contentType

Type: 1 商品 2 營銷活動  

displayMills

展現時長 毫秒數

itemId

商品id

activityId

營銷活動id

 

3.2.5 消息通知(notification)

事件標籤:notification

標籤

含義

action

動做:通知產生=1,通知彈出=2,通知點擊=3,常駐通知展現(不重複上報,一天以內只報一次)=4

type

通知id:預警通知=1,天氣預報(早=2,晚=3),常駐=4

ap_time

客戶端彈出時間

content

備用字段

3.2.6 用戶後臺活躍(active_background)

事件標籤: active_background

標籤

含義

active_source

1=upgrade,2=download(下載),3=plugin_upgrade

3.2.7 評論(comment)

描述:評論表

序號

字段名稱

字段描述

字段類型

長度

容許空

缺省值

1

comment_id

評論表

int

10,0

 

 

2

userid

用戶id

int

10,0

0

3

p_comment_id

父級評論id(爲0則是一級評論,不爲0則是回覆)

int

10,0

 

4

content

評論內容

string

1000

 

5

addtime

建立時間

string

 

 

6

other_id

評論的相關id

int

10,0

 

7

praise_count

點贊數量

int

10,0

0

8

reply_count

回覆數量

int

10,0

0

 

3.2.8 收藏(favorites)

描述:收藏

序號

字段名稱

字段描述

字段類型

長度

容許空

缺省值

1

id

主鍵

int

10,0

 

 

2

course_id

商品id

int

10,0

0

3

userid

用戶ID

int

10,0

0

4

add_time

建立時間

string

 

 

3.2.9 點贊(praise)

描述:全部的點贊表

序號

字段名稱

字段描述

字段類型

長度

容許空

缺省值

1

id

主鍵id

int

10,0

 

 

2

userid

用戶id

int

10,0

 

3

target_id

點讚的對象id

int

10,0

 

4

type

點贊類型 1問答點贊 2問答評論點贊 3 文章點贊數4 評論點贊

int

10,0

 

5

add_time

添加時間

string

 

 

 

3.2.10 錯誤日誌

 

errorBrief

錯誤摘要

errorDetail

錯誤詳情

3.3 啓動日誌數據

事件標籤: start

標籤

含義

entry

入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget =5

open_ad_type

開屏廣告類型:  開屏原生廣告=1, 開屏插屏廣告=2

action

狀態:成功=1  失敗=2

loading_time

加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間)

detail

失敗碼(沒有則上報空)

extend1

失敗的message(沒有則上報空)

en

日誌類型start

 

{
    "action":"1",
    "ar":"MX",
    "ba":"HTC",
    "detail":"",
    "en":"start",
    "entry":"2",
    "extend1":"",
    "g":"43R2SEQX@gmail.com",
    "hw":"640*960",
    "l":"en",
    "la":"20.4",
    "ln":"-99.3",
    "loading_time":"2",
    "md":"HTC-2",
    "mid":"995",
    "nw":"4G",
    "open_ad_type":"2",
    "os":"8.1.2",
    "sr":"B",
    "sv":"V2.0.6",
    "t":"1561472502444",
    "uid":"995",
    "vc":"10",
    "vn":"1.3.4"
}

3.4 數據生成腳本

3.1.1 建立Mavne工程

1)建立 log-collector

GroupId : com.test

Project name : log-collector

2)建立一個包名:com.test.appclient

3)在com.test.appclient包下建立一個類,AppMain。

4)在pom.xml文件中添加以下內容

 

<!--版本號統一-->
<properties>
    <slf4j.version>1.7.20</slf4j.version>
    <logback.version>1.0.7</logback.version>
</properties>

<dependencies>
    <!--阿里巴巴開源json解析框架-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.51</version>
    </dependency>

    <!--日誌生成框架-->
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>${logback.version}</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>${logback.version}</version>
    </dependency>
</dependencies>

<!--編譯打包插件-->
<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin </artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.test.appclient.AppMain</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

注意:com.test.appclient.AppMain要和本身建的全類名一致。

3.1.2 公共字段Bean

1)建立包名:com.test.bean

2)在com.test.bean包下依次建立以下bean對象

package com.test.bean;
/**
 * 公共日誌
 */
public class AppBase{

    private String mid; // (String) 設備惟一標識
    private String uid; // (String) 用戶uid
    private String vc;  // (String) versionCode,程序版本號
    private String vn;  // (String) versionName,程序版本名
    private String l;   // (String) 系統語言
    private String sr;  // (String) 渠道號,應用從哪一個渠道來的。
    private String os;  // (String) Android系統版本
    private String ar;  // (String) 區域
    private String md;  // (String) 手機型號
    private String ba;  // (String) 手機品牌
    private String sv;  // (String) sdkVersion
    private String g;   // (String) gmail
    private String hw;  // (String) heightXwidth,屏幕寬高
    private String t;   // (String) 客戶端日誌產生時的時間
    private String nw;  // (String) 網絡模式
    private String ln;  // (double) lng經度
    private String la;  // (double) lat 緯度

    public String getMid() {
        return mid;
    }

    public void setMid(String mid) {
        this.mid = mid;
    }

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }

    public String getVc() {
        return vc;
    }

    public void setVc(String vc) {
        this.vc = vc;
    }

    public String getVn() {
        return vn;
    }

    public void setVn(String vn) {
        this.vn = vn;
    }

    public String getL() {
        return l;
    }

    public void setL(String l) {
        this.l = l;
    }

    public String getSr() {
        return sr;
    }

    public void setSr(String sr) {
        this.sr = sr;
    }

    public String getOs() {
        return os;
    }

    public void setOs(String os) {
        this.os = os;
    }

    public String getAr() {
        return ar;
    }

    public void setAr(String ar) {
        this.ar = ar;
    }

    public String getMd() {
        return md;
    }

    public void setMd(String md) {
        this.md = md;
    }

    public String getBa() {
        return ba;
    }

    public void setBa(String ba) {
        this.ba = ba;
    }

    public String getSv() {
        return sv;
    }

    public void setSv(String sv) {
        this.sv = sv;
    }

    public String getG() {
        return g;
    }

    public void setG(String g) {
        this.g = g;
    }

    public String getHw() {
        return hw;
    }

    public void setHw(String hw) {
        this.hw = hw;
    }

    public String getT() {
        return t;
    }

    public void setT(String t) {
        this.t = t;
    }

    public String getNw() {
        return nw;
    }

    public void setNw(String nw) {
        this.nw = nw;
    }

    public String getLn() {
        return ln;
    }

    public void setLn(String ln) {
        this.ln = ln;
    }

    public String getLa() {
        return la;
    }

    public void setLa(String la) {
        this.la = la;
    }
}

3.1.3 啓動日誌Bean

package com.test.bean;
/**
 * 啓動日誌
 */
public class AppStart extends AppBase {

    private String entry;//入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget =5
    private String open_ad_type;//開屏廣告類型:  開屏原生廣告=1, 開屏插屏廣告=2
    private String action;//狀態:成功=1  失敗=2
    private String loading_time;//加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間)
    private String detail;//失敗碼(沒有則上報空)
    private String extend1;//失敗的message(沒有則上報空)
    private String en;//啓動日誌類型標記

    public String getEntry() {
        return entry;
    }

    public void setEntry(String entry) {
        this.entry = entry;
    }

    public String getOpen_ad_type() {
        return open_ad_type;
    }

    public void setOpen_ad_type(String open_ad_type) {
        this.open_ad_type = open_ad_type;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getLoading_time() {
        return loading_time;
    }

    public void setLoading_time(String loading_time) {
        this.loading_time = loading_time;
    }

    public String getDetail() {
        return detail;
    }

    public void setDetail(String detail) {
        this.detail = detail;
    }

    public String getExtend1() {
        return extend1;
    }

    public void setExtend1(String extend1) {
        this.extend1 = extend1;
    }

    public String getEn() {
        return en;
    }

    public void setEn(String en) {
        this.en = en;
    }
}

3.1.4 錯誤日誌Bean

package com.test.bean;
/**
 * 錯誤日誌
 */
public class AppErrorLog {

    private String errorBrief;    //錯誤摘要
    private String errorDetail;   //錯誤詳情

    public String getErrorBrief() {
        return errorBrief;
    }

    public void setErrorBrief(String errorBrief) {
        this.errorBrief = errorBrief;
    }

    public String getErrorDetail() {
        return errorDetail;
    }

    public void setErrorDetail(String errorDetail) {
        this.errorDetail = errorDetail;
    }
}

3.1.5 事件日誌Bean之商品詳情

package com.test.bean;
/**
 * 商品詳情
 */
public class AppNewsDetail {

    private String entry;//頁面入口來源:應用首頁=一、push=二、詳情頁相關推薦=3
    private String action;//動做:開始加載=1,加載成功=2(pv),加載失敗=3, 退出頁面=4
    private String goodsid;//商品ID(服務端下發的ID)
    private String showtype;//商品樣式:0、無圖一、一張大圖二、兩張圖三、三張小圖四、一張小圖五、一張大圖兩張小圖    來源於詳情頁相關推薦的商品,上報樣式都爲0(由於都是左文右圖)
    private String news_staytime;//頁面停留時長:從商品開始加載時開始計算,到用戶關閉頁面所用的時間。若中途用跳轉到其它頁面了,則暫停計時,待回到詳情頁時恢復計時。或中途劃出的時間超過10分鐘,則本次計時做廢,不上報本次數據。如未加載成功退出,則報空。
    private String loading_time;//加載時長:計算頁面開始加載到接口返回數據的時間 (開始加載報0,加載成功或加載失敗才上報時間)
    private String type1;//加載失敗碼:把加載失敗狀態碼報回來(報空爲加載成功,沒有失敗)
    private String category;//分類ID(服務端定義的分類ID)

    public String getEntry() {
        return entry;
    }

    public void setEntry(String entry) {
        this.entry = entry;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getGoodsid() {
        return goodsid;
    }

    public void setGoodsid(String goodsid) {
        this.goodsid = goodsid;
    }

    public String getShowtype() {
        return showtype;
    }

    public void setShowtype(String showtype) {
        this.showtype = showtype;
    }

    public String getNews_staytime() {
        return news_staytime;
    }

    public void setNews_staytime(String news_staytime) {
        this.news_staytime = news_staytime;
    }

    public String getLoading_time() {
        return loading_time;
    }

    public void setLoading_time(String loading_time) {
        this.loading_time = loading_time;
    }

    public String getType1() {
        return type1;
    }

    public void setType1(String type1) {
        this.type1 = type1;
    }

    public String getCategory() {
        return category;
    }

    public void setCategory(String category) {
        this.category = category;
    }
}

3.1.6 事件日誌Bean之商品列表

package com.test.bean;
/**
 * 商品列表
 */
public class AppLoading {
    private String action;//動做:開始加載=1,加載成功=2,加載失敗=3
    private String loading_time;//加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間)
    private String loading_way;//加載類型:1-讀取緩存,2-從接口拉新數據   (加載成功才上報加載類型)
    private String extend1;//擴展字段 Extend1
    private String extend2;//擴展字段 Extend2
    private String type;//加載類型:自動加載=1,用戶下拽加載=2,底部加載=3(底部條觸發點擊底部提示條/點擊返回頂部加載)
    private String type1;//加載失敗碼:把加載失敗狀態碼報回來(報空爲加載成功,沒有失敗)

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getLoading_time() {
        return loading_time;
    }

    public void setLoading_time(String loading_time) {
        this.loading_time = loading_time;
    }

    public String getLoading_way() {
        return loading_way;
    }

    public void setLoading_way(String loading_way) {
        this.loading_way = loading_way;
    }

    public String getExtend1() {
        return extend1;
    }

    public void setExtend1(String extend1) {
        this.extend1 = extend1;
    }

    public String getExtend2() {
        return extend2;
    }

    public void setExtend2(String extend2) {
        this.extend2 = extend2;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getType1() {
        return type1;
    }

    public void setType1(String type1) {
        this.type1 = type1;
    }
}

3.1.7 事件日誌Bean之廣告

package com.test.bean;
/**
 * 廣告
 */
public class AppAd {

    private String entry;//入口:商品列表頁=1  應用首頁=2 商品詳情頁=3
    private String action;//動做: 廣告展現=1 廣告點擊=2
    private String contentType;//Type: 1 商品 2 營銷活動
    private String displayMills;//展現時長 毫秒數
    private String itemId; //商品id
    private String activityId; //營銷活動id


    public String getEntry() {
        return entry;
    }

    public void setEntry(String entry) {
        this.entry = entry;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getActivityId() {
        return activityId;
    }

    public void setActivityId(String activityId) {
        this.activityId = activityId;
    }

    public String getContentType() {
        return contentType;
    }

    public void setContentType(String contentType) {
        this.contentType = contentType;
    }

    public String getDisplayMills() {
        return displayMills;
    }

    public void setDisplayMills(String displayMills) {
        this.displayMills = displayMills;
    }

    public String getItemId() {
        return itemId;
    }

    public void setItemId(String itemId) {
        this.itemId = itemId;
    }
}

3.1.8 事件日誌Bean之商品點擊

package com.test.bean;
/**
 * 商品點擊日誌
 */
public class AppDisplay {

    private String action;//動做:曝光商品=1,點擊商品=2,
    private String goodsid;//商品ID(服務端下發的ID)
    private String place;//順序(第幾條商品,第一條爲0,第二條爲1,如此類推)
    private String extend1;//曝光類型:1 - 首次曝光 2-重複曝光(沒有使用)
    private String category;//分類ID(服務端定義的分類ID)

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getGoodsid() {
        return goodsid;
    }

    public void setGoodsid(String goodsid) {
        this.goodsid = goodsid;
    }

    public String getPlace() {
        return place;
    }

    public void setPlace(String place) {
        this.place = place;
    }

    public String getExtend1() {
        return extend1;
    }

    public void setExtend1(String extend1) {
        this.extend1 = extend1;
    }

    public String getCategory() {
        return category;
    }

    public void setCategory(String category) {
        this.category = category;
    }
}

3.1.9 事件日誌Bean之消息通知

package com.test.bean;
/**
 * 消息通知日誌
 */
public class AppNotification {
    private String action;//動做:通知產生=1,通知彈出=2,通知點擊=3,常駐通知展現(不重複上報,一天以內只報一次)=4
    private String type;//通知id:預警通知=1,天氣預報(早=2,晚=3),常駐=4
    private String ap_time;//客戶端彈出時間
    private String content;//備用字段

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getAp_time() {
        return ap_time;
    }

    public void setAp_time(String ap_time) {
        this.ap_time = ap_time;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

3.1.10 事件日誌Bean之用戶後臺活躍

package com.test.bean;
/**
 * 用戶後臺活躍
 */
public class AppActive_background {
    private String active_source;//1=upgrade,2=download(下載),3=plugin_upgrade

    public String getActive_source() {
        return active_source;
    }

    public void setActive_source(String active_source) {
        this.active_source = active_source;
    }
}

3.1.11 事件日誌Bean之用戶評論

package com.test.bean;
/**
 * 評論
 */
public class AppComment {

    private int comment_id;//評論表
    private int userid;//用戶id
    private  int p_comment_id;//父級評論id(爲0則是一級評論,不爲0則是回覆)
    private String content;//評論內容
    private String addtime;//建立時間
    private int other_id;//評論的相關id
    private int praise_count;//點贊數量
    private int reply_count;//回覆數量

    public int getComment_id() {
        return comment_id;
    }

    public void setComment_id(int comment_id) {
        this.comment_id = comment_id;
    }

    public int getUserid() {
        return userid;
    }

    public void setUserid(int userid) {
        this.userid = userid;
    }

    public int getP_comment_id() {
        return p_comment_id;
    }

    public void setP_comment_id(int p_comment_id) {
        this.p_comment_id = p_comment_id;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public String getAddtime() {
        return addtime;
    }

    public void setAddtime(String addtime) {
        this.addtime = addtime;
    }

    public int getOther_id() {
        return other_id;
    }

    public void setOther_id(int other_id) {
        this.other_id = other_id;
    }

    public int getPraise_count() {
        return praise_count;
    }

    public void setPraise_count(int praise_count) {
        this.praise_count = praise_count;
    }

    public int getReply_count() {
        return reply_count;
    }

    public void setReply_count(int reply_count) {
        this.reply_count = reply_count;
    }
}

3.1.12 事件日誌Bean之用戶收藏

package com.test.bean;
/**
 * 收藏
 */
public class AppFavorites {
    private int id;//主鍵
    private int course_id;//商品id
    private int userid;//用戶ID
    private String add_time;//建立時間

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getCourse_id() {
        return course_id;
    }

    public void setCourse_id(int course_id) {
        this.course_id = course_id;
    }

    public int getUserid() {
        return userid;
    }

    public void setUserid(int userid) {
        this.userid = userid;
    }

    public String getAdd_time() {
        return add_time;
    }

    public void setAdd_time(String add_time) {
        this.add_time = add_time;
    }
}

3.1.13 事件日誌Bean之用戶點贊

package com.test.bean;
/**
 * 點贊
 */
public class AppPraise {
    private int id; //主鍵id
    private int userid;//用戶id
    private int target_id;//點讚的對象id
    private int type;//點贊類型 1問答點贊 2問答評論點贊 3 文章點贊數4 評論點贊
    private String add_time;//添加時間

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getUserid() {
        return userid;
    }

    public void setUserid(int userid) {
        this.userid = userid;
    }

    public int getTarget_id() {
        return target_id;
    }

    public void setTarget_id(int target_id) {
        this.target_id = target_id;
    }

    public int getType() {
        return type;
    }

    public void setType(int type) {
        this.type = type;
    }

    public String getAdd_time() {
        return add_time;
    }

    public void setAdd_time(String add_time) {
        this.add_time = add_time;
    }
}

3.1.14 主函數

在AppMain類中添加以下內容:

package com.test.appclient;

import java.io.UnsupportedEncodingException;
import java.util.Random;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.test.bean.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 日誌行爲數據模擬
 */
public class AppMain {

    private final static Logger logger = LoggerFactory.getLogger(AppMain.class);
    private static Random rand = new Random();

    // 設備id
    private static int s_mid = 0;

    // 用戶id
    private static int s_uid = 0;

    // 商品id
    private static int s_goodsid = 0;

    public static void main(String[] args) {

        // 參數一:控制發送每條的延時時間,默認是0
        Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;

        // 參數二:循環遍歷次數
        int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;

        // 生成數據
        generateLog(delay, loop_len);
    }

    private static void generateLog(Long delay, int loop_len) {

        for (int i = 0; i < loop_len; i++) {

            int flag = rand.nextInt(2);

            switch (flag) {
                case (0):
                    //應用啓動
                    AppStart appStart = generateStart();
                    String jsonString = JSON.toJSONString(appStart);

                    //控制檯打印
                    logger.info(jsonString);
                    break;

                case (1):

                    JSONObject json = new JSONObject();

                    json.put("ap", "app");
                    json.put("cm", generateComFields());

                    JSONArray eventsArray = new JSONArray();

                    // 事件日誌
                    // 商品點擊,展現
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateDisplay());
                        json.put("et", eventsArray);
                    }

                    // 商品詳情頁
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateNewsDetail());
                        json.put("et", eventsArray);
                    }

                    // 商品列表頁
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateNewList());
                        json.put("et", eventsArray);
                    }

                    // 廣告
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateAd());
                        json.put("et", eventsArray);
                    }

                    // 消息通知
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateNotification());
                        json.put("et", eventsArray);
                    }

                    // 用戶後臺活躍
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateBackground());
                        json.put("et", eventsArray);
                    }

                    //故障日誌
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateError());
                        json.put("et", eventsArray);
                    }

                    // 用戶評論
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateComment());
                        json.put("et", eventsArray);
                    }

                    // 用戶收藏
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateFavorites());
                        json.put("et", eventsArray);
                    }

                    // 用戶點贊
                    if (rand.nextBoolean()) {
                        eventsArray.add(generatePraise());
                        json.put("et", eventsArray);
                    }

                    //時間
                    long millis = System.currentTimeMillis();

                    //控制檯打印
                    logger.info(millis + "|" + json.toJSONString());
                    break;
            }

            // 延遲
            try {
                Thread.sleep(delay);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 公共字段設置
     */
    private static JSONObject generateComFields() {

        AppBase appBase = new AppBase();

        //設備id
        appBase.setMid(s_mid + "");
        s_mid++;

        // 用戶id
        appBase.setUid(s_uid + "");
        s_uid++;

        // 程序版本號 5,6等
        appBase.setVc("" + rand.nextInt(20));

        //程序版本名 v1.1.1
        appBase.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10));

        // 安卓系統版本
        appBase.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10));

        // 語言  es,en,pt
        int flag = rand.nextInt(3);
        switch (flag) {
            case (0):
                appBase.setL("es");
                break;
            case (1):
                appBase.setL("en");
                break;
            case (2):
                appBase.setL("pt");
                break;
        }

        // 渠道號   從哪一個渠道來的
        appBase.setSr(getRandomChar(1));

        // 區域
        flag = rand.nextInt(2);
        switch (flag) {
            case 0:
                appBase.setAr("BR");
            case 1:
                appBase.setAr("MX");
        }

        // 手機品牌 ba ,手機型號 md,就取2位數字了
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appBase.setBa("Sumsung");
                appBase.setMd("sumsung-" + rand.nextInt(20));
                break;
            case 1:
                appBase.setBa("Huawei");
                appBase.setMd("Huawei-" + rand.nextInt(20));
                break;
            case 2:
                appBase.setBa("HTC");
                appBase.setMd("HTC-" + rand.nextInt(20));
                break;
        }

        // 嵌入sdk的版本
        appBase.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10));
        // gmail
        appBase.setG(getRandomCharAndNumr(8) + "@gmail.com");

        // 屏幕寬高 hw
        flag = rand.nextInt(4);
        switch (flag) {
            case 0:
                appBase.setHw("640*960");
                break;
            case 1:
                appBase.setHw("640*1136");
                break;
            case 2:
                appBase.setHw("750*1134");
                break;
            case 3:
                appBase.setHw("1080*1920");
                break;
        }

        // 客戶端產生日誌時間
        long millis = System.currentTimeMillis();
        appBase.setT("" + (millis - rand.nextInt(99999999)));

        // 手機網絡模式 3G,4G,WIFI
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appBase.setNw("3G");
                break;
            case 1:
                appBase.setNw("4G");
                break;
            case 2:
                appBase.setNw("WIFI");
                break;
        }

        // 拉丁美洲 西經34°46′至西經117°09;北緯32°42′至南緯53°54′
        // 經度
        appBase.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + "");
        // 緯度
        appBase.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + "");

        return (JSONObject) JSON.toJSON(appBase);
    }

    /**
     * 商品展現事件
     */
    private static JSONObject generateDisplay() {

        AppDisplay appDisplay = new AppDisplay();

        boolean boolFlag = rand.nextInt(10) < 7;

        // 動做:曝光商品=1,點擊商品=2,
        if (boolFlag) {
            appDisplay.setAction("1");
        } else {
            appDisplay.setAction("2");
        }

        // 商品id
        String goodsId = s_goodsid + "";
        s_goodsid++;

        appDisplay.setGoodsid(goodsId);

        // 順序  設置成6條吧
        int flag = rand.nextInt(6);
        appDisplay.setPlace("" + flag);

        // 曝光類型
        flag = 1 + rand.nextInt(2);
        appDisplay.setExtend1("" + flag);

        // 分類
        flag = 1 + rand.nextInt(100);
        appDisplay.setCategory("" + flag);

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appDisplay);

        return packEventJson("display", jsonObject);
    }

    /**
     * 商品詳情頁
     */
    private static JSONObject generateNewsDetail() {

        AppNewsDetail appNewsDetail = new AppNewsDetail();

        // 頁面入口來源
        int flag = 1 + rand.nextInt(3);
        appNewsDetail.setEntry(flag + "");

        // 動做
        appNewsDetail.setAction("" + (rand.nextInt(4) + 1));

        // 商品id
        appNewsDetail.setGoodsid(s_goodsid + "");

        // 商品來源類型
        flag = 1 + rand.nextInt(3);
        appNewsDetail.setShowtype(flag + "");

        // 商品樣式
        flag = rand.nextInt(6);
        appNewsDetail.setShowtype("" + flag);

        // 頁面停留時長
        flag = rand.nextInt(10) * rand.nextInt(7);
        appNewsDetail.setNews_staytime(flag + "");

        // 加載時長
        flag = rand.nextInt(10) * rand.nextInt(7);
        appNewsDetail.setLoading_time(flag + "");

        // 加載失敗碼
        flag = rand.nextInt(10);
        switch (flag) {
            case 1:
                appNewsDetail.setType1("102");
                break;
            case 2:
                appNewsDetail.setType1("201");
                break;
            case 3:
                appNewsDetail.setType1("325");
                break;
            case 4:
                appNewsDetail.setType1("433");
                break;
            case 5:
                appNewsDetail.setType1("542");
                break;
            default:
                appNewsDetail.setType1("");
                break;
        }

        // 分類
        flag = 1 + rand.nextInt(100);
        appNewsDetail.setCategory("" + flag);

        JSONObject eventJson = (JSONObject) JSON.toJSON(appNewsDetail);

        return packEventJson("newsdetail", eventJson);
    }

    /**
     * 商品列表
     */
    private static JSONObject generateNewList() {

        AppLoading appLoading = new AppLoading();

        // 動做
        int flag = rand.nextInt(3) + 1;
        appLoading.setAction(flag + "");

        // 加載時長
        flag = rand.nextInt(10) * rand.nextInt(7);
        appLoading.setLoading_time(flag + "");

        // 失敗碼
        flag = rand.nextInt(10);
        switch (flag) {
            case 1:
                appLoading.setType1("102");
                break;
            case 2:
                appLoading.setType1("201");
                break;
            case 3:
                appLoading.setType1("325");
                break;
            case 4:
                appLoading.setType1("433");
                break;
            case 5:
                appLoading.setType1("542");
                break;
            default:
                appLoading.setType1("");
                break;
        }

        // 頁面  加載類型
        flag = 1 + rand.nextInt(2);
        appLoading.setLoading_way("" + flag);

        // 擴展字段1
        appLoading.setExtend1("");

        // 擴展字段2
        appLoading.setExtend2("");

        // 用戶加載類型
        flag = 1 + rand.nextInt(3);
        appLoading.setType("" + flag);

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appLoading);

        return packEventJson("loading", jsonObject);
    }

    /**
     * 廣告相關字段
     */
    private static JSONObject generateAd() {

        AppAd appAd = new AppAd();

        // 入口
        int flag = rand.nextInt(3) + 1;
        appAd.setEntry(flag + "");

        // 動做
        flag = rand.nextInt(5) + 1;
        appAd.setAction(flag + "");

        // 內容類型類型
        flag = rand.nextInt(6)+1;
        appAd.setContentType(flag+ "");

        // 展現樣式
        flag = rand.nextInt(120000)+1000;
        appAd.setDisplayMills(flag+"");

        flag=rand.nextInt(1);
        if(flag==1){
            appAd.setContentType(flag+"");
            flag =rand.nextInt(6);
            appAd.setItemId(flag+ "");
        }else{
            appAd.setContentType(flag+"");
            flag =rand.nextInt(1)+1;
            appAd.setActivityId(flag+ "");
        }

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appAd);

        return packEventJson("ad", jsonObject);
    }

    /**
     * 啓動日誌
     */
    private static AppStart generateStart() {

        AppStart appStart = new AppStart();

        //設備id
        appStart.setMid(s_mid + "");
        s_mid++;

        // 用戶id
        appStart.setUid(s_uid + "");
        s_uid++;

        // 程序版本號 5,6等
        appStart.setVc("" + rand.nextInt(20));

        //程序版本名 v1.1.1
        appStart.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10));

        // 安卓系統版本
        appStart.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10));

        //設置日誌類型
        appStart.setEn("start");

        //    語言  es,en,pt
        int flag = rand.nextInt(3);
        switch (flag) {
            case (0):
                appStart.setL("es");
                break;
            case (1):
                appStart.setL("en");
                break;
            case (2):
                appStart.setL("pt");
                break;
        }

        // 渠道號   從哪一個渠道來的
        appStart.setSr(getRandomChar(1));

        // 區域
        flag = rand.nextInt(2);
        switch (flag) {
            case 0:
                appStart.setAr("BR");
            case 1:
                appStart.setAr("MX");
        }

        // 手機品牌 ba ,手機型號 md,就取2位數字了
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appStart.setBa("Sumsung");
                appStart.setMd("sumsung-" + rand.nextInt(20));
                break;
            case 1:
                appStart.setBa("Huawei");
                appStart.setMd("Huawei-" + rand.nextInt(20));
                break;
            case 2:
                appStart.setBa("HTC");
                appStart.setMd("HTC-" + rand.nextInt(20));
                break;
        }

        // 嵌入sdk的版本
        appStart.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10));
        // gmail
        appStart.setG(getRandomCharAndNumr(8) + "@gmail.com");

        // 屏幕寬高 hw
        flag = rand.nextInt(4);
        switch (flag) {
            case 0:
                appStart.setHw("640*960");
                break;
            case 1:
                appStart.setHw("640*1136");
                break;
            case 2:
                appStart.setHw("750*1134");
                break;
            case 3:
                appStart.setHw("1080*1920");
                break;
        }

        // 客戶端產生日誌時間
        long millis = System.currentTimeMillis();
        appStart.setT("" + (millis - rand.nextInt(99999999)));

        // 手機網絡模式 3G,4G,WIFI
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appStart.setNw("3G");
                break;
            case 1:
                appStart.setNw("4G");
                break;
            case 2:
                appStart.setNw("WIFI");
                break;
        }

        // 拉丁美洲 西經34°46′至西經117°09;北緯32°42′至南緯53°54′
        // 經度
        appStart.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + "");
        // 緯度
        appStart.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + "");

        // 入口
        flag = rand.nextInt(5) + 1;
        appStart.setEntry(flag + "");

        // 開屏廣告類型
        flag = rand.nextInt(2) + 1;
        appStart.setOpen_ad_type(flag + "");

        // 狀態
        flag = rand.nextInt(10) > 8 ? 2 : 1;
        appStart.setAction(flag + "");

        // 加載時長
        appStart.setLoading_time(rand.nextInt(20) + "");

        // 失敗碼
        flag = rand.nextInt(10);
        switch (flag) {
            case 1:
                appStart.setDetail("102");
                break;
            case 2:
                appStart.setDetail("201");
                break;
            case 3:
                appStart.setDetail("325");
                break;
            case 4:
                appStart.setDetail("433");
                break;
            case 5:
                appStart.setDetail("542");
                break;
            default:
                appStart.setDetail("");
                break;
        }

        // 擴展字段
        appStart.setExtend1("");

        return appStart;
    }
    /**
     * 消息通知
     */
    private static JSONObject generateNotification() {

        AppNotification appNotification = new AppNotification();

        int flag = rand.nextInt(4) + 1;

        // 動做
        appNotification.setAction(flag + "");

        // 通知id
        flag = rand.nextInt(4) + 1;
        appNotification.setType(flag + "");

        // 客戶端彈時間
        appNotification.setAp_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        // 備用字段
        appNotification.setContent("");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appNotification);

        return packEventJson("notification", jsonObject);
    }

    /**
     * 後臺活躍
     */
    private static JSONObject generateBackground() {

        AppActive_background appActive_background = new AppActive_background();

        // 啓動源
        int flag = rand.nextInt(3) + 1;
        appActive_background.setActive_source(flag + "");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appActive_background);

        return packEventJson("active_background", jsonObject);
    }

    /**
     * 錯誤日誌數據
     */
    private static JSONObject generateError() {

        AppErrorLog appErrorLog = new AppErrorLog();

        String[] errorBriefs = {"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)", "at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"};        //錯誤摘要
        String[] errorDetails = {"java.lang.NullPointerException\\n    " + "at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n " + "at cn.lift.dfdf.web.AbstractBaseController.validInbound", "at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n " + "at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n" + " at java.lang.reflect.Method.invoke(Method.java:606)\\n"};        //錯誤詳情

        //錯誤摘要
        appErrorLog.setErrorBrief(errorBriefs[rand.nextInt(errorBriefs.length)]);
        //錯誤詳情
        appErrorLog.setErrorDetail(errorDetails[rand.nextInt(errorDetails.length)]);

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appErrorLog);

        return packEventJson("error", jsonObject);
    }

    /**
     * 爲各個事件類型的公共字段(時間、事件類型、Json數據)拼接
     */
    private static JSONObject packEventJson(String eventName, JSONObject jsonObject) {

        JSONObject eventJson = new JSONObject();

        eventJson.put("ett", (System.currentTimeMillis() - rand.nextInt(99999999)) + "");
        eventJson.put("en", eventName);
        eventJson.put("kv", jsonObject);

        return eventJson;
    }

    /**
     * 獲取隨機字母組合
     *
     * @param length 字符串長度
     */
    private static String getRandomChar(Integer length) {

        StringBuilder str = new StringBuilder();
        Random random = new Random();

        for (int i = 0; i < length; i++) {
            // 字符串
            str.append((char) (65 + random.nextInt(26)));// 取得大寫字母
        }

        return str.toString();
    }

    /**
     * 獲取隨機字母數字組合
     * @param length 字符串長度
     */
    private static String getRandomCharAndNumr(Integer length) {

        StringBuilder str = new StringBuilder();
        Random random = new Random();

        for (int i = 0; i < length; i++) {

            boolean b = random.nextBoolean();

            if (b) { // 字符串
                // int choice = random.nextBoolean() ? 65 : 97; 取得65大寫字母仍是97小寫字母
                str.append((char) (65 + random.nextInt(26)));// 取得大寫字母
            } else { // 數字
                str.append(String.valueOf(random.nextInt(10)));
            }
        }

        return str.toString();
    }

    /**
     * 收藏
     */
    private static JSONObject generateFavorites() {

        AppFavorites favorites = new AppFavorites();

        favorites.setCourse_id(rand.nextInt(10));
        favorites.setUserid(rand.nextInt(10));
        favorites.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(favorites);

        return packEventJson("favorites", jsonObject);
    }

    /**
     * 點贊
     */
    private static JSONObject generatePraise() {

        AppPraise praise = new AppPraise();

        praise.setId(rand.nextInt(10));
        praise.setUserid(rand.nextInt(10));
        praise.setTarget_id(rand.nextInt(10));
        praise.setType(rand.nextInt(4) + 1);
        praise.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(praise);

        return packEventJson("praise", jsonObject);
    }

    /**
     * 評論
     */
    private static JSONObject generateComment() {

        AppComment comment = new AppComment();

        comment.setComment_id(rand.nextInt(10));
        comment.setUserid(rand.nextInt(10));
        comment.setP_comment_id(rand.nextInt(5));

        comment.setContent(getCONTENT());
        comment.setAddtime((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        comment.setOther_id(rand.nextInt(10));
        comment.setPraise_count(rand.nextInt(1000));
        comment.setReply_count(rand.nextInt(200));

        JSONObject jsonObject = (JSONObject) JSON.toJSON(comment);

        return packEventJson("comment", jsonObject);
    }

    /**
     * 生成單個漢字
     */
    private static char getRandomChar() {

        String str = "";
        int hightPos; //
        int lowPos;

        Random random = new Random();

        //隨機生成漢子的兩個字節
        hightPos = (176 + Math.abs(random.nextInt(39)));
        lowPos = (161 + Math.abs(random.nextInt(93)));

        byte[] b = new byte[2];
        b[0] = (Integer.valueOf(hightPos)).byteValue();
        b[1] = (Integer.valueOf(lowPos)).byteValue();

        try {
            str = new String(b, "GBK");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            System.out.println("錯誤");
        }

        return str.charAt(0);
    }

    /**
     * 拼接成多個漢字
     */
    private static String getCONTENT() {

        StringBuilder str = new StringBuilder();

        for (int i = 0; i < rand.nextInt(100); i++) {
            str.append(getRandomChar());
        }

        return str.toString();
    }
}

3.1.15 配置日誌打印Logback

Logback主要用於在磁盤和控制檯打印日誌

Logback具體使用:

1)在resources文件夾下建立logback.xml文件。

2)在logback.xml文件中填寫以下配置

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
   <!--定義日誌文件的存儲地址 勿在 LogBack 的配置中使用相對路徑 -->
   <property name="LOG_HOME" value="/tmp/logs/" />

   <!-- 控制檯輸出 -->
   <appender name="STDOUT"
      class="ch.qos.logback.core.ConsoleAppender">
      <encoder
         class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
         <!--格式化輸出:%d表示日期,%thread表示線程名,%-5level:級別從左顯示5個字符寬度%msg:日誌消息,%n是換行符 -->
         <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
      </encoder>
   </appender>
   
   <!-- 按照天天生成日誌文件。存儲事件日誌 -->
   <appender name="FILE"
      class="ch.qos.logback.core.rolling.RollingFileAppender">
      <!-- <File>${LOG_HOME}/app.log</File>設置日誌不超過${log.max.size}時的保存路徑,注意,若是是web項目會保存到Tomcat的bin目錄 下 -->  
      <rollingPolicy
         class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
         <!--日誌文件輸出的文件名 -->
         <FileNamePattern>${LOG_HOME}/app-%d{yyyy-MM-dd}.log</FileNamePattern>
         <!--日誌文件保留天數 -->
         <MaxHistory>30</MaxHistory>
      </rollingPolicy>
      <encoder
         class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
         <pattern>%msg%n</pattern>
      </encoder>
      <!--日誌文件最大的大小 -->
      <triggeringPolicy
         class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
         <MaxFileSize>10MB</MaxFileSize>
      </triggeringPolicy>
   </appender>

    <!--異步打印日誌-->
    <appender name ="ASYNC_FILE" class= "ch.qos.logback.classic.AsyncAppender">
        <!-- 不丟失日誌.默認的,若是隊列的80%已滿,則會丟棄TRACT、DEBUG、INFO級別的日誌 -->
        <discardingThreshold >0</discardingThreshold>
        <!-- 更改默認的隊列的深度,該值會影響性能.默認值爲256 -->
        <queueSize>512</queueSize>
        <!-- 添加附加的appender,最多隻能添加一個 -->
        <appender-ref ref = "FILE"/>
    </appender>

    <!-- 日誌輸出級別 -->
   <root level="INFO">
      <appender-ref ref="STDOUT" />
      <appender-ref ref="ASYNC_FILE" />
      <appender-ref ref="error" />
   </root>
</configuration>

3.1.16 Maven打jar包

4、數據採集模塊

4.1 Hadoop安裝

 見大數據軟件安裝之Hadoop(Apache)(數據存儲及計算)

4.1.1 項目經驗之HDFS存儲多目錄

若HDFS存儲空間緊張,須要對DataNode進行磁盤擴展。

1)在DataNode節點增長磁盤並進行掛載。

 

 

 2)在hdfs-site.xml文件中配置多目錄,注意新掛載磁盤的訪問權限問題。

<property>

    <name>dfs.datanode.data.dir</name>

    <value>file:///${hadoop.tmp.dir}/dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value>

</property>

3)增長磁盤後,保證每一個目錄數據均衡

  開啓數據均衡命令:bin/start-balancer.sh -threshold 10

  對於參數10,表明的是集羣中各個節點的磁盤空間利用率相差不超過10%,可根據實際狀況調整。

  中止數據均衡命令:bin/stop-banlancer.sh

4.1.2 項目經驗之LZO壓縮配置

1)hadoop自己並不支持壓縮,故須要使用twitter提供的hadoop-lzo開源組件。hadoop-lzo需依賴hadoop和lzo進行編譯,編譯步驟以下。

lzo需依賴hadoop和lzo進行編譯,編譯步驟以下。

Hadoop支持LZO

0. 環境準備
maven(下載安裝,配置環境變量,修改sitting.xml加阿里雲鏡像)
gcc-c++
zlib-devel
autoconf
automake
libtool
經過yum安裝便可,yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool

1. 下載、安裝並編譯LZO

wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz

tar -zxvf lzo-2.10.tar.gz

cd lzo-2.10

./configure -prefix=/usr/local/hadoop/lzo/

make

make install

2. 編譯hadoop-lzo源碼

2.1 下載hadoop-lzo的源碼,下載地址:https://github.com/twitter/hadoop-lzo/archive/master.zip
2.2 解壓以後,修改pom.xml
    <hadoop.current.version>2.7.2</hadoop.current.version>
2.3 聲明兩個臨時環境變量
     export C_INCLUDE_PATH=/usr/local/hadoop/lzo/include
     export LIBRARY_PATH=/usr/local/hadoop/lzo/lib 
2.4 編譯
    進入hadoop-lzo-master,執行maven編譯命令
    mvn package -Dmaven.test.skip=true
2.5 進入target,hadoop-lzo-0.4.21-SNAPSHOT.jar 即編譯成功

2)將編譯好後的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/

[test@hadoop102 common]$ pwd

/opt/module/hadoop-2.7.2/share/hadoop/common

[test@hadoop102 common]$ ls

hadoop-lzo-0.4.20.jar

3)同步hadoop-lzo-0.4.20.jar 到hadoop10三、hadoop104

[test@hadoop102 common]$ xsync hadoop-lzo-0.4.20.jar

4)core-site.xml增長配置支持LZO壓縮

<configuration>

 <property>

  <name>io.compression.codecs</name>

  <value>

  org.apache.hadoop.io.compress.GzipCodec,

  org.apache.hadoop.io.compress.DefaultCodec,

  org.apache.hadoop.io.compress.BZip2Codec,

  org.apache.hadoop.io.compress.SnappyCodec,

  com.hadoop.compression.lzo.LzoCodec,

  com.hadoop.compression.lzo.LzopCodec

  </value>

 </property>

<property>

    <name>io.compression.codec.lzo.class</name>

    <value>com.hadoop.compression.lzo.LzoCodec</value>

</property>

</configuration>

5)同步core-site.xml到hadoop10三、hadoop104

[test@hadoop102 hadoop]$ xsync core-site.xml

6)啓動及查看集羣

[test@hadoop102 hadoop-2.7.2]$  sbin/start-dfs.sh

[test@hadoop102 hadoop-2.7.2]$ sbin/start-yarn.sh

4.1.3 項目經驗之LZO建立索引

1)建立LZO文件的索引,LZO壓縮文件的可切片特性依賴其索引,故咱們須要手動爲LZO壓縮文件建立索引。若無索引,則LZO文件的切片只有一個。

hadoop jar /path/to/your/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer big_file.lzo

2)測試

(1)將bigtable.lzo(150M)上傳到集羣的根目錄

[test@hadoop102 module]$ hadoop fs -mkdir /input

[test@hadoop102 module]$ hadoop fs -put bigtable.lzo /input

(2)對上傳的LZO文件建索引

[test@hadoop102 module]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar  com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo

4.1.4 項目經驗之基準測試

1)測試HDFS寫性能

測試內容:向HDFS集羣寫10個128M的文件

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB

19/05/02 11:45:23 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write

19/05/02 11:45:23 INFO fs.TestDFSIO:            Date & time: Thu May 02 11:45:23 CST 2019

19/05/02 11:45:23 INFO fs.TestDFSIO:        Number of files: 10

19/05/02 11:45:23 INFO fs.TestDFSIO: Total MBytes processed: 1280.0

19/05/02 11:45:23 INFO fs.TestDFSIO:      Throughput mb/sec: 10.69751115716984

19/05/02 11:45:23 INFO fs.TestDFSIO: Average IO rate mb/sec: 14.91699504852295

19/05/02 11:45:23 INFO fs.TestDFSIO:  IO rate std deviation: 11.160882132355928

19/05/02 11:45:23 INFO fs.TestDFSIO:     Test exec time sec: 52.315

2)測試HDFS讀性能

測試內容:讀取HDFS集羣10個128M的文件

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB

19/05/02 11:56:36 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read

19/05/02 11:56:36 INFO fs.TestDFSIO:            Date & time: Thu May 02 11:56:36 CST 2019

19/05/02 11:56:36 INFO fs.TestDFSIO:        Number of files: 10

19/05/02 11:56:36 INFO fs.TestDFSIO: Total MBytes processed: 1280.0

19/05/02 11:56:36 INFO fs.TestDFSIO:      Throughput mb/sec: 16.001000062503905

19/05/02 11:56:36 INFO fs.TestDFSIO: Average IO rate mb/sec: 17.202795028686523

19/05/02 11:56:36 INFO fs.TestDFSIO:  IO rate std deviation: 4.881590515873911

19/05/02 11:56:36 INFO fs.TestDFSIO:     Test exec time sec: 49.116

19/05/02 11:56:36 INFO fs.TestDFSIO:

3)刪除測試生成數據

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -clean

4)使用Sort程序評測MapReduce

(1)使用RandomWriter來產生隨機數,每一個節點運行10個Map任務,每一個Map產生大約1G大小的二進制隨機數

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar randomwriter random-data

(2)執行Sort程序

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar sort random-data sorted-data

(3)驗證數據是否真正排好序了

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data

4.1.5 項目經驗之Hadoop參數調優

1)HDFS參數調優hdfs-site.xml

dfs.namenode.handler.count=20 * log2(Cluster Size),好比集羣規模爲8臺時,此參數設置爲60

The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes.

NameNode有一個工做線程池,用來處理不一樣DataNode的併發心跳以及客戶端併發的元數據操做。對於大集羣或者有大量客戶端的集羣來講,一般須要增大參數dfs.namenode.handler.count的默認值10。設置該值的通常原則是將其設置爲集羣大小的天然對數乘以20,即20logN,N爲集羣大小。

2)YARN參數調優yarn-site.xml

(1)情景描述:總共7臺機器,天天幾億條數據,數據源->Flume->Kafka->HDFS->Hive

面臨問題:數據統計主要用HiveSQL,沒有數據傾斜,小文件已經作了合併處理,開啓的JVM重用,並且IO沒有阻塞,內存用了不到50%。可是仍是跑的很是慢,並且數據量洪峯過來時,整個集羣都會宕掉。基於這種狀況有沒有優化方案。

(2)解決辦法:

內存利用率不夠。這個通常是Yarn的2個配置形成的,單個任務能夠申請的最大內存大小,和Hadoop單個節點可用內存大小。調節這兩個參數能提升系統內存的利用率。

(a)yarn.nodemanager.resource.memory-mb

表示該節點上YARN可以使用的物理內存總量,默認是8192(MB),注意,若是你的節點內存資源不夠8GB,則須要調減少這個值,而YARN不會智能的探測節點的物理內存總量。

(b)yarn.scheduler.maximum-allocation-mb

單個任務可申請的最多物理內存量,默認是8192(MB)。

3)Hadoop宕機

(1)若是MR形成系統宕機。此時要控制Yarn同時運行的任務數,和每一個任務申請的最大內存。調整參數:yarn.scheduler.maximum-allocation-mb(單個任務可申請的最多物理內存量,默認是8192MB)

(2)若是寫入文件過量形成NameNode宕機。那麼調高Kafka的存儲大小,控制從Kafka到HDFS的寫入速度。高峯期的時候用Kafka進行緩存,高峯期過去數據同步會自動跟上。

4.2 Zookeeper安裝

大數據軟件安裝之ZooKeeper監控 

集羣規劃

 

服務器hadoop102

服務器hadoop103

服務器hadoop104

Zookeeper

Zookeeper

Zookeeper

Zookeeper

4.2.1 ZK集羣啓動中止腳本

1)在hadoop102的/home/test/bin目錄下建立腳本

[test@hadoop102 bin]$ vim zk.sh

       在腳本中編寫以下內容

#! /bin/bash

 

case $1 in

"start"){

   for i in hadoop102 hadoop103 hadoop104

   do

      ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start"

   done

};;

"stop"){

   for i in hadoop102 hadoop103 hadoop104

   do

      ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop"

   done

};;

"status"){

   for i in hadoop102 hadoop103 hadoop104

   do

      ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status"

   done

};;

esac

2)增長腳本執行權限

[test@hadoop102 bin]$ chmod 777 zk.sh

3)Zookeeper集羣啓動腳本

[test@hadoop102 module]$ zk.sh start

4)Zookeeper集羣中止腳本

[test@hadoop102 module]$ zk.sh stop

4.2.2 項目經驗之Linux環境變量

1)修改/etc/profile文件:用來設置系統環境參數,好比$PATH. 這裏面的環境變量是對系統內全部用戶生效。使用bash命令,須要source  /etc/profile一下。

2)修改~/.bashrc文件:針對某一個特定的用戶,環境變量的設置只對該用戶本身有效。使用bash命令,只要以該用戶身份運行命令行就會讀取該文件。

3)把/etc/profile裏面的環境變量追加到~/.bashrc目錄

[test@hadoop102 ~]$ cat /etc/profile >> ~/.bashrc

[test@hadoop103 ~]$ cat /etc/profile >> ~/.bashrc

[test@hadoop104 ~]$ cat /etc/profile >> ~/.bashrc

4)說明

登陸式Shell,採用用戶名好比test登陸,會自動加載/etc/profile

非登陸式Shell,採用ssh 好比ssh hadoop103登陸,不會自動加載/etc/profile,會自動加載~/.bashrc

儘可能將環境變量 部署在 /etc/profile.d/env.sh

4.3 日誌生成

4.3.1 日誌啓動

1)代碼參數說明

// 參數一:控制發送每條的延時時間,默認是0

Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;

// 參數二:循環遍歷次數

int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;

2)將生成的jar包log-collector-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷貝到hadoop102服務器/opt/module上,並同步到hadoop103的/opt/module路徑下,

[test@hadoop102 module]$ xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar

3)在hadoop102上執行jar程序

[test@hadoop102 module]$ java -classpath log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.test.appclient.AppMain  >/opt/module/test.log

說明1:

java -classpath 須要在jar包後面指定全類名;

java -jar 須要查看一下解壓的jar包META-INF/ MANIFEST.MF文件中,Main-Class是否有全類名。若是有能夠用java -jar,若是沒有就須要用到java -classpath

說明2:/dev/null表明linux的空設備文件,全部往這個文件裏面寫入的內容都會丟失,俗稱「黑洞」。

標準輸入0:從鍵盤得到輸入 /proc/self/fd/0

標準輸出1:輸出到屏幕(即控制檯) /proc/self/fd/1

錯誤輸出2:輸出到屏幕(即控制檯) /proc/self/fd/2

4)在/tmp/logs路徑下查看生成的日誌文件

[test@hadoop102 module]$ cd /tmp/logs/

[test@hadoop102 logs]$ ls

app-2020-03-10.log

4.3.2 集羣日誌生成啓動腳本

1)在/home/test/bin目錄下建立腳本lg.sh

[test@hadoop102 bin]$ vim lg.sh

       2)在腳本中編寫以下內容

#! /bin/bash

 

   for i in hadoop102 hadoop103

   do

      ssh $i "java -classpath /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.test.appclient.AppMain $1 $2 >/dev/null 2>&1 &"

   done

3)修改腳本執行權限

[test@hadoop102 bin]$ chmod 777 lg.sh

4)啓動腳本

[test@hadoop102 module]$ lg.sh

5)分別在hadoop10二、hadoop103的/tmp/logs目錄上查看生成的數據

[test@hadoop102 logs]$ ls

app-2020-03-10.log

[test@hadoop103 logs]$ ls

app-2020-03-10.log

4.3.3 集羣時間同步修改腳本

1)在/home/test/bin目錄下建立腳本dt.sh

[test@hadoop102 bin]$ vim dt.sh

       2)在腳本中編寫以下內容

#!/bin/bash

 

for i in hadoop102 hadoop103 hadoop104

do

        echo "========== $i =========="

        ssh -t $i "sudo date -s $1"

done

注意:ssh -t 一般用於ssh遠程執行sudo命令

3)修改腳本執行權限

[test@hadoop102 bin]$ chmod 777 dt.sh

4)啓動腳本

[test@hadoop102 bin]$ dt.sh 2020-03-10

4.3.4 集羣全部進程查看腳本

1)在/home/test/bin目錄下建立腳本xcall.sh

[test@hadoop102 bin]$ vim xcall.sh

       2)在腳本中編寫以下內容

#! /bin/bash

 

for i in hadoop102 hadoop103 hadoop104

do

        echo --------- $i ----------

        ssh $i "$*"

done

3)修改腳本執行權限

[test@hadoop102 bin]$ chmod 777 xcall.sh

4)啓動腳本

[test@hadoop102 bin]$ xcall.sh jps

4.4 採集日誌Flume

4.4.1 日誌採集Flume安裝

見 大數據軟件安裝之Flume(日誌採集)

集羣規劃:

 

服務器hadoop102

服務器hadoop103

服務器hadoop104

Flume(採集日誌)

Flume

Flume

 

4.4.2 項目經驗之Flume組件

1)Source

(1)Taildir Source相比Exec Source、Spooling Directory Source的優點

TailDir Source:斷點續傳、多目錄。Flume1.6之前須要本身自定義Source記錄每次讀取文件位置,實現斷點續傳。

Exec Source能夠實時蒐集數據,可是在Flume不運行或者Shell命令出錯的狀況下,數據將會丟失。

Spooling Directory Source監控目錄,不支持斷點續傳。

(2)batchSize大小如何設置?

答:Event 1K左右時,500-1000合適(默認爲100)

2)Channel

採用Kafka Channel,省去了Sink,提升了效率。

注意在Flume1.7之前,Kafka Channel不多有人使用,由於發現parseAsFlumeEvent這個配置起不了做用。也就是不管parseAsFlumeEvent配置爲true仍是false,都會轉爲Flume Event。

這樣的話,形成的結果是,會始終都把Flume的headers中的信息混合着內容一塊兒寫入Kafka的消息中,這顯然不是我所須要的,我只是須要把內容寫入便可。

4.4.3 日誌採集Flume配置

1)Flume 配置分析

Flume直接讀log日誌的數據,log日誌的格式是app-yyyy-mm-dd.log。

2)Flume的配置以下:

(1)在/opt/module/flume/conf目錄下建立file-flume-kafka.conf文件

[test@hadoop102 conf]$ vim file-flume-kafka.conf

在文件配置以下內容

# 組件定義

a1.sources=r1

a1.channels=c1 c2

 

# taildir方式數據

# configure source

a1.sources.r1.type = TAILDIR

a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json  #記錄日誌讀取位置

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /tmp/logs/app.+   #讀取日誌位置

a1.sources.r1.fileHeader = true

a1.sources.r1.channels = c1 c2

 

#interceptor

a1.sources.r1.interceptors =  i1 i2

a1.sources.r1.interceptors.i1.type = com.test.flume.interceptor.LogETLInterceptor$Builder  #ETL攔截器

a1.sources.r1.interceptors.i2.type = com.test.flume.interceptor.LogTypeInterceptor$Builder   #日誌類型攔截器

 

a1.sources.r1.selector.type = multiplexing  # 根據日誌類型分數據

a1.sources.r1.selector.header = topic

a1.sources.r1.selector.mapping.topic_start = c1

a1.sources.r1.selector.mapping.topic_event = c2

 

# configure channel

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.channels.c1.kafka.topic = topic_start    #日誌類型是start ,數據發往channel1

a1.channels.c1.parseAsFlumeEvent = false

a1.channels.c1.kafka.consumer.group.id = flume-consumer 

 

a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.channels.c2.kafka.topic = topic_event    #日誌類型是event,數據發往channel2

a1.channels.c2.parseAsFlumeEvent = false

a1.channels.c2.kafka.consumer.group.id = flume-consumer

            注意:com.test.flume.interceptor.LogETLInterceptor和com.test.flume.interceptor.LogTypeInterceptor是自定義的攔截器的全類名。須要根據用戶自定義的攔截器作相應修改。

4.4.4 Flume的ETL和分類型攔截器

本項目中自定義了兩個攔截器,分別是:ETL攔截器、日誌類型區分攔截器。

ETL攔截器主要用於,過濾時間戳不合法和Json數據不完整的日誌

日誌類型區分攔截器主要用於,將啓動日誌和事件日誌區分開來,方便發往Kafka的不一樣Topic。

1)建立Maven工程flume-interceptor

2)建立包名:com.test.flume.interceptor

3)在pom.xml文件中添加以下配置

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.7.0</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

4)在com.test.flume.interceptor包下建立LogETLInterceptor類名

Flume ETL攔截器LogETLInterceptor

package com.test.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class LogETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        // 1 獲取數據
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));

        // 2 判斷數據類型並向Header中賦值
        if (log.contains("start")) {
            if (LogUtils.validateStart(log)){
                return event;
            }
        }else {
            if (LogUtils.validateEvent(log)){
                return event;
            }
        }

        // 3 返回校驗結果
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        ArrayList<Event> interceptors = new ArrayList<>();

        for (Event event : events) {
            Event intercept1 = intercept(event);

            if (intercept1 != null){
                interceptors.add(intercept1);
            }
        }

        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogETLInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

5)Flume日誌過濾工具類

package com.test.flume.interceptor;
import org.apache.commons.lang.math.NumberUtils;

public class LogUtils {

    public static boolean validateEvent(String log) {
        // 服務器時間 | json
        // 1549696569054 | {"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]}

        // 1 切割
        String[] logContents = log.split("\\|");

        // 2 校驗
        if(logContents.length != 2){
            return false;
        }

        //3 校驗服務器時間
        if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){
            return false;
        }

        // 4 校驗json
        if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){
            return false;
        }

        return true;
    }

    public static boolean validateStart(String log) {
 // {"action":"1","ar":"MX","ba":"HTC","detail":"542","en":"start","entry":"2","extend1":"","g":"S3HQ7LKM@gmail.com","hw":"640*960","l":"en","la":"-43.4","ln":"-98.3","loading_time":"10","md":"HTC-5","mid":"993","nw":"WIFI","open_ad_type":"1","os":"8.2.1","sr":"D","sv":"V2.9.0","t":"1559551922019","uid":"993","vc":"0","vn":"1.1.5"}

        if (log == null){
            return false;
        }

        // 校驗json
        if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){
            return false;
        }

        return true;
    }
}
5)Flume日誌類型區分攔截器LogTypeInterceptor
package com.test.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class LogTypeInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        // 區分日誌類型:   body  header
        // 1 獲取body數據
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));

        // 2 獲取header
        Map<String, String> headers = event.getHeaders();

        // 3 判斷數據類型並向Header中賦值
        if (log.contains("start")) {
            headers.put("topic","topic_start");
        }else {
            headers.put("topic","topic_event");
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        ArrayList<Event> interceptors = new ArrayList<>();

        for (Event event : events) {
            Event intercept1 = intercept(event);

            interceptors.add(intercept1);
        }

        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements  Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

6)打包

攔截器打包以後,只須要單獨包,不須要將依賴的包上傳。打包以後要放入Flume的lib文件夾下面。

4.4.5 日誌採集Flume啓動中止腳本

1)在/home/test/bin目錄下建立腳本f1.sh

[test@hadoop102 bin]$ vim f1.sh

       在腳本中填寫以下內容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------啓動 $i 採集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/test1 2>&1 &"
        done
};;    
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------中止 $i 採集flume-------"
                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"
        done

};;
esac

說明1:nohup,該命令能夠在你退出賬戶/關閉終端以後繼續運行相應的進程。nohup就是不掛起的意思,不掛斷地運行命令

說明2:awk 默認分隔符爲空格

說明3:xargs 表示取出前面命令運行的結果,做爲後面命令的輸入參數。

2)增長腳本執行權限

[test@hadoop102 bin]$ chmod 777 f1.sh

3)f1集羣啓動腳本

[test@hadoop102 module]$ f1.sh start

4)f1集羣中止腳本

[test@hadoop102 module]$ f1.sh stop

4.5 Kafka安裝

4.5.1 Kafka集羣安裝

見 大數據安裝之Kafka(用於實時處理的消息隊列)

集羣規劃:

 

服務器hadoop102

服務器hadoop103

服務器hadoop104

Kafka

Kafka

Kafka

Kafka

4.5.2 Kafka集羣啓動中止腳本

1)在/home/test/bin目錄下建立腳本kf.sh

[test@hadoop102 bin]$ vim kf.sh

       在腳本中填寫以下內容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103 hadoop104
        do
                echo " --------啓動 $i Kafka-------"
                ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties "
        done
};;
"stop"){
        for i in hadoop102 hadoop103 hadoop104
        do
                echo " --------中止 $i Kafka-------"
                ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
        done
};;
esac

2)增長腳本執行權限

[test@hadoop102 bin]$ chmod 777 kf.sh

3)kf集羣啓動腳本

[test@hadoop102 module]$ kf.sh start

4)kf集羣中止腳本

[test@hadoop102 module]$ kf.sh stop

4.5.3 查看Kafka Topic列表

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

4.5.4 建立Kafka Topic

進入到/opt/module/kafka/目錄下分別建立:啓動日誌主題、事件日誌主題。

1)建立啓動日誌主題

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181  --create --replication-factor 1 --partitions 1 --topic topic_start

2)建立事件日誌主題

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181  --create --replication-factor 1 --partitions 1 --topic topic_event

4.5.5 刪除Kafka Topic

1)刪除啓動日誌主題

[test@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_start

2)刪除事件日誌主題

[test@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_event

4.5.6 Kafka生產消息

[test@hadoop102 kafka]$ bin/kafka-console-producer.sh \

--broker-list hadoop102:9092 --topic topic_start

>hello world

>test  test

4.5.7 Kafka消費消息

[test@hadoop102 kafka]$ bin/kafka-console-consumer.sh \

--bootstrap-server hadoop102:9092 --from-beginning --topic topic_start

--from-beginning:會把主題中以往全部的數據都讀取出來。根據業務場景選擇是否增長該配置。

4.5.8 查看Kafka Topic詳情

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \

--describe --topic topic_start

4.5.9 項目經驗之Kafka壓力測試

1)Kafka壓測

用Kafka官方自帶的腳本,對Kafka進行壓測。Kafka壓測時,能夠查看到哪一個地方出現了瓶頸(CPU,內存,網絡IO)。通常都是網絡IO達到瓶頸

kafka-consumer-perf-test.sh

kafka-producer-perf-test.sh

2)Kafka Producer壓力測試

(1)在/opt/module/kafka/bin目錄下面有這兩個文件。咱們來測試一下

[test@hadoop102 kafka]$ bin/kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

說明:

record-size是一條信息有多大,單位是字節。

num-records是總共發送多少條信息。

throughput 是每秒多少條信息,設成-1,表示不限流,可測出生產者最大吞吐量。

(2)Kafka會打印下面的信息

100000 records sent, 95877.277085 records/sec (9.14 MB/sec), 187.68 ms avg latency, 424.00 ms max latency, 155 ms 50th, 411 ms 95th, 423 ms 99th, 424 ms 99.9th.

參數解析:本例中一共寫入10w條消息,吞吐量爲9.14 MB/sec,每次寫入的平均延遲爲187.68毫秒,最大的延遲爲424.00毫秒。

3)Kafka Consumer壓力測試

Consumer的測試,若是這四個指標(IO,CPU,內存,網絡)都不能改變,考慮增長分區數來提高性能。

[test@hadoop102 kafka]$

bin/kafka-consumer-perf-test.sh --zookeeper hadoop102:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1

參數說明:

--zookeeper 指定zookeeper的連接信息

--topic 指定topic的名稱

--fetch-size 指定每次fetch的數據的大小

--messages 總共要消費的消息個數

測試結果說明:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec

2019-02-19 20:29:07:566, 2019-02-19 20:29:12:170, 9.5368, 2.0714, 100010, 21722.4153

開始測試時間,測試結束數據,共消費數據9.5368MB,吞吐量2.0714MB/s,共消費100010條,平均每秒消費21722.4153條。

4.5.10 項目經驗之Kafka機器數量計算

Kafka機器數量(經驗公式)=2*(峯值生產速度*副本數/100)+1

先拿到峯值生產速度,再根據設定的副本數,就能預估出須要部署Kafka的數量。

好比咱們的峯值生產速度是50M/s。副本數爲2。

Kafka機器數量=2*(50*2/100)+ 1=3臺

 

4.6 消費Kafka數據Flume

 

集羣規劃

 

 

服務器hadoop102

服務器hadoop103

服務器hadoop104

Flume(消費Kafka)

 

 

Flume

4.6.1 日誌消費Flume配置

1)Flume配置分析

2)Flume的具體配置以下:

       (1)在hadoop104的/opt/module/flume/conf目錄下建立kafka-flume-hdfs.conf文件

[test@hadoop104 conf]$ vim kafka-flume-hdfs.conf

在文件配置以下內容

## 組件定義
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1   #kafka start主題源數據
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start

## source2   #kafka event主題源數據
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event

## channel1   
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6

## sink1    #start主題數據輸出到HDFS路徑
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
 
##sink2           #event 主題數據輸出到HDFS路徑若是hadoop和flume不在一臺服務器須要在路徑前邊增長hdfs://hadoop102:9000/
a1.sinks.k2.type = hdfs     
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-

## 不要產生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10    #生成文件大小設定
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0

## 控制輸出文件是原生文件。      
a1.sinks.k1.hdfs.fileType = CompressedStream      #支持LZO數據壓縮設置
a1.sinks.k2.hdfs.fileType = CompressedStream 

a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop

## 拼裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

4.6.2 項目經驗之Flume組件

1)FileChannel和MemoryChannel區別

MemoryChannel傳輸數據速度更快,但由於數據保存在JVM的堆內存中,Agent進程掛掉會致使數據丟失,適用於對數據質量要求不高的需求。

FileChannel傳輸速度相對於Memory慢,但數據安全保障高,Agent進程掛掉也能夠從失敗中恢復數據。

2)FileChannel優化

經過配置dataDirs指向多個路徑,每一個路徑對應不一樣的硬盤,增大Flume吞吐量。

官方說明以下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir和backupCheckpointDir也儘可能配置在不一樣硬盤對應的目錄中,保證checkpoint壞掉後,能夠快速使用backupCheckpointDir恢復數據

3)Sink:HDFS Sink

(1)HDFS存入大量小文件,有什麼影響?

元數據層面:每一個小文件都有一份元數據,其中包括文件路徑,文件名,全部者,所屬組,權限,建立時間等,這些信息都保存在Namenode內存中。因此小文件過多,會佔用Namenode服務器大量內存,影響Namenode性能和使用壽命

計算層面:默認狀況下MR會對每一個小文件啓用一個Map任務計算,很是影響計算性能。同時也影響磁盤尋址時間。

       (2)HDFS小文件處理

官方默認的這三個參數配置寫入HDFS後會產生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

基於以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0幾個參數綜合做用,效果以下:

(1)文件在達到128M時會滾動生成新文件

(2)文件建立超3600秒時會滾動生成新文件

4.6.3 日誌消費Flume啓動中止腳本

1)在/home/test/bin目錄下建立腳本f2.sh

[test@hadoop102 bin]$ vim f2.sh

#! /bin/bash

case $1 in
"start"){
        for i in hadoop104
        do
                echo " --------啓動 $i 消費flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt   2>&1 &"
        done
};;
"stop"){
        for i in hadoop104
        do
                echo " --------中止 $i 消費flume-------"
                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
        done

};;
esac

2)增長腳本執行權限

[test@hadoop102 bin]$ chmod 777 f2.sh

3)f2集羣啓動腳本

[test@hadoop102 module]$ f2.sh start

4)f2集羣中止腳本

[test@hadoop102 module]$ f2.sh stop

4.6.4 項目經驗之Flume內存優化

1)問題描述:若是啓動消費Flume拋出以下異常

ERROR hdfs.HDFSEventSink: process failed

java.lang.OutOfMemoryError: GC overhead limit exceeded

2)解決方案步驟:

(1)在hadoop102服務器的/opt/module/flume/conf/flume-env.sh文件中增長以下配置

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

(2)同步配置到hadoop10三、hadoop104服務器

[test@hadoop102 conf]$ xsync flume-env.sh

3)Flume內存參數設置及優化

JVM heap通常設置爲4G或更高,部署在單獨的服務器上(4核8線程16G內存)

-Xmx與-Xms最好設置一致,減小內存抖動帶來的性能影響,若是設置不一致容易致使頻繁fullgc。

-Xms表示JVM Heap(堆內存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆內存)最大容許的尺寸,按需分配。若是不設置一致,容易在初始化時,因爲內存不夠,頻繁觸發fullgc。

 

4.7 採集通道啓動/中止腳本

1)在/home/test/bin目錄下建立腳本cluster.sh

[test@hadoop102 bin]$ vim cluster.sh

#! /bin/bash

case $1 in
"start"){
    echo " -------- 啓動 集羣 -------"

    echo " -------- 啓動 hadoop集羣 -------"
    /opt/module/hadoop-2.7.2/sbin/start-dfs.sh 
    ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/start-yarn.sh"

    #啓動 Zookeeper集羣
    zk.sh start

sleep 4s;

    #啓動 Flume採集集羣
    f1.sh start

    #啓動 Kafka採集集羣
    kf.sh start

sleep 6s;

    #啓動 Flume消費集羣
    f2.sh start

    };;
"stop"){
    echo " -------- 中止 集羣 -------"


    #中止 Flume消費集羣
    f2.sh stop

    #中止 Kafka採集集羣
    kf.sh stop

    sleep 6s;

    #中止 Flume採集集羣
    f1.sh stop

    #中止 Zookeeper集羣
    zk.sh stop

    echo " -------- 中止 hadoop集羣 -------"
    ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh"
    /opt/module/hadoop-2.7.2/sbin/stop-dfs.sh 
};;
esac

2)增長腳本執行權限

[test@hadoop102 bin]$ chmod 777 cluster.sh

3)cluster集羣啓動腳本

[test@hadoop102 module]$ cluster.sh start

4)cluster集羣中止腳本

[test@hadoop102 module]$ cluster.sh stop

5、總結

5.1 數倉概念總結

數據倉庫的輸入數據源和輸出系統分別是什麼?

輸入系統:埋點產生的用戶給行爲數據、JavaEE後臺產生的業務數據。

輸出系統:報表系統、用戶畫像系統、推薦系統

5.2 項目需求及架構總結

5.2.1 集羣規模計算

 

5.2.2 框架版本選型

1)Apache:運維麻煩,組件間兼容性須要本身調研。(通常大廠使用,技術實力雄厚,有專業的運維人員)(建議使用)

2)CDH:國內使用最多的版本,但CM不開源,但其實對中、小公司使用來講沒有影響

3)HDP:開源,能夠進行二次開發,可是沒有CDH穩定,國內使用較少

5.2.3 服務器選型

5.3 數據採集模塊總結

5.3.1 Linxu&Shell相關總結

1)Linux經常使用命令

序號

命令

命令解釋

1

top

查看內存

2

df -h

查看磁盤存儲狀況

3

iotop

查看磁盤IO讀寫(yum install iotop安裝)

4

iotop -o

直接查看比較高的磁盤讀寫程序

5

netstat -tunlp | grep 端口號

查看端口占用狀況

6

uptime

查看報告系統運行時長及平均負載

7

ps  aux

查看進程

2)Shell經常使用工具

awk、sed、cut、sort

5.3.2 Hadoop相關總結

1)Hadoop默認不支持LZO壓縮,若是須要支持LZO壓縮,須要添加jar包,並在hadoop的cores-site.xml文件中添加相關壓縮配置。

見  項目經驗之LZO建立索引

2)Hadoop經常使用端口號

50070 hdfs,8088 mr任務,19888 歷史服務器,9000 客戶端訪問集羣

3)Hadoop配置文件以及簡單的Hadoop集羣搭建

core-site.xml hadoop-env.sh
hdfs-site.xml yarn-env.sh
yarn-site.xml mapred-env.sh
mapred-site.xml slaves

4)HDFS讀流程和寫流程

5)MapReduce的Shuffle過程及Hadoop優化(包括:壓縮、小文件、集羣優化)

Shuffer在map方法以後,reduce方法以前
數據出來後首先進入getpartition(),而後進入還原緩衝區,還原緩衝區一側存數據,一側存索引,到達80%進行反向溢寫,
還原緩衝區默認大小是100M。溢寫過程當中(進行排序,按照快排的手段排序,對key的索引排序,按照字典順序排),溢寫 以前要進行各類排序,排完序以後把溢寫文件存進來(產生大量溢寫文件),對溢寫文件進行歸併排序,歸併完以後按照指定分 區存好數據。等待reduce端來拉去數據,拉取本身指定分區數據,拉取過來先放到內存,內存不夠溢寫到磁盤,無論事內存 仍是磁盤數據都進行歸併,歸併過程中進行分組排序,最後進入到對應的reduce方法裏去。
Shuffer優化 還原緩衝區默認大小是100 調到200M ;設置到90%溢寫(減小溢寫文件個數,起到優化做用);
溢寫文件能夠提早採用一次combiner(前提條件是求和);默認一次歸併個數是10個,能夠調到20個-30個;
爲了減小磁盤IO在map端對數據採用壓縮;有幾個地方能夠壓縮Map輸入端、Map輸出端、Reduce輸出端能夠進行壓縮;

6)Yarn的Job提交流程

7)Yarn的默認調度器、調度器分類、以及他們之間的區別

默認是FIFO調度器
FIFO調度器、容量調度器、公平調度器
FIFO調度器:先進先出
選型:
對併發度要求搞,且錢的公司:公平調度器(中、大公司)
對併發度要求不是過高,且不是特別錢:容量(中小公司)
容量調度器:默認只一個default隊列,在開發時會用多個隊列
技術框架:hive、spark、flink
業務建立隊列:登錄註冊、購物車、用戶行爲、業務數據。。。分開放的好處是解耦、下降風險

8)HDFS存儲多目錄

9)Hadoop參數調優

10)項目經驗之基準測試

5.3.3 Zookeeper相關總結

1)選舉機制   

半數機制,安裝奇數臺服務器
10臺服務器安裝幾個zookeeper:3臺。
20臺服務器安裝幾個zookeeper:5臺。
100臺服務器安裝幾個zookeeper:11臺。
不是越多越好,也不是越少越好。若是多,通訊時間常,效率低;如太少,可靠性差。

2)經常使用命令

       ls、get、create

5.3.4 Flume相關總結

1)Flume組成,Put事務,Take事務

       Taildir Source:斷點續傳、多目錄。Flume1.6之前須要本身自定義Source記錄每次讀取文件位置,實現斷點續傳。

       File Channel:數據存儲在磁盤,宕機數據能夠保存。可是傳輸速率慢。適合對數據傳輸可靠性要求高的場景,好比,金融行業。

       Memory Channel:數據存儲在內存中,宕機數據丟失。傳輸速率快。適合對數據傳輸可靠性要求不高的場景,好比,普通的日誌數據。

       Kafka Channel:減小了Flume的Sink階段,提升了傳輸效率。          

       Source到Channel是Put事務

       Channel到Sink是Take事務

2)Flume攔截器

       (1)攔截器注意事項

              項目中自定義了:ETL攔截器和區分類型攔截器。

採用兩個攔截器的優缺點:優勢,模塊化開發和可移植性;缺點,性能會低一些

       (2)自定義攔截器步驟

a)實現 Interceptor

b)重寫四個方法

  • initialize 初始化
  • public Event intercept(Event event) 處理單個Event
  • public List<Event> intercept(List<Event> events) 處理多個Event,在這個方法中調用Event intercept(Event event)
  • close 方法

c)靜態內部類,實現Interceptor.Builder

3)Flume Channel選擇器

 

4)Flume 監控器

Ganglia

5)Flume採集數據會丟失嗎?

不會,Channel存儲能夠存儲在File中,數據傳輸自身有事務。

6)Flume內存

開發中在flume-env.sh中設置JVM heap爲4G或更高,部署在單獨的服務器上(4核8線程16G內存)

-Xmx與-Xms最好設置一致,減小內存抖動帶來的性能影響,若是設置不一致容易致使頻繁fullgc。

-Xms表示JVM Heap(堆內存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆內存)最大容許的尺寸,按需分配。若是不設置一致,容易在初始化時,因爲內存不夠,頻繁觸發fullgc。

7)FileChannel優化

經過配置dataDirs指向多個路徑,每一個路徑對應不一樣的硬盤,增大Flume吞吐量。

官方說明以下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir和backupCheckpointDir也儘可能配置在不一樣硬盤對應的目錄中,保證checkpoint壞掉後,能夠快速使用backupCheckpointDir恢復數據

8)Sink:HDFS Sink小文件處理

(1)HDFS存入大量小文件,有什麼影響?

元數據層面:每一個小文件都有一份元數據,其中包括文件路徑,文件名,全部者,所屬組,權限,建立時間等,這些信息都保存在Namenode內存中。因此小文件過多,會佔用Namenode服務器大量內存,影響Namenode性能和使用壽命

計算層面:默認狀況下MR會對每一個小文件啓用一個Map任務計算,很是影響計算性能。同時也影響磁盤尋址時間。

       (2)HDFS小文件處理

官方默認的這三個參數配置寫入HDFS後會產生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

基於以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0幾個參數綜合做用,效果以下:

(1)文件在達到128M時會滾動生成新文件

(2)文件建立超3600秒時會滾動生成新文件

舉例:在2018-01-01 05:23的時侯sink接收到數據,那會產生以下tmp文件:

 5.3.5 Kafka相關總結

 

 

 

 

 

 

 

 

 

 

 

 

 

1)Kafka壓測

Kafka官方自帶壓力測試腳本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka壓測時,能夠查看到哪一個地方出現了瓶頸(CPU,內存,網絡IO)。通常都是網絡IO達到瓶頸。

2)Kafka的機器數量

Kafka機器數量=2*(峯值生產速度*副本數/100)+1

3)Kafka的日誌保存時間

3天

4)Kafka的硬盤大小

天天的數據量*3天

5)Kafka監控

公司本身開發的監控器;

開源的監控器:KafkaManager、KafkaMonitor

6)Kakfa分區數。

(1)建立一個只有1個分區的topic

(2)測試這個topic的producer吞吐量和consumer吞吐量。

(3)假設他們的值分別是Tp和Tc,單位能夠是MB/s。

(4)而後假設總的目標吞吐量是Tt,那麼分區數=Tt / min(Tp,Tc)

例如:producer吞吐量=10m/s;consumer吞吐量=50m/s,指望吞吐量100m/s;

分區數=100 / 10 =10分區

分區數通常設置爲:3-10個

7)副本數設定

通常咱們設置成2個或3個,不少企業設置爲2個。

8)多少個Topic

     一般狀況:多少個日誌類型就多少個Topic。也有對日誌類型進行合併的。

9)Kafka丟不丟數據

Ack=0,producer不等待kafka broker的ack,一直生產數據。

Ack=1,leader數據落盤就發送ack,producer收到ack才繼續生產數據。

Ack=-1,ISR中的全部副本數據羅盤才發送ack,producer收到ack才繼續生產數據。

10)Kafka的ISR副本同步隊列

ISR(In-Sync Replicas),副本同步隊列。ISR中包括Leader和Follower。若是Leader進程掛掉,會在ISR隊列中選擇一個服務做爲新的Leader。有replica.lag.max.messages(延遲條數)和replica.lag.time.max.ms(延遲時間)兩個參數決定一臺服務是否能夠加入ISR副本隊列,在0.10版本移除了replica.lag.max.messages參數,防止服務頻繁的進去隊列。

任意一個維度超過閾值都會把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也會先存放在OSR中。

11)Kafka分區分配

Range和RoundRobin

12)Kafka中數據量計算

天天總數據量100g,天天產生1億條日誌, 10000萬/24/60/60=1150條/每秒鐘

平均每秒鐘:1150條

低谷每秒鐘:400條

高峯每秒鐘:1150條*(2-20倍)=2300條-23000條

每條日誌大小:0.5k-2k(取1k)

每秒多少數據量:2.0M-20MB

13) Kafka掛掉

(1)Flume記錄

(2)日誌有記錄

(3)短時間沒事

14)Kafka消息數據積壓,Kafka消費能力不足怎麼處理?

(1)若是是Kafka消費能力不足,則能夠考慮增長Topic的分區數,而且同時提高消費組的消費者數量,消費者數=分區數。(二者缺一不可)

(2)若是是下游的數據處理不及時:提升每批次拉取的數量。批次拉取數據過少(拉取數據/處理時間<生產速度),使處理的數據小於生產的數據,也會形成數據積壓。

15)Kafka冪等性

Kafka0.11版本引入了冪等性,冪等性配合at least once語義能夠實現exactly once語義。但只能保證單次會話的冪等。

16)Kafka事務

Kafka0.11版本引入Kafka的事務機制,其能夠保證生產者發往多個分區的一批數據的原子性。

相關文章
相關標籤/搜索