這一章要講 Spark Streaming,講以前首先回顧下它的用法,具體用法請參照《Spark Streaming 編程指南》。html
val ssc = new StreamingContext(sparkConf, Seconds(1));
// 得到一個DStream負責鏈接 監聽端口:地址
val lines = ssc.socketTextStream(serverIP, serverPort);
// 對每一行數據執行Split操做
val words = lines.flatMap(_.split(" "));
// 統計word的數量
val pairs = words.map(word => (word, 1));
val wordCounts = pairs.reduceByKey(_ + _);
// 輸出結果
wordCounts.print();
ssc.start(); // 開始
ssc.awaitTermination(); // 計算完畢退出
複製代碼
一、首先實例化一個 StreamingContext編程
二、調用 StreamingContext 的 socketTextStreambash
三、對得到的 DStream 進行處理網絡
四、調用 StreamingContext 是 start 方法,而後等待socket
咱們看 StreamingContext 的 socketTextStream 方法吧。ide
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
複製代碼
一、StoageLevel 是 StorageLevel.MEMORY_AND_DISK_SER_2函數
二、使用 SocketReceiver 的 bytesToLines 把輸入流轉換成可遍歷的數據this
繼續看 socketStream 方法,它直接 new 了一個spa
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
複製代碼
繼續深刻挖掘 SocketInputDStream,追述一下它的繼承關係,SocketInputDStream>>ReceiverInputDStream>>InputDStream>>DStream。線程
具體實現 ReceiverInputDStream 的類有好幾個,基本上都是從網絡端來數據的。
它實現了 ReceiverInputDStream 的 getReceiver 方法,實例化了一個 SocketReceiver 來接收數據。
SocketReceiver 的 onStart 方法裏面調用了 receive 方法,處理代碼以下:
socket = new Socket(host, port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
複製代碼
一、new 了一個 Socket 來接收數據,用 bytesToLines 方法把 InputStream 轉換成一行一行的字符串。
二、把每一行數據用 store 方法保存起來,store 方法是從 SocketReceiver 的父類 Receiver 繼承而來,內部實現是:
def store(dataItem: T) {
executor.pushSingle(dataItem)
}
複製代碼
executor 是 ReceiverSupervisor 類型,Receiver 的操做都是由它來處理。這裏先不深糾,後面咱們再說這個 pushSingle 的實現。
到這裏咱們知道 lines 的類型是 SocketInputDStream,而後對它是一頓的轉換,flatMap、map、reduceByKey、print,這些方法都不是 RDD 的那種方法,而是 DStream 獨有的。
講到上面這幾個方法,咱們開始轉入 DStream 了,flatMap、map、reduceByKey、print 方法都涉及到 DStream 的轉換,這和 RDD 的轉換是相似的。咱們講一下 reduceByKey 和 print。
reduceByKey 方法和 RDD 同樣,調用的 combineByKey 方法實現的,不同的是它直接 new 了一個 ShuffledDStream 了,咱們接着看一下它的實現吧。
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
case None => None
}
}
複製代碼
在 compute 階段,對經過 Time 得到的 rdd 進行 reduceByKey 操做。接下來的 print 方法也是一個轉換:
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
複製代碼
打印前十個,超過 10 個打印 "..."。須要注意 register 方法。
ssc.graph.addOutputStream(this)
複製代碼
它會把代碼插入到當前的 DStream 添加到 outputStreams 裏面,後面輸出的時候若是沒有 outputStream 就不會有輸出,這個須要記住哦!
前戲結束以後,ssc.start() 高潮開始了。 start 方法很小,最核心的一句是 JobScheduler 的 start 方法。咱們得轉到 JobScheduler 方法上面去。
下面是 start 方法的代碼:
def start(): Unit = synchronized {
  // 接受到JobSchedulerEvent就處理事件
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobSchedulerEvent => processEvent(event)
}
}), "JobScheduler")
listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
receiverTracker.start()
jobGenerator.start()
}
複製代碼
一、啓動了一個 Actor 來處理 JobScheduler 的 JobStarted、JobCompleted、ErrorReported 事件。
二、啓動 StreamingListenerBus 做爲監聽器。
三、啓動 ReceiverTracker。
四、啓動 JobGenerator。
咱們接下來看看 ReceiverTracker 的 start 方法。
def start() = synchronized {if (!receiverInputStreams.isEmpty) {
actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), "ReceiverTracker")
receiverExecutor.start()
}
}
複製代碼
一、首先判斷了一下 receiverInputStreams 不能爲空,那 receiverInputStreams 是怎麼時候寫入值的呢?答案在 SocketInputDStream 的父類 InputDStream 當中,當實例化 InputDStream 的時候會在 DStreamGraph 裏面添加 InputStream。
abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) {
ssc.graph.addInputStream(this)
//....
}
複製代碼
二、實例化 ReceiverTrackerActor,它負責 RegisterReceiver(註冊 Receiver)、AddBlock、ReportError(報告錯誤)、DeregisterReceiver(註銷 Receiver)等事件的處理。
三、啓動 receiverExecutor(實際類是 ReceiverLauncher,這名字起得。。),它主要負責啓動 Receiver,start 方法裏面調用了 startReceivers 方法吧。
private def startReceivers() {
   // 對應着上面的那個例子,getReceiver方法得到是SocketReceiver
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
// 查看是否全部的receivers都有優先選擇機器,這個須要重寫Receiver的preferredLocation方法,目前只有FlumeReceiver重寫了
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
// 建立一個並行receiver集合的RDD, 把它們分散到各個worker節點上
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}
// 在worker節點上啓動Receiver的方法,遍歷全部Receiver,而後啓動
val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException("Could not start receiver as object not found.")
}
val receiver = iterator.next()
val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
executor.start()
executor.awaitTermination()
}
// 運行這個重複的做業來確保全部的slave都已經註冊了,避免全部的receivers都到一個節點上
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
// 把receivers分發出去,啓動
ssc.sparkContext.runJob(tempRDD, startReceiver)
}
複製代碼
一、遍歷 receiverInputStreams 獲取全部的 Receiver。
二、查看這些 Receiver 是否全都有優先選擇機器。
三、把 SparkContext 的 makeRDD 方法把全部 Receiver 包裝到 ParallelCollectionRDD 裏面,並行度是 Receiver 的數量。
四、發個小任務給確保全部的 slave 節點都已經註冊了(這個小任務有點兒莫名其妙,感受怪怪的)。
五、提交做業,啓動全部 Receiver。
Spark 寫得實在是太巧妙了,竟然能夠把 Receiver 包裝在 RDD 裏面,當作是數據來處理!
啓動 Receiver 的時候,new 了一個 ReceiverSupervisorImpl,而後調的 start 方法,主要乾了這麼三件事情,代碼就不貼了。
一、啓動 BlockGenerator。
二、調用 Receiver 的 OnStart 方法,開始接受數據,並把數據寫入到 ReceiverSupervisor。
三、調用 onReceiverStart 方法,發送 RegisterReceiver 消息給 driver 報告本身啓動了。
ok,到了這裏,重點落到了 BlockGenerator。前面說到 SocketReceiver 把接受到的數據調用 ReceiverSupervisor 的 pushSingle 方法保存。
// 這是ReceiverSupervisorImpl的方法
def pushSingle(data: Any) {
blockGenerator += (data)
}
// 這是BlockGenerator的方法
def += (data: Any): Unit = synchronized {
currentBuffer += data
}
複製代碼
咱們看一下它的 start 方法吧。
def start() {
blockIntervalTimer.start()
blockPushingThread.start()
}
複製代碼
它啓動了一個定時器 RecurringTimer 和一個線程執行 keepPushingBlocks 方法。
先看 RecurringTimer 的實現:
while (!stopped) {
clock.waitTillTime(nextTime)
callback(nextTime)
prevTime = nextTime
nextTime += period
}
複製代碼
每隔一段時間就執行 callback 函數,callback 函數是 new 的時候傳進來的,是 BlockGenerator 的 updateCurrentBuffer 方法。
private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
if (newBlockBuffer.size > 0) {
val blockId = StreamBlockId(receiverId, time - blockInterval)
val newBlock = new Block(blockId, newBlockBuffer)
blocksForPushing.put(newBlock)
}
} catch {case t: Throwable =>
reportError("Error in block updating thread", t)
}
}
複製代碼
它 new 了一個 Block 出來,而後添加到 blocksForPushing 這個 ArrayBlockingQueue 隊列當中。
提到這裏,有兩個參數須要你們注意的:
spark.streaming.blockInterval 默認值是200
spark.streaming.blockQueueSize 默認值是10
複製代碼
這是前面提到的間隔時間和隊列的長度,間隔時間默認是 200 毫秒,隊列是最多能容納 10 個 Block,多了就要阻塞了。
咱們接下來看一下 BlockGenerator 另外啓動的那個線程執行的 keepPushingBlocks 方法到底在幹什麼?
private def keepPushingBlocks() {
    while(!stopped) {
Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
}
   // ...退出以前把剩下的也輸出去了
}
複製代碼
它在把 blocksForPushing 中的 block 不停的拿出來,調用 pushBlock 方法,這個方法屬於在實例化 BlockGenerator 的時候,從 ReceiverSupervisorImpl 傳進來的 BlockGeneratorListener 的。
private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
def onError(message: String, throwable: Throwable) {
reportError(message, throwable)
}
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
pushArrayBuffer(arrayBuffer, None, Some(blockId))
}
}, streamId, env.conf)
複製代碼
一、reportError,經過 actor 向 driver 發送錯誤報告消息 ReportError。
二、調用 pushArrayBuffer 保存數據。
下面是 pushArrayBuffer 方法:
def pushArrayBuffer(arrayBuffer: ArrayBuffer[_], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], storageLevel, tellMaster = true)
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
}
複製代碼
一、把 Block 保存到 BlockManager 當中,序列化方式爲以前提到的 StorageLevel.MEMORY_AND_DISK_SER_2(內存不夠就寫入到硬盤,而且在 2 個節點上保存的方式)。
二、調用 reportPushedBlock 給 driver 發送 AddBlock 消息,報告新添加的 Block,ReceiverTracker 收到消息以後更新內部的 receivedBlockInfo 映射關係。
前面只講了數據的接收和保存,那數據是怎麼處理的呢?
以前一直講 ReceiverTracker,而忽略了以前的 JobScheduler 的 start 方法裏面最後啓動的 JobGenerator。
def start(): Unit = synchronized {
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobGeneratorEvent => processEvent(event)
}
}), "JobGenerator")
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}
複製代碼
一、啓動一個 actor 處理 JobGeneratorEvent 事件。
二、若是是已經有 CheckPoint 了,就接着上次的記錄進行處理,不然就是第一次啓動。
咱們先看 startFirstTime 吧,CheckPoint 之後再說吧,有點兒小複雜。
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
}
複製代碼
一、timer.getStartTime 計算出來下一個週期的到期時間,計算公式:(math.floor(clock.currentTime.toDouble / period) + 1).toLong * period,以當前的時間 / 除以間隔時間,再用 math.floor 求出它的上一個整數(即上一個週期的到期時間點),加上 1,再乘以週期就等於下一個週期的到期時間。
二、啓動 DStreamGraph,啓動時間 = startTime - graph.batchDuration。
三、啓動 Timer,咱們看看它的定義:
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
複製代碼
到這裏就清楚了,DStreamGraph 的間隔時間就是 timer 的間隔時間,啓動時間要設置成比 Timer 早一個時間間隔,緣由再慢慢探究。
能夠看出來每隔一段時間,Timer 給 eventActor 發送 GenerateJobs 消息,咱們直接去看它的處理方法 generateJobs 吧,中間忽略了一步,你們本身看。
private def processEvent(event: JobGeneratorEvent) {
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time) => doCheckpoint(time)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
複製代碼
下面是 generateJobs 方法。
private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
}
複製代碼
一、DStreamGraph 生成 jobs。
二、從 stream 那裏獲取接收到的 Block 信息。
三、調用 submitJobSet 方法提交做業。
四、提交完做業以後,作一個 CheckPoint。
先看 DStreamGraph 是怎麼生成的 jobs。
def generateJobs(time: Time): Seq[Job] = {
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
}
jobs
}
複製代碼
outputStreams 在這個例子裏面是 print 這個方法裏面添加的,這個在前面說了,咱們繼續看 DStream 的 generateJob。
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
複製代碼
一、調用 getOrCompute 方法得到 RDD
二、new 了一個方法去提交這個做業,缺什麼都不作
爲何呢?這是直接跳轉的錯誤,呵呵,由於這個 outputStream 是 print 方法返回的,它應該是 ForEachDStream,因此咱們應該看的是它裏面的 generateJob 方法。
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
複製代碼
這裏請你們千萬要注意,不要在這塊被卡住了。
咱們看看它這個 RDD 是怎麼出來的吧。
private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
// If this DStream was not initialized (i.e., zeroTime not set), then do it
// If RDD was already generated, then retrieve it from HashMap
generatedRDDs.get(time) match {
// 這個RDD已經被生成過了,直接用就是了
case Some(oldRDD) => Some(oldRDD)
// 還沒生成過,就調用compte函數生成一個
case None => {
if (isTimeValid(time)) {
compute(time) match {
case Some(newRDD) =>
         // 設置保存的級別
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
}
         // 若是如今須要,就作CheckPoint
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
}
         // 添加到generatedRDDs裏面去,能夠再次利用
generatedRDDs.put(time, newRDD)
Some(newRDD)
case None =>
None
}
} else {
None
}
}
}
}
複製代碼
從上面的方法能夠看出來它是經過每一個 DStream 本身實現的 compute 函數得出來的 RDD。咱們找到 SocketInputDStream,沒有 compute 函數,在父類 ReceiverInputDStream 裏面找到了。
override def compute(validTime: Time): Option[RDD[T]] = {
// 若是出現了時間比startTime早的話,就返回一個空的RDD,由於這個極可能是master掛了以後的錯誤恢復
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}
複製代碼
經過 DStream 的 id 把 receiverTracker 當中把接收到的 block 信息所有拿出來,記錄到 ReceiverInputDStream 自身的receivedBlockInfo 這個 HashMap 裏面,就把 RDD 返回了,RDD 裏面實際包含的是 Block 的 id 的集合。
如今咱們就能夠回到以前 JobGenerator 的 generateJobs 方法,咱們就清楚它這句是提交的什麼了。
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
複製代碼
JobSet 是記錄 Job 的完成狀況的,直接看 submitJobSet 方法吧。
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
} else {
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
}
}
複製代碼
遍歷 jobSet 裏面的全部 jobs,經過 jobExecutor 這個線程池提交。咱們看一下 JobHandler 就知道了。
private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
job.run()
eventActor ! JobCompleted(job)
}
}
複製代碼
一、通知 eventActor 處理 JobStarted 事件。
二、運行 job。
三、通知 eventActor 處理 JobCompleted 事件。
這裏的重點是 job.run,事件處理只是更新相關的 job 信息。
def run() {
result = Try(func())
}
複製代碼
在遍歷 BlockRDD 的時候,在 compute 函數獲取該 Block(詳細請看 BlockRDD),而後對這個 RDD 的結果進行打印。
到這裏就算結束了,最後來個總結吧,圖例在下一章補上,這一章只是過程分析:
一、能夠有多個輸入,咱們能夠經過 StreamingContext 定義多個輸入,好比咱們監聽多個(host,ip),能夠給它們定義各自的處理邏輯和輸出,輸出方式不只限於 print 方法,還能夠有別的方法,saveAsTextFiles 和 saveAsObjectFiles。這塊的設計是支持共享 StreamingContext 的。
二、StreamingContext 啓動了 JobScheduler,JobScheduler 啓動 ReceiverTracker 和 JobGenerator。
三、ReceiverTracker 是經過把 Receiver 包裝成 RDD 的方式,發送到 Executor 端運行起來的,Receiver 起來以後向 ReceiverTracker 發送 RegisterReceiver 消息。
三、Receiver 把接收到的數據,經過 ReceiverSupervisor 保存。
四、ReceiverSupervisorImpl 把數據寫入到 BlockGenerator 的一個 ArrayBuffer 當中。
五、BlockGenerator 內部每一個一段時間(默認是 200 毫秒)就把這個 ArrayBuffer 構形成 Block 添加到 blocksForPushing 當中。
六、BlockGenerator 的另一條線程則不斷的把加入到 blocksForPushing 當中的 Block 寫入到 BlockManager 當中,並向 ReceiverTracker 發送 AddBlock 消息。
七、JobGenerator 內部有個定時器,按期生成 Job,經過 DStream 的 id,把 ReceiverTracker 接收到的 Block 信息從 BlockManager 上抓取下來進行處理,這個間隔時間是咱們在實例化 StreamingContext 的時候傳進去的那個時間,在這個例子裏面是 Seconds(1)。