第12課:Spark Streaming源碼解讀之Executor容錯安全性

本篇博文的目標是 
1. Executor的WAL機制詳解 
2. 消息重放Kafkajava

數據安全性的考慮:node

  1. Spark Streaming不斷的接收數據,而且不斷的產生Job,不斷的提交Job給集羣運行。因此這就涉及到一個很是重要的問題數據安全性。
  2. Spark Streaming是基於Spark Core之上的,若是可以確保數據安全可好的話,在Spark Streaming生成Job的時候裏面是基於RDD,即便運行的時候出現問題,那麼Spark Streaming也能夠藉助Spark Core的容錯機制自動容錯。
  3. 對Executor容錯主要是對數據的安全容錯
  4. 爲啥這裏不考慮對數據計算的容錯:計算的時候Spark Streaming是藉助於Spark Core之上的容錯的,因此自然就是安全可靠的。

Executor容錯方式: 
1. 最簡單的容錯是副本方式,基於底層BlockManager副本容錯,也是默認的容錯方式。 
2. 接收到數據以後不作副本,支持數據重放,所謂重放就是支持反覆讀取數據。python

這裏寫圖片描述

BlockManager備份:apache

  1. 默認在內存中兩份副本,也就是Spark Streaming的Receiver接收到數據以後存儲的時候指定StorageLevel爲MEMORY_AND_DISK_SER_2,底層存儲是交給BlockManager,BlockManager的語義確保了若是指定了兩份副本,通常都在內存中。因此至少兩個Executor中都會有數據。
/**
 * :: DeveloperApi ::
 * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, * or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or * ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether * to replicate the RDD partitions on multiple nodes. * * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants * for commonly useful storage levels. To create your own storage level object, use the * factory method of the singleton object (`StorageLevel(...)`). */ @DeveloperApi class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable { 
  •  
2.  ReceiverBlockHandler源碼以下:
private val receivedBlockHandler: ReceivedBlockHandler = { //若是要開啓WAL必需要有checkpoint目錄。 if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { if (checkpointDirOption.isEmpty) { throw new SparkException( "Cannot enable receiver write-ahead log without checkpoint directory set. " + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + "See documentation for more details.") } new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) } else { new BlockManagerBasedBlockHandler(env.blockManager, //此時的storageLevel是構建Receiver的時候傳入進來的 receiver.storageLevel) } } 
  •  
3.  默認沒有開啓WAL機制。
/** A helper class with utility functions related to the WriteAheadLog interface */ private[streaming] object WriteAheadLogUtils extends Logging { val RECEIVER_WAL_ENABLE_CONF_KEY = "spark.streaming.receiver.writeAheadLog.enable" val RECEIVER_WAL_CLASS_CONF_KEY = "spark.streaming.receiver.writeAheadLog.class" val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY = "spark.streaming.receiver.writeAheadLog.rollingIntervalSecs" val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.receiver.writeAheadLog.maxFailures" val RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY = "spark.streaming.receiver.writeAheadLog.closeFileAfterWrite" val DRIVER_WAL_CLASS_CONF_KEY = "spark.streaming.driver.writeAheadLog.class" val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY = "spark.streaming.driver.writeAheadLog.rollingIntervalSecs" val DRIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.driver.writeAheadLog.maxFailures" val DRIVER_WAL_BATCHING_CONF_KEY = "spark.streaming.driver.writeAheadLog.allowBatching" val DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY = "spark.streaming.driver.writeAheadLog.batchingTimeout" val DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY = "spark.streaming.driver.writeAheadLog.closeFileAfterWrite" val DEFAULT_ROLLING_INTERVAL_SECS = 60 val DEFAULT_MAX_FAILURES = 3 def enableReceiverLog(conf: SparkConf): Boolean = { conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false) } 
  •  
4.  例如socketTextStream源碼以下:
/**
 * Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ def socketTextStream( hostname: String, port: Int, //初始化了storageLevel storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) 
  •  
5.  BlockManagerBasedBlockHandler源碼以下:
/** * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which * stores the received blocks into a block manager with the specified storage level. */ private[streaming] class BlockManagerBasedBlockHandler( blockManager: BlockManager, storageLevel: StorageLevel) extends ReceivedBlockHandler with Logging { def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { var numRecords = None: Option[Long] val putResult: Seq[(BlockId, BlockStatus)] = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true) case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) val putResult = blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true) numRecords = countIterator.count putResult case ByteBufferBlock(byteBuffer) => blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) case o => throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") } if (!putResult.map { _._1 }.contains(blockId)) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } BlockManagerBasedStoreResult(blockId, numRecords) } def cleanupOldBlocks(threshTime: Long) { // this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing // of BlockRDDs. } } 
  •  
6.  具體實現是經過putIterator。
def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true, effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { require(values != null, "Values is null") doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel) } 
  •  
