Spark 實踐——基於 Spark Streaming 的實時日誌分析系統

本文基於《Spark 最佳實踐》第6章 Spark 流式計算。php

咱們知道網站用戶訪問流量是不間斷的,基於網站的訪問日誌,即 Web log 分析是典型的流式實時計算應用場景。好比百度統計,它能夠作流量分析、來源分析、網站分析、轉化分析。另外還有特定場景分析,好比安全分析,用來識別 CC 攻擊、 SQL 注入分析、脫庫等。這裏咱們簡單實現一個相似於百度分析的系統。html

代碼見 https://github.com/libaoquan95/WebLogAnalysepython

1.模擬生成 web log 記錄

在日誌中,每行表明一條訪問記錄,典型格式以下:nginx

46.156.87.72 - - [2018-05-15 06:00:30] "GET /upload.php HTTP/1.1" 200 0 "http://www.baidu.com/s?wd=spark" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)" "-"

分別表明:訪問 ip,時間戳,訪問頁面,響應狀態,搜索引擎索引,訪問 Agent。git

簡單模擬一下數據收集和發送的環節,用一個 Python 腳本隨機生成 Nginx 訪問日誌,爲了方便起見,不使用 HDFS,使用單機文件系統。github

首先,新建文件夾用於存放日誌文件web

$ mkdir Documents/nginx
$ mkdir Documents/nginx/log
$ mkdir Documents/nginx/log/tmp

而後,使用 Python 腳本隨機生成 Nginx 訪問日誌,併爲腳本設置執行權限, 代碼見 sample_web_log.pysql

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import random
import time


class WebLogGeneration(object):

    # 類屬性,由全部類的對象共享
    site_url_base = "http://www.xxx.com/"

    # 基本構造函數
    def __init__(self):
        #  前面7條是IE,因此大概瀏覽器類型70%爲IE ,接入類型上,20%爲移動設備,分別是7和8條,5% 爲空
        self.user_agent_dist = {0.0:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",
                                0.1:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",
                                0.2:"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727)",
                                0.3:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",
                                0.4:"Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko",
                                0.5:"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0",
                                0.6:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",
                                0.7:"Mozilla/5.0 (iPhone; CPU iPhone OS 7_0_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53",
                                0.8:"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19",
                                0.9:"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36",
                                1:" ",}
        self.ip_slice_list = [10, 29, 30, 46, 55, 63, 72, 87, 98,132,156,124,167,143,187,168,190,201,202,214,215,222]
        self.url_path_list = ["login.php","view.php","list.php","upload.php","admin/login.php","edit.php","index.html"]
        self.http_refer = [ "http://www.baidu.com/s?wd={query}","http://www.google.cn/search?q={query}","http://www.sogou.com/web?query={query}","http://one.cn.yahoo.com/s?p={query}","http://cn.bing.com/search?q={query}"]
        self.search_keyword = ["spark","hadoop","hive","spark mlib","spark sql"]


    def sample_ip(self):
        slice = random.sample(self.ip_slice_list, 4) #從ip_slice_list中隨機獲取4個元素,做爲一個片段返回
        return  ".".join([str(item) for item in slice])  #  todo


    def sample_url(self):
        return  random.sample(self.url_path_list,1)[0]


    def sample_user_agent(self):
        dist_uppon = random.uniform(0, 1)
        return self.user_agent_dist[float('%0.1f' % dist_uppon)]


    # 主要搜索引擎referrer參數
    def sample_refer(self):
        if random.uniform(0, 1) > 0.2:  # 只有20% 流量有refer
            return "-"

        refer_str=random.sample(self.http_refer,1)
        query_str=random.sample(self.search_keyword,1)
        return refer_str[0].format(query=query_str[0])

    def sample_one_log(self,count = 3):
        time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
        while count >1:
            query_log = "{ip} - - [{local_time}] \"GET /{url} HTTP/1.1\" 200 0 \"{refer}\" \"{user_agent}\" \"-\"".format(ip=self.sample_ip(),local_time=time_str,url=self.sample_url(),refer=self.sample_refer(),user_agent=self.sample_user_agent())
            print query_log
            count = count -1

