Spark Streaming實戰演練

1、spark streaming簡介

Streaming是一種數據傳輸技術,它把客戶機收到的數據變成一個穩定連續的流,源源不斷的輸出,使用戶聽到的聲音和圖像十分穩定,而用戶在整個文件傳輸完成開始前就能夠瀏覽文件。html

常見的流式計算框架:shell

l Apache stormapache

l Spark streaming編程

l Apache samzaapi

上述三種實時計算系統都是開源分佈式系統,具備低延遲,可擴展和容錯性諸多優勢,他們的共同特點在於:容許你在運行數據流代碼時,將任務分配到一系列具備容錯能力的計算機上並行運行。此外,他們都提供了簡單的api來簡化底層複雜的程度。app

實時計算框架的對比參考文檔:http://www.csdn.net/article/2015-03-09/2824135框架

Spark Streaming是對spark core api的擴展,他是一個分佈式的,高吞吐量,具備容錯性的實時數據處理系統。socket

clip_image002

Spark streaming處理數據時一批一批處理的,所以spark streaming僅是一個準實時處理系統,其底層本質上仍是基於spark core的批處理應用。分佈式

clip_image004

2、一個簡單的spark streaming示例

參考:http://spark.apache.org/docs/1.3.0/streaming-programming-guide.htmlide

一、在shell中運行下面命令:

$ nc -lk 9999

二、打開另外一個shell,運行下面命令:

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

三、在第一個客戶端下輸入一些以空格分割的單詞,在第二個shell端能夠實時看到對這些輸入進行的單詞統計:

clip_image006

四、從以上例子中咱們能夠整理出spark streaming的編程模型

//導入依賴包
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
 
//初始化StreamingContext對象
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

//如下定義了從哪裏讀取數據

val lines = ssc.socketTextStream("localhost", 9999)

//如下是真正的功能實現

val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
 
//啓動spark streaming
ssc.start()
ssc.awaitTermination()

五、初始化StreamingContext的兩種方式:

1) 從sparkConf建立,一般用於在idea中編程使用。

2) 從已有的spark contact對象建立,通常應用於spark-shell測試使用。

clip_image008

六、spark streaming讀取hdfs數據

6.1)代碼:

//導入依賴包

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

//初始化StreamingContext對象

val ssc = new StreamingContext(sc, Seconds(1))

//如下定義了從哪裏讀取數據

val lines = ssc.textFileStream("hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/")

//如下是真正的功能實現

val words = lines.flatMap(_.split(" "))

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()

//啓動spark streaming

ssc.start()

ssc.awaitTermination()

6.2)在spark-shell上運行上述代碼:

建立spark streaming讀取hdfs目錄:

$ bin/hdfs dfs -mkdir hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/

準備數據:

$ cat /opt/datas/wc.input

hadoop

hdfs yarn mapreduce zookeeper

hive

sqoop flume oozie hue

hbase

storm scala kafka spark

啓動spark-shell,手動運行以上代碼:

$ bin/spark-shell --master local[2]

scala> import org.apache.spark._

import org.apache.spark._

scala> import org.apache.spark.streaming._

import org.apache.spark.streaming._

scala> import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.streaming.StreamingContext._

scala> val ssc = new StreamingContext(sc, Seconds(1))

ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@714e203a

scala> val lines = ssc.textFileStream("hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/")

17/07/12 16:56:40 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@3d18ac9

lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@74462773

scala> val words = lines.flatMap(_.split(" "))

words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@55322d12

scala> val pairs = words.map(word => (word, 1))

pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@4d0fc96d

scala> val wordCounts = pairs.reduceByKey(_ + _)

wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@34e46a44

scala> wordCounts.print()

//運行如下代碼,即啓動spark shell

scala> ssc.start()

scala> ssc.awaitTermination()

另起一個shell終端,將測試數據上傳到hdfs下hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/目錄下:

$ bin/hdfs dfs -put /opt/datas/wc.input hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/1

這時咱們可能從spark-shell終端獲取spark streaming的輸出,以下:

-------------------------------------------

Time: 1499850053000 ms

-------------------------------------------

(scala,1)

(hive,1)

(oozie,1)

(mapreduce,1)

(zookeeper,1)

(hue,1)

(yarn,1)

(kafka,1)

(sqoop,1)

(spark,1)

...

6.3)簡化的測試方法

咱們能夠發現,以上方法進行spark開發,須要一行一行加載代碼,這種方式比較麻煩,那麼有沒有好的方法一次性加載全部代碼呢?固然是存在的,下面咱們測試一下經過spark-shell中加載scala文件的方式進行開發測試:

首先建立一個文件用於存儲上述代碼:

$ cat /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/HDFSSparkStreaming.scala

//導入依賴包

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

//初始化StreamingContext對象

val ssc = new StreamingContext(sc, Seconds(1))

//如下定義了從哪裏讀取數據

val lines = ssc.textFileStream("hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/")

//如下是真正的功能實現

val words = lines.flatMap(_.split(" "))

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()

//啓動spark streaming

ssc.start()

ssc.awaitTermination()

刪除hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/目錄下的全部文件:

$ bin/hdfs dfs -rm hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/*

啓動一個spark-shell:

$ bin/spark-shell --master local[2]

Spark-shell以文本方式運行scala代碼:

scala> :load /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/HDFSSparkStreaming.scala

另起客戶端想目標目錄傳遞文件:

$ bin/hdfs dfs -put /opt/datas/wc.input hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/1

相關文章
相關標籤/搜索