流式分析系統實現 之一

1、實驗介紹
     
咱們知道網站用戶訪問流量是不間斷的,基於網站的訪問日誌,即 Web log 分析是典型的流式實時計算應用場景。好比百度統計,它能夠作流量分析、來源分析、網站分析、轉化分析。另外還有特定場景分析,好比安全分析,用來識別 CC 攻擊、 SQL 注入分析、脫庫等。這裏咱們簡單實現一個相似於百度分析的系統。
 
1.1 實驗知識點
  • Python 模擬生成 Nginx 日誌
  • Spark Streaming 編程
  • 服務器訪問日誌分析方法
 
1.2 實驗環境
  • Spark 2.1.1
  • Python 2.7.6
  • Xfce 終端
2、實驗原理
百度統計(tongji.baidu.com)是百度推出的一款免費的專業網站流量分析工具,可以告訴用戶訪客是如何找到並瀏覽用戶的網站的,以及在網站上瀏覽了哪些頁面。這些信息能夠幫助用戶改善訪客在其網站上的使用體驗,不斷提高網站的投資回報率。
百度統計提供了幾十種圖形化報告,包括:趨勢分析、來源分析、頁面分析、訪客分析、定製分析等多種統計分析服務。
這裏咱們參考百度統計的功能,基於 Spark Streaming 簡單實現一個分析系統,使之包括如下分析功能。
    • 流量分析。一段時間內用戶網站的流量變化趨勢,針對不一樣的 IP 對用戶網站的流量進行細分。常見指標是總 PV 和各 IP 的PV。
    • 來源分析。各類搜索引擎來源給用戶網站帶來的流量狀況,須要精確到具體搜索引擎、具體關鍵詞。經過來源分析,用戶能夠及時瞭解哪一種類型的來源爲其帶來了更多訪客。常見指標是搜索引擎、關鍵詞和終端類型的 PV 。
    • 網站分析。各個頁面的訪問狀況,包括及時瞭解哪些頁面最吸引訪客以及哪些頁面最容易致使訪客流失,從而幫助用戶更有針對性地改善網站質量。常見指標是各頁面的 PV 。
2.1日誌實時採集
     
Web log 通常在 HTTP 服務器收集,好比 Nginx access 日誌文件。一個典型的方案是 Nginx 日誌文件 + Flume + Kafka + Spark Streaming,以下所述:
      1. 接收服務器用 Nginx ,根據負載能夠部署多臺,數據落地至本地日誌文件;
      2. 每一個 Nginx 節點上部署 Flume ,使用 tail -f 實時讀取 Nginx 日誌,發送至 KafKa 集羣;
      3. 專用的 Kafka 集羣用戶鏈接實時日誌與 Spark 集羣,詳細配置能夠參考 http://spark.apache.org/docs/2.1.1/streaming-kafka-integration.html ;
      4. Spark Streaming 程序實時消費 Kafka 集羣上的數據,實時分析,輸出;
      5. 結果寫入 MySQL 數據庫。
固然,還能夠進一步優化,好比 CGI 程序直接發日誌消息到 Kafka ,節省了寫訪問日誌的磁盤開銷。這裏主要專一 Spark Streaming 的應用,因此咱們不作詳細論述。
 
2.1流式分析系統實現
     
咱們簡單模擬一下數據收集和發送的環節,用一個 Python 腳本隨機生成 Nginx 訪問日誌,並經過腳本的方式自動上傳至 HDFS ,而後移動至指定目錄。 Spark Streaming 程序監控 HDFS 目錄,自動處理新的文件。
生成 Nginx 日誌的 Python 代碼以下,保存爲文件 sample_web_log.py 。
# ! /usr/bin/env python
# encoding=utf8

import random
import time

