第10課:Spark Streaming源碼解讀之流數據不斷接收全生命週期完全研究和思考

  

本期內容:apache

1, 數據接收架構設計模式設計模式

2, 數據接收源碼完全研究架構

 

Spark Streaming應用程序有如下特色:ide

1,不斷持續接收數據oop

3, Receiver和Driver不在同一節點中性能

 

接收數據,存儲數據,彙報數據的metedata給Driverspa

數據接收的過程相似於MVC線程

Mode:Driver架構設計

View:Receiver設計

Control:ReceiverSupervisorImpl

Receiver的啓動由ReceiverSupervisorImpl來控制,Receiver接收到數據交給ReceiverSupervisorImpl來存儲。

RDD中的元素必需要實現序列化,才能將RDD序列化給Executor端。Receiver就實現了Serializable接口。

// Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
  if (scheduledLocations.isEmpty) {
    ssc.sc.makeRDD(Seq(receiver), 1)
  } else {
    val preferredLocations = scheduledLocations.map(_.toString).distinct
    ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
  }

 

@DeveloperApi
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {

 

處理Receiver接收到的數據,存儲數據並彙報給Driver,Receiver是一條一條的接收數據的。

/**
 * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
 * which provides all the necessary functionality for handling the data received by
 * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]]
 * object that is used to divide the received data stream into blocks of data.
 */
private[streaming] class ReceiverSupervisorImpl(
    receiver: Receiver[_],
    env: SparkEnv,
    hadoopConf: Configuration,
    checkpointDirOption: Option[String]
  ) extends ReceiverSupervisor(receiver, env.conf) with Logging {

經過限定數據存儲速度來實現限流

接收數據,合併成buffer,放入block隊列

在ReceiverSupervisorImpl啓動會調用BlockGenerator對象的start方法。

那麼BlockGenerator類是用來幹什麼的呢?從源碼上註釋能夠說明該類來把一個Receiver接收到的數據合併到一個Block而後寫入到BlockManager中。該類內部有兩個線程,一個是週期性把數據生成一批對象,而後把先前的一批數據封裝成Block。另外一個線程時把Block寫入到BlockManager中。

 

BlockGenerator類繼承自ReateLimiter類,說明咱們不能限定接收數據的速度,可是能夠限定存儲數據的速度,轉過來就限定流動的速度。

BlockGenerator類有一個定時器(默認每200ms將接收到的數據合併成block)和一個線程(把block寫入到BlockManager),200ms會產生一個Block,即1秒鐘生成5個Partition。過小則生成的數據片中數據過小,致使一個Task處理的數據少,性能差。實際經驗獲得不要低於50ms。

那BlockGenerator是怎麼被建立的?

 

BlockGenerator類中的定時器會回調updateCurrentBuffer方法。

Receiver不斷的接收數據,BlockGenerator類經過一個定時器,把Receiver接收到的數據,把多條合併成Block,再放入到Block隊列中。

 

運行在Executor端的ReceiverSupervisorImpl須要與Driver端的ReceiverRacker進行通訊,傳遞元數據信息metedata,其中ReceiverSupervisorImpl經過RPC的名稱獲取到ReceiverRacker的遠程調用。

在ReceiverTracker調用start方法啓動的時候,會以ReceiverTracker的名稱建立RPC通訊體。ReceiverSupervisorImpl就是和這個RPC通訊體進行消息交互的。

在ReceiverTrackerEndpoint接收到ReceiverSupervisorImpl發送的註冊消息,把其RpcEndpoint保存起來。

對應的Executor端的ReceiverSupervisorImpl也會建立Rpc消息通訊體,來接收來自Driver端ReceiverTacker的消息。

BlockGenerator類中的線程每隔10ms從隊列中獲取Block,寫入到BlockManager中。

相關文章
相關標籤/搜索