本文從編程模型、任務調度、時間機制、Kafka 動態分區的感知、容錯及處理語義、背壓等幾個方面對比 Spark Streaming 與 Flink,但願對有實時處理需求業務的企業端用戶在框架選型有所啓發。本文篇幅較長,建議先收藏~web
Spark Streaming 運行時的角色(standalone 模式)主要有:算法
Master:主要負責總體集羣資源的管理和應用程序調度;編程
Worker:負責單個節點的資源管理,driver 和 executor 的啓動等;後端
Driver:用戶入口程序執行的地方,即 SparkContext 執行的地方,主要是 DAG 生成、stage 劃分、task 生成及調度;api
Executor:負責執行 task,反饋執行狀態和執行結果。緩存
Flink 運行時的角色(standalone 模式)主要有:bash
Jobmanager: 協調分佈式執行,他們調度任務、協調 checkpoints、協調故障恢復等。至少有一個 JobManager。高可用狀況下能夠啓動多個 JobManager,其中一個選舉爲 leader,其他爲 standby;架構
Taskmanager: 負責執行具體的 tasks、緩存、交換數據流,至少有一個 TaskManager;併發
Slot: 每一個 task slot 表明 TaskManager 的一個固定部分資源,Slot 的個數表明着 taskmanager 可並行執行的 task 數。app
圖 1:Spark Streaming 生態,via Spark 官網
圖 2:Flink 生態,via Flink官網
Spark Streaming 是微批處理,運行的時候須要指定批處理的時間,每次運行 job 時處理一個批次的數據,流程如圖 3 所示:
圖 3,via Spark 官網
Flink 是基於事件驅動的,事件能夠理解爲消息。事件驅動的應用程序是一種狀態應用程序,它會從一個或者多個流中注入事件,經過觸發計算更新狀態,或外部動做對注入的事件做出反應。
圖 4,via Fink 官網
編程模型對比,主要是對比 flink 和 Spark Streaming 二者在代碼編寫上的區別。
Spark Streaming 與 kafka 的結合主要是兩種模型:
基於 receiver dstream;
基於 direct dstream。
以上兩種模型編程機構近似,只是在 api 和內部數據獲取有些區別,新版本的已經取消了基於 receiver 這種模式,企業中一般採用基於 direct Dstream 的模式。
val Array(brokers, topics) = args// 建立一個批處理時間是2s的context
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 使用broker和topic建立DirectStream
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
// Get the lines, split them into words, count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print() // 啓動流
ssc.start()
ssc.awaitTermination()
複製代碼
經過以上代碼咱們能夠 get 到:
設置批處理時間
建立數據流
編寫transform
編寫action
啓動執行
接下來看 flink 與 kafka 結合是如何編寫代碼的。Flink 與 kafka 結合是事件驅動,你們可能對此會有疑問,消費 kafka 的數據調用 poll 的時候是批量獲取數據的(能夠設置批處理大小和超時時間),這就不能叫作事件觸發了。而實際上,flink 內部對 poll 出來的數據進行了整理,而後逐條 emit,造成了事件觸發的機制。 下面的代碼是 flink 整合 kafka 做爲 data source 和 data sink:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// ExecutionConfig.GlobalJobParameters
env.getConfig().setGlobalJobParameters(null); DataStream<KafkaEvent> input = env
.addSource( new FlinkKafkaConsumer010<>(
parameterTool.getRequired("input-topic"), new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).setParallelism(1).rebalance()
.keyBy("word")
.map(new RollingAdditionMapper()).setParallelism(0);
input.addSink( new FlinkKafkaProducer010<>(
parameterTool.getRequired("output-topic"), new KafkaEventSchema(),
parameterTool.getProperties()));
env.execute("Kafka 0.10 Example");
複製代碼
從 Flink 與 kafka 結合的代碼能夠 get 到:
註冊數據 source
編寫運行邏輯
註冊數據 sink
調用 env.execute 相比於 Spark Streaming 少了設置批處理時間,還有一個顯著的區別是 flink 的全部算子都是 lazy 形式的,調用 env.execute 會構建 jobgraph。client 端負責 Jobgraph 生成並提交它到集羣運行;而 Spark Streaming的操做算子分 action 和 transform,其中僅有 transform 是 lazy 形式,並且 DAG 生成、stage 劃分、任務調度是在 driver 端進行的,在 client 模式下 driver 運行於客戶端處。
Spark Streaming 任務如上文提到的是基於微批處理的,實際上每一個批次都是一個 Spark Core 的任務。對於編碼完成的 Spark Core 任務在生成到最終執行結束主要包括如下幾個部分:
構建 DAG 圖;
劃分 stage;
生成 taskset;
調度 task。
具體可參考圖 5:
圖 5:Spark 任務調度
對於 job 的調度執行有 fifo 和 fair 兩種模式,Task 是根據數據本地性調度執行的。 假設每一個 Spark Streaming 任務消費的 kafka topic 有四個分區,中間有一個 transform操做(如 map)和一個 reduce 操做,如圖 6 所示:
圖 6
假設有兩個 executor,其中每一個 executor 三個核,那麼每一個批次相應的 task 運行位置是固定的嗎?是否能預測? 因爲數據本地性和調度不肯定性,每一個批次對應 kafka 分區生成的 task 運行位置並非固定的。
對於 flink 的流任務客戶端首先會生成 StreamGraph,接着生成 JobGraph,而後將 jobGraph 提交給 Jobmanager 由它完成 jobGraph 到 ExecutionGraph 的轉變,最後由 jobManager 調度執行。
如圖 7 所示有一個由 data source、MapFunction和 ReduceFunction 組成的程序,data source 和 MapFunction 的併發度都爲 4,而 ReduceFunction 的併發度爲 3。一個數據流由 Source-Map-Reduce 的順序組成,在具備 2 個TaskManager、每一個 TaskManager 都有 3 個 Task Slot 的集羣上運行。
能夠看出 flink 的拓撲生成提交執行以後,除非故障,不然拓撲部件執行位置不變,並行度由每個算子並行度決定,相似於 storm。而 spark Streaming 是每一個批次都會根據數據本地性和資源狀況進行調度,無固定的執行拓撲結構。 flink 是數據在拓撲結構裏流動執行,而 Spark Streaming 則是對數據緩存批次並行處理。
流處理程序在時間概念上總共有三個時間概念:
處理時間是指每臺機器的系統時間,當流程序採用處理時間時將使用運行各個運算符實例的機器時間。處理時間是最簡單的時間概念,不須要流和機器之間的協調,它能提供最好的性能和最低延遲。然而在分佈式和異步環境中,處理時間不能提供消息事件的時序性保證,由於它受到消息傳輸延遲,消息在算子之間流動的速度等方面制約。
事件時間是指事件在其設備上發生的時間,這個時間在事件進入 flink 以前已經嵌入事件,而後 flink 能夠提取該時間。基於事件時間進行處理的流程序能夠保證事件在處理的時候的順序性,可是基於事件時間的應用程序必需要結合 watermark 機制。基於事件時間的處理每每有必定的滯後性,由於它須要等待後續事件和處理無序事件,對於時間敏感的應用使用的時候要慎重考慮。
注入時間是事件注入到 flink 的時間。事件在 source 算子處獲取 source 的當前時間做爲事件注入時間,後續的基於時間的處理算子會使用該時間處理數據。
相比於事件時間,注入時間不可以處理無序事件或者滯後事件,可是應用程序無序指定如何生成 watermark。在內部注入時間程序的處理和事件時間相似,可是時間戳分配和 watermark 生成都是自動的。
圖 8 能夠清晰地看出三種時間的區別:
圖 8
Spark Streaming 只支持處理時間,Structured streaming 支持處理時間和事件時間,同時支持 watermark 機制處理滯後數據。
flink 支持三種時間機制:事件時間,注入時間,處理時間,同時支持 watermark 機制處理滯後數據。
對於有實時處理業務需求的企業,隨着業務增加數據量也會同步增加,將致使原有的 kafka 分區數不知足數據寫入所需的併發度,須要擴展 kafka 的分區或者增長 kafka 的 topic,這時就要求實時處理程序,如 SparkStreaming、flink 能檢測到 kafka 新增的 topic 、分區及消費新增分區的數據。
接下來結合源碼分析,Spark Streaming 和 flink 在 kafka 新增 topic 或 partition 時可否動態發現新增分區並消費處理新增分區的數據。 Spark Streaming 與 kafka 結合有兩個區別比較大的版本,如圖 9 所示是官網給出的對比數據:
圖 9
其中確認的是 Spark Streaming 與 kafka 0.8 版本結合不支持動態分區檢測,與 0.10 版本結合支持,接着經過源碼分析。
Spark Streaming 與 kafka 0.8 版本結合
*源碼分析只針對分區檢測
入口是 DirectKafkaInputDStream 的 compute:
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {// 改行代碼會計算這個job,要消費的每一個kafka分區的最大偏移
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))// 構建KafkaRDD,用指定的分區數和要消費的offset範圍
val rdd = KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) // Report the record number and metadata of this batch interval to InputInfoTracker.
val offsetRanges = currentOffsets.map { case (tp, fo) =>
val uo = untilOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo.offset)
} val description = offsetRanges.filter { offsetRange =>
// Don't display empty ranges. offsetRange.fromOffset != offsetRange.untilOffset }.map { offsetRange => s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" }.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the user val metadata = Map( "offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd) } 複製代碼
第一行就是計算獲得該批次生成 KafkaRDD 每一個分區要消費的最大 offset。 接着看 latestLeaderOffsets(maxRetries)
@tailrec protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {// 能夠看到的是用來指定獲取最大偏移分區的列表仍是隻有currentOffsets,沒有發現關於新增的分區的內容。
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) // Either.fold would confuse @tailrec, do it manually
if (o.isLeft) { val err = o.left.get.toString if (retries <= 0) { throw new SparkException(err)
} else {
logError(err) Thread.sleep(kc.config.refreshLeaderBackoffMs)
latestLeaderOffsets(retries - 1)
}
} else {
o.right.get
}
}
複製代碼
其中 protected var currentOffsets = fromOffsets,這個僅僅是在構建 DirectKafkaInputDStream 的時候初始化,並在 compute 裏面更新:
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
複製代碼
中間沒有檢測 kafka 新增 topic 或者分區的代碼,因此能夠確認 Spark Streaming 與 kafka 0.8 的版本結合不支持動態分區檢測。
入口一樣是 DirectKafkaInputDStream 的 compute 方法,撿主要的部分說,Compute 裏第一行也是計算當前 job 生成 kafkardd 要消費的每一個分區的最大 offset:
// 獲取當前生成job,要用到的KafkaRDD每一個分區最大消費偏移值
val untilOffsets = clamp(latestOffsets())
複製代碼
具體檢測 kafka 新增 topic 或者分區的代碼在 latestOffsets()
/**
* Returns the latest (highest) available offsets, taking new partitions into account. */
protected def latestOffsets(): Map[TopicPartition, Long] = { val c = consumer
paranoidPoll(c) // 獲取全部的分區信息
val parts = c.assignment().asScala // make sure new partitions are reflected in currentOffsets
// 作差獲取新增的分區信息
val newPartitions = parts.diff(currentOffsets.keySet) // position for new partitions determined by auto.offset.reset if no commit
// 新分區消費位置,沒有記錄的化是由auto.offset.reset決定
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap // don't want to consume messages, so pause c.pause(newPartitions.asJava) // find latest available offsets c.seekToEnd(currentOffsets.keySet.asJava) parts.map(tp => tp -> c.position(tp)).toMap } 複製代碼
該方法內有獲取 kafka 新增分區,並將其更新到 currentOffsets 的過程,因此能夠驗證 Spark Streaming 與 kafka 0.10 版本結合支持動態分區檢測。
入口類是 FlinkKafkaConsumerBase,該類是全部 flink 的 kafka 消費者的父類。
在 FlinkKafkaConsumerBase 的 run 方法中,建立了 kafkaFetcher,實際上就是消費者:
this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
複製代碼
接是建立了一個線程,該線程會按期檢測 kafka 新增分區,而後將其添加到 kafkaFetcher 裏。
if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) { final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>(); this.discoveryLoopThread = new Thread(new Runnable() { @Override
public void run() { try { // --------------------- partition discovery loop ---------------------
List<KafkaTopicPartition> discoveredPartitions; // throughout the loop, we always eagerly check if we are still running before
// performing the next operation, so that we can escape the loop as soon as possible
while (running) { if (LOG.isDebugEnabled()) { LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
} try {
discoveredPartitions = partitionDiscoverer.discoverPartitions();
} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) { // the partition discoverer may have been closed or woken up before or during the discovery;
// this would only happen if the consumer was canceled; simply escape the loop
break;
} // no need to add the discovered partitions if we were closed during the meantime
if (running && !discoveredPartitions.isEmpty()) {
kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
} // do not waste any time sleeping if we're not running anymore if (running && discoveryIntervalMillis != 0) { try { Thread.sleep(discoveryIntervalMillis); } catch (InterruptedException iex) { // may be interrupted if the consumer was canceled midway; simply escape the loop break; } } } } catch (Exception e) { discoveryLoopErrorRef.set(e); } finally { // calling cancel will also let the fetcher loop escape // (if not running, cancel() was already called) if (running) { cancel(); } } } }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks()); discoveryLoopThread.start(); kafkaFetcher.runFetchLoop(); 複製代碼
上面,就是 flink 動態發現 kafka 新增分區的過程。不過與 Spark 無需作任何配置不一樣的是,flink 動態發現 kafka 新增分區,這個功能須要被使能的。也很簡單,須要將 flink.partition-discovery.interval-millis 該屬性設置爲大於 0 便可。
本節內容主要是想對比二者在故障恢復及如何保證僅一次的處理語義。這個時候適合拋出一個問題:實時處理的時候,如何保證數據僅一次處理語義?
對於 Spark Streaming 任務,咱們能夠設置 checkpoint,而後假如發生故障並重啓,咱們能夠從上次 checkpoint 之處恢復,可是這個行爲只能使得數據不丟失,可能會重複處理,不能作到恰一次處理語義。
對於 Spark Streaming 與 kafka 結合的 direct Stream 能夠本身維護 offset 到 zookeeper、kafka 或任何其它外部系統,每次提交完結果以後再提交 offset,這樣故障恢復重啓能夠利用上次提交的 offset 恢復,保證數據不丟失。可是假如故障發生在提交結果以後、提交 offset 以前會致使數據屢次處理,這個時候咱們須要保證處理結果屢次輸出不影響正常的業務。
由此能夠分析,假設要保證數據恰一次處理語義,那麼結果輸出和 offset 提交必須在一個事務內完成。在這裏有如下兩種作法:
Dstream.foreachRDD(rdd=>{
rdd.repartition(1).foreachPartition(partition=>{ // 開啓事務
partition.foreach(each=>{// 提交數據
}) // 提交事務
})
})
複製代碼
也就是結果數據包含 offset。這樣提交結果和提交 offset 就是一個操做完成,不會數據丟失,也不會重複處理。故障恢復的時候能夠利用上次提交結果帶的 offset。
若要 sink 支持僅一次語義,必須以事務的方式寫數據到 Kafka,這樣當提交事務時兩次 checkpoint 間的全部寫入操做做爲一個事務被提交。這確保了出現故障或崩潰時這些寫入操做可以被回滾。
在一個分佈式且含有多個併發執行 sink 的應用中,僅僅執行單次提交或回滾是不夠的,由於全部組件都必須對這些提交或回滾達成共識,這樣才能保證獲得一致性的結果。Flink 使用兩階段提交協議以及預提交(pre-commit)階段來解決這個問題。
本例中的 Flink 應用如圖 11 所示包含如下組件:
一個source,從Kafka中讀取數據(即KafkaConsumer)
一個時間窗口化的聚會操做
一個sink,將結果寫回到Kafka(即KafkaProducer)
圖 11
下面詳細講解 flink 的兩段提交思路:
圖 12
如圖 12 所示,Flink checkpointing 開始時便進入到 pre-commit 階段。具體來講,一旦 checkpoint 開始,Flink 的 JobManager 向輸入流中寫入一個 checkpoint barrier ,將流中全部消息分割成屬於本次 checkpoint 的消息以及屬於下次 checkpoint 的,barrier 也會在操做算子間流轉。對於每一個 operator 來講,該 barrier 會觸發 operator 狀態後端爲該 operator 狀態打快照。data source 保存了 Kafka 的 offset,以後把 checkpoint barrier 傳遞到後續的 operator。
這種方式僅適用於 operator 僅有它的內部狀態。內部狀態是指 Flink state backends 保存和管理的內容(如第二個 operator 中 window 聚合算出來的 sum)。
當一個進程僅有它的內部狀態的時候,除了在 checkpoint 以前將須要將數據更改寫入到 state backend,不須要在預提交階段作其餘的動做。在 checkpoint 成功的時候,Flink 會正確的提交這些寫入,在 checkpoint 失敗的時候會終止提交,過程可見圖 13。
圖 13
當結合外部系統的時候,外部系統必需要支持可與兩階段提交協議捆綁使用的事務。顯然本例中的 sink 因爲引入了 kafka sink,所以在預提交階段 data sink 必須預提交外部事務。以下圖:
當 barrier 在全部的算子中傳遞一遍,而且觸發的快照寫入完成,預提交階段完成。全部的觸發狀態快照都被視爲 checkpoint 的一部分,也能夠說 checkpoint 是整個應用程序的狀態快照,包括預提交外部狀態。出現故障能夠從 checkpoint 恢復。下一步就是通知全部的操做算子 checkpoint 成功。該階段 jobmanager 會爲每一個 operator 發起 checkpoint 已完成的回調邏輯。
本例中 data source 和窗口操做無外部狀態,所以該階段,這兩個算子無需執行任何邏輯,可是 data sink 是有外部狀態的,所以,此時咱們必須提交外部事務,以下圖:
以上就是 flink 實現恰一次處理的基本邏輯。
消費者消費的速度低於生產者生產的速度,爲了使應用正常,消費者會反饋給生產者來調節生產者生產的速度,以使得消費者須要多少,生產者生產多少。
*back pressure 後面一概稱爲背壓。
Spark Streaming 跟 kafka 結合是存在背壓機制的,目標是根據當前 job 的處理狀況來調節後續批次的獲取 kafka 消息的條數。爲了達到這個目的,Spark Streaming 在原有的架構上加入了一個 RateController,利用的算法是 PID,須要的反饋數據是任務處理的結束時間、調度時間、處理時間、消息條數,這些數據是經過 SparkListener 體系得到,而後經過 PIDRateEsimator 的 compute 計算獲得一個速率,進而能夠計算獲得一個 offset,而後跟限速設置最大消費條數比較獲得一個最終要消費的消息最大 offset。
PIDRateEsimator 的 compute 方法以下:
def compute( time: Long, // in milliseconds
numElements: Long, processingDelay: Long, // in milliseconds
schedulingDelay: Long // in milliseconds
): Option[Double] = {
logTrace(s"\ntime = $time, # records = $numElements, " +
s"processing time = $processingDelay, scheduling delay = $schedulingDelay") this.synchronized { if (time > latestTime && numElements > 0 && processingDelay > 0) { val delaySinceUpdate = (time - latestTime).toDouble / 1000
val processingRate = numElements.toDouble / processingDelay * 1000
val error = latestRate - processingRate val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis // in elements/(second ^ 2)
val dError = (error - latestError) / delaySinceUpdate val newRate = (latestRate - proportional * error -
integral * historicalError -
derivative * dError).max(minRate)
logTrace(s""" | latestRate = $latestRate, error = $error | latestError = $latestError, historicalError = $historicalError | delaySinceUpdate = $delaySinceUpdate, dError = $dError """.stripMargin)
latestTime = time if (firstRun) {
latestRate = processingRate
latestError = 0D
firstRun = false
logTrace("First run, rate estimation skipped") None
} else {
latestRate = newRate
latestError = error
logTrace(s"New rate = $newRate") Some(newRate)
}
} else {
logTrace("Rate estimation skipped") None
}
}
}
複製代碼
與 Spark Streaming 的背壓不一樣的是,Flink 背壓是 jobmanager 針對每個 task 每 50ms 觸發 100 次 Thread.getStackTrace() 調用,求出阻塞的佔比。過程如圖 16 所示:
阻塞佔比在 web 上劃分了三個等級:
OK: 0 <= Ratio <= 0.10,表示狀態良好;
LOW: 0.10 < Ratio <= 0.5,表示有待觀察;
HIGH: 0.5 < Ratio <= 1,表示要處理了。