本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法
SparkStreaming 是基於批處理的流式計算平臺,目前默認是200ms的間隔。SparkStreaming 會把數據流封裝成一個個批次,而後把多個批次的數據轉換成RDD,並交由BlockManger管理,最終以任務的方式進行提交DAG有向無環圖。apache
* Main entry point for Spark Streaming functionality. It provides methods used to create
* [[org.apache.spark.streaming.dstream.DStream]]s from various input sources. It can be either
* created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf
* configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext.
* The associated SparkContext can be accessed using `context.sparkContext`. After
* creating and transforming DStreams, the streaming computation can be started and stopped
* using `context.start()` and `context.stop()`, respectively.
* `context.awaitTermination()` allows the current thread to wait for the termination
* of the context by `stop()` or by an exception.
複製代碼
SparkStreaming基本案例緩存
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
複製代碼
StreamingContext到SparkContext轉換架構
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
new SparkContext(conf)
}
複製代碼
socketTextStream依賴關係app
(1)超級父類邏輯模板英文專業講解:框架
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see
* org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
* DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,
* etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by
* transforming existing DStreams using operations such as `map`,
* `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
* periodically generates a RDD, either from live data or by transforming the RDD generated by a
* parent DStream.
*
* This class contains the basic operations available on all DStreams, such as `map`, `filter` and
* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
* operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
* `join`. These operations are automatically available on any DStream of pairs
* (e.g., DStream[(Int, Int)] through implicit conversions.
*
* A DStream internally is characterized by a few basic properties:
* - A list of other DStreams that the DStream depends on
* - A time interval at which the DStream generates an RDD
* - A function that is used to generate an RDD after each time interval
複製代碼
(2) Dstream源碼段摘錄socket
abstract class DStream[T: ClassTag] (
@transient private[streaming] var ssc: StreamingContext
) extends Serializable with Logging {
validateAtInit()
// =======================================================================
// Methods that should be implemented by subclasses of DStream
// =======================================================================
/** Time interval after which the DStream generates an RDD */
def slideDuration: Duration
/** List of parent DStreams on which this DStream depends on */
def dependencies: List[DStream[_]]
/** Method that generates an RDD for the given time */
def compute(validTime: Time): Option[RDD[T]]
// =======================================================================
// Methods and fields available on all DStreams
// =======================================================================
複製代碼
// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
// Time zero for the DStream
private[streaming] var zeroTime: Time = null
// Duration for which the DStream will remember each RDD created
private[streaming] var rememberDuration: Duration = null
// Storage level of the RDDs in the stream
private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE
複製代碼
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
}
generatedRDDs.put(time, newRDD)
}
rddOption
} else {
None
}
}
}
複製代碼
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
* to generate their own jobs.
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
case None => None
}
}
複製代碼
目前Dstream的輸出觸發操做有:print,saveAsTextFiles,saveAsObjectFiles,saveAsHadoopFiles, foreachRDD。而這些輸出觸發操做會生成ForeachDStream對象。並註冊到DStreamGraph的成員outputStreams中。ide
final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
複製代碼
而StreamingContext中,DStreamGraph 是重要的成員,專門負責action操做。函數
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
_cp.graph.setContext(this)
_cp.graph.restoreCheckpointData()
_cp.graph
} else {
require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(_batchDur)
newGraph
}
}
複製代碼
各個Dstream對象的依賴關係和操做算子最終如何串成一條串呢?DStreamGraph會利用outputStreams進行回溯並生成Job,當StreamingContext啓動的時候,纔會真正執行算法鏈條。oop
能夠看到Dstream抽象父類模板的print函數實際上會定義ForeachFuc 和 ForeachRDD ,ForeachRDD中包含了ForeachDstream,而這個ForeachDstream最終會註冊到StreamingContext.
print 函數
* Print the first num elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
複製代碼
saveAsTextFiles 函數
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
this.foreachRDD(saveFunc, displayInnerRDDOps = false)
}
複製代碼
register 函數
* Register this streaming as an output stream. This would ensure that RDDs of this
* DStream will be generated.
private[streaming] def register(): DStream[T] = {
ssc.graph.addOutputStream(this)
this
}
複製代碼
private[streaming] val scheduler = new JobScheduler(this)
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
scheduler.start() <= 神來之筆
}
state = StreamingContextState.ACTIVE
scheduler.listenerBus.post(
StreamingListenerStreamingStarted(System.currentTimeMillis()))
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
logDebug("Adding shutdown hook") // force eager creation of logger
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown())
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}
複製代碼
Driver端:啓動receiverTracker => 用於數據接收,數據緩存,Block生成
Driver端:啓動jobGenerator => 用於DstreamGraph初始化,Dstream與RDD的轉換,生成Job,提交執行
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start() <= 神來之筆,耳聽八方
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams <= 神來之筆
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start()
receiverTracker = new ReceiverTracker(ssc) <= 神來之筆
inputInfoTracker = new InputInfoTracker(ssc) <= 神來之筆 用於管理全部的輸入流以及輸入的數據統計
val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
case _ => null
}
executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
executorAllocClient,
receiverTracker,
ssc.conf,
ssc.graph.batchDuration.milliseconds,
clock)
executorAllocationManager.foreach(ssc.addStreamingListener)
receiverTracker.start() <= 神來之筆
jobGenerator.start() <= 神來之筆
executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")
}
複製代碼
private var eventLoop: EventLoop[JobSchedulerEvent] = null
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
複製代碼
JobStarted
JobCompleted
ErrorReported
private def processEvent(event: JobSchedulerEvent) {
try {
event match {
case JobStarted(job, startTime) => handleJobStart(job, startTime)
case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
case ErrorReported(m, e) => handleError(m, e)
}
} catch {
case e: Throwable =>
reportError("Error in job scheduler", e)
}
}
複製代碼
本文重點解剖了StreamingContext啓動流程及Dtream 模板源碼,沒有參考任何網上博客,鄭重聲明爲原創內容,禁止轉載或用於商業用途。
秦凱新 於深圳 2018