class WebLogGeneration(object):

    #類屬性,由全部類的對象共享
    site_url_base = "http://www.xxx.com/"

    #基本構造函數
    def __init__(self):
        # 前面7條是IE,因此大概瀏覽器類型70%爲IE,接入類型上,20%爲移動設備,分別是7和8條,5%爲空
        # https://github.com/mssola/user_agent/blob/master/all_test.go
        self.user_agent_dist = {0.0:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",
                                0.1:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",
                                0.2:"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727)",
                                0.3:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",
                                0.4:"Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko",
                                0.5:"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0",
                                0.6:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",
                                0.7:"Mozilla/5.0 (iPhone; CPU iPhone OS 7_0_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53",
                                0.8:"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19",
                                0.9:"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36",
                                1:" ",}
        self.ip_slice_list = [10, 29, 30, 46, 55, 63, 72, 87, 98,132,156,124,167,143,187,168,190,201,202,214,215,222]
        self.url_path_list = ["login.php","view.php","list.php","upload.php","admin/login.php","edit.php","index.html"]
        self.http_refer = ["http://www.baidu.com/s?wd={query}","http://www.google.cn/search?q={query}","http://www.sogou.com/web?query={query}",
                           "http://one.cn.yahoo.com/s?p={query}","http://cn.bing.com/search?q={query}"]
        self.search_keyword = ["spark","hadoop","hive","spark mlib","spark sql"]

    def sample_ip(self):
        slice = random.sample(self.ip_slice_list,4) #從ip_slice_list中隨機獲取4個元素,做爲一個片段返回
        return ".".join([str(item) for item in slice])

    def sample_url(self):
        return random.sample(self.url_path_list,1)[0]

    def sample_user_agent(self):
        dist_uppon = random.uniform(0,1)
        return self.user_agent_dist[float('%0.1f' % dist_uppon)]

    #主要搜索引擎referrer參數
    def sample_refer(self):
        if random.uniform(0,1) > 0.2: #只有20% 流量有refer
            return "-"

        refer_str = random.sample(self.http_refer,1)
        query_str = random.sample(self.search_keyword,1)
        return refer_str[0].format(query=query_str[0])

    def sample_one_log(self,count=3):
        time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
        while count > 1:
            query_log = "{ip} - - [{local_time}] \"GET /{url} HTTP/1.1\" 200 0 \"{refer}\" \"{user_agent}\" \"-\"".format(ip
                          =self.sample_ip(),local_time=time_str,url=self.sample_url(),refer=self.sample_refer(),user_agent=self.sample_user_agent())
            print query_log
            count = count -1
if __name__ == "__main__":
    web_log_gene = WebLogGeneration()

    #while True:
    #   time.sleep(random.uniform(0,3))
    web_log_gene.sample_one_log(random.uniform(10,100))
View Code

這是一條日誌的示例,爲一行形式,各字段間用空格分隔,字符串類型的值用雙引號包圍:php

46.202.124.63 - - [2015-11-26 09:54:27] "GET /view.php HTTP/1.1" 200 0 "http://www.google.cn/search?q=hadoop" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)" "-"
而後須要一個簡單的腳原本調用上面的腳本以隨機生成日誌,上傳至 HDFS ,而後移動到目標目錄:
#!/bin/bash
echo "Hello World !"
# HDFS命令
HDFS="hadoop fs"

# Streaming 程序監聽的目錄,注意跟後面Streaming程序的配置要保持一致
streaming_dir="/spark/streaming"

#清空舊數據
#su hdfs <<EOF
$HDFS -rm "${streaming_dir}"'/tmp/*' > /dev/null 2>&1
$HDFS -rm "${streaming_dir}"'/*' > /dev/null 2>&1
#EOF
#一直運行
while [ 1 ];do
    python sample_web_log.py > test.log
    # 給日誌文件加上時間戳,避免重名
    tmplog="access.`date +'%s'`.log"
    $HDFS -put test.log ${streaming_dir}/tmp/$tmplog
    $HDFS -mv          ${streaming_dir}/tmp/$tmplog ${streaming_dir}/
    echo "`date +"%F %T"` put $tmplog to HDFS succeed"
    sleep 1
done
#EOF
View Code

Spark Streaming 程序代碼以下所示,能夠在 bin/spark-shell 交互式環境下運行,若是要以 Spark 程序的方式運行,按註釋中的說明調整一下 StreamingContext 的生成方式便可。啓動 bin/spark-shell 時,爲了不因 DEBUG 日誌信息太多而影響觀察輸出,能夠將 DEBUG 日誌重定向至文件,屏幕上只顯示主要輸出,方法是 ./bin/spark-shell 2>spark-shell-debug.log:html

