大數據Spark+Kafka實時數據分析案例

本案例利用Spark+Kafka實時分析男女生每秒購物人數,利用Spark Streaming實時處理用戶購物日誌,而後利用websocket將數據實時推送給瀏覽器,最後瀏覽器將接收到的數據實時展示,案例的總體框架圖以下:

此處輸入圖片的描述
下面分析詳細分析下上述步驟:
javascript

  1. 應用程序將購物日誌發送給Kafka,topic爲」sex」,由於這裏只是統計購物男女生人數,因此只須要發送購物日誌中性別屬性便可。這裏採用模擬的方式發送購物日誌,即讀取購物日誌數據,每間隔相同的時間發送給Kafka。
  2. 接着利用Spark Streaming從Kafka主題」sex」讀取並處理消息。這裏按滑動窗口的大小按順序讀取數據,例如能夠按每5秒做爲窗口大小讀取一次數據,而後再處理數據。
  3. Spark將處理後的數據發送給Kafka,topic爲」result」。
  4. 而後利用Flask搭建一個web應用程序,接收Kafka主題爲」result」的消息。
  5. 利用Flask-SocketIO將數據實時推送給客戶端。
  6. 客戶端瀏覽器利用js框架socketio實時接收數據,而後利用js可視化庫hightlights.js庫動態展現。

至此,本案例的總體架構已介紹完畢。css

1、實驗環境準備

實驗系統和軟件要求

Ubuntu: 16.04
Spark: 2.1.0
Scala: 2.11.8
kafka: 0.8.2.2
Python: 3.x(3.0以上版本)
Flask: 0.12.1
Flask-SocketIO: 2.8.6
kafka-python: 1.3.3html

系統和軟件的安裝

Spark安裝(前續文檔已經安裝)

Kafka安裝

Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。Kafka的目的是經過Hadoop的並行加載機制來統一線上和離線的消息處理,也是爲了經過集羣機來提供實時的消費。下面介紹有關Kafka的簡單安裝和使用, 簡單介紹參考KAFKA簡介, 想全面瞭解Kafka,請訪問Kafka的官方博客。
我選擇的是kafka_2.11-0.10.1.0.tgz(注意,此處版本號,在後面spark使用時是有要求的,見集成指南)版本。java

sudo tar -zxf kafka_2.11-0.10.1.0.tgz -C /usr/local cd /usr/local sudo mv kafka_2.11-0.10.1.0/ ./kafka sudo chown -R hadoop ./kafka

接下來在Ubuntu系統環境下測試簡單的實例。Mac系統請本身按照安裝的位置,切換到相應的指令。按順序執行以下命令:python

cd /usr/local/kafka # 進入kafka所在的目錄 bin/zookeeper-server-start.sh config/zookeeper.properties

命令執行後不會返回Shell命令輸入狀態,zookeeper就會按照默認的配置文件啓動服務,請千萬不要關閉當前終端.啓動新的終端,輸入以下命令:nginx

cd /usr/local/kafka bin/kafka-server-start.sh config/server.properties

kafka服務端就啓動了,請千萬不要關閉當前終端。啓動另一個終端,輸入以下命令:web

cd /usr/local/kafka bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab

topic是發佈消息發佈的category,以單節點的配置建立了一個叫dblab的topic.能夠用list列出全部建立的topics,來查看剛纔建立的主題是否存在。算法

bin/kafka-topics.sh --list --zookeeper localhost:2181 

能夠在結果中查看到dblab這個topic存在。接下來用producer生產點數據:shell

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab

並嘗試輸入以下信息:apache

hello hadoop hello xmu hadoop world

而後再次開啓新的終端或者直接按CTRL+C退出。而後使用consumer來接收數據,輸入以下命令:

cd /usr/local/kafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab --from-beginning 

即可以看到剛纔產生的三條信息。說明kafka安裝成功。

Python安裝

Ubuntu16.04系統自帶Python2.7和Python3.5,本案例直接使用Ubuntu16.04自帶Python3.5;

Python依賴庫

案例主要使用了兩個Python庫,Flask和Flask-SocketIO,這兩個庫的安裝很是簡單,請啓動進入Ubuntu系統,打開一個命令行終端。
Python之因此強大,其中一個緣由是其豐富的第三方庫。pip則是python第三方庫的包管理工具。Python3對應的包管理工具是pip3。所以,須要首先在Ubuntu系統中安裝pip3,命令以下:

sudo apt-get install python3-pip

安裝完pip3之後,可使用以下Shell命令完成Flask和Flask-SocketIO這兩個Python第三方庫的安裝以及與Kafka相關的Python庫的安裝:

