Spark中程序最容易出錯的是流處理,流處理也是目前spark技術瓶頸之一,因此要作出一個優秀的spark發行版的話,對流處理的優化是必需的。html
根據spark歷史演進的趨勢,spark graphX,機器學習已經發展得很是好。對它進行改進是重要的,單不是最重要的。最最重要的仍是流處理,而流處理最爲核心的是流處理結合機器學習,圖計算的一體化結合使用,真正的實現一個堆棧rum them all .node
1 流處理最容易出錯apache
2 流處理結合圖計算和機器學習將發揮出巨大的潛力api
3 構造出複雜的實時數據處理的應用程序網絡
流處理實際上是構建在spark core之上的一個應用程序app
一:代碼案例:機器學習
import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by hadoop on 2016/4/18. * 背景描述 在廣告點擊計費系統中 咱們在線過濾掉 黑名單的點擊 進而保護廣告商的利益 * 只有效的廣告點擊計費 *新浪微博:http://www.weibo.com/ilovepains */ object OnlineBlanckListFilter extends App{ //val basePath = "hdfs://master:9000/streaming" val conf = new SparkConf().setAppName("SparkStreamingOnHDFS") if(args.length == 0) conf.setMaster("spark://Master:7077") val ssc = new StreamingContext(conf, Seconds(30)) val blackList = Array(("hadoop", true) , ("mahout", true), ("spark", false)) val backListRDD = ssc.sparkContext.parallelize(blackList) val adsClickStream = ssc.socketTextStream("192.168.74.132", 9000, StorageLevel.MEMORY_AND_DISK_SER_2) val rdd = adsClickStream.map{ads => (ads.split(" ")(1), ads)} val validClicked = rdd.transform(userClickRDD => { val joinedBlackRDD = userClickRDD.leftOuterJoin(backListRDD) joinedBlackRDD.filter(joinedItem => { if(joinedItem._2._2.getOrElse(false)){ false }else{ true } }) }) validClicked.map(validClicked => { validClicked._2._1 }).print() ssc.start() ssc.awaitTermination() }
二:sparkStreamUI 觀察任務狀態:socket
思考:這裏一共有5個JOB,第 2 3 4 是咱們在代碼中觸發的JOB那麼 第0和第1個JOB從何而來?ide
咱們查看JOB0 的UI:oop
咱們發現這個任務是咱們的應用程序啓動後就有了,思考:這個JOB是幹什麼的?
咱們從源碼出發
/**類名:ReceiverTracker * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. */ private def launchReceivers(): Unit = { val receivers = receiverInputStreams.map(nis => { val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr }) runDummySparkJob() logInfo("Starting " + receivers.length + " receivers") endpoint.send(StartAllReceivers(receivers)) }
咱們關注
runDummySparkJob
這個方法:
/** * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the * receivers to be scheduled on the same node. * * TODO Should poll the executor number and wait for executors according to * "spark.scheduler.minRegisteredResourcesRatio" and * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job. */ private def runDummySparkJob(): Unit = { if (!ssc.sparkContext.isLocal) { ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() } assert(getExecutors.nonEmpty) }
這個註釋清楚地說明了這個任務的做用,爲了最大化第利用集羣資源,避免數據接收都在一個節點上
如今咱們繼續關注JOB1 這個任務:
咱們繼續查看這個任務的詳情:
筆者這次數據的來源是用 nc -lk 9000(在worker2 這個節點上運行的)
咱們懷疑這個就是數據的接收點?
源碼中找解釋:
/** * Start a receiver along with its scheduled executors */ private def startReceiver( receiver: Receiver[_], scheduledLocations: Seq[TaskLocation]): Unit = { def shouldStartReceiver: Boolean = { // It's okay to start when trackerState is Initialized or Started !(isTrackerStopping || isTrackerStopped) } val receiverId = receiver.streamId if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) return } val checkpointDirOption = Option(ssc.checkpointDir) val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) // Function to start the receiver on the worker node val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } } // Create the RDD using the scheduledLocations to run the receiver in a Spark job val receiverRDD: RDD[Receiver[_]] = if (scheduledLocations.isEmpty) { ssc.sc.makeRDD(Seq(receiver), 1) } else { val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) } receiverRDD.setName(s"Receiver $receiverId") ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) // We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete { case Success(_) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } case Failure(e) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logError("Receiver has been stopped. Try to restart it.", e) logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } }(submitJobThreadPool) logInfo(s"Receiver ${receiver.streamId} started") }
咱們看到源碼中有這樣一句話:Create the RDD using the scheduledLocations to run the receiver in a Spark job
說明數據的接收者是以任務的方式運行在Worker節點上,這說明了SparkStreaming能夠極大話地利用集羣資源,各個節點 均可以接收數據,數據產生以後會放在BlockManager 裏邊(後續源碼繼續分析)。將數據存儲起來。
思考:數據被收集起來,那麼咱們真正的計算髮生在哪裏?先查看下WEBUI:
這裏咱們看到了咱們的代碼邏輯。任務仍是經過JobScheduler 這個類經過線程池的方式提交給集羣運行的。
附上JOB提交源碼:
def run() { try { val formattedTime = UIUtils.formatBatchTime( job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}" val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]" ssc.sc.setJobDescription( s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""") ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) // We need to assign `eventLoop` to a temp variable. Otherwise, because // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then // it's possible that when `post` is called, `eventLoop` happens to null. var _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis())) // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) } } else { // JobScheduler has been stopped. } } finally { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) } }
經過跟蹤源碼發現 job.run 方法指向了外部傳入的一個方法:
/** * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * to generate their own jobs. */ private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } }
也就是說是經過Dstream的generateJob方法來向集羣提交任務的(DStreamGraph調用generateJobs觸發了Dstream類 的generateJob這個方法)DStreamGraph記錄了Dstream的邏輯轉關係,最終將Dstream上的轉換關係回溯生成RDD實 例,構 成了RDD的DAG,觸發條件就是一個咱們自定義的一個Batch,這裏調用了getOrCompute 方法來回溯Dstream的 轉換, 生 成了RDD實例和RDD的DAG。
附上一個sparkStreaming 總體流程圖
從這裏咱們能得出什麼結論?
1.數據的收集發生在spark集羣中的Worker節點,數據接收器(Receiver),是以一個JOB來接收數據的!
2.真正的計算也發生在Worker上,spark集羣把任務分發到各個節點,及數據的接收在一個節點,而數據的收集在一個節點 (針對這次實踐)
3.sparkStreaming 中各個任務是配合起來工做的,至於爲什麼要這樣作後續繼續分析
4.數據本地性,上邊咱們看到任務的本地性是 PROCESS_LOCAL 這個說明了數據是在內存中,而咱們數據是在一個節上,那這裏的數據必然要通過網絡傳輸(須要通過Shuffle講數據放在計算的節點),每次數據收集的時候會將數據分片,並 且將數據分發到各個計算節點上。
5.sparkStreaming是以時間爲單位來生成JOB,本質上來說是加上了時間維度的批處理任務
三: 瞬間理解Spark Streaming本質
DStream是一個沒有邊界的集合,沒有大小的限制。
DStream表明了時空的概念。隨着時間的推移,裏面不斷產生RDD。
時間已固定,咱們就鎖定到空間的操做。
從空間的維度來說,就是處理層面。
對DStream的操做,構成了DStreamGraph。如如下圖例所示:
上圖中每一個foreach都會觸發一個做業,就會順着依賴從後往前回溯,造成DAG,以下圖所示:
空間維度肯定以後,隨着時間不斷推動,會不斷實例化RDD Graph,而後觸發Job去執行處,及上面所說的(generateJobs)這個方法。
再次理解官網的一段話:
Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.