Streaming是一種數據傳輸技術,它把客戶機收到的數據變成一個穩定連續的流,源源不斷的輸出,使用戶聽到的聲音和圖像十分穩定,而用戶在整個文件傳輸完成開始前就能夠瀏覽文件。html
常見的流式計算框架:shell
l Apache stormapache
l Spark streaming編程
l Apache samzaapi
上述三種實時計算系統都是開源分佈式系統,具備低延遲,可擴展和容錯性諸多優勢,他們的共同特點在於:容許你在運行數據流代碼時,將任務分配到一系列具備容錯能力的計算機上並行運行。此外,他們都提供了簡單的api來簡化底層複雜的程度。app
實時計算框架的對比參考文檔:http://www.csdn.net/article/2015-03-09/2824135框架
Spark Streaming是對spark core api的擴展,他是一個分佈式的,高吞吐量,具備容錯性的實時數據處理系統。socket
Spark streaming處理數據時一批一批處理的,所以spark streaming僅是一個準實時處理系統,其底層本質上仍是基於spark core的批處理應用。分佈式
參考:http://spark.apache.org/docs/1.3.0/streaming-programming-guide.htmlide
一、在shell中運行下面命令:
$ nc -lk 9999
二、打開另外一個shell,運行下面命令:
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
三、在第一個客戶端下輸入一些以空格分割的單詞,在第二個shell端能夠實時看到對這些輸入進行的單詞統計:
四、從以上例子中咱們能夠整理出spark streaming的編程模型
//導入依賴包 import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ //初始化StreamingContext對象 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) //如下定義了從哪裏讀取數據 val lines = ssc.socketTextStream("localhost", 9999) //如下是真正的功能實現 val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() //啓動spark streaming ssc.start() ssc.awaitTermination() |
五、初始化StreamingContext的兩種方式:
1) 從sparkConf建立,一般用於在idea中編程使用。
2) 從已有的spark contact對象建立,通常應用於spark-shell測試使用。
六、spark streaming讀取hdfs數據
6.1)代碼:
//導入依賴包 import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ //初始化StreamingContext對象 val ssc = new StreamingContext(sc, Seconds(1)) //如下定義了從哪裏讀取數據 val lines = ssc.textFileStream("hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/") //如下是真正的功能實現 val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() //啓動spark streaming ssc.start() ssc.awaitTermination() |
6.2)在spark-shell上運行上述代碼:
建立spark streaming讀取hdfs目錄:
$ bin/hdfs dfs -mkdir hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/
準備數據:
$ cat /opt/datas/wc.input
hadoop
hdfs yarn mapreduce zookeeper
hive
sqoop flume oozie hue
hbase
storm scala kafka spark
啓動spark-shell,手動運行以上代碼:
$ bin/spark-shell --master local[2]
scala> import org.apache.spark._
import org.apache.spark._
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._
scala> import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext._
scala> val ssc = new StreamingContext(sc, Seconds(1))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@714e203a
scala> val lines = ssc.textFileStream("hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/")
17/07/12 16:56:40 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@3d18ac9
lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@74462773
scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@55322d12
scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@4d0fc96d
scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@34e46a44
scala> wordCounts.print()
//運行如下代碼,即啓動spark shell
scala> ssc.start()
scala> ssc.awaitTermination()
另起一個shell終端,將測試數據上傳到hdfs下hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/目錄下:
$ bin/hdfs dfs -put /opt/datas/wc.input hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/1
這時咱們可能從spark-shell終端獲取spark streaming的輸出,以下:
-------------------------------------------
Time: 1499850053000 ms
-------------------------------------------
(scala,1)
(hive,1)
(oozie,1)
(mapreduce,1)
(zookeeper,1)
(hue,1)
(yarn,1)
(kafka,1)
(sqoop,1)
(spark,1)
...
6.3)簡化的測試方法
咱們能夠發現,以上方法進行spark開發,須要一行一行加載代碼,這種方式比較麻煩,那麼有沒有好的方法一次性加載全部代碼呢?固然是存在的,下面咱們測試一下經過spark-shell中加載scala文件的方式進行開發測試:
首先建立一個文件用於存儲上述代碼:
$ cat /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/HDFSSparkStreaming.scala
//導入依賴包
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
//初始化StreamingContext對象
val ssc = new StreamingContext(sc, Seconds(1))
//如下定義了從哪裏讀取數據
val lines = ssc.textFileStream("hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/")
//如下是真正的功能實現
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
//啓動spark streaming
ssc.start()
ssc.awaitTermination()
刪除hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/目錄下的全部文件:
$ bin/hdfs dfs -rm hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/*
啓動一個spark-shell:
$ bin/spark-shell --master local[2]
Spark-shell以文本方式運行scala代碼:
scala> :load /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/HDFSSparkStreaming.scala
另起客戶端想目標目錄傳遞文件:
$ bin/hdfs dfs -put /opt/datas/wc.input hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/1