Spark Streaming

1、spark的野心:java

一、取代mapreduce--->Batch Processingnode

二、spark  Sql   --->hiveapache

三、stream  processing  --->strom小程序

吞吐量比storm大,處理速度是storm的2到5倍,可是延遲是秒級別的api

sparkstream  是微批處理框架

sparkstream  是以時間進行分割的,的核心仍是批處理,有任務的啓停,最小批次是不能小於500毫秒(過小了反應不過來),通常是500---2000毫秒socket

2、Spark streaming基本概念ide

離散流(discretized stream)或DStream:這是Spark Streaming對內部持續的實時數據流的抽象描述,即咱們處理的一個實時數據流,在Spark Streaming中對應於一個DStream實例。函數

批數據(batch data):這是化整爲零的第一步,將實時流數據以時間片爲單位進行分批,將流處理轉化爲時間片數據的批處理。隨着持續時間的推移,這些處理結果就造成了對應的結果數據流了。性能

時間片或批處理時間間隔(batch interval):這是人爲地對流數據進行定量的標準,以時間片做爲咱們拆分流數據的依據。一個時間片的數據對應一個RDD實例。

窗口長度(window length):一個窗口覆蓋的流數據的時間長度。必須是批處理時間間隔的倍數,

滑動時間間隔:前一個窗口到後一個窗口所通過的時間長度。必須是批處理時間間隔的倍數

Input DStream:一個input DStream是一個特殊的DStream,將Spark Streaming鏈接到一個外部數據源來讀取數據

 

Spark Streaming是將流式計算分解成一系列短小的批處理做業。這裏的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分紅一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),而後將Spark Streaming中對DStream的Transformation操做變爲針對Spark中對RDD的Transformation操做,將RDD通過操做變成中間結果保存在內存中。整個流式計算根據業務的需求能夠對中間的結果進行疊加或者存儲到外部設備。

3、性能

實時性:對於實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對於每一段數據的處理都會通過Spark DAG圖分解以及Spark的任務集的調度過程。對於目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),因此Spark Streaming可以知足除對實時性要求很是高(如高頻實時交易)以外的全部流式準實時計算場景。

擴展性與吞吐量:Spark目前在EC2上已可以線性擴展到100個節點(每一個節點4Core),能夠以數秒的延遲處理6GB/s的數據量(60M(60M條記錄) records/s),其吞吐量也比流行的Storm高2~5倍,圖4是Berkeley利用WordCount和Grep兩個用例所作的測試,在Grep這個測試中,Spark Streaming中的每一個節點的吞吐量是670k records/s,而Storm是115k records/s

 

4、實戰:

=======================================================================================================================

一、第一個sparkstreaming 小程序(不能實現累加)

導入必要的jar包

 

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by root on 2016/7/24.
  */
object sparkstream {

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    var conf = new SparkConf().setAppName("mysparkStream").setMaster("local[2]")
    var ssc = new StreamingContext(conf ,Seconds(5))
    var input = ssc.socketTextStream("node11",9999,StorageLevel.MEMORY_AND_DISK)
    input.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
    ssc.start()
    ssc.awaitTermination()
  }
}

 

在node12 上安裝nc服務  

yum -y install nc

啓動服務 端口9999  

nc -lk  9999

而後輸入數據上面的程序就能夠計算了

截屏:

============================================================================================================================

Spark Streaming 是Spark核心API的一個擴展,能夠實現高吞吐量的、具有容錯機制的實時流數據的處理。支持從多種數據源獲取數據,包括Kafka、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets

 

spark中的算子:

transform函數:

sparkstreaming  中的數據類型都是DStream類型的,當DStream類型的數據的操做算子不能知足需求時 transform函數能夠將DStream轉化爲rdd類型進行操做,操做完成以後再轉化爲DStream類型返回;

updateStateByKey:記錄以前數據的狀態,和如今的數據進行計算

=======================================================================================================================

例子二、wordcount實例  (updateStateByKey函數的使用)

package com.kafka.test
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.dstream.DStream

object WordCount {
  def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    var conf = new SparkConf().setAppName("mysparkStream").setMaster("local[2]")
    var ssc = new StreamingContext(conf ,Seconds(5))
        ssc.checkpoint("E:/wordcount");
        val updateFunc = (values :Seq[Int],state:Option[Int])=> {
          val  currentCount = values.sum;
          val   previousCount = state.getOrElse(0);
          Some(currentCount+previousCount);
        }
    var lines: ReceiverInputDStream[String] = ssc.socketTextStream("node04",9999,StorageLevel.MEMORY_AND_DISK)
    var words: DStream[(String, Int)] =lines.flatMap { x => x.split(" ") }.map { x => (x,1) };
    