// 導入類
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

// 設計計算的週期,單位秒
val batch = 10

/*
 * 這是bin/spark-shell交互式模式下建立StreamingContext的方法
 * 非交互式請使用下面的方法來建立
 */
val ssc = new StreamingContext(sc, Seconds(batch))

/*
// 非交互式下建立StreamingContext的方法
val conf = new SparkConf().setAppName("NginxAnay")
val ssc = new StreamingContext(conf, Seconds(batch))
*/


/*
 * 建立輸入DStream,是文本文件目錄類型
 * 本地模式下也可使用本地文件系統的目錄,好比 file:///home/spark/streaming
 */
val lines = ssc.textFileStream("hdfs:///spark/streaming")


/*
 * 下面是統計各項指標,調試時能夠只進行部分統計,方便觀察結果
 */


// 1. 總PV
lines.count().print()


// 2. 各IP的PV,按PV倒序//   空格分隔的第一個字段就是IP
lines.map(line => {(line.split(" ")(0), 1)}).reduceByKey(_ + _).transform(rdd => {
  rdd.map(ip_pv => (ip_pv._2, ip_pv._1)).
  sortByKey(false).
  map(ip_pv => (ip_pv._2, ip_pv._1))
}).print()


// 3. 搜索引擎PV
val refer = lines.map(_.split("\"")(3))

// 先輸出搜索引擎和查詢關鍵詞,避免統計搜索關鍵詞時重複計算// 輸出(host, query_keys)
val searchEnginInfo = refer.map(r => {

    val f = r.split('/')

    val searchEngines = Map(
        "www.google.cn" -> "q",
        "www.yahoo.com" -> "p",
        "cn.bing.com" -> "q",
        "www.baidu.com" -> "wd",
        "www.sogou.com" -> "query"
    )

    if (f.length > 2) {
        val host = f(2)

        if (searchEngines.contains(host)) {
            val query = r.split('?')(1)
            if (query.length > 0) {
                val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0)
                if (arr_search_q.length > 0)
                    (host, arr_search_q(0).split('=')(1))
                else
                    (host, "")
            } else {
                (host, "")
            }
        } else
            ("", "")
    } else
        ("", "")

})

// 輸出搜索引擎PV
searchEnginInfo.filter(_._1.length > 0).map(p => {(p._1, 1)}).reduceByKey(_ + _).print()


// 4. 關鍵詞PV
searchEnginInfo.filter(_._2.length > 0).map(p => {(p._2, 1)}).reduceByKey(_ + _).print()


// 5. 終端類型PV
lines.map(_.split("\"")(5)).map(agent => {
    val types = Seq("iPhone", "Android")
    var r = "Default"
    for (t <- types) {
        if (agent.indexOf(t) != -1)
            r = t
    }
    (r, 1)
}).reduceByKey(_ + _).print()


// 6. 各頁面PV
lines.map(line => {(line.split("\"")(1).split(" ")(1), 1)}).reduceByKey(_ + _).print()



// 啓動計算,等待執行結束(出錯或Ctrl-C退出)
ssc.start()
ssc.awaitTermination()
View Code

打開兩個終端,一個調用上面的 bash 腳本模擬提交日誌,一個在交互式環境下運行上面的 Streaming 程序。你能夠看到各項指標的輸出,好比某個批次下的輸出爲(依次對應上面的 6 個計算項):python

  1.總PVgit

    ------------------------------------------- github

    Time: 1448533850000 ms web

    ------------------------------------------- sql

    44374shell

  2.各IP的PV,按PV倒序數據庫

    ------------------------------------------- apache

    Time: 1448533850000 ms

    -------------------------------------------

    (72.63.87.30,30)

    (63.72.46.55,30)

    (98.30.63.10,29)

    (72.55.63.46,29)

    (63.29.10.30,29)

    (29.30.63.46,29)

    (55.10.98.87,27)

    (46.29.98.30,27)

    (72.46.63.30,27)

    (87.29.55.10,26)

  3.搜索引擎PV

    -------------------------------------------

    Time: 1448533850000 ms

    -------------------------------------------

    (cn.bing.com,1745)

    (www.baidu.com,1773)

    (www.google.cn,1793)

    (www.sogou.com,1845)

  4.關鍵詞PV

    -------------------------------------------

    Time: 1448533850000 ms

    -------------------------------------------

    (spark,1426)

    (hadoop,1455)

    (spark sql,1429)

    (spark mlib,1426)

    (hive,1420)

  5.終端類型PV

    -------------------------------------------

    Time: 1448533850000 ms

    -------------------------------------------

    (Android,4281)

    (Default,35745)

    (iPhone,4348)

  6.各頁面PV

    -------------------------------------------

    Time: 1448533850000 ms

    -------------------------------------------

    (/edit.php,6435)

    (/admin/login.php,6271)

    (/login.php,6320)

    (/upload.php,6278)

    (/list.php,6411)

    (/index.html,6309)

    (/view.php,6350)

 