pip3 install flask pip3 install flask-socketio pip3 install kafka-python

這些安裝好的庫在咱們的程序文件的開頭能夠直接用來引用。好比下面的例子。

from flask import Flask from flask_socketio import SocketIO from kafka import KafkaConsumer

from import 跟直接import的區別舉個例子來講明。
import socket的話,要用socket.AF_INET,由於AF_INET這個值在socket的名稱空間下。
from socket import* 是把socket下的全部名字引入當前名稱空間。

2、數據處理和Python操做Kafka

本案例採用的數據集壓縮包爲data_format.zip點擊這裏下載data_format.zip數據集,該數據集壓縮包是淘寶2015年雙11前6個月(包含雙11)的交易數據(交易數據有偏移,可是不影響實驗的結果),裏面包含3個文件,分別是用戶行爲日誌文件user_log.csv 、回頭客訓練集train.csv 、回頭客測試集test.csv. 在這個案例中只是用user_log.csv這個文件,下面列出文件user_log.csv的數據格式定義:
用戶行爲日誌user_log.csv,日誌中的字段定義以下:

  1. user_id | 買家id
  2. item_id | 商品id
  3. cat_id | 商品類別id
  4. merchant_id | 賣家id
  5. brand_id | 品牌id
  6. month | 交易時間:月
  7. day | 交易事件:日
  8. action | 行爲,取值範圍{0,1,2,3},0表示點擊,1表示加入購物車,2表示購買,3表示關注商品
  9. age_range | 買家年齡分段:1表示年齡<18,2表示年齡在[18,24],3表示年齡在[25,29],4表示年齡在[30,34],5表示年齡在[35,39],6表示年齡在[40,49],7和8表示年齡>=50,0和NULL則表示未知
  10. gender | 性別:0表示女性,1表示男性,2和NULL表示未知
  11. province| 收穫地址省份

數據具體格式以下:

user_id,item_id,cat_id,merchant_id,brand_id,month,day,action,age_range,gender,province
328862,323294,833,2882,2661,08,29,0,0,1,內蒙古
328862,844400,1271,2882,2661,08,29,0,1,1,山西
328862,575153,1271,2882,2661,08,29,0,2,1,山西
328862,996875,1271,2882,2661,08,29,0,1,1,內蒙古
328862,1086186,1271,1253,1049,08,29,0,0,2,浙江

這個案例實時統計每秒中男女生購物人數,所以針對每條購物日誌,咱們只須要獲取gender便可,而後發送給Kafka,接下來Spark Streaming再接收gender進行處理。

數據預處理

接着能夠寫以下Python代碼,文件名爲producer.py:(具體的工程文件結構參照步驟一)

mkdir -p ~/kafka-exp/scripts cd ~/kafka-exp/scripts vim producer.py

添加如入內容:

# coding: utf-8 import csv import time from kafka import KafkaProducer # 實例化一個KafkaProducer示例,用於向Kafka投遞消息 producer = KafkaProducer(bootstrap_servers='localhost:9092') # 打開數據文件 csvfile = open("../data/user_log.csv","r") # 生成一個可用於讀取csv文件的reader reader = csv.reader(csvfile) for line in reader: gender = line[9] # 性別在每行日誌代碼的第9個元素 if gender == 'gender': continue # 去除第一行表頭 time.sleep(0.1) # 每隔0.1秒發送一行數據 # 發送數據,topic爲'sex' producer.send('sex',line[9].encode('utf8'))

上述代碼很簡單,首先是先實例化一個Kafka生產者。而後讀取用戶日誌文件,每次讀取一行,接着每隔0.1秒發送給Kafka,這樣1秒發送10條購物日誌。這裏發送給Kafka的topic爲’sex’。

Python操做Kafka

咱們能夠寫一個KafkaConsumer測試數據是否投遞成功,代碼以下,文件名爲consumer.py

from kafka import KafkaConsumer consumer = KafkaConsumer('sex') for msg in consumer: print((msg.value).decode('utf8'))

在開啓上述KafkaProducer和KafkaConsumer以前,須要先開啓Kafka。而後再開兩個終端,分別用做發佈消息與消費消息,執行命令以下:

cd ~/kafka-exp/scripts python3 producer.py #啓動生產者發送消息給Kafaka

打開另一個命令行 終端窗口,消費消息,執行以下命令:

cd ~/kafka-exp/scripts python3 consumer.py #啓動消費者從Kafaka接收消息

