原本標題想叫 flink 和 storm API 實例對比,發現它們最近 API 都變動頻繁,
就加上了版本號,storm 在 1.0 版本後增長了 IWindowedBolt 接口,不過我尚未試用,
以後可能會補上.java
一份 Kafka 日誌記錄着用戶在不一樣平臺的訪問時間,uuid,一條信息的例子以下apache
{ "platform": 1, "time": 1470267515, "uuid": "ad751bb3-d0ee-c9b9-be26-2ba4570bb3fe", }
咱們須要統計不一樣平臺天天的uv狀況,而且有以下要求.json
每秒鐘要輸出最新的統計結果bootstrap
程序永遠跑着不會停,因此要按期清理內存裏的過期數據api
收到的消息裏的時間字段並非按照順序嚴格遞增的,因此要有必定的容錯機制app
訪問uv並不必定每秒鐘都會變化,重複輸出對IO是巨大的浪費,因此要在uv變動時在一秒
內輸出結果,未變動時不輸出框架
輸入數據使用KafkaSpout, Storm 有現成的 KafkaSpout, 加上配置文件便可使用.dom
val hosts = new ZkHosts("zkhosts") val spoutConfig = new SpoutConfig(hosts, "sometopic", "zkroot", "xxx") spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme) spoutConfig.startOffsetTime = OffsetRequest.LatestTime val builder = new TopologyBuilder builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig))
建立一個KeeperBolt,用Map和Set保存當前結果,一條日誌對應的Set要經過日誌時間對應
的日期和平臺取得ide
class KeeperBolt extends BaseBasicBolt { val map = mutable.Map.empty[LocalDate, mutable.Map[Int, mutable.Set[Int]]] var lastSize = Map.empty[LocalDate, Map[Int, Int]] override def execute(input: Tuple, collector: BasicOutputCollector): Unit = { input.getSourceStreamId match { case "default" => val jsObj = Json.parse(input.getString(0)) val time = (jsObj \ "time").as[Int] val date = Instant.ofEpochSecond(time).atZone(ZoneId.systemDefault()).toLocalDate val platform = (jsObj \ "platform").as[Int] val uuid = (jsObj \ "uuid").as[String] val datemap = map.getOrElseUpdate(date, mutable.Map.empty) val set = datemap.getOrElseUpdate(platform, mutable.Set.empty) set.add(uuid.hashCode)
Storm處理這方面的內容比較麻煩,爲了避免破壞每一個Storm Task單線程的特性,咱們不該該直接
在Bolt裏起一個Timer作這件事情,不然就要加上沒必要要的鎖,一來影響性能和增長程序的複雜度,
二來自創線程也會影響storm對Topology運行狀態/性能的評估,因此須要經過另外創建Spout的
方式發送消息讓Bolt可以收到清空消息的做用.工具
class CommandSpout extends BaseRichSpout { var collecor: SpoutOutputCollector = _ var commands = Vector.empty[String] override def nextTuple(): Unit = { commands.synchronized { while (commands.nonEmpty) { println("emit: " + commands.head) collecor.emit("command", new Values(commands.head), Random.nextInt()) commands = commands.tail } } } override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = { this.collecor = collector val timer = new Timer timer.scheduleAtFixedRate(new TimerTask { override def run(): Unit = { commands.synchronized { commands +:= "output" } } }, 1000, 1000) timer.scheduleAtFixedRate(new TimerTask { override def run(): Unit = { commands.synchronized { commands +:= "retain" } } }, 86400, 86400) } override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { declarer.declareStream("command", new Fields("command")) } }
KeeperBolt收到消息之後,要執行相應的動做,包括輸出和清理舊數據.
輸出方面:
須要單獨創建一個LastSize類維持以前輸出時候的結果,將現有的map中每一個uv的數量
和上次輸出的uv比較,只輸出發生變化的內容.
清理舊數據:
KeeperBolt增添了一個變量叫作beginDate,當beginDate每日更新之後,
一方面將map中日期早於beginDate的數據刪除,
另外一方面,以後接收消息的過程當中會作過濾.
class KeeperBolt(var beginDate: LocalDate = LocalDate.now().minusDays(2)) extends BaseBasicBolt { val map = mutable.Map.empty[LocalDate, mutable.Map[Int, mutable.Set[Int]]] var lastSize = Map.empty[LocalDate, Map[Int, Int]] override def execute(input: Tuple, collector: BasicOutputCollector): Unit = { input.getSourceStreamId match { case "default" => val jsObj = Json.parse(input.getString(0)) val time = (jsObj \ "time").as[Int] val date = Instant.ofEpochSecond(time).atZone(ZoneId.systemDefault()).toLocalDate if (date.compareTo(beginDate) > 0) { val platform = (jsObj \ "platform").as[Int] val uuid = (jsObj \ "uuid").as[String] val datemap = map.getOrElseUpdate(date, mutable.Map.empty) val set = datemap.getOrElseUpdate(platform, mutable.Set.empty) set.add(uuid.hashCode) } case "command" => val command = input.getString(0) match { case "output" => val currSize: Map[LocalDate, Map[Int, Int]] = map.map { case (key, submap) => (key, submap.map { case (platform, set) => (platform, set.size) }.toMap) }.toMap println("currSize = " + currSize) println("lastSize = " + lastSize) for ((date, subMap) <- currSize; (platform, uv) <- subMap) { lastSize.get(date).flatMap(_.get(platform)) match { case Some(lastuv) if lastuv == uv => //不作任何事 case _ => println("updated date,platform,uv = " + (date, platform, uv)) } } lastSize = currSize case "retain" => beginDate = LocalDate.now().minusDays(2) } } }
每一個節點工做完成之後,咱們須要編碼把節點鏈接起來.
總共用到兩個Spout和一個Bolt,兩個Spout分別負責輸出日誌和發送命令,
一個Bolt存儲數據和訂閱兩個Spout
val builder = new TopologyBuilder builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig)) builder.setSpout("command-spout", new CommandSpout) builder.setBolt("keeper", new KeeperBolt) .allGrouping("command-spout", "command") .localOrShuffleGrouping("kafka-spout") val conf = new Config conf.setNumWorkers(1) val topo = builder.createTopology() val cluster = new LocalCluster() StormSubmitter.submitTopologyWithProgressBar("Boom", conf, topo)
這樣一個計算天天不一樣平臺uv輸出到,定時清理舊數據,永遠運行的應用就完成了.
完整代碼
import java.time.{Instant, LocalDate, ZoneId} import java.util import java.util.{Timer, TimerTask} import kafka.api.OffsetRequest import org.apache.storm.kafka.{KafkaSpout, SpoutConfig, StringScheme, ZkHosts} import org.apache.storm.spout.{SchemeAsMultiScheme, SpoutOutputCollector} import org.apache.storm.task.TopologyContext import org.apache.storm.topology.base.{BaseBasicBolt, BaseRichSpout} import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer, TopologyBuilder} import org.apache.storm.tuple.{Fields, Tuple, Values} import org.apache.storm.{Config, LocalCluster, StormSubmitter} import play.api.libs.json.Json import scala.collection.mutable import scala.util.Random object StormMain { class CommandSpout extends BaseRichSpout { var collecor: SpoutOutputCollector = _ var commands = Vector.empty[String] override def nextTuple(): Unit = { commands.synchronized { while (commands.nonEmpty) { println("emit: " + commands.head) collecor.emit("command", new Values(commands.head), Random.nextInt()) commands = commands.tail } } } override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = { this.collecor = collector val timer = new Timer timer.scheduleAtFixedRate(new TimerTask { override def run(): Unit = { commands.synchronized { commands +:= "output" } } }, 1000, 1000) timer.scheduleAtFixedRate(new TimerTask { override def run(): Unit = { commands.synchronized { commands +:= "retain" } } }, 86400, 86400) } override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { declarer.declareStream("command", new Fields("command")) } } class KeeperBolt(var beginDate: LocalDate = LocalDate.now().minusDays(2)) extends BaseBasicBolt { val map = mutable.Map.empty[LocalDate, mutable.Map[Int, mutable.Set[Int]]] var lastSize = Map.empty[LocalDate, Map[Int, Int]] override def execute(input: Tuple, collector: BasicOutputCollector): Unit = { input.getSourceStreamId match { case "default" => val jsObj = Json.parse(input.getString(0)) val time = (jsObj \ "time").as[Int] val date = Instant.ofEpochSecond(time).atZone(ZoneId.systemDefault()).toLocalDate if (date.compareTo(beginDate) > 0) { val platform = (jsObj \ "platform").as[Int] val uuid = (jsObj \ "uuid").as[String] val datemap = map.getOrElseUpdate(date, mutable.Map.empty) val set = datemap.getOrElseUpdate(platform, mutable.Set.empty) set.add(uuid.hashCode) } case "command" => val command = input.getString(0) match { case "output" => val currSize: Map[LocalDate, Map[Int, Int]] = map.map { case (key, submap) => (key, submap.map { case (platform, set) => (platform, set.size) }.toMap) }.toMap println("currSize = " + currSize) println("lastSize = " + lastSize) for ((date, subMap) <- currSize; (platform, uv) <- subMap) { lastSize.get(date).flatMap(_.get(platform)) match { case Some(lastuv) if lastuv == uv => //不作任何事 case _ => println("updated date,platform,uv = " + (date, platform, uv)) } } lastSize = currSize case "retain" => beginDate = LocalDate.now().minusDays(2) } } } override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { } } def main(args: Array[String]): Unit = { val hosts = new ZkHosts("xxx") val spoutConfig = new SpoutConfig(hosts, "xxx", "/xxx", "xxx") spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme) spoutConfig.startOffsetTime = OffsetRequest.LatestTime val builder = new TopologyBuilder builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig)) builder.setSpout("command-spout", new CommandSpout) builder.setBolt("keeper", new KeeperBolt) .allGrouping("command-spout", "command") .localOrShuffleGrouping("kafka-spout") val conf = new Config conf.setNumWorkers(1) val topo = builder.createTopology() val cluster = new LocalCluster() StormSubmitter.submitTopologyWithProgressBar("Boom", conf, topo) } }
Storm方案的API很是底層,像定時任務這樣的事情就須要單獨創建一個Spout,代碼體量已經與
核心業務邏輯至關,很是繁瑣.
將數據按照時間進行隔離(或者說滑動窗口)功能也須要和業務邏輯交織在一塊兒,
當須要更改程序的時候人須要多花一些經歷找到想更改的代碼片斷.
每一個節點作什麼事情,節點間如何鏈接都須要人爲指定.當一個陌生人剛接手代碼的時候,
他很難經過讀一小段核心代碼就瞭解大概,須要通讀上下文,在不一樣類之間屢次跳轉才能理解.
Flink針對以上狀況做出了改進, API 的抽象程度進行了提升,而且針對常見的使用場景提供了
不少實用工具,下面就是 Flink 實現這個需求的例子,可配合官方文檔閱讀.
和 Storm 同樣,Flink 標配了 Kafka 客戶端,配置上參數就能使用了
val properties = new Properties() properties.setProperty("bootstrap.servers", "xxx") properties.setProperty("zookeeper.connect", "xxx") properties.setProperty("group.id", "xxx") val env = StreamExecutionEnvironment.getExecutionEnvironment env.setBufferTimeout(100) val consumer = new FlinkKafkaConsumer08[JsObject]("order_refer_v6.1", new DeserializationSchema[JsObject] { override def isEndOfStream(t: JsObject): Boolean = false override def deserialize(bytes: Array[Byte]): JsObject = { Json.parse(new String(bytes)).as[JsObject] } override def getProducedType: TypeInformation[JsObject] = TypeInformation.of(classOf[JsObject]) }, properties)
Flink提供了Window和Watermark兩個工具,能夠將定時需求和業務邏輯隔離開來,
因此咱們先講這個.
Watermark:
Watermark 是 Flink中用來記錄數據進度的工具,一旦 Watermark 超過設定的某個時間
窗口尾端了, Flink 就認爲一個時間窗口已通過時了,不該該再被改變,就會將時間窗口
對應的內容從內存中剔除,達到一個新陳代謝的做用.
在下面的代碼例子中,因爲內存比較寬裕,日誌的亂序程度又可能比較大,我並非用日誌中的
時間做爲生成Watermark的依據,而是使用系統時間,將三天內的數據全都保留在內存中,固然
timestamp仍是從日誌獲得的,由於timestamp要做爲區分Window的依據,若是用系統時間來
生成timestamp就會產生由於程序啓動時間不一樣而產生不一樣結果的問題了.
consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[JsObject] { override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis() - 86400l * 1000 * 3) } override def extractTimestamp(t: JsObject, l: Long): Long = { (t \ "time").as[Long] * 1000 } })
Window:
Watermark設置完成後,使用Window功能很是簡單,加兩句話就能夠了
env.addSource(consumer) .keyBy(_.\("platform").as[Int]) .timeWindow(Time.days(1)) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
timeWindow(Time.days(1))
表示把數據按天進行分隔,後面還要帶一個trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
是由於,雖然咱們關心的是天天的總結果,可是不但願在一天結束的時候纔看到這個結果,
而是每秒鐘都能得到最新結果的輸出.
窗口的事情處理完了,如今能夠真的開始處理業務邏輯了,Flink中的數據流通過Window處理後
叫作WindowedStream,WindowedStream有一個fold方法,能夠將一個Window中的數據進行
聚合產生新的數據,可是除此以外,咱們還想知道數據所在窗口的起始時間和結束時間,
這就須要用到apply方法,這至關於fold方法的一個擴展,在進行聚合的同時還能得到窗口信息.
env.addSource(consumer) .keyBy(_.\("platform").as[Int]) .timeWindow(Time.days(1)) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) .applyWith((0, Set.empty[Int], 0l, 0l))( foldFunction = { case ((_, set, _, 0), jsObj) => val platform = (jsObj \ "platform").as[Int] val uuid = (jsObj \ "uuid").as[String] (platform, set + uuid.hashCode, 0, 0) }, windowFunction = { case (key, window, results) => results.map { case (platform, set, _, _) => (platform, set, window.getStart, window.getEnd) } } )
如今咱們擁有了最新的 (平臺,集合,窗口起始時間,窗口結束時間),可是一方面咱們不須要集合
的內容,只須要了解它最新的大小信息,另外一方面集合的大小未必每秒都會改變,爲了減小沒必要要
的輸出,咱們要在連續兩條相同的日誌裏去掉一條,通常集合的map/flatMap方法作不到這一點,
可是Flink額外提供了一個flatMapWithState方法,能夠根據數據流中以前的數據執行不一樣的
邏輯,下面就是對數據流中的重複數據進行去重的例子.
env.addSource(consumer) .keyBy(_.\("platform").as[Int]) .timeWindow(Time.days(1)) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) .applyWith((0, Set.empty[Int], 0l, 0l))( foldFunction = { case ((_, set, _, 0), jsObj) => val platform = (jsObj \ "platform").as[Int] val uuid = (jsObj \ "uuid").as[String] (platform, set + uuid.hashCode, 0, 0) }, windowFunction = { case (key, window, results) => results.map { case (platform, set, _, _) => (platform, set, window.getStart, window.getEnd) } } ) .mapWith { case (key, set, windowStart, windowEnd) => (key, set.size, windowStart, windowEnd) } .keyBy(0) .flatMapWithState[(Int, Int, Long, Long), Int] { case ((key, num, begin, end), curr) => curr match { case Some(numCurr) if numCurr == num => (Seq.empty, Some(num)) //若是以前已經有相同的數據,則返回空結果 case _ => (Seq((key, num, begin, end)), Some(num)) } }
這樣一來咱們就能夠作到,數據不更新時不輸出結果,數據更新時最快速度輸出最新結果.
import java.util.Properties import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.extensions._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 import org.apache.flink.streaming.util.serialization.DeserializationSchema import play.api.libs.json.{JsObject, Json} object FlinkMain { def main(args: Array[String]): Unit = { val properties = new Properties() properties.setProperty("bootstrap.servers", "xxx") properties.setProperty("zookeeper.connect", "xxx") properties.setProperty("group.id", "xxx") val env = StreamExecutionEnvironment.getExecutionEnvironment env.setBufferTimeout(100) val consumer = new FlinkKafkaConsumer08[JsObject]("order_refer_v6.1", new DeserializationSchema[JsObject] { override def isEndOfStream(t: JsObject): Boolean = false override def deserialize(bytes: Array[Byte]): JsObject = { Json.parse(new String(bytes)).as[JsObject] } override def getProducedType: TypeInformation[JsObject] = TypeInformation.of(classOf[JsObject]) }, properties) consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[JsObject] { override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis() - 86400l * 1000 * 3) } override def extractTimestamp(t: JsObject, l: Long): Long = { (t \ "time").as[Long] * 1000 } }) val stream = env.addSource(consumer) .keyBy(_.\("platform").as[Int]) .timeWindow(Time.days(1)) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) .applyWith((0, Set.empty[Int], 0l, 0l))( foldFunction = { case ((_, set, _, 0), jsObj) => val platform = (jsObj \ "platform").as[Int] val uuid = (jsObj \ "uuid").as[String] (platform, set + uuid.hashCode, 0, 0) }, windowFunction = { case (key, window, results) => results.map { case (platform, set, _, _) => (platform, set, window.getStart, window.getEnd) } } ) .mapWith { case (key, set, windowStart, windowEnd) => (key, set.size, windowStart, windowEnd) } .keyBy(0) .flatMapWithState[(Int, Int, Long, Long), Int] { case ((key, num, begin, end), curr) => curr match { case Some(numCurr) if numCurr == num => (Seq.empty, Some(num)) //若是以前已經有相同的數據,則返回空結果 case _ => (Seq((key, num, begin, end)), Some(num)) } } stream.print() env.execute("Boom") } }
從"讓人類更輕鬆"這個角度而言,Flink API 很好地優化了我上面提到的幾個問題,固然與此同時
你也失去了更精細化控制一些東西的能力,不過我認爲就大部分的平常工做而言,讓人類更輕鬆纔是
咱們應該追求的目標,利用精細控制來特別優化程序,極可能只在少數很是重要的業務上
Storm最近的版本增長了WindowedBolt和Watermark功能,若是配合Storm一直有的Trident API
的話,應該是能夠用很接近Flink API的方式寫出同樣的邏輯的,雖然Trident API不明緣由地
彷佛一直不怎麼流行.
事實上,我以爲主流的流計算框架API都會變得愈來愈像.甚至大部分不一樣的框架會支持同一套API,
好比如今 Apache Beam
就同時有spark和flink的實現,我以爲最後的發展,仍是要看每一個框架的砸錢社區發展作得好很差.