    val stateDstream = words.updateStateByKey[Int](updateFunc);
    stateDstream.print();
    ssc.start();
    ssc.awaitTermination()
  }
}

一樣要啓動 nc  -lk   9999 端口 ,而後向其中添加數據

=======================================================================================================================

窗口理解:

Spark Streaming還提供了窗口的計算,它容許你經過滑動窗口對數據進行轉換,窗口轉換操做以下:

==================================================================================================================

例子三、窗口

package com.kafka.test
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.storage.StorageLevel
import org.netlib.util.Second
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Durations
import org.apache.spark.streaming.Durations

object WindowBaseTopWord {
  
  def main(args: Array[String]) {
        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName("WindowBasedTopWord").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Durations.seconds(5)) //這裏的5秒是指切分RDD的間隔
    ssc.checkpoint("E:/wordcount_checkpoint") //設置docheckpoint目錄,沒有會自動建立


    val words: ReceiverInputDStream[String] = ssc.socketTextStream("node04",9999)
    val pairs = words.flatMap(_.split(" ")).map(x => (x,1))
    pairs.foreachRDD(rdd => {
      println("--------------split RDD begin--------------")
      rdd.foreach(println)
      println("--------------split RDD end--------------")
    })
    /*
    reduceByKeyAndWindow(reduceFunc,invReduceFunc,windowDuration,slideDuration)
    reduceFunc:用於計算window框住的RDDS
    invReduceFunc:用於優化的函數,減小window滑動中去計算重複的數據,經過「_-_」便可優化
    windowDuration:表示window框住的時間長度,如本例5秒切分一次RDD,框20秒,就會保留最近4次切分的RDD
    slideDuration:表示window滑動的時間長度,即每隔多久執行本計算


    本例5秒切分一次RDD,每次滑動10秒,window框住20秒的RDDS,即:每10秒計算最近20秒切分的RDDS,中間有10秒重複,
    經過invReduceFunc參數進行去重優化
     */
    val pairsWindow = pairs.reduceByKeyAndWindow(_+_,_-_,Durations.seconds(20),Durations.seconds(10))
    val sortDstream = pairsWindow.transform(rdd => {
      val sortRdd = rdd.map(t => (t._2,t._1)).sortByKey(false).map(t => (t._2,t._1))  //降序排序
      val more = sortRdd.take(3)  //取前3個輸出
      println("--------------print top 3 begin--------------")
      more.foreach(println)
      println("--------------print top 3 end--------------")
      sortRdd
    })
    sortDstream.print()


    ssc.start()
    ssc.awaitTermination()
  }

}

nc  -lk  9999

=======================================================================================================

例子四、SparkStreaming 從kafka中讀取數據計算單詞數(不是總數)

package com.kafka.test;

import java.util.HashMap;
import java.util.Map;


import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import scala.actors.threadpool.Arrays;

public class KafkaStreamingWordCount {

    public static void main(String[] args) {
         Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);
        //接收數據的地址和端口
        String zkQuorum = "192.168.47.11:2181,192.168.47.12:2181,192.168.47.13:2181";
        //話題所在的組
        String group = "1";
        //話題名稱以「,」分隔
        String topics = "test3,test2";
        //每一個話題的分片數
        int numThreads = 2;
        SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
//        jssc.checkpoint("checkpoint"); //設置檢查點
        //存放話題跟分片的映射關係
        Map<String, Integer> topicmap = new HashMap<>();
        String[] topicsArr = topics.split(",");
        int n = topicsArr.length;
        for(int i=0;i<n;i++){
            topicmap.put(topicsArr[i], numThreads);
        }
        //從Kafka中獲取數據轉換成RDD
        JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);
        //從話題中過濾所需數據
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {

            @Override
            public Iterable<String> call(Tuple2<String, String> arg0)
                    throws Exception {
                return Arrays.asList(arg0._2.split(" "));
            }
        });
        //對其中的單詞進行統計
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });
        //打印結果
        wordCounts.print();
        jssc.start();
        jssc.awaitTermination();

    }

}

這裏須要啓動kafka集羣,並向其topic中實時注入數據

相關文章
相關標籤/搜索