Flume+Kafka+SparkStreaming+Hbase+可視化(三)

SparkStreaming
  1).概述:可擴展、高可用、容錯性
        一站式解決方案
    2).原理
    粗粒度:Spark Streaming 接受實時流數據,將數據切分爲批次數據,交由Spark Engine處理數據。組RDD,微批處理。
    細粒度:
  3).核心
  StreamingContext:
    start 以後不能 add StreamingContext
    Stop 以後不能 restart
    一個JVM僅僅能有一個StreamingContext
    stop(false) 能僅僅中止StreamingContext而不中止SparkContext。stop() 二者所有中止
    ssc = new StreamingContext(conf, Seconds(2))
  Dstream:
    來源於Dstrem或者Source
    一系列的RDD,每一個批次
    Dstream操做底層爲RDD操做
    Input Dstream & Receviers:
  sources,除了文件系統源,全部的input dstream都須要關聯Receivers(ReceiverDstream)
  基於Receiver的Dstream線程數必須大於 Receiver數量
  transform Output Operator
 
  4).帶狀態的算子 updateStateByKey
  
package com.spark.train

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/*
spark streaming 有狀態的WordCount
updateStateByKey
 */

object statefulWordCount {

  def main(args: Array[String]): Unit = {

    val conf  = new SparkConf().setMaster("local[2]").setAppName("updateStateByKey for wordCount")
    val ssc = new StreamingContext(conf, Seconds(4))

    // need a checkPoint
    ssc.checkpoint("/opt/datas/spark_data/updateStateByKey1")

    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" ")).map((_, 1))
    val wordCount = words.updateStateByKey[Int](updateFunc _)

    wordCount.print()


    ssc.start()
    ssc.awaitTermination()

  }

  /** updateStateByKey update function
    * currentValues: new count
    * preValues: previous count
    *
    * Some(): som
    */
  def updateFunc(currentValues:Seq[Int], preValuse:Option[Int]):Option[Int] = {

    val newCount = currentValues.sum
    val preCount = preValuse.getOrElse(0)
    Some(newCount + preCount)
  }
}

  

  5).黑名單過濾
  
package com.spark.train

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
  * 黑名單過濾
  */

object filterBlackListSparkStreaming {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("filter black list")
    val ssc = new StreamingContext(conf, Seconds(5))

    /**
      * 黑名單轉 RDD
      *
      */
    val blackList = List("ls","zs")
    val rddBlackList = ssc.sparkContext.parallelize(blackList).map(x => (x, true))

    val logs = ssc.socketTextStream("localhost", 9999)
      .map(x => (x.split(",")(1), x))
      .transform(rdd => {
        rdd.leftOuterJoin(rddBlackList)
          .filter(x => x._2._2.getOrElse(false) != true)
          .map(x => x._2._1)
      })

    logs.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

  

  Flume和Spark Streaming集成兩種方式:代碼以下:
  
package com.spark.test

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils

object flumeSparkStreaming {
  
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("flumeDstream")
    val ssc = new StreamingContext(conf, Seconds(5))

    /** 第一種方式
      * define flumeDstream
      * 先啓動spark streaming ,在啓動flume,最好telnet 開始wordCount
      */
    val flumeDstream = FlumeUtils.createStream(ssc, "bigdata", 3333)
    val wordCnt = flumeDstream.map(x => new String(x.event.getBody.array()).trim())
        .flatMap(_.split(" "))
        .map((_,1)).reduceByKey(_ + _)

    wordCnt.print()

    
    /**第二種方式
      * createPollingStream
      * 須要從新設置flume sink爲 org.apache.spark.streaming.flume.sink.SparkSink
      * 其餘配置同樣,須要先啓動flume,數據會緩存,而後streaming 去pull 數據
      */
//    val XXX = FlumeUtils.createPollingStream(ssc, "bigdata", 3333)
//      .map(x => new String(x.event.getBody.array()).trim)
//      .flatMap(_.split(""))
//      .map((_, 1))
//      .reduceByKey(_ + _)

    ssc.start()
    ssc.awaitTermination()
  }
}
相關文章
相關標籤/搜索