7.  doPut源碼以下:
// If we're storing bytes, then initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. val replicationFuture = data match { case b: ByteBufferValues if putLevel.replication > 1 => // Duplicate doesn't copy the bytes, but just creates a wrapper val bufferView = b.buffer.duplicate() Future { // This is a blocking action and should run in futureExecutionContext which is a cached // thread pool} //經過replicate將數據備份到其餘節點上。 replicate(blockId, bufferView, putLevel) }(futureExecutionContext) case _ => null } 
  •  
8.  replicate源碼以下:把數據備份到另外一個節點。
/** * Replicate block to another node. Not that this is a blocking call that returns after * the block has been replicated. */ private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) val numPeersToReplicateTo = level.replication - 1 val peersForReplication = new ArrayBuffer[BlockManagerId] val peersReplicatedTo = new ArrayBuffer[BlockManagerId] val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) val startTime = System.currentTimeMillis val random = new Random(blockId.hashCode) 
  •  

WAL方式 
1. 幹其餘事情以前寫入log日誌中。將此日誌寫入目錄下,也就是checkpoint目錄下。若是做業失敗的話,能夠基於此日誌進行恢復。安全

private val receivedBlockHandler: ReceivedBlockHandler = { if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { if (checkpointDirOption.isEmpty) { throw new SparkException( "Cannot enable receiver write-ahead log without checkpoint directory set. " + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + "See documentation for more details.") } //由於可能有好幾個receiver,因此這裏須要streamId. new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) } else { //而BlockManager是基於RDD容錯的,因此就不須要了。 new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) } } 
  •  
2.  ReceivedBlockHandler源碼以下:實現了ReceiverBlockHandler
/** * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which * stores the received blocks in both, a write ahead log and a block manager. */ private[streaming] class WriteAheadLogBasedBlockHandler( blockManager: BlockManager, streamId: Int, storageLevel: StorageLevel, conf: SparkConf, hadoopConf: Configuration, checkpointDir: String, clock: Clock = new SystemClock ) extends ReceivedBlockHandler with Logging { 
  •  
3.  使用WAL,就不必將replication變成2份。WAL是寫到checkpoint目錄中,而checkpoint是保持在HDFS中,HDFS默認是3份副本。
private val effectiveStorageLevel = {
  if (storageLevel.deserialized) { logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" + s" write ahead log is enabled, change to serialization false") } if (storageLevel.replication > 1) { logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " + s"write ahead log is enabled, change to replication 1") } 
  •  
4.  存儲數據的時候是同時往WAL和BlockManager中放數據。
/** * This implementation stores the block into the block manager as well as a write ahead log. * It does this in parallel, using Scala Futures, and returns only after the block has * been stored in both places. */ def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { var numRecords = None: Option[Long] // Serialize the block so that it can be inserted into both val serializedBlock = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.dataSerialize(blockId, arrayBuffer.iterator) case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) val serializedBlock = blockManager.dataSerialize(blockId, countIterator) numRecords = countIterator.count serializedBlock case ByteBufferBlock(byteBuffer) => byteBuffer case _ => throw new Exception(s"Could not push $blockId to block manager, unexpected block type") } 
  •  
5.  而後將數據存儲到BlockManager中。
// Store the block in block manager val storeInBlockManagerFuture = Future { val putResult = blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true) if (!putResult.map { _._1 }.contains(blockId)) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } } 
  •  
6.  使用write方法寫入到log中
// Store the block in write ahead log val storeInWriteAheadLogFuture = Future { //block自己要可序列化。 writeAheadLog.write(serializedBlock, clock.getTimeMillis()) } 
  •  
7.  WAL寫數據的時候是順序寫,數據不可修改,因此讀的時候只須要按照指針(也就是要讀的record在那,長度是多少)讀便可。因此WAL的速度很是快。
/** * :: DeveloperApi :: * * This abstract class represents a write ahead log (aka journal) that is used by Spark Streaming * to save the received data (by receivers) and associated metadata to a reliable storage, so that * they can be recovered after driver failures. See the Spark documentation for more information * on how to plug in your own custom implementation of a write ahead log. */ @org.apache.spark.annotation.DeveloperApi public abstract class WriteAheadLog { Record handle包含了全部的讀和寫所必要信息,時間做爲索引 /** * Write the record to the log and return a record handle, which contains all the information * necessary to read back the written record. The time is used to the index the record, * such that it can be cleaned later. Note that implementations of this abstract class must * ensure that the written data is durable and readable (using the record handle) by the * time this function returns. */ // WriteAheadLogRecordHandle使用該句柄讀取數據 abstract public WriteAheadLogRecordHandle write(ByteBuffer record, long time); /** * Read a written record based on the given record handle. */ abstract public ByteBuffer read(WriteAheadLogRecordHandle handle); /** * Read and return an iterator of all the records that have been written but not yet cleaned up. */ abstract public Iterator<ByteBuffer> readAll(); /** * Clean all the records that are older than the threshold time. It can wait for * the completion of the deletion. */ //清除過期的目錄 abstract public void clean(long threshTime, boolean waitForCompletion); /** * Close this log and release any resources. */ abstract public void close(); } 
  •  
8.  WriteAheadLogRecordHandle的實現是FileBasedWriteAheadLogSegment.

這裏寫圖片描述 
9. Path: 在哪一個目錄下,offset:索引,length:長度,基於此就能夠索引到數據的位置。bash

/** Class for representing a segment of data in a write ahead log file */ private[streaming] case class FileBasedWriteAheadLogSegment(path: String, offset: Long, length: Int) extends WriteAheadLogRecordHandle 
  •  
10. WriteAheadLog的實現以下:

這裏寫圖片描述 
11. FileBasedWriteAheadLog管理WAL文件。app

/**
 * This class manages write ahead log files.
 *
 *  - Writes records (bytebuffers) to periodically rotating log files.
 *  - Recovers the log files and the reads the recovered records upon failures. * - Cleans up old log files. * * Uses [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]] to write * and [[org.apache.spark.streaming.util.FileBasedWriteAheadLogReader]] to read. * * @param logDirectory Directory when rotating log files will be created. * @param hadoopConf Hadoop configuration for reading/writing log files. */ private[streaming] class FileBasedWriteAheadLog( 
  •  
12. 直接將數據寫入到HDFS的checkpoint
/** * Write a byte buffer to the log file. This method synchronously writes the data in the * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed * to HDFS, and will be available for readers to read. */ def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized { var fileSegment: FileBasedWriteAheadLogSegment = null var failures = 0 var lastException: Exception = null var succeeded = false while (!succeeded && failures < maxFailures) { try { // getLogWriter得到Writer fileSegment = getLogWriter(time).write(byteBuffer) if (closeFileAfterWrite) { resetWriter() } succeeded = true } catch { case ex: Exception => lastException = ex logWarning("Failed to write to write ahead log") resetWriter() failures += 1 } } if (fileSegment == null) { logError(s"Failed to write to write ahead log after $failures failures") throw lastException } fileSegment } 
  •  
13. 不一樣時間不一樣條件下,會寫入到不一樣的文件中,會有不少小文件。
/** Get the current log writer while taking care of rotation */ private def getLogWriter(currentTime: Long): FileBasedWriteAheadLogWriter = synchronized { if (currentLogWriter == null || currentTime > currentLogWriterStopTime) { resetWriter() currentLogPath.foreach { pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _) } currentLogWriterStartTime = currentTime currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000) val newLogPath = new Path(logDirectory, timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime)) currentLogPath = Some(newLogPath.toString) currentLogWriter = new FileBasedWriteAheadLogWriter(currentLogPath.get, hadoopConf) } currentLogWriter } 
  •  
14. Read部分
/** * A random access reader for reading write ahead log files written using * [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]]. Given the file segment info, * this reads the record (ByteBuffer) from the log file. */ private[streaming] class FileBasedWriteAheadLogRandomReader(path: String, conf: Configuration) extends Closeable { private val instream = HdfsUtils.getInputStream(path, conf) private var closed = (instream == null) // the file may be deleted as we're opening the stream def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized { //先找到指針索引 assertOpen() instream.seek(segment.offset) val nextLength = instream.readInt() HdfsUtils.checkState(nextLength == segment.length, s"Expected message length to be ${segment.length}, but was $nextLength") val buffer = new Array[Byte](nextLength) instream.readFully(buffer) ByteBuffer.wrap(buffer) } 
  •  

支持數據存放。在實際的開發中直接使用Kafka,由於不須要容錯,也不須要副本。 
Kafka有Receiver方式和Direct方式 
Receiver方式:是交給Zookeeper去管理數據的,也就是偏移量offSet.若是失效後,Kafka會基於offSet從新讀取,由於處理數據的時候中途崩潰,不會給Zookeeper發送ACK,此時Zookeeper認爲你並無消息這個數據。可是在實際中越來用的越多的是Direct的方式直接操做offSet.並且仍是本身管理offSet.dom

  1. DirectKafkaInputDStream會去查看最新的offSet,而且把offSet放到Batch中。
  2. 在Batch每次生成的時候都會調用latestLeaderOffsets查看最近的offSet,此時的offSet就會與上一個offSet相減得到這個Batch的範圍。這樣就能夠知道讀那些數據。
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = { val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) // Either.fold would confuse @tailrec, do it manually if (o.isLeft) { val err = o.left.get.toString if (retries <= 0) { throw new SparkException(err) } else { log.error(err) Thread.sleep(kc.config.refreshLeaderBackoffMs) latestLeaderOffsets(retries - 1) } } else { o.right.get } } 
  •  

本課程筆記來源於: 
這裏寫圖片描述socket

相關文章
相關標籤/搜索