查看數據更直觀的作法是用圖形來展現,常見作法是將結果寫入外部 DB ,而後經過一些圖形化報表展現系統展現出來。好比對於終端類型,咱們能夠用餅圖展現,如圖6-11所示。

圖6-11 終端類型分佈圖示例(另見彩插圖6-11)

對於連續的數據,咱們也能夠用拆線圖來展現趨勢。好比某頁面的PV,如圖6-12所示。

除了常規的每一個固定週期進行一次統計,咱們還能夠對連續多個週期的數據進行統計。以統計總 PV 爲例,上面的示例是每 10 秒統計一次,可能還須要每分鐘統計一次,至關於 6 個 10 秒的週期。咱們能夠利用窗口方法實現,不一樣的代碼以下:

  // 窗口方法必須配置checkpint,能夠這樣配置: ssc.checkpoint("hdfs:///spark/checkpoint")

  // 這是常規每10秒一個週期的PV統計 lines.count().print()

  // 這是每分鐘(連續多個週期)一次的PV統計 lines.countByWindow(Seconds(batch*6), Seconds(batch*6)).print()

使用相同的辦法運行程序以後,咱們首先會看到連續 6 次 10 秒週期的 PV 統計輸出:

  -------------------------------------------

  Time: 1448535090000 ms

  -------------------------------------------

  1101

  -------------------------------------------

  Time: 1448535100000 ms

  -------------------------------------------

  816

  -------------------------------------------

  Time: 1448535110000 ms

  -------------------------------------------

  892

  -------------------------------------------

  Time: 1448535120000 ms

  -------------------------------------------

  708

  -------------------------------------------

  Time: 1448535130000 ms

  -------------------------------------------

  881

  -------------------------------------------

  Time: 1448535140000 ms

  -------------------------------------------

  872

在這以後,有一個 1 分鐘週期的 PV 統計輸出,它的值恰好是上面 6 次計算結果的總和:

  ------------------------------------------- 
  Time: 1448535140000 ms 
  -------------------------------------------
  5270 

3、開發準備
3.1 準備生成日誌的Python代碼
3.1.1 編輯代碼
首先把sample_web_log.py代碼放到集羣中,上傳到
3.1.2 修改代碼的執行權限
     chmod +x sample_web_log.py
     chown hdfs:hdfs sample_web_log.py
     cp sample_web_log.py /var/lib/hadoop-hdfs/device-report
 
3.2 啓動 Spark Shell
    接下來須要啓動 Spark Shell 來定製 Streaming 任務。爲了不因 DEBUG 日誌信息太多而影響觀察輸出,能夠將 DEBUG 日誌重定向至文件,屏幕上只顯示主要輸出。
     請經過如下代碼來啓動Spark Shell。啓動須要耗費必定的時間,請耐心等待。
     spark-shell 2>spark-shell-debug.log
     等到出現 scala>提示符時,就代表已經成功啓動Spark Sheel了。
請不要關閉運行Spark Shell的終端,其餘任何的終端命令請在新打開的終端運行。
4、具體步驟
4.1 建立日誌目錄
      在 hdfs spark 目錄下新建 streaming 目錄,並增設 tmp 臨時文件夾。
4.2 經過bash腳本生成日誌
     touch log.sh
     vim log.sh
     

log.sh 文件中須要填入如下內容:

#!/bin/bash
echo "Hello World !"
# HDFS命令
HDFS="hadoop fs"

# Streaming 程序監聽的目錄,注意跟後面Streaming程序的配置要保持一致
streaming_dir="/spark/streaming"

#清空舊數據
#su hdfs <<EOF
$HDFS -rm "${streaming_dir}"'/tmp/*' > /dev/null 2>&1
$HDFS -rm "${streaming_dir}"'/*' > /dev/null 2>&1
#EOF
#一直運行
while [ 1 ];do
    python sample_web_log.py > test.log
    # 給日誌文件加上時間戳,避免重名
    tmplog="access.`date +'%s'`.log"
    $HDFS -put test.log ${streaming_dir}/tmp/$tmplog
    $HDFS -mv          ${streaming_dir}/tmp/$tmplog ${streaming_dir}/
    echo "`date +"%F %T"` put $tmplog to HDFS succeed"
    sleep 1
done
#EOF
View Code
同時須要修改該腳本文件的執行權限。
chmod +x log.sh
 
進入spark-shell 2>spark-shell-debug.log,執行如下代碼。
/*
 * 這是bin/spark-shell交互式模式下建立StreamingContext的方法
 * 非交互式請使用下面的方法來建立
 */
val ssc = new StreamingContext(sc,Seconds(batch))

/*
// 非交互式下建立StreamingContext的方法
val conf = new SparkConf().setAppName("NginxAnay")
val ssc = new StreamingContext(conf, Seconds(batch))
*/


/*
 * 建立輸入DStream,是文本文件目錄類型
 * 本地模式下也可使用本地文件系統的目錄,好比 file:///home/spark/streaming
 */
val lines = ssc.textFileStream("hdfs:///spark/streaming")

/*
 * 下面是統計各項指標,調試時能夠只進行部分統計,方便觀察結果
 */


//1.總pv
lines.count().print()

//2. 各IP的PV,按PV倒序
// 空格分隔的第一個字段就是IP
lines.map(line => {(line.split(" ")(0),1)}).reduceByKey(_ + _).transform(rdd => {
    rdd.map(ip_pv => (ip_pv._2,ip_pv._1)).
    sortByKey(false).
    map(ip_pv => (ip_pv._2,ip_pv._1))
}).print()

//3.搜索引擎PV
val refer = lines.map(_.split("\"")(3))

//先輸出搜索引擎和查詢關鍵詞,避免統計搜索關鍵詞時重複計算
//輸出(host,query_keys)
val searchEnginInfo = refer.map(r => {
    val f = r.split('/')
    val searchEngines = Map(
        "www.google.cn" -> "q",
        "www.yahoo.com" -> "p",
        "cn.bing.com" -> "q",
        "www.baidu.com" -> "wd",
        "www.sogou.com" -> "query"
    )

    if (f.length > 2) {
        val host = f(2)

        if(searchEngines.contains(host)) {
            val query = r.split('?')(1)
            if(query.length > 0) {
                val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0)
                if(arr_search_q.length > 0)
                    (host,arr_search_q(0).split('=')(1))
                else
                    (host,"")
            } else {
                (host,"")
            }
        } else
            ("","")
    } else
        ("","")
})

//輸出搜索引擎PV
searchEnginInfo.filter(_._1.length > 0).map(p => {(p._1,1)}).reduceByKey(_ + _).print()

//4.關鍵詞PV
searchEnginInfo.filter(_._2.length > 0).map(p => {(p._2,1)}).reduceByKey(_ + _).print()

//5.終端類型PV
lines.map(_.split("\"")(5)).map(agent => {
    val types = Seq("iPhone","Android")
    var r = "Default"
    for (t <- types) {
        if(agent.indexOf(t) != -1)
            r = t
    }
    (r,1)
}).reduceByKey(_ + _).print()

//6.各頁面PV
lines.map(line => {(line.split("\"")(1).split(" ")(1),1)}).reduceByKey(_ + _).print()

//啓動計算,等待執行結束(出錯或Ctrl+C退出)
ssc.start()
View Code
在別一個終端中執行log.sh文件(genLog.sh是同一個文件
而後看spark終端中的信息
相關文章
相關標籤/搜索