運行上面這條命令之後,這時,你會看到屏幕上會輸出一行又一行的數字,相似下面的樣子:

2 1 1 1 .......

3、Spark Streaming實時處理數據

本案例在於實時統計每秒中男女生購物人數,而Spark Streaming接收的數據爲1,1,0,2…,其中0表明女性,1表明男性,因此對於2或者null值,則不考慮。其實經過分析,能夠發現這個就是典型的wordcount問題,並且是基於Spark流計算。女生的數量,即爲0的個數,男生的數量,即爲1的個數。

所以利用Spark Streaming接口reduceByKeyAndWindow,設置窗口大小爲1,滑動步長爲1,這樣統計出的0和1的個數即爲每秒男生女生的人數。

Spark準備工做

Kafka和Flume等高級輸入源,須要依賴獨立的庫(jar文件)。按照咱們前面安裝好的Spark版本,這些jar包都不在裏面,爲了證實這一點,咱們如今能夠測試一下。請打開一個新的終端,而後啓動spark-shell:

cd /usr/local/spark/spark-2.3.0-bin-hadoop2.7 ./bin/spark-shell

啓動成功後,在spark-shell中執行下面import語句:

scala> import org.apache.spark.streaming.kafka010._ <console>:25: error: object kafka is not a member of package org.apache.spark.streaming import org.apache.spark.streaming.kafka010._ ^

你能夠看到,立刻會報錯,由於找不到相關的jar包。而後咱們退出spark-shell。
根據Spark官網的說明,對於Spark2.3.0版本,若是要使用Kafka,則須要下載spark-streaming-kafka-0-10_2.11相關jar包。
如今請在Linux系統中,打開一個火狐瀏覽器,請點擊這裏訪問Spark官網,裏面有提供spark-streaming-kafka-0-10_2.11-2.3.0.jar文件的下載,其中,2.11表示scala的版本,2.3.0表示Spark版本號。下載後的文件會被默認保存在當前Linux登陸用戶的下載目錄下,本教程統一使用hadoop用戶名登陸Linux系統,因此,咱們就把這個文件複製到Spark目錄的jars目錄下。請新打開一個終端,輸入下面命令:

mkdir /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/kafka cp ./spark-streaming-kafka-0-10_2.11-2.3.0.jar /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/kafka

下面還要繼續把Kafka安裝目錄的libs目錄下的全部jar文件複製到「/usr/local/spark/jars/kafka」目錄下,請在終端中執行下面命令:

