性能:Transform層面

數據處理的並行度

一、BlockRDD的分區數
(1)經過Receiver接受數據的特色決定
(2)也能夠本身經過repartition設置
二、ShuffleRDD的分區數
(1)默認的分區數爲spark.default.parallelism(core的大小)
(2)經過咱們本身設置決定
 val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)

    val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER)

    val lines = lines1.union(lines2)

    lines.repartition(100)  //經過repartition設置  

    //處理的邏輯,就是簡單的進行word count
    val words = lines.repartition(100).flatMap(_.split(" "))
     //本身設置決定ShuffleRDD的分區數 以及分區算法,默認是core的數量 
    val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))   //併發度是10個分區,根據集羣資源狀況調節

數據的序列化

兩種須要序列化的數據:
一、輸入數據
默認是以StorageLevel.MEMORY_AND_DISK_SER_2的形式存儲在executor上的內存中(以序列化的方式存儲在內存中,內存不夠放在DISK中)
二、Streaming操做中產生的緩存RDD
默認是以StorageLevel.MEMORY_ONLY_SER的形式存儲的內存中
使用Kryo序列化機制,比Java序列化機制性能好
 
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * WordCount程序,Spark Streaming消費TCP Server發過來的實時數據的例子:
  *
  * 一、在master服務器上啓動一個Netcat server
  * `$ nc -lk 9998` (若是nc命令無效的話,咱們能夠用yum install -y nc來安裝nc)
  *
  * 二、用下面的命令在在集羣中將Spark Streaming應用跑起來
  * spark-submit --class com.twq.wordcount.JavaNetworkWordCount \
  * --master spark://master:7077 \
  * --deploy-mode client \
  * --driver-memory 512m \
  * --executor-memory 512m \
  * --total-executor-cores 4 \
  * --executor-cores 2 \
  * /home/hadoop-twq/spark-course/streaming/spark-streaming-basic-1.0-SNAPSHOT.jar
  */
object KryoNetworkWordCount {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("KryoNetworkWordCount")

    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")    //指定spark.serializer.KryoSerializer
    sparkConf.set("spark.kryo.registrator", "com.twq.spark.rdd.example.ClickTrackerKryoRegistrator")  //  自定義的數據類型經過Kryo序列化

    val sc = new SparkContext(sparkConf)

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sc, Seconds(1))

    //若是一個batchInterval中的數據量不大,而且沒有window等操做,則能夠使用MEMORY_ONLY
    val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_ONLY_SER)

    //處理的邏輯,就是簡單的進行word count
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    //將結果輸出到控制檯
    wordCounts.print()

    //啓動Streaming處理流
    ssc.start()

    //等待Streaming程序終止
    ssc.awaitTermination()
  }
}

class ClickTrackerKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[TrackerLog])
  }
}

case class TrackerLog(id: String, name: String)

  

內存調優

一、須要內存大小
和transform類型有關係
數據存儲的級別
 
二、GC
driver端和executor端都使用CMS垃圾收集器
CMS(Concurrent Mark Sweep 標記清除算法)收集器是一種以獲取最短回收停頓時間爲目標的收集器
(經過--driver-java-options和spark.executor.extraJavaOptions)
相關文章
相關標籤/搜索