if __name__ == "__main__":
    web_log_gene = WebLogGeneration()

    #while True:
    #    time.sleep(random.uniform(0, 3))
    web_log_gene.sample_one_log(random.uniform(10, 100))

設置可執行權限的方法以下shell

$ chmod +x sample_web_log.py

以後,編寫 bash 腳本,自動生成日誌記錄,並賦予可執行權限,代碼見 genLog.shapache

#!/bin/bash

while [ 1 ]; do
    ./sample_web_log.py > test.log

    tmplog="access.`date +'%s'`.log"
    cp test.log streaming/tmp/$tmplog
    mv streaming/tmp/$tmplog streaming/
    echo "`date +"%F %T"` generating $tmplog succeed"
    sleep 1
done

賦予權限

$ chmod +x genLog.sh

執行 genLog.sh 查看效果,輸入 ctrl+c 終止。

$ ./genLog.sh

2.流式分析

建立 Scala 腳本,代碼見 genLog.sh

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

val batch = 10  // 計算週期(秒)
//val conf = new SparkConf().setAppName("WebLogAnalyse").setMaster("local")
//val ssc = new StreamingContext(conf, Seconds(batch))
val ssc = new StreamingContext(sc, Seconds(batch))
val input = "file:///home/libaoquan/Documents/nginx/log"  // 文件流
val lines = ssc.textFileStream(input)

// 計算總PV
lines.count().print()

// 各個ip的pv
lines.map(line => (line.split(" ")(0), 1)).reduceByKey(_+_).print()

// 獲取搜索引擎信息
val urls = lines.map(_.split("\"")(3))

// 先輸出搜索引擎和查詢關鍵詞,避免統計搜索關鍵詞時重複計算
// 輸出(host, query_keys)
val searchEnginInfo = urls.map( url  => {
  // 搜索引擎對應的關鍵字索引
  val searchEngines = Map(
  "www.google.cn" -> "q",
  "www.yahoo.com" -> "p",
  "cn.bing.com" -> "q",
  "www.baidu.com" -> "wd",
  "www.sogou.com" -> "query"
  )
  val temp = url.split("/")
  // Array(http:, "", www.baidu.com, s?wd=hadoop)
  if(temp.length > 2){
  val host = temp(2)
  if(searchEngines.contains(host)){
    val q = url.split("//?")
    if(q.length > 0) {
      val query = q(1)
      val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host) + "=") == 0)
      if (arr_search_q.length > 0) {
        (host, arr_search_q(0).split('=')(1))
      } else {
        (host, "")
      }
    } else{
      ("", "")
    }
  } else{
    ("", "")
  }
  } else{
  ("", "")
  }
})

// 搜索引擎pv
searchEnginInfo.filter(_._1.length > 0).map(i => (i._1, 1)).reduceByKey(_+_).print()

// 關鍵字pv
searchEnginInfo.filter(_._2.length > 0).map(i => (i._2, 1)).reduceByKey(_+_).print()

// 終端pv
lines.map(_.split("\"")(5)).map(agent => {
  val types = Seq("iPhone", "Android")
  var r = "Default"
  for (t <- types) {
  if (agent.indexOf(t) != -1)
    r = t
  }
  (r, 1)
}).reduceByKey(_ + _).print()

// 各頁面pv
lines.map(line => (line.split("\"")(1).split(" ")(1), 1)).reduceByKey(_+_).print()

ssc.start()
ssc.awaitTermination()

3.執行

同時開啓兩個終端,分別執行 genLog.sh 生成日誌文件和執行 WebLogAnalyse.scala 腳本進行流式分析。

執行 genLog.sh

$ ./genLog.sh

執行 WebLogAnalyse.scala, 使用 spark-shell 執行 scala 腳本

$ spark-shell --executor-memory 5g --driver-memory 1g --master local  < WebLogAnalyse.scala

效果以下,左邊是 WebLogAnalyse.scala,右邊是 genLog.sh

相關文章
相關標籤/搜索