import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1)) // 能夠經過 ssc.sparkContext 來訪問 SparkContext // 或者經過已經存在的 SparkContext 來建立 StreamingContext import org.apache.spark.streaming._ val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1))
初始化完Context以後:html
1) 定義消息輸入源來建立DStreamsjava
2) 定義DStreams的轉化操做和輸出操做sql
3) 經過streamingContext.start()來啓動消息採集和處理數據庫
4) 等待程序終止,能夠經過streamingContext.awaitTermination()來設置apache
5) 經過stramingContext.stop()來手動終止處理程序bootstrap
StreamingContext和SparkContext什麼關係?windows
import org.apache.spark.streaming._ val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1))
注意:api
StreamingContext一旦啓動,對DStreams的操做就不能修改了緩存
在同一時間一個JVM中只有一個StreamingContext能夠啓動服務器
stop()方法將同時中止SparkContext,能夠傳入參數stopSparkContext用於只中止StreamingContext
在Spark1.4版本後,如何優雅的中止SparkStreaming而不丟失數據,經過設置 sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true") 便可。在StreamingContext的start方法中已經註冊了Hook方法
Discretized Stream是Spark Streaming的基礎抽象,表明持續性的數據流和通過各類Spark原語操做後的結果數據流。在內部實現上,DStream是一系列連續的RDD來表示。每一個RDD含有一段時間間隔內的數據,以下圖:
對數據的操做也是按照RDD爲單位來進行的
計算過程由Spark engine來完成
Spark Streaming原生支持一些不一樣的數據源。一些「核心」數據源已經被打包到Spark Streaming的Maven工件中,而其餘的一些則能夠經過spark- streaming-kafka等附加工件獲取。每一個接收器都以Spark執行器程序中一個長期運行的任務的形式運行,所以會佔據分配給應用的CPU核心。此外,咱們還須要有可用的CPU核心來處理數據。這意味着若是要運行多個接收器,就必須至少有和接收器數目相同的核心數,還要加上用來完成計算所須要的核心數。例如,若是咱們想要在流計算應用中運行10個接收器,那麼至少須要爲應用分配11個CPU核心。因此若是在本地模式運行,不要使用local或者local[1]
Socket數據流前面的例子已經看到過
文件數據流:可以讀取全部HDFS API兼容的文件系統文件,經過fileStream方法進行讀取
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming將會監控dataDirectory目錄並不斷處理移動進來的文件,記住目前不支持嵌套目錄
1) 文件須要有相同的數據格式
2) 文件進入dataDirectory的方式須要經過移動或者重命名來實現
3) 一旦文件移動進目錄,則不能再修改,即使修改了也不會讀取新數據
若是文件比較簡單,則可使用streamingContext.textFileStream(dataDirectory)方法來讀取文件。文件流不須要接收器,不須要單獨分配CPU核
經過繼承Receiver,並實現onStart、onStop方法來自定義數據源採集
class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { override def run() { receive() } }.start() } def onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself if isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ private def receive() { var socket: Socket = null var userInput: String = null try { // Connect to host:port socket = new Socket(host, port) // Until stopped or connection broken continue reading val reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) userInput = reader.readLine() } reader.close() socket.close() // Restart in an attempt to connect again when server is active again restart("Trying to connect again") } catch { case e: java.net.ConnectException => // restart if could not connect to server restart("Error connecting to " + host + ":" + port, e) case t: Throwable => // restart if there is any other error restart("Error receiving data", t) } } }
能夠經過 streamingContext.receiverStream(<instance of custom receiver>)來使用自定義的數據採集源
// Assuming ssc is the StreamingContext val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port)) val words = lines.flatMap(_.split(" ")) ...
測試過程當中,能夠經過使用streamingContext.queueStream(queueOfRDDs)來建立DStream,每個推送到這個隊列中的RDD,都會做爲一個 DStream處理
import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object QueueRdd { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("QueueRdd") val ssc = new StreamingContext(conf, Seconds(1)) // Create the queue through which RDDs can be pushed to // a QueueInputDStream //建立 RDD 隊列 val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]() // Create the QueueInputDStream and use it do some processing // 建立 QueueInputDStream val inputStream = ssc.queueStream(rddQueue) //處理隊列中的 RDD 數據 val mappedStream = inputStream.map(x => (x % 10, 1)) val reducedStream = mappedStream.reduceByKey(_ + _) //打印結果 reducedStream.print() //啓動計算 ssc.start() // Create and push some RDDs into for (i <- 1 to 30) { rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10) Thread.sleep(2000) //經過程序中止 StreamingContext 的運行 //ssc.stop() } } }
除核心數據源外,還能夠用附加數據源接收器來從一些知名數據獲取系統中接收的數據,這些接收器都做爲Spark Streaming的組件進行獨立打包了。 它們仍然是Spark的一部分,不過你須要在構建文件中添加額外的包才能使用它們。現有的接收器包括Twitter、Apache Kafka、Amazon Kinesis、Apache Flume,以及ZeroMQ。能夠經過添加與Spark版本匹配的Maven工件spark-streaming-[projectname]_2.10來引入這些附加接收器
在工程中須要引入Maven工件spark-streaming-kafka_2.10來使用它。包內提供的KafkaUtils對象能夠在StreamingContext和JavaStreamingContext 中以你的Kafka消息建立出DStream。因爲KafkaUtils能夠訂閱多個主題,所以它建立出的DStream由成對的主題和消息組成。要建立出一個流數據,須要使用 StreamingContext實例、一個由逗號隔開的ZooKeeper主機列表字符串、消費者組的名字(惟一名字),以及一個從主題到針對這個主題的接收器線程數的映射表來調用createStream()方法
import org.apache.spark.streaming.kafka._ ... // 建立一個從主題到接收器線程數的映射表 val topics = List(("pandas", 1), ("logs", 1)).toMap val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics) topicLines.map(_._2)
演示SparkStreaming如何從Kafka讀取消息,若是經過鏈接池方法把消息處理完成後再寫回Kafka
kafka Connection Pool程序:
mport java.util.Properties import org.apache.commons.pool2.impl.DefaultPooledObject import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} case class KafkaProducerProxy(brokerList: String, producerConfig: Properties = new Properties, defaultTopic: Option[String] = None, producer: Option[KafkaProducer[String, String]] = None) { type Key = String type Val = String require(brokerList == null || !brokerList.isEmpty, "Must set broker list") private val p = producer getOrElse { var props:Properties= new Properties(); props.put("bootstrap.servers", brokerList); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); new KafkaProducer[String,String](props) } private def toMessage(value: Val, key: Option[Key] = None, topic: Option[String] = None): ProducerRecord[Key, Val] = { val t = topic.getOrElse(defaultTopic.getOrElse(throw new IllegalArgumentException("Must provide topic or default topic"))) require(!t.isEmpty, "Topic must not be empty") key match { case Some(k) => new ProducerRecord(t, k, value) case _ => new ProducerRecord(t, value) } } def send(key: Key, value: Val, topic: Option[String] = None) { p.send(toMessage(value, Option(key), topic)) } def send(value: Val, topic: Option[String]) { send(null, value, topic) } def send(value: Val, topic: String) { send(null, value, Option(topic)) } def send(value: Val) { send(null, value, None) } def shutdown(): Unit = p.close() } abstract class KafkaProducerFactory(brokerList: String, config: Properties, topic: Option[String] = None) extends Serializable { def newInstance(): KafkaProducerProxy } class BaseKafkaProducerFactory(brokerList: String, config: Properties = new Properties, defaultTopic: Option[String] = None) extends KafkaProducerFactory(brokerList, config, defaultTopic) { override def newInstance() = new KafkaProducerProxy(brokerList, config, defaultTopic) } class PooledKafkaProducerAppFactory(val factory: KafkaProducerFactory) extends BasePooledObjectFactory[KafkaProducerProxy] with Serializable { override def create(): KafkaProducerProxy = factory.newInstance() override def wrap(obj: KafkaProducerProxy): PooledObject[KafkaProducerProxy] = new DefaultPooledObject(obj) override def destroyObject(p: PooledObject[KafkaProducerProxy]): Unit = { p.getObject.shutdown() super.destroyObject(p) } }
KafkaStreaming main:
import org.apache.commons.pool2.impl.{GenericObjectPool, GenericObjectPoolConfig} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.api.java.function.VoidFunction import org.apache.spark.rdd.RDD import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} object createKafkaProducerPool{ def apply(brokerList: String, topic: String): GenericObjectPool[KafkaProducerProxy] = { val producerFactory = new BaseKafkaProducerFactory(brokerList, defaultTopic = Option(topic)) val pooledProducerFactory = new PooledKafkaProducerAppFactory(producerFactory) val poolConfig = { val c = new GenericObjectPoolConfig val maxNumProducers = 10 c.setMaxTotal(maxNumProducers) c.setMaxIdle(maxNumProducers) c } new GenericObjectPool[KafkaProducerProxy](pooledProducerFactory, poolConfig) } } object KafkaStreaming{ def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) //建立 topic val brobrokers = "172.16.148.150:9092,172.16.148.151:9092,172.16.148.152:9092" val sourcetopic="source"; val targettopic="target"; //建立消費者組 var group="con-consumer-group" //消費者配置 val kafkaParam = Map( "bootstrap.servers" -> brobrokers,//用於初始化連接到集羣的地址 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], //用於標識這個消費者屬於哪一個消費團體 "group.id" -> group, / //若是沒有初始化偏移量或者當前的偏移量不存在任何服務器上,可使用這個配置屬性 //可使用這個配置,latest 自動重置偏移量爲最新的偏移量 "auto.offset.reset" -> "latest", //若是是 true,則這個消費者的偏移量會在後臺自動提交 "enable.auto.commit" -> (false: java.lang.Boolean) ); //ssc.sparkContext.broadcast(pool) //建立 DStream,返回接收到的輸入數據 var stream=KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String ](Array(sourcetopic),kafkaParam)) //每個 stream 都是一個 ConsumerRecord stream.map(s =>("id:" + s.key(),">>>>:"+s.value())).foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { // Get a producer from the shared pool val pool = createKafkaProducerPool(brobrokers, targettopic) val p = pool.borrowObject() partitionOfRecords.foreach {message => System.out.println(message._2); p.send(message._2,Option(targettopic))} // Returning the producer to the pool also shuts it down pool.returnObject(p) }) }) ssc.start() ssc.awaitTermination() } }
Spark對於Kafka的鏈接主要有兩種方式,一種是DirectKafkaInputDStream,另一種是KafkaInputDStream。DirectKafkaInputDStream只在driver端接收數據,因此繼承了InputDStream,是沒有receivers的
主要經過KafkaUtils#createDirectStream以及KafkaUtils#createStream這兩個API來建立,除了要傳入的參數不一樣外,接收kafka數據的節點、拉取數據的時機也徹底不一樣
KafkaUtils#createStream【Receiver-based】
這種方法使用一個Receiver來接收數據。在該Receiver的實現中使用了Kafka high-level consumer API。Receiver從kafka接收的數據將被存儲到Spark executor中,隨後啓動的job將處理這些數據
在默認配置下,該方法失敗後會丟失數據(保存在executor內存裏的數據在application失敗後就沒了),若要保證數據不丟失,須要啓用WAL(即預寫日誌至HDFS、S3等),這樣再失敗後能夠從日誌文件中恢復數據
在該函數中,會新建一個KafkaInputDStream對象,KafkaInputDStream繼承於ReceiverInputDStream。KafkaInputDStream實現了getReceiver方法,返回接收器的實例:
def getReceiver(): Receiver[(K, V)] = { if (!useReliableReceiver) { //< 不啓用 WAL new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } else { //< 啓用 WAL new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } }
根據是否啓用 WAL,receiver分爲KafkaReceiver和ReliableKafkaReceiver
下圖描述了KafkaReceiver接收數據的具體流程:
須要注意的點:
Kafka Topic的partitions與RDD的partitions沒有直接關係,不能一一對應。若是增長topic的partition個數的話僅僅會增長單個Receiver接收數據的線程數。事實上,使用這種方法只會在一個executor上啓用一個Receiver,該Receiver包含一個線程池,線程池的線程個數與全部topics的partitions個數總和一致,每條線程接收一個topic的一個partition的數據。而並不會增長處理數據時的並行度。
對於一個 topic,可使用多個groupid相同的input DStream來使用多個Receivers來增長並行度,而後union他們;對於多個topics,除了能夠用上個辦法增長並行度外,還能夠對不一樣的topic使用不一樣的input DStream而後union他們來增長並行度
若是啓用了WAL,爲能將接收到的數據將以log的方式在指定的存儲系統備份一份,須要指定輸入數據的存儲等級爲StorageLevel.MEMORY_AND_DISK_SER或StorageLevel.MEMORY_AND_DISK_SER_2
KafkaUtils#createDirectStream【WithOut Receiver】
自Spark-1.3.0起,提供了不須要Receiver的方法。替代了使用receivers來接收數據,該方法按期查詢每一個topic+partition的lastest offset,並據此決定每一個batch要接收的offsets範圍
KafkaUtils#createDirectStream調用中,會新建DirectKafkaInputDStream,DirectKafkaInputDStream#compute(validTime: Time)會從kafka拉取數據並生成RDD,流程以下:
如上圖所示,該函數主要作了如下三個事情:
1. 肯定要接收的partitions的offsetRange,以做爲第2步建立的RDD的數據來源
2. 建立RDD並執行count操做,使RDD真實具備數據
3. 以streamId、數據條數,offsetRanges信息初始化inputInfo並添加到JobScheduler中
進一步看KafkaRDD的getPartitions實現:
override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map {case (o, i) => val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) }.toArray }
從上面的代碼能夠很明顯看到,KafkaRDD的partition數據與Kafka topic的某個partition的o.fromOffset至o.untilOffset數據是相對應的,也就是說KafkaRDD的partition與Kafka partition是一一對應的
該方式相比使用Receiver的方式有如下好處:
簡化並行:再也不須要建立多個kafka input DStream而後再union這些input DStream。使用directStream,Spark Streaming會建立與Kafka partitions相同數量的paritions的RDD,RDD的partition與Kafka的partition一一對應,這樣更易於理解及調優
高效:在方式一中要保證數據零丟失須要啓用WAL(預寫日誌),這會佔用更多空間。而在方式二中,能夠直接從Kafka指定的topic的指定offsets處恢復數據,不須要使用WAL
剛好一次語義保證:基於Receiver方式使用了Kafka的high level API來在Zookeeper中存儲已消費的offsets。這在某些狀況下會致使一些數據被消費兩次,好比streaming app在處理某個batch內已接受到的數據的過程當中掛掉,可是數據已經處理了一部分,但這種狀況下沒法將已處理數據的offsets更新到 Zookeeper中,下次重啓時,這批數據將再次被消費且處理。基於direct的方式,使用kafka的簡單api,Spark Streaming本身就負責追蹤消費的offset,並保存在 checkpoint中。Spark本身必定是同步的,所以能夠保證數據是消費一次且僅消費一次。這種方式中,只要將output操做和保存offsets操做封裝成一個原子操做就能避免失敗後的重複消費和處理,從而達到剛好一次的語義(Exactly-once)
經過以上分析,咱們能夠對這兩種方式的區別作一個總結:
1. createStream會使用Receiver;而createDirectStream不會
2. createStream使用的Receiver會分發到某個executor上去啓動並接受數據;而createDirectStream直接在driver上接收數據
3. createStream 使用 Receiver 源源不斷的接收數據並把數據交給ReceiverSupervisor 處理最終存儲爲 blocks 做爲 RDD 的輸入,從 kafka拉取數據與計算消費數據相互獨立;而 createDirectStream 會在每一個 batch拉取數據並就地消費,到下個 batch 再次拉取消費,周而復始,從 kafka拉取數據與計算消費數據是連續的,沒有獨立開
4. createStream中建立的KafkaInputDStream每一個batch所對應的RDD的partition不與Kafka partition一一對應;而createDirectStream中建立的 DirectKafkaInputDStream每一個batch所對應的RDD的partition與Kafka partition一一對應
Spark提供兩個不一樣的接收器來使用Apache Flume。 兩個接收器簡介以下
推式接收器該接收器以Avro數據池的方式工做,由Flume向其中推數據
拉式接收器該接收器能夠從自定義的中間數據池中拉數據,而其餘進程可使用Flume把數據推動該中間數據池
兩種方式都須要從新配置Flume,並在某個節點配置的端口上運行接收器(不是已有的 Spark 或者 Flume 使用的端口)。要使用其中任何一種方法,都須要在工程中引入Maven工件spark-streaming-flume_2.10
推式接收器的方法設置起來很容易,可是它不使用事務來接收數據。在這種方式中,接收器以Avro數據池的方式工做,咱們須要配置Flume來把數據發到 Avro數據池。咱們提供的FlumeUtils對象會把接收器配置在一個特定的工做節點的主機名及端口號上。這些設置必須和Flume配置相匹配
雖然這種方式很簡潔,但缺點是沒有事務支持。這會增長運行接收器的工做節點發生錯誤時丟失少許數據的概率。不只如此,若是運行接收器的工做節點發生故障,系統會嘗試從另外一個位置啓動接收器,這時須要從新配置Flume才能將數據發給新的工做節點。這樣配置會比較麻煩
較新的方式是拉式接收器(在 Spark 1.1 中引入),它設置了一個專用的Flume數據池供Spark Streaming讀取,並讓接收器主動從數據池中拉取數據。這種方式的優勢在於彈性較好,Spark Streaming經過事務從數據池中讀取並複製數據。在收到事務完成的通知前,這些數據還保留在數據池中
咱們須要先把自定義數據池配置爲Flume的第三方插件。安裝插件的最新方法請參考Flume文檔的相關部分(https://flume.apache.org/FlumeUserGuide.html#installing-third-party-plugins)。 因爲插件是用Scala寫的,所以須要把插件自己以及Scala庫都添加到Flume插件中
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume-sink_2.11</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.11</version> </dependency>
當把自定義Flume數據池添加到一個節點上以後,就須要配置Flume來把數據推送到這個數據池中
a1.sinks = spark a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.spark.hostname = receiver-hostname a1.sinks.spark.port = port-used-for-sync-not-spark-port a1.sinks.spark.channel = memoryChannel
等到數據已經在數據池中緩存起來,就能夠調用FlumeUtils來讀取數據了
val events = FlumeUtils.createPollingStream(ssc,receiverHostname,receiverPort)
DStream上的原語與RDD的相似,分爲Transformations(轉換)和Output Operations(輸出)兩種,此外轉換操做中還有一些比較特殊的原語,如:updateStateByKey()、transform()以及各類Window相關的原語
DStream的轉化操做能夠分爲無狀態(stateless)和有狀態(stateful)兩種
在無狀態轉化操做中,每一個批次的處理不依賴於以前批次的數據。常見的RDD轉化操做,例如 map()、filter()、reduceByKey() 等,都是無狀態轉化操做
相對地,有狀態轉化操做須要使用以前批次的數據或者是中間結果來計算當前批次的數據。有狀態轉化操做包括基於滑動窗口的轉化操做和追蹤狀態變化的轉化操做
無狀態轉化操做就是把簡單的RDD轉化操做應用到每一個批次上,也就是轉化DStream中的每個RDD。部分無狀態轉化操做列在了下表中。注意,針對鍵值對的DStream轉化操做(好比reduceByKey())要添加import StreamingContext._才能在Scala中使用
Transformation |
Meaning |
map(func) |
將源DStream中的每一個元素經過一個函數func從而獲得新的DStreams |
flatMap(func) |
和map相似,可是每一個輸入的項能夠被映射爲0或更多項 |
filter(func) |
選擇源DStream中函數func判爲true的記錄做爲新DStreams |
repartition(numPartitions) |
經過建立更多或者更少的partition來改變此DStream的並行級別 |
union(otherStream) |
聯合源DStreams和其餘DStreams來獲得新DStream |
count() |
統計源DStreams中每一個RDD所含元素的個數獲得單元素RDD的新DStreams |
reduce(func) |
經過函數func(兩個參數一個輸出)來整合源DStreams中每一個RDD元素獲得單元素RDD的DStreams。這個函數須要關聯從而能夠被並行計算 |
countByValue() |
對於DStreams中元素類型爲K調用此函數,獲得包含(K,Long)對的新DStream,其中Long值代表相應的K在源DStream中每一個RDD出現的頻率 |
reduceByKey(func, [numTasks]) |
對(K,V)對的DStream調用此函數,返回一樣(K,V)對的新DStream,可是新DStream中的對應V爲使用reduce函數整合而來。Note:默認狀況下,這個操做使用Spark默認數量的並行任務(本地模式爲2,集羣模式中的數量取決於配置參數spark.default.parallelism)。也能夠傳入可選的參數numTaska來設置不一樣數量的任務 |
join(otherStream, [numTasks]) |
兩DStream分別爲(K,V)和(K,W)對,返回(K,(V,W))對的新DStream |
cogroup(otherStream, [numTasks]) |
兩DStream分別爲(K,V)和(K,W)對,返回(K,(Seq[V],Seq[W])對新DStreams |
transform(func)
|
將RDD到RDD映射的函數func做用於源 DStream中每一個RDD上獲得新DStream。 這個可用於在DStream的RDD上作任意操做 |
updateStateByKey(func) |
獲得」狀態」DStream,其中每一個key狀態的更新是經過將給定函數用於此key的上一個狀態和新值而獲得。這個可用於保存每一個key值的任意狀態數據 |
特殊的Transformations
UpdateStateByKey原語用於記錄歷史記錄,有時,咱們須要在DStream中跨批次維護狀態(例如流計算中累加 wordcount)。針對這種狀況,updateStateByKey()爲咱們提供了對一個狀態變量的訪問,用於鍵值對形式的DStream。給定一個由(鍵,事件)對構成的DStream,並傳遞一個指定如何根據新的事件更新每一個鍵對應狀態的函數,它能夠構建出一個新的DStream,其內部數據爲(鍵,狀態)對
updateStateByKey()的結果會是一個新的DStream,其內部的RDD序列是由每一個時間區間對應的(鍵,狀態)對組成的
updateStateByKey操做使得咱們能夠在用新信息進行更新時保持任意的狀態。爲使用這個功能,須要作下面兩步:
1. 定義狀態,狀態能夠是一個任意的數據類型
2. 定義狀態更新函數,用此函數闡明如何使用以前的狀態和來自輸入流的新值對狀態進行更新
使用updateStateByKey須要對檢查點目錄進行配置,會使用檢查點來保存狀態
更新版的 wordcount:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WorldCount { def main(args: Array[String]) { // 定義更新狀態方法,參數 values 爲當前批次單詞頻度,state 爲以往批次單詞頻度 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint(".") // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("master01", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) // 使用 updateStateByKey 來更新狀態,統計從運行開始以來單詞總的次數 val stateDstream = pairs.updateStateByKey[Int](updateFunc) stateDstream.print() //val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console //wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate //ssc.stop() } }
Window Operations有點相似於Storm中的State,能夠設置窗口的大小和滑動窗口的間隔來動態的獲取當前Steaming的容許狀態
基於窗口的操做會在一個比StreamingContext的批次間隔更長的時間範圍內,經過整合多個批次的結果,計算出整個窗口的結果
全部基於窗口的操做都須要兩個參數,分別爲窗口時長以及滑動步長,兩 者都必須是StreamContext的批次間隔的整數倍。窗口時長控制每次計算最近的多少個批次的數據,其實就是最近的windowDuration/batchInterval個批次。 若是有一個以10秒爲批次間隔的源DStream,要建立一個最近30秒的時間窗口(即最近3個批次),就應當把windowDuration設爲30秒。而滑動步長的默認值與批次間隔相等,用來控制對新的DStream進行計算的間隔。若是源DStream批次間隔爲10秒,而且咱們只但願每兩個批次計算一次窗口結果,就應該把滑動步長設置爲20秒
假設,你想拓展前例從而每隔十秒對持續30秒的數據生成word count。爲作到這個,咱們須要在持續30秒數據的(word,1)對DStream上應用reduceByKey。使用操做reduceByKeyAndWindow
# reduce last 30 seconds of data, every 10 second windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x -y, 30, 20)
Transformation |
Meaning |
window(windowLength, slideInterval) |
基於對源DStream窗化的批次進行計算返回一個新的DStream |
countByWindow(windowLength, slideInterval) |
返回一個滑動窗口計數流中的元素 |
reduceByWindow(func, windowLength, slideInterval) |
經過使用自定義函數整合滑動區間流元素來建立一個新的單元素流 |
reduceByKeyAndWindow(func, windowLength, slideI nterval, [numTasks])
|
當在一個(K,V)對的DStream上調用此函數,會返回一個新(K,V)對的DStream,此處經過對滑動窗口中批次數據使用reduce 函數來整合每一個key的value值。Note:默認狀況下,這個操做使用Spark的默認數量並行任務(本地是2),在集羣模式中依據配置屬性(spark.default.parallelism)來作grouping。能夠經過設置可選參數numTasks來設置不一樣數量的tasks |
reduceByKeyAndWindow(func, invFunc, windowLeng th, slideInterval, [numTasks]) |
這個函數是上述函數的更高效版本,每一個窗口的reduce值都是經過用前一個窗的reduce值來遞增計算。經過reduce進入到滑動窗口數據並」反向reduce」離開窗口的舊數據來實現這個操做。一個例子是隨着窗口滑動對keys的「加」「減」計數。經過前邊介紹能夠想到,這個函數只適用於」可逆的 reduce函數」,也就是這些reduce函數有相應的」反 reduce」函數(以參數invFunc形式傳入)。如前述函數,reduce 任務的數量經過可選參數來配置。注意:爲了使用這個操做,檢查點必須可用 |
countByValueAndWindow(windowLength,slideInterv al, [numTasks])
|
對(K,V)對的DStream調用,返回(K,Long)對的新DStream,其中每一個key的值是其在滑動窗口中頻率。如上,可配置 reduce任務數量 |
reduceByWindow()和reduceByKeyAndWindow()讓咱們能夠對每一個窗口更高效地進行歸約操做。它們接收一個歸約函數,在整個窗口上執行,好比+。 除此之外,它們還有一種特殊形式,經過只考慮新進入窗口的數據和離開窗口的數據,讓Spark增量計算歸約結果。這種特殊形式須要提供歸約函數的一個逆函數,好比+對應的逆函數爲-。對於較大的窗口,提供逆函數能夠大大提升執行效率
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1)) val ipCountDStream = ipDStream.reduceByKeyAndWindow( {(x, y) => x + y}, {(x, y) => x - y}, Seconds(30), Seconds(10)) // 加上新進入窗口的批次中的元素 // 移除離開窗口的老批次中的元素 // 窗口時長 // 滑動步長
countByWindow()和countByValueAndWindow()做爲對數據進行計數操做的簡寫。countByWindow()返回一個表示每一個窗口中元素個數的DStream,而countByValueAndWindow()返回的DStream則包含窗口中每一個值的個數
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()} val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10)) val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
WordCount 第三版:3 秒一個批次,窗口 12 秒,滑步 6 秒
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WorldCount { def main(args: Array[String]) { // 定義更新狀態方法,參數 values 爲當前批次單詞頻度,state 爲以往批次單詞頻度 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint(".") // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("master01", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(12), Seconds(6)) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate //ssc.stop() } }
Transform原語容許DStream上執行任意的RDD-to-RDD函數。即便這些函數並無在DStream的API中暴露出來,經過該函數能夠方便的擴展Spark API
該函數每一批次調度一次
好比下面的例子,在進行單詞統計的時候,想要過濾掉spam的信息,其實也就是對DStream中的RDD應用轉換
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform { rdd => rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... }
鏈接操做(leftOuterJoin, rightOuterJoin, fullOuterJoin也能夠),能夠鏈接Stream-Stream,windows-stream to windows-stream、stream-dataset
Stream-Stream Joins
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2) val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2)
Stream-dataset joins
val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
輸出操做指定了對流數據經轉化操做獲得的數據所要執行的操做(例如把結果推入外部數據庫或輸出到屏幕上)。與RDD中的惰性求值相似,若是一個DStream及其派生出的DStream都沒有被執行輸出操做,那麼這些DStream就都不會被求值。若是 StreamingContext中沒有設定輸出操做,整個context就都不會啓動
Output Operation |
Meaning |
print() |
在運行流程序的驅動結點上打印DStream 中每一批次數據的最開始 10個元素。這用於開發和調試。在 Python API 中,一樣的操做叫 pprint() |
saveAsTextFiles(prefix, [suffix]) |
以 text 文件形式存儲這個DStream 的內容。每一批次的存儲文 件名基於參數中的 prefix 和 suffix。」prefix-Time_IN_MS[.suffix]」. |
saveAsObjectFiles(prefix, [suffix]) |
以 Java 對象序列化的方式將Stream 中的數據保存爲SequenceFiles . 每一批次的存儲文件 名 基 於 參 數 中 的 爲 "prefix- TIME_IN_MS[.suffix]". Python 中 目 前不可用。 |
saveAsHadoopFiles(prefix, [suffix]) |
將 Stream 中的數據保存爲Hadoop files. 每一批次的存儲文件名 基 於 參 數 中 的 爲 "prefix- TIME_IN_MS[.suffix]". |
foreachRDD(func) |
這是最通用的輸出操做,即將函 數 func 用於產生於 stream 的每個RDD。其中參數傳入的函數 func 應該 實現將每個 RDD 中數據推送到外 部系統,如將 RDD 存入文件或者經過 網絡將其寫入數據庫。注意:函數 func在運行流應用的驅動中被執行,同時 其中通常函數 RDD 操做從而強制其 對於流 RDD 的運算 |
通用的輸出操做foreachRDD(),它用來對DStream中的RDD運行任意計算。這和transform()有些相似,均可以讓咱們訪問任意RDD。在foreachRDD()中,能夠重用咱們在Spark中實現的全部行動操做。好比,常見的用例之一是把數據寫到諸如MySQL的外部數據庫中
須要注意的是:
1) 鏈接不能寫在driver層面
2) 若是寫在foreach則每一個RDD都建立,得不償失
3) 增長foreachPartition,在分區建立
4) 能夠考慮使用鏈接池優化
dstream.foreachRDD { rdd => // error val connection = createNewConnection() // executed at the driver 序列化錯誤 rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record) // executed at the worker ) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }
累加器(Accumulators)和廣播變量(Broadcast variables)不能從Spark Streaming的檢查點中恢復。若是你啓用檢查並也使用了累加器和廣播變量,那麼你必須建立累加器和廣播變量的延遲單實例從而在驅動因失效重啓後他們能夠被從新實例化。以下例述:
object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance } } object DroppedWordsCounter { @volatile private var instance: LongAccumulator = null def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { instance = sc.longAccumulator("WordsInBlacklistCounter") } } } instance } } wordCounts.foreachRDD{ (rdd: RDD[(String, Int)], time: Time) => // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter.add(count) false } else { true } }.collect().mkString("[", ", ", "]") val output = "Counts at time " + time + " " + counts }
咱們能夠很容易地在流數據上使用DataFrames和SQL。可是必須使用SparkContext來建立StreamingContext要用的SQLContext。此外,這一過程能夠在驅動失效後重啓。咱們經過建立一個實例化的SQLContext單實例來實現這個工做。以下例所示。咱們對前例 word count進行修改從而使用DataFrames和SQL來產生word counts。每一個RDD被轉換爲DataFrame,以臨時表格配置並用SQL進行查詢
val words: DStream[String] = ... words.foreachRDD { rdd => // Get the singleton instance of SparkSession val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word") // Create a temporary view wordsDataFrame.createOrReplaceTempView("words") // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() }
咱們也能夠從不一樣的線程在定義於流數據的表上運行SQL查詢(也就是說,異步運行StreamingContext)。僅肯定你設置StreamingContext記住了足夠數量的流數據以使得查詢操做能夠運行。不然,StreamingContext不會意識到任何異步的SQL查詢操做,那麼其就會在查詢完成以後刪除舊的數據。例如,若是你要查詢最後一批次,可是你的查詢會運行5分鐘,那麼你須要調用streamingContext.remember(Minutes(5))(in Scala, 或者其餘語言的等價操做)
和RDDs相似,DStreams一樣容許開發者將流數據保存在內存中。也就是說,在DStream上使用persist()方法將會自動把DStreams中的每一個RDD保存在內存中。當DStream中的數據要被屢次計算時,這個很是有用(如在一樣數據上的屢次操做)。對於像 reduceByWindow和reduceByKeyAndWindow以及基於狀態的(updateStateByKey)這種操做,保存是隱含默認的。所以,即便開發者沒有調用persist(),由基於窗操做產生的DStreams會自動保存在內存中
檢查點機制是咱們在Spark Streaming中用來保障容錯性的主要機制。與應用程序邏輯無關的錯誤(即系統錯位,JVM崩潰等)有迅速恢復的能力
它可使Spark Streaming階段性地把應用數據存儲到諸如HDFS或Amazon S3這樣的可靠存儲系統中,以供恢復時使用。具體來講,檢查點機制主要爲如下兩個目的服務
1) 控制發生失敗時須要重算的狀態數。SparkStreaming能夠經過轉化圖的譜系圖來重算狀態,檢查點機制則能夠控制須要在轉化圖中回溯多遠
2) 提供驅動器程序容錯。若是流計算應用中的驅動器程序崩潰了,你能夠重啓驅動器程序並讓驅動器程序從檢查點恢復,這樣Spark Streaming就能夠讀取以前運行的程序處理數據的進度,並從那裏繼續
爲了實現這個,Spark Streaming須要爲容錯存儲系統checkpoint足夠的信息從而使得其能夠從失敗中恢復過來。有兩種類型的數據設置檢查點
Metadata checkpointing:將定義流計算的信息存入容錯的系統如HDFS
元數據包括:
配置-用於建立流應用的配置
DStreams操做-定義流應用的DStreams操做集合
不完整批次-批次的工做已進行排隊可是並未完成
Data checkpointing:將產生的RDDs存入可靠的存儲空間。對於在多批次間合併數據的狀態轉換,這個頗有必要。在這樣的轉換中,RDDs的產生基於以前批次的RDDs,這樣依賴鏈長度隨着時間遞增。爲了不在恢復期這種無限的時間增加(和鏈長度成比例),狀態轉換中間的RDDs週期性寫入可靠地存儲空間(如HDFS)從而切短依賴鏈
總而言之,元數據檢查點在由驅動失效中恢復是首要須要的。而數據或者RDD檢查點甚至在使用了狀態轉換的基礎函數中也是必要的
出於這些緣由,檢查點機制對於任何生產環境中的流計算應用都相當重要。咱們能夠經過向ssc.checkpoint()方法傳遞一個路徑參數(HDFS、S3或者本地路徑都可)來配置檢查點機制,同時咱們的應用應該可以使用檢查點的數據
1. 當程序首次啓動,其將建立一個新的StreamingContext,設置全部的流並調用start()
2. 當程序在失效後重啓,其將依據檢查點目錄的檢查點數據從新建立一個StreamingContext。經過使用StraemingContext.getOrCreate很容易得到這個性能
ssc.checkpoint("hdfs://...") # 建立和設置一個新的StreamingContext def functionToCreateContext(): sc = SparkContext(...) # new context ssc = new StreamingContext(...) lines = ssc.socketTextStream(...) # create DStreams ... ssc.checkpoint(checkpointDirectory) # 設置檢查點目錄 return ssc # 從檢查點數據中獲取 StreamingContext 或者從新建立一個 context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext) # 在須要完成的 context 上作額外的配置 # 不管其有沒有啓動 context ... # 啓動context context.start() contaxt.awaitTermination()
若是檢查點目錄(checkpointDirectory)存在,那麼context將會由檢查點數據從新建立。若是目錄不存在(首次運行),那麼函數functionToCreateContext將會被調用來建立一個新的context並設置DStreams
注意RDDs的檢查點引發存入可靠內存的開銷。在RDDs須要檢查點的批次裏,處理的時間會所以而延長。因此,檢查點的間隔須要很仔細地設置。在小尺寸批次(1 秒鐘)。每一批次檢查點會顯著減小操做吞吐量。反之,檢查點設置的過於頻繁致使「血統」和任務尺寸增加,這會有很很差的影響對於須要 RDD檢查點設置的狀態轉換,默認間隔是批次間隔的乘數通常至少爲10秒鐘。能夠經過dstream.checkpoint(checkpointInterval)。一般,檢查點設置間隔是5-10個DStream的滑動間隔
WAL即write ahead log(預寫日誌),是在1.2 版本中就添加的特性。做用就是,將數據經過日誌的方式寫到可靠的存儲,好比HDFS、s3,在driver或worker failure時能夠從在可靠存儲上的日誌文件恢復數據。WAL在driver端和executor端都有應用
WAL在driver端的應用
用於寫日誌的對象writeAheadLogOption:WriteAheadLog。在StreamingContext中的JobScheduler中的ReceiverTracker的ReceivedBlockTracker構造函數中被建立,ReceivedBlockTracker用於管理已接收到的blocks信息。須要注意的是,這裏只須要啓用 checkpoint就能夠建立該driver端的WAL管理實例,而不須要將spark.streaming.receiver.writeAheadLog.enable設置爲true
寫什麼、什麼時候寫
首選須要明確的是,ReceivedBlockTracker經過WAL寫入log文件的內容是3種事件(固然,會進行序列化):
case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo);即新增了一個block及該block的具體信息,包括streamId、blockId、數據條數等
case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks);即爲某個batchTime分配了哪些blocks做爲該batch RDD的數據源
case class BatchCleanupEvent(times: Seq[Time]);即清理了哪些batchTime對應的block
知道了寫了什麼內容,結合源碼,也不難找出是何時寫了這些內容。須要再次注意的是,寫上面這三種事件,也不須要將spark.streaming.receiver.writeAheadLog.enable設置爲true
WAL在executor端的應用
Receiver接收到的數據會源源不斷的傳遞給ReceiverSupervisor,是否啓用WAL機制(便是否將spark.streaming.receiver.writeAheadLog.enable設置爲true)會影響ReceiverSupervisor在存儲block時的行爲:
不啓用 WAL:你設置的StorageLevel是什麼,就怎麼存儲。好比MEMORY_ONLY只會在內存中存一份,MEMORY_AND_DISK會在內存和磁盤上各存一份等
啓用 WAL:在StorageLevel指定的存儲的基礎上,寫一份到WAL中。存儲一份在WAL上,更不容易丟數據但性能損失也比較大
關因而否要啓用WAL,要視具體的業務而定:
若能夠接受必定的數據丟失,則不須要啓用 WAL,由於對性能影響較大
若徹底不能接受數據丟失,那就須要同時啓用checkpoint和WAL,checkpoint保存着執行進度(好比已生成但未完成的jobs),WAL中保存着blocks及blocks元數據(好比保存着未完成的jobs對應的blocks信息及block文件)。同時,這種狀況可能要在數據源和 Streaming Application中聯合來保證exactly once語義
預寫日誌功能的流程是:
1) 一個SparkStreaming應用開始時(也就是driver開始時),相關的StreamingContext使用SparkContext啓動接收器成爲長駐運行任務。這些接收器接收並保存流數據到Spark內存中以供處理
2) 接收器通知driver
3) 接收塊中的元數據(metadata)被髮送到driver的StreamingContext。
這個元數據包括:
(a) 定位其在executor內存中數據的塊referenceid
(b) 塊數據在日誌中的偏移信息(若是啓用了)
用戶傳送數據的生命週期以下圖所示:
默認狀況下,Spark Streaming經過Receiver以生產者生產數據的速率接收數據,計算過程當中會出現batch processing time > batch interval的狀況,其中batch processing time爲實際計算一個批次花費時間,batch interval爲Streaming應用設置的批處理間隔。這意味着Spark Streaming的數據接收速率高於Spark從隊列中移除數據的速率,也就是數據處理能力低,在設置間隔內不能徹底處理當前接收速率接收的數據。若是這種狀況持續過長的時間,會形成數據在內存中堆積,致使Receiver所在Executor內存溢出等問題(若是設置StorageLevel包含disk,則內存存放不下的數據會溢寫至disk,加大延遲)。Spark 1.5 之前版本,用戶若是要限制Receiver的數據接收速率,能夠經過設置靜態配製參數「spark.streaming.receiver.maxRate」的值來實現,此舉雖然能夠經過限制接收速率,來適配當前的處理能力,防止內存溢出,但也會引入其它問題。好比:producer數據生產高於maxRate,當前集羣處理能力也高於maxRate,這就會形成資源利用率降低等問題。爲了更好的協調數據接收速率與資源處理能力,Spark Streaming從 v1.5 開始引入反壓機制(back-pressure),經過動態控制數據接收速率來適配集羣數據處理能力
Spark Streaming Backpressure: 根據JobScheduler反饋做業的執行信息來動態調整Receiver數據接收率。經過屬性「spark.streaming.backpressure.enabled」來控制是否啓用backpressure機制,默認值false,即不啓用
Streaming架構以下圖所示
在原架構的基礎上加上一個新的組件RateController,這個組件負責監聽「OnBatchCompleted」事件,而後從中抽取processingDelay及schedulingDelay信息。Estimator依據這些信息估算出最大處理速度(rate),最後由基於Receiver的Input Stream將rate經過ReceiverTracker與 ReceiverSupervisorImpl轉發給 BlockGenerator(繼承自RateLimiter)
流量控制點
當Receiver開始接收數據時,會經過supervisor.pushSingle()方法將接收的數據存入currentBuffer等待BlockGenerator定時將數據取走,包裝成block。在將數據存放入currentBuffer之時,要獲取許可(令牌)。若是獲取到許可就能夠將數據存入buffer, 不然將被阻塞,進而阻塞Receiver從數據源拉取數據
其令牌投放採用令牌桶機制進行,原理以下圖所示:
令牌桶機制:大小固定的令牌桶可自行以恆定的速率源源不斷地產生令牌。若是令牌不被消耗,或者被消耗的速度小於產生的速度,令牌就會不斷地增多,直到把桶填滿。後面再產生的令牌就會從桶中溢出。最後桶中能夠保存的最大令牌數永遠不會超過桶的大小。當進行某操做時須要令牌時會從令牌桶中取出相應的令牌數,若是獲取到則繼續操做,不然阻塞。用完以後不用放回
驅動器程序的容錯要求咱們以特殊的方式建立StreamingContext。咱們須要把檢查點目錄提供給StreamingContext。與直接調用new StreamingContext不一樣,應該使用StreamingContext.getOrCreate()函數
配置過程以下:
一、 啓動 Driver 自動重啓功能
standalone: 提交任務時添加 --supervise 參數
yarn:設置yarn.resourcemanager.am.max-attempts 或者spark.yarn.maxAppAttempts
mesos: 提交任務時添加 --supervise 參數
二、 設置 checkpoint
StreamingContext.setCheckpoint(hdfsDirectory)
三、支持從 checkpoint 中重啓配置
def createContext(checkpointDirectory: String): StreamingContext = { val ssc = new StreamingContext ssc.checkpoint(checkpointDirectory) ssc } val ssc = StreamingContext.getOrCreate(checkpointDirectory, createContext(checkpointDirectory))
爲了應對工做節點失敗的問題,Spark Streaming使用與Spark的容錯機制相同的方法。全部從外部數據源中收到的數據都在多個工做節點上備份。全部從備份數據轉化操做的過程當中建立出來的RDD都能容忍一個工做節點的失敗,由於根據RDD譜系圖,系統能夠把丟失的數據從倖存的輸入數據備份中重算出來。對於reduceByKey等Stateful操做重作的lineage較長的,強制啓動checkpoint,減小重作概率
運行接收器的工做節點的容錯也是很重要的。若是這樣的節點發生錯誤,SparkStreaming會在集羣中別的節點上重啓失敗的接收器。然而,這種狀況會不會致使數據的丟失取決於數據源的行爲(數據源是否會重發數據)以及接收器的實現(接收器是否會向數據源確認收到數據)。舉個例子,使用Flume做 爲數據源時,兩種接收器的主要區別在於數據丟失時的保障。在「接收器從數 據池中拉取數據」的模型中,Spark只會在數據已經在集羣中備份時纔會從數據池中移除元素。而在「向接收器推數據」的模型中,若是接收器在數據備份以前失敗,一些數據可能就會丟失。總的來講,對於任意一個接收器,你必須同時考 慮上游數據源的容錯性(是否支持事務)來確保零數據丟失
通常主要是經過將接收到數據後先寫日誌(WAL)到可靠文件系統中,後才寫入實際的RDD。若是後續處理失敗則成功寫入WAL的數據經過WAL進行恢復,未成功寫入WAL的數據經過可回溯的Source進行重放
總的來講,接收器提供如下保證
1) 全部從可靠文件系統中讀取的數據(好比經過StreamingContext.hadoopFiles讀取的)都是可靠的,由於底層的文件系統是有備份的。Spark Streaming會記住哪些數據存放到了檢查點中,並在應用崩潰後從檢查點處繼續執行
2) 對於像Kafka、推式Flume、Twitter這樣的不可靠數據源,Spark會把輸入數據複製到其餘節點上,可是若是接收器任務崩潰,Spark仍是會丟失數據。在Spark 1.1以及更早的版本中,收到的數據只被備份到執行器進程的內存中,因此一旦驅動器程序崩潰(此時全部的執行器進程都會丟失鏈接), 數據也會丟失。在Spark 1.2 中,收到的數據被記錄到諸如HDFS這樣的可靠的文件系統中,這樣即便驅動器程序重啓也不會致使數據丟失
綜上所述,確保全部數據都被處理的最佳方式是使用可靠的數據源(例如HDFS、拉式 Flume 等)。若是咱們還要在批處理做業中處理這些數據,使用可靠數據源是最佳方式,由於這種方式確保了咱們的批處理做業和流計算做業能讀取到相同的數據,於是能夠獲得相同的結果
操做過程以下:
啓用checkpoint
-ssc.setCheckpoint(checkpointDir)
啓用 WAL
-sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
對Receiver使用可靠性存儲StoreageLevel.MEMORY_AND_DISK_SER or StoreageLevel.MEMORY_AND_DISK_SER2
因爲Spark Streaming工做節點的容錯保障,Spark Streaming能夠爲全部的轉化操做提供 「精確一次」執行的語義,即便一個工做節點在處理部分數據時發生失敗,最終的轉化結果(即轉化操做獲得的RDD)仍然與數據只被處理一次獲得的結果同樣
然而,當把轉化操做獲得的結果使用輸出操做推入外部系統中時,寫結果的任務可能因故障而執行屢次,一些數據可能也就被寫了屢次。因爲這引入了外部系統,所以咱們須要專門針對各系統的代碼來處理這樣的狀況。咱們可使用事務操做來寫入外部系統(即原子化地將一個RDD分區一次寫入), 或者設計冪等的更新操做(即屢次運行同一個更新操做仍生成相同的結果)。好比Spark Streaming的saveAs...File操做會在一個文件寫完時自動將其原子化地移動到最終位置上,以此確保每一個輸出文件只存在一份
最多見的問題是Spark Streaming可使用的最小批次間隔是多少。總的來講,500毫秒已經被證明爲對許多應用而言是比較好的最小批次大小。尋找最小批次大小的最佳實踐是從一個比較大的批次大小(10秒左右)開始,不斷使用更小的批次大小。若是Streaming用戶界面中顯示的處理時間保持不變,咱們就能夠進一步減少批次大小。若是處理時間開始增長,咱們可能已經達到了應用的極限
類似地,對於窗口操做,計算結果的間隔(也就是滑動步長)對於性能也有巨大的影響。當計算代價巨大併成爲系統瓶頸時,就應該考慮提升滑動步長了。減小批處理所消耗時間的常見方式還有提升並行度。有如下三種方式能夠提升並行度:
1) 增長接收器數目有時若是記錄太多致使單臺機器來不及讀入並分發的話,接收器會成爲系統瓶頸。這時咱們就須要經過建立多個輸入DStream(這樣會建立多個接收器)來增長接收器數目,而後使用union來把數據合併爲一個數據源
2) 將收到的數據顯式地從新分區若是接收器數目沒法再增長,咱們能夠經過使用DStream.repartition來顯式從新分區輸入流(或者合併多個流獲得的數據流)來從新分配收到的數據
3) 提升聚合計算的並行度 對於像reduceByKey()這樣的操做,咱們能夠在第二個參數中指定並行度,咱們在介紹RDD時提到過相似的手段