本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。java
* Abstract class of a receiver that can be run on worker nodes to receive external data. A
* custom receiver can be defined by defining the functions `onStart()` and `onStop()`. `onStart()`
* should define the setup steps necessary to start receiving data,
* and `onStop()` should define the cleanup steps necessary to stop receiving data.
* Exceptions while receiving can be handled either by restarting the receiver with `restart(...)`
* or stopped completely by `stop(...)`.
複製代碼
_supervisor :ReceiverSupervisor 啓動時威力無窮,坐擁乾坤,主要管理Executor端的數據存儲。(下一節重點剖析)node
store() :顧名思義,就是數據流batch 存儲算法
@DeveloperApi
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
/**
* This method is called by the system when the receiver is started. This function
* must initialize all resources (threads, buffers, etc.) necessary for receiving data.
* This function must be non-blocking, so receiving the data must occur on a different
* thread. Received data can be stored with Spark by calling `store(data)`.
*
* If there are errors in threads started here, then following options can be done
* (i) `reportError(...)` can be called to report the error to the driver.
* The receiving of data will continue uninterrupted.
* (ii) `stop(...)` can be called to stop receiving data. This will call `onStop()` to
* clear up all resources allocated (threads, buffers, etc.) during `onStart()`.
* (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()`
* immediately, and then `onStart()` after a delay.
*/
def onStart(): Unit
/**
* This method is called by the system when the receiver is stopped. All resources
* (threads, buffers, etc.) set up in `onStart()` must be cleaned up in this method.
*/
def onStop(): Unit
/** Override this to specify a preferred location (hostname). */
def preferredLocation: Option[String] = None
/**
* Store a single item of received data to Spark's memory.
* These single items will be aggregated together into data blocks before
* being pushed into Spark's memory.
*/
def store(dataItem: T) {
supervisor.pushSingle(dataItem)
}
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
def store(dataBuffer: ArrayBuffer[T]) {
supervisor.pushArrayBuffer(dataBuffer, None, None)
}
/**
* Store an ArrayBuffer of received data as a data block into Spark's memory.
* The metadata will be associated with this block of data
* for being used in the corresponding InputDStream.
*/
def store(dataBuffer: ArrayBuffer[T], metadata: Any) {
supervisor.pushArrayBuffer(dataBuffer, Some(metadata), None)
}
/** Store an iterator of received data as a data block into Spark's memory. */
def store(dataIterator: Iterator[T]) {
supervisor.pushIterator(dataIterator, None, None)
}
/**
* Store an iterator of received data as a data block into Spark's memory.
* The metadata will be associated with this block of data
* for being used in the corresponding InputDStream.
*/
def store(dataIterator: java.util.Iterator[T], metadata: Any) {
supervisor.pushIterator(dataIterator.asScala, Some(metadata), None)
}
/** Store an iterator of received data as a data block into Spark's memory. */
def store(dataIterator: java.util.Iterator[T]) {
supervisor.pushIterator(dataIterator.asScala, None, None)
}
/**
* Store an iterator of received data as a data block into Spark's memory.
* The metadata will be associated with this block of data
* for being used in the corresponding InputDStream.
*/
def store(dataIterator: Iterator[T], metadata: Any) {
supervisor.pushIterator(dataIterator, Some(metadata), None)
}
/**
* Store the bytes of received data as a data block into Spark's memory. Note
* that the data in the ByteBuffer must be serialized using the same serializer
* that Spark is configured to use.
*/
def store(bytes: ByteBuffer) {
supervisor.pushBytes(bytes, None, None)
}
/**
* Store the bytes of received data as a data block into Spark's memory.
* The metadata will be associated with this block of data
* for being used in the corresponding InputDStream.
*/
def store(bytes: ByteBuffer, metadata: Any) {
supervisor.pushBytes(bytes, Some(metadata), None)
}
/** Report exceptions in receiving data. */
def reportError(message: String, throwable: Throwable) {
supervisor.reportError(message, throwable)
}
/**
* Restart the receiver. This method schedules the restart and returns
* immediately. The stopping and subsequent starting of the receiver
* (by calling `onStop()` and `onStart()`) is performed asynchronously
* in a background thread. The delay between the stopping and the starting
* is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
* The `message` will be reported to the driver.
*/
def restart(message: String) {
supervisor.restartReceiver(message)
}
/**
* Restart the receiver. This method schedules the restart and returns
* immediately. The stopping and subsequent starting of the receiver
* (by calling `onStop()` and `onStart()`) is performed asynchronously
* in a background thread. The delay between the stopping and the starting
* is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
* The `message` and `exception` will be reported to the driver.
*/
def restart(message: String, error: Throwable) {
supervisor.restartReceiver(message, Some(error))
}
/**
* Restart the receiver. This method schedules the restart and returns
* immediately. The stopping and subsequent starting of the receiver
* (by calling `onStop()` and `onStart()`) is performed asynchronously
* in a background thread.
*/
def restart(message: String, error: Throwable, millisecond: Int) {
supervisor.restartReceiver(message, Some(error), millisecond)
}
/** Stop the receiver completely. */
def stop(message: String) {
supervisor.stop(message, None)
}
/** Stop the receiver completely due to an exception */
def stop(message: String, error: Throwable) {
supervisor.stop(message, Some(error))
}
/** Check if the receiver has started or not. */
def isStarted(): Boolean = {
supervisor.isReceiverStarted()
}
/**
* Check if receiver has been marked for stopping. Use this to identify when
* the receiving of data should be stopped.
*/
def isStopped(): Boolean = {
supervisor.isReceiverStopped()
}
/**
* Get the unique identifier the receiver input stream that this
* receiver is associated with.
*/
def streamId: Int = id
/*
* =================
* Private methods
* =================
*/
/** Identifier of the stream this receiver is associated with. */
private var id: Int = -1
/** Handler object that runs the receiver. This is instantiated lazily in the worker. */
@transient private var _supervisor: ReceiverSupervisor = null
/** Set the ID of the DStream that this receiver is associated with. */
private[streaming] def setReceiverId(_id: Int) {
id = _id
}
/** Attach Network Receiver executor to this receiver. */
private[streaming] def attachSupervisor(exec: ReceiverSupervisor) {
assert(_supervisor == null)
_supervisor = exec
}
/** Get the attached supervisor. */
private[streaming] def supervisor: ReceiverSupervisor = {
assert(_supervisor != null,
"A ReceiverSupervisor has not been attached to the receiver yet. Maybe you are starting " +
"some computation in the receiver before the Receiver.onStart() has been called.")
_supervisor
}
複製代碼
創建通信終端,方便整個Streaming狀態監控。緩存
launchReceivers() :本身給本身 endpoint 終端發送StartAllReceivers,而後啓動Receiver RDD,把它做爲Job,而後由SparkContext提交Job,最後Receiver 就能夠在Executor上啓動。架構
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers() <= 神來之筆
logInfo("ReceiverTracker started")
trackerState = Started
}
}
複製代碼
做爲信使,我就是要發StartAllReceivers(receivers)消息。框架
val receivers = receiverInputStreams.map { nis =>
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
} <= 神來之筆
runDummySparkJob()
logInfo("Starting " + receivers.length + " receivers")
endpoint.send(StartAllReceivers(receivers)) <= 神來之筆
複製代碼
}async
/** RpcEndpoint to receive messages from the receivers. */
private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
private val walBatchingThreadPool = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool"))
@volatile private var active: Boolean = true
override def receive: PartialFunction[Any, Unit] = {
// Local messages
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
<= 神來之筆(以最優位置推薦,肯定receiver在哪一個Executor上啓動)
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId) <= 神來之筆
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors) <= 神來之筆(循環全部receiver以最優位置啓動)
}
case RestartReceiver(receiver) =>
// Old scheduled executors minus the ones that are not active any more
val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
// Try global scheduling again
oldScheduledExecutors
} else {
val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
// Clear "scheduledLocations" to indicate we are going to do local scheduling
val newReceiverInfo = oldReceiverInfo.copy(
state = ReceiverState.INACTIVE, scheduledLocations = None)
receiverTrackingInfos(receiver.streamId) = newReceiverInfo
schedulingPolicy.rescheduleReceiver(
receiver.streamId,
receiver.preferredLocation,
receiverTrackingInfos,
getExecutors)
}
// Assume there is one receiver restarting at one time, so we don't need to update
// receiverTrackingInfos
startReceiver(receiver, scheduledLocations)
case c: CleanupOldBlocks =>
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
case UpdateReceiverRateLimit(streamUID, newRate) =>
for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
eP.send(UpdateRateLimit(newRate))
}
// Remote messages
case ReportError(streamId, message, error) =>
reportError(streamId, message, error)
}
複製代碼
startReceiverFunc :receiverRDD的func 函數,主要執行在Executor中,startReceiverFunc方法體中包含須要啓動的 ReceiverSupervisorImpl ,ReceiverSupervisorImpl是具體的數據接收執行者。內部封裝了BlockGenerator:ide
private val registeredBlockGenerators = new ConcurrentLinkedQueue[BlockGenerator]()
複製代碼
BlockGenerator 中的 blockIntervalTimer 和 blockPushingThread 負責了整個實時數據流的batch到Block的過程。函數
startReceiver 核心代碼段,從receiverRDD到Job提交,最終executor端的ReceiverSupervisorImpl.start()oop
* Start a receiver along with its scheduled executors
private def startReceiver(
receiver: Receiver[_],
scheduledLocations: Seq[TaskLocation]): Unit = {
def shouldStartReceiver: Boolean = {
// It's okay to start when trackerState is Initialized or Started
!(isTrackerStopping || isTrackerStopped)
}
val receiverId = receiver.streamId
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
return
}
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf =
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(ThreadUtils.sameThread)
logInfo(s"Receiver ${receiver.streamId} started")
}
複製代碼
ReceiverTracker 內部 receiver RDD的架構圖
下圖深度剖析了ReceiverTracker中如何實現 receiver RDD 的Job提交流程
最終Driver端的ReceiverTracker 負責把receiver在不一樣的Executor中啓動,從而使得receiver能夠不間斷的進行數據的批處理存儲及BlcokManger管理。
秦凱新 於深圳 2018