val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER) val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER) val lines = lines1.union(lines2) lines.repartition(100) //經過repartition設置 //處理的邏輯,就是簡單的進行word count val words = lines.repartition(100).flatMap(_.split(" ")) //本身設置決定ShuffleRDD的分區數 以及分區算法,默認是core的數量 val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10)) //併發度是10個分區,根據集羣資源狀況調節
import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** * WordCount程序,Spark Streaming消費TCP Server發過來的實時數據的例子: * * 一、在master服務器上啓動一個Netcat server * `$ nc -lk 9998` (若是nc命令無效的話,咱們能夠用yum install -y nc來安裝nc) * * 二、用下面的命令在在集羣中將Spark Streaming應用跑起來 * spark-submit --class com.twq.wordcount.JavaNetworkWordCount \ * --master spark://master:7077 \ * --deploy-mode client \ * --driver-memory 512m \ * --executor-memory 512m \ * --total-executor-cores 4 \ * --executor-cores 2 \ * /home/hadoop-twq/spark-course/streaming/spark-streaming-basic-1.0-SNAPSHOT.jar */ object KryoNetworkWordCount { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("KryoNetworkWordCount") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //指定spark.serializer.KryoSerializer sparkConf.set("spark.kryo.registrator", "com.twq.spark.rdd.example.ClickTrackerKryoRegistrator") // 自定義的數據類型經過Kryo序列化 val sc = new SparkContext(sparkConf) // Create the context with a 1 second batch size val ssc = new StreamingContext(sc, Seconds(1)) //若是一個batchInterval中的數據量不大,而且沒有window等操做,則能夠使用MEMORY_ONLY val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_ONLY_SER) //處理的邏輯,就是簡單的進行word count val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) //將結果輸出到控制檯 wordCounts.print() //啓動Streaming處理流 ssc.start() //等待Streaming程序終止 ssc.awaitTermination() } } class ClickTrackerKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[TrackerLog]) } } case class TrackerLog(id: String, name: String)