cd /usr/local/kafka/libs ls cp ./* /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/kafka

創建Spark項目

以前有不少教程都有說明如何建立Spark項目,這裏再次說明。首先在/usr/local/spark/mycode新建項目主目錄kafka,而後在kafka目錄下新建scala文件存放目錄以及scala工程文件

mkdir -p /usr/local/spark/mycode/kafka/src/main/scala

接着在src/main/scala文件下建立兩個文件,一個是用於設置日誌,一個是項目工程主文件,設置日誌文件爲StreamingExamples.scala

package org.apache.spark.examples.streaming import org.apache.spark.internal.Logging import org.apache.log4j.{Level, Logger} /** Utility functions for Spark Streaming examples. */ object StreamingExamples extends Logging { /** Set reasonable logging levels for streaming if the user has not configured log4j. */ def setStreamingLogLevels() { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { // We first log something to initialize Spark's default logging, then we override the // logging level. logInfo("Setting log level to [WARN] for streaming example." + " To override add a custom log4j.properties to the classpath.") Logger.getRootLogger.setLevel(Level.WARN) } } }

這個文件不作過多解釋,由於這只是一個輔助文件,下面着重介紹工程主文件,文件名爲KafkaTest.scala

package org.apache.spark.examples.streaming import java.util.HashMap import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.json4s._ import org.json4s.jackson.Serialization import org.json4s.jackson.Serialization.write import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.Interval import org.apache.spark.streaming.kafka010._ object KafkaWordCount { implicit val formats = DefaultFormats//數據格式化時須要 def main(args: Array[String]): Unit={ if (args.length < 3) { System.err.println("Usage: KafkaWordCount <brokers> <groupId> <topics>") System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(brokers, groupId, topics) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint("checkpoint") val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) // Get the lines, split them into words, count the words and print val lines = messages.map(_.value) val words = lines.flatMap(_.split(" "))//將輸入的每行用空格分割成一個個word // 對每一秒的輸入數據進行reduce,而後將reduce後的數據發送給Kafka val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_+_,_-_, Seconds(1), Seconds(1), 1).foreachRDD(rdd => { if(rdd.count !=0 ){ val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // 實例化一個Kafka生產者 val producer = new KafkaProducer[String, String](props) // rdd.colect即將rdd中數據轉化爲數組,而後write函數將rdd內容轉化爲json格式 val str = write(rdd.collect) // 封裝成Kafka消息,topic爲"result" val message = new ProducerRecord[String, String]("result", null, str) // 給Kafka發送消息 producer.send(message) } }) ssc.start() ssc.awaitTermination() } }

上述代碼註釋已經也很清楚了,下面在簡要說明下:

  1. 首先按每秒的頻率讀取Kafka消息;
  2. 而後對每秒的數據執行wordcount算法,統計出0的個數,1的個數,2的個數;
  3. 最後將上述結果封裝成json發送給Kafka。

另外,須要注意,上面代碼中有一行以下代碼:

ssc.checkpoint(".") 

這行代碼表示把檢查點文件寫入分佈式文件系統HDFS,因此必定要事先啓動Hadoop。若是沒有啓動Hadoop,則後面運行時會出現「拒絕鏈接」的錯誤提示。若是你尚未啓動Hadoop,則能夠如今在Ubuntu終端中,使用以下Shell命令啓動Hadoop:

cd /usr/local/hadoop #這是hadoop的安裝目錄 ./sbin/start-dfs.sh

另外,若是不想把檢查點寫入HDFS,而是直接把檢查點寫入本地磁盤文件(這樣就不用啓動Hadoop),則能夠對ssc.checkpoint()方法中的文件路徑進行指定,好比下面這個例子:

ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint")

checkpoint的意思就是創建檢查點,相似於快照,例如在spark計算裏面 計算流程DAG特別長,服務器須要將整個DAG計算完成得出結果,可是若是在這很長的計算流程中忽然中間算出的數據丟失了,spark又會根據RDD的依賴關係從頭至尾計算一遍,這樣子就很費性能,固然咱們能夠將中間的計算結果經過cache或者persist放到內存或者磁盤中,可是這樣也不能保證數據徹底不會丟失,存儲的這個內存出問題了或者磁盤壞了,也會致使spark從頭再根據RDD計算一遍,因此就有了checkpoint,其中checkpoint的做用就是將DAG中比較重要的中間數據作一個檢查點將結果存儲到一個高可用的地方(一般這個地方就是HDFS裏面)

運行項目

編寫好程序以後,下面介紹下如何打包運行程序。在/usr/local/spark/mycode/kafka目錄下新建文件simple.sbt,輸入以下內容:

name := "Simple Project" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0" libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.3.0" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.3.0" libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.2.11"

而後,便可編譯打包程序,輸入以下命令

/usr/local/sbt/sbt package

打包成功以後,接下來編寫運行腳本,在/usr/local/spark/mycode/kafka目錄下新建startup.sh文件,輸入以下內容:

/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --driver-class-path /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/*:/usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/kafka/* --class "org.apache.spark.examples.streaming.KafkaWordCount" /usr/local/spark/mycode/kafka/target/scala-2.11/simple-project_2.11-1.0.jar 127.0.0.1:9092 1 sex 

其中最後四個爲輸入參數,含義以下

  1. 127.0.0.1:9092爲brokerer地址
  2. 1 爲consumer group標籤
  3. sex爲消費者接收的topic
    最後在/usr/local/spark/mycode/kafka目錄下,運行以下命令便可執行剛編寫好的Spark Streaming程序
sh startup.sh

程序運行成功以後,下面經過以前的KafkaProducer和KafkaConsumer來檢測程序。

測試程序

下面開啓以前編寫的KafkaProducer投遞消息,而後將KafkaConsumer中接收的topic改成result,驗證是否能接收topic爲result的消息,更改以後的KafkaConsumer爲

from kafka import KafkaConsumer consumer = KafkaConsumer('result') for msg in consumer: print((msg.value).decode('utf8'))

在同時開啓Spark Streaming項目,KafkaProducer以及KafkaConsumer以後,能夠在KafkaConsumer運行窗口,出現如下相似數據:

[{"0":1},{"2":3},{"1":6}]
[{"0":5},{"2":2},{"1":3}]
[{"0":3},{"2":3},{"1":4}]
.......

4、結果展現

接下來作的事是,利用Flask-SocketIO實時推送數據,socket.io.js實時獲取數據,highlights.js展現數據。

Flask-SocketIO實時推送數據

將介紹如何利用Flask-SocketIO將結果實時推送到瀏覽器。
下載代碼,用python3.5 運行 app.py便可:

python app.py
相關文章
相關標籤/搜索