spark版本定製十八:Spark Streaming中空RDD處理及流處理程序優雅的中止

本期內容:sql

一、Spark Streaming中RDD的空處理數據庫

二、StreamingContext程序的中止apache

1、Spark Streaming中RDD的空處理

案例代碼:
 
Scala代碼:
 
package com.dt.spark.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 使用Scala開發集羣運行的Spark Streaming的foreachRDD把處理後的數據寫入外部存儲系統中
 *
 * 
 * 背景描述:在廣告點擊計費系統中,咱們在線過濾掉黑名單的點擊,進而保護廣告商的利益,只進行有效的廣告點擊計費
 *               或者在防刷評分(或者流量)系統,過濾掉無效的投票或者評分或者流量;
 * 實現技術:使用transform Api直接基於RDD編程,進行join操做
  *
  */
object OnlineForeachRDD2DB {
     def main(args: Array[String]){

       val conf =  new SparkConf() //建立SparkConf對象
      conf.setAppName( "OnlineWordcount") //設置應用程序的名稱,在程序運行的監控界面能夠看到名稱
      conf.setMaster( "spark://Master:7077") //此時,程序在Spark集羣
      /**
        * 設置batchDuration時間間隔來控制Job生成的頻率而且建立Spark Streaming執行的入口
         */
       val ssc =  new StreamingContext(conf, Seconds(300))
     
       val lines = ssc.socketTextStream( "Master", 9999)

       val words = lines.flatMap(line => line.split( " "))

       val wordCounts = words.map(word => (word,1)).reduceByKey(_ + _)

      wordCounts.foreachRDD{ rdd =>

        /**
        * 例如:rdd爲空,rdd爲空會產生什麼問題呢?
          *     rdd沒有任何元素,可是也會作作foreachPartition,也會進行寫數據庫的操做或者把數據寫到HDFS上,
          *         rdd裏面沒有任何記錄,可是還會獲取計算資源,而後計算一下,消耗計算資源,這個時候純屬浪費資源,
          *         因此必須對空rdd進行處理;
          *
        *         例如:使用rdd.count()>0,可是rdd.count()會觸發一個Job;
          *             使用rdd.isEmpty()的時候,take也會觸發Job;
          *             def isEmpty(): Boolean = withScope {
        *                   partitions.length == 0 || take(1).length == 0
        *             }
        *
        *              rdd.partitions.isEmpty裏判斷的是length是否等於0,就表明是否有partition
        *              def isEmpty: Boolean = { length == 0 }
        *
        *
        *             注:rdd.isEmpty()和rdd.partitions.isEmpty是兩種概念;
          */

       rdd.partitions.isEmpty
     if(rdd.isEmpty()) {
        rdd.foreachPartition{ partitonOfRecord =>
           val connection = ConnectionPool.getConnection()
          partitonOfRecord.foreach(record => {
             val sql =  "insert into streaming_itemcount(item,rcount) values('" + record._1 +  "'," + record._2 +  ")"
             val stmt = connection.createStatement()
            stmt.executeUpdate(sql)
            stmt.close()
          })
          ConnectionPool.returnConnection(connection)
        }}
      }
 
     ssc.start()
      ssc.awaitTermination()
    }
}
 

2、StreamingContext程序的中止

第一種中止方式是無論接受到數據是否處理完成,直接被中止掉,第二種方式是接受到數據所有處理完成才中止掉,通常採用第二種方式。
 
第一種中止方式:
 
/**
 * Stop the execution of the streams immediately (does not wait for all received data
 * to be processed). By default, if ` stopSparkContextis not specified, the underlying
 * SparkContext will also be stopped. This implicit behavior can be configured using the
 * SparkConf configuration spark.streaming.stopSparkContextByDefault.
 *
 * 把streams的執行直接中止掉(並不會等待全部接受到的數據處理完成),默認狀況下SparkContext也會被中止掉,
 * 隱式的行爲能夠作配置,配置參數爲spark.streaming.stopSparkContextByDefault。
  *
 * @param stopSparkContext  If true, stops the associated SparkContext. The underlying SparkContext
 *                         will be stopped regardless of whether this StreamingContext has been
 *                         started.
 */
def stop(
         stopSparkContext: Boolean = conf.getBoolean( "spark.streaming.stopSparkContextByDefault"true)
        ): Unit = synchronized {
 stop(stopSparkContext,  false)
}
 
 
第二種中止方式:
 
/**
 * Stop the execution of the streams, with option of ensuring all received data
 * has been processed.
 *
 * 全部接受到的數據所有被處理完成,才把streams的執行中止掉
  *
 * @param stopSparkContext  if true, stops the associated SparkContext. The underlying SparkContext
 *                         will be stopped regardless of whether this StreamingContext has been
 *                         started.
 * @param stopGracefully  if true, stops gracefully by waiting for the processing of all
 *                       received data to be completed
 */
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
  var shutdownHookRefToRemove: AnyRef =  null
 if (AsynchronousListenerBus.withinListenerThread.value) {
   throw new SparkException( "Cannot stop StreamingContext within listener thread of" +
    " AsynchronousListenerBus")
 }
 synchronized {
   try {
   state  match {
     case INITIALIZED =>
     logWarning( "StreamingContext has not been started yet")
     case STOPPED =>
     logWarning( "StreamingContext has already been stopped")
     case ACTIVE =>
     scheduler.stop(stopGracefully)
      // Removing the streamingSource to de-register the metrics on stop()
     env.metricsSystem.removeSource(streamingSource)
     uiTab.foreach(_.detach())
     StreamingContext.setActiveContext( null)
     waiter.notifyStop()
      if (shutdownHookRef !=  null) {
      shutdownHookRefToRemove = shutdownHookRef
      shutdownHookRef =  null
     }
     logInfo( "StreamingContext stopped successfully")
   }
  }  finally {
    // The state should always be Stopped after calling `stop()`, even if we haven't started yet
   state = STOPPED
  }
 }
  if (shutdownHookRefToRemove !=  null) {
  ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)
 }
  // Even if we have already stopped, we still need to attempt to stop the SparkContext because
 // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
  if (stopSparkContext) sc.stop()
}
 

特別感謝王家林老師的獨具一格的講解:編程

王家林老師名片:微信

中國Spark第一人less

新浪微博:http://weibo.com/ilovepainssocket

微信公衆號:DT_Sparkui

博客:http://blog.sina.com.cn/ilovepainsthis

QQ:1740415547spa

YY課堂:天天20:00現場授課頻道68917580

相關文章
相關標籤/搜索