flink 1.1 和 storm 0.10 API 實例對比

flink 1.1 和 storm 0.10 API 實例對比

原本標題想叫 flink 和 storm API 實例對比,發現它們最近 API 都變動頻繁,
就加上了版本號,storm 在 1.0 版本後增長了 IWindowedBolt 接口,不過我尚未試用,
以後可能會補上.java

需求說明

一份 Kafka 日誌記錄着用戶在不一樣平臺的訪問時間,uuid,一條信息的例子以下apache

{
    "platform": 1,
    "time": 1470267515,
    "uuid": "ad751bb3-d0ee-c9b9-be26-2ba4570bb3fe",
}

咱們須要統計不一樣平臺天天的uv狀況,而且有以下要求.json

  1. 每秒鐘要輸出最新的統計結果bootstrap

  2. 程序永遠跑着不會停,因此要按期清理內存裏的過期數據api

  3. 收到的消息裏的時間字段並非按照順序嚴格遞增的,因此要有必定的容錯機制app

  4. 訪問uv並不必定每秒鐘都會變化,重複輸出對IO是巨大的浪費,因此要在uv變動時在一秒
    內輸出結果,未變動時不輸出框架

Storm 方案

輸入

輸入數據使用KafkaSpout, Storm 有現成的 KafkaSpout, 加上配置文件便可使用.dom

val hosts = new ZkHosts("zkhosts")
  val spoutConfig = new SpoutConfig(hosts, "sometopic", "zkroot", "xxx")
  spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme)
  spoutConfig.startOffsetTime = OffsetRequest.LatestTime
  val builder = new TopologyBuilder
  builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig))

計算結果

建立一個KeeperBolt,用Map和Set保存當前結果,一條日誌對應的Set要經過日誌時間對應
的日期和平臺取得ide

class KeeperBolt extends BaseBasicBolt {
    val map = mutable.Map.empty[LocalDate, mutable.Map[Int, mutable.Set[Int]]]
    var lastSize = Map.empty[LocalDate, Map[Int, Int]]

    override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
      input.getSourceStreamId match {
        case "default" =>
          val jsObj = Json.parse(input.getString(0))
          val time = (jsObj \ "time").as[Int]
          val date = Instant.ofEpochSecond(time).atZone(ZoneId.systemDefault()).toLocalDate
          val platform = (jsObj \ "platform").as[Int]
          val uuid = (jsObj \ "uuid").as[String]
          val datemap = map.getOrElseUpdate(date, mutable.Map.empty)
          val set = datemap.getOrElseUpdate(platform, mutable.Set.empty)
          set.add(uuid.hashCode)

按期清理舊數據&輸出當前結果

Storm處理這方面的內容比較麻煩,爲了避免破壞每一個Storm Task單線程的特性,咱們不該該直接
在Bolt裏起一個Timer作這件事情,不然就要加上沒必要要的鎖,一來影響性能和增長程序的複雜度,
二來自創線程也會影響storm對Topology運行狀態/性能的評估,因此須要經過另外創建Spout的
方式發送消息讓Bolt可以收到清空消息的做用.工具

class CommandSpout extends BaseRichSpout {
    var collecor: SpoutOutputCollector = _

    var commands = Vector.empty[String]

    override def nextTuple(): Unit = {
      commands.synchronized {
        while (commands.nonEmpty) {
          println("emit: " + commands.head)
          collecor.emit("command", new Values(commands.head), Random.nextInt())
          commands = commands.tail
        }
      }
    }

    override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
      this.collecor = collector
      val timer = new Timer
      timer.scheduleAtFixedRate(new TimerTask {
        override def run(): Unit = {
          commands.synchronized {
            commands +:= "output"
          }
        }
      }, 1000, 1000)
      timer.scheduleAtFixedRate(new TimerTask {
        override def run(): Unit = {
          commands.synchronized {
            commands +:= "retain"
          }
        }
      }, 86400, 86400)
    }

    override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
      declarer.declareStream("command", new Fields("command"))
    }
  }

KeeperBolt收到消息之後,要執行相應的動做,包括輸出和清理舊數據.
輸出方面:
須要單獨創建一個LastSize類維持以前輸出時候的結果,將現有的map中每一個uv的數量
和上次輸出的uv比較,只輸出發生變化的內容.
清理舊數據:
KeeperBolt增添了一個變量叫作beginDate,當beginDate每日更新之後,
一方面將map中日期早於beginDate的數據刪除,
另外一方面,以後接收消息的過程當中會作過濾.

class KeeperBolt(var beginDate: LocalDate = LocalDate.now().minusDays(2)) extends BaseBasicBolt {
    val map = mutable.Map.empty[LocalDate, mutable.Map[Int, mutable.Set[Int]]]
    var lastSize = Map.empty[LocalDate, Map[Int, Int]]

    override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
      input.getSourceStreamId match {
        case "default" =>
             val jsObj = Json.parse(input.getString(0))
             val time = (jsObj \ "time").as[Int]
             val date = Instant.ofEpochSecond(time).atZone(ZoneId.systemDefault()).toLocalDate
             if (date.compareTo(beginDate) > 0) {
             val platform = (jsObj \ "platform").as[Int]
             val uuid = (jsObj \ "uuid").as[String]
             val datemap = map.getOrElseUpdate(date, mutable.Map.empty)
             val set = datemap.getOrElseUpdate(platform, mutable.Set.empty)
             set.add(uuid.hashCode)
           }
        case "command" =>
          val command = input.getString(0) match {
            case "output" =>
              val currSize: Map[LocalDate, Map[Int, Int]] = map.map {
                case (key, submap) => (key, submap.map {
                  case (platform, set) => (platform, set.size)
                }.toMap)
              }.toMap
              println("currSize = " + currSize)
              println("lastSize = " + lastSize)
              for ((date, subMap) <- currSize; (platform, uv) <- subMap) {
                lastSize.get(date).flatMap(_.get(platform)) match {
                  case Some(lastuv) if lastuv == uv => //不作任何事
                  case _ =>
                    println("updated date,platform,uv = " + (date, platform, uv))
                }
              }
              lastSize = currSize
            case "retain" =>
              beginDate = LocalDate.now().minusDays(2)

          }
      }
    }

構建 Topology

每一個節點工做完成之後,咱們須要編碼把節點鏈接起來.
總共用到兩個Spout和一個Bolt,兩個Spout分別負責輸出日誌和發送命令,
一個Bolt存儲數據和訂閱兩個Spout

val builder = new TopologyBuilder
    builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig))
    builder.setSpout("command-spout", new CommandSpout)
    builder.setBolt("keeper", new KeeperBolt)
      .allGrouping("command-spout", "command")
      .localOrShuffleGrouping("kafka-spout")
    val conf = new Config
    conf.setNumWorkers(1)
    val topo = builder.createTopology()
    val cluster = new LocalCluster()
    StormSubmitter.submitTopologyWithProgressBar("Boom", conf, topo)

這樣一個計算天天不一樣平臺uv輸出到,定時清理舊數據,永遠運行的應用就完成了.
完整代碼

import java.time.{Instant, LocalDate, ZoneId}
import java.util
import java.util.{Timer, TimerTask}

import kafka.api.OffsetRequest
import org.apache.storm.kafka.{KafkaSpout, SpoutConfig, StringScheme, ZkHosts}
import org.apache.storm.spout.{SchemeAsMultiScheme, SpoutOutputCollector}
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.base.{BaseBasicBolt, BaseRichSpout}
import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer, TopologyBuilder}
import org.apache.storm.tuple.{Fields, Tuple, Values}
import org.apache.storm.{Config, LocalCluster, StormSubmitter}
import play.api.libs.json.Json

import scala.collection.mutable
import scala.util.Random

object StormMain {

  class CommandSpout extends BaseRichSpout {
    var collecor: SpoutOutputCollector = _

    var commands = Vector.empty[String]

    override def nextTuple(): Unit = {
      commands.synchronized {
        while (commands.nonEmpty) {
          println("emit: " + commands.head)
          collecor.emit("command", new Values(commands.head), Random.nextInt())
          commands = commands.tail
        }
      }
    }

    override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
      this.collecor = collector
      val timer = new Timer
      timer.scheduleAtFixedRate(new TimerTask {
        override def run(): Unit = {
          commands.synchronized {
            commands +:= "output"
          }
        }
      }, 1000, 1000)
      timer.scheduleAtFixedRate(new TimerTask {
        override def run(): Unit = {
          commands.synchronized {
            commands +:= "retain"
          }
        }
      }, 86400, 86400)
    }

    override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
      declarer.declareStream("command", new Fields("command"))
    }
  }

  class KeeperBolt(var beginDate: LocalDate = LocalDate.now().minusDays(2)) extends BaseBasicBolt {
    val map = mutable.Map.empty[LocalDate, mutable.Map[Int, mutable.Set[Int]]]
    var lastSize = Map.empty[LocalDate, Map[Int, Int]]

    override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
      input.getSourceStreamId match {
        case "default" =>
          val jsObj = Json.parse(input.getString(0))
          val time = (jsObj \ "time").as[Int]
          val date = Instant.ofEpochSecond(time).atZone(ZoneId.systemDefault()).toLocalDate
          if (date.compareTo(beginDate) > 0) {
            val platform = (jsObj \ "platform").as[Int]
            val uuid = (jsObj \ "uuid").as[String]
            val datemap = map.getOrElseUpdate(date, mutable.Map.empty)
            val set = datemap.getOrElseUpdate(platform, mutable.Set.empty)
            set.add(uuid.hashCode)
          }
        case "command" =>
          val command = input.getString(0) match {
            case "output" =>
              val currSize: Map[LocalDate, Map[Int, Int]] = map.map {
                case (key, submap) => (key, submap.map {
                  case (platform, set) => (platform, set.size)
                }.toMap)
              }.toMap
              println("currSize = " + currSize)
              println("lastSize = " + lastSize)
              for ((date, subMap) <- currSize; (platform, uv) <- subMap) {
                lastSize.get(date).flatMap(_.get(platform)) match {
                  case Some(lastuv) if lastuv == uv => //不作任何事
                  case _ =>
                    println("updated date,platform,uv = " + (date, platform, uv))
                }
              }
              lastSize = currSize
            case "retain" =>
              beginDate = LocalDate.now().minusDays(2)

          }
      }
    }

    override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {

    }
  }

  def main(args: Array[String]): Unit = {
    val hosts = new ZkHosts("xxx")
    val spoutConfig = new SpoutConfig(hosts, "xxx", "/xxx", "xxx")
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme)
    spoutConfig.startOffsetTime = OffsetRequest.LatestTime
    val builder = new TopologyBuilder
    builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig))
    builder.setSpout("command-spout", new CommandSpout)
    builder.setBolt("keeper", new KeeperBolt)
      .allGrouping("command-spout", "command")
      .localOrShuffleGrouping("kafka-spout")
    val conf = new Config
    conf.setNumWorkers(1)
    val topo = builder.createTopology()
    val cluster = new LocalCluster()
    StormSubmitter.submitTopologyWithProgressBar("Boom", conf, topo)
  }
}

潛在問題

  1. Storm方案的API很是底層,像定時任務這樣的事情就須要單獨創建一個Spout,代碼體量已經與
    核心業務邏輯至關,很是繁瑣.

  2. 將數據按照時間進行隔離(或者說滑動窗口)功能也須要和業務邏輯交織在一塊兒,
    當須要更改程序的時候人須要多花一些經歷找到想更改的代碼片斷.

  3. 每一個節點作什麼事情,節點間如何鏈接都須要人爲指定.當一個陌生人剛接手代碼的時候,
    他很難經過讀一小段核心代碼就瞭解大概,須要通讀上下文,在不一樣類之間屢次跳轉才能理解.

Flink 方案

Flink針對以上狀況做出了改進, API 的抽象程度進行了提升,而且針對常見的使用場景提供了
不少實用工具,下面就是 Flink 實現這個需求的例子,可配合官方文檔閱讀.

輸入

和 Storm 同樣,Flink 標配了 Kafka 客戶端,配置上參數就能使用了

val properties = new Properties()
     properties.setProperty("bootstrap.servers", "xxx")
     properties.setProperty("zookeeper.connect", "xxx")
     properties.setProperty("group.id", "xxx")
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setBufferTimeout(100)
     val consumer = new FlinkKafkaConsumer08[JsObject]("order_refer_v6.1", new DeserializationSchema[JsObject] {
       override def isEndOfStream(t: JsObject): Boolean = false
 
       override def deserialize(bytes: Array[Byte]): JsObject = {
         Json.parse(new String(bytes)).as[JsObject]
       }
 
       override def getProducedType: TypeInformation[JsObject] = TypeInformation.of(classOf[JsObject])
     }, properties)

按期清理舊數據和定時輸出

Flink提供了Window和Watermark兩個工具,能夠將定時需求和業務邏輯隔離開來,
因此咱們先講這個.

Watermark:
Watermark 是 Flink中用來記錄數據進度的工具,一旦 Watermark 超過設定的某個時間
窗口尾端了, Flink 就認爲一個時間窗口已通過時了,不該該再被改變,就會將時間窗口
對應的內容從內存中剔除,達到一個新陳代謝的做用.
在下面的代碼例子中,因爲內存比較寬裕,日誌的亂序程度又可能比較大,我並非用日誌中的
時間做爲生成Watermark的依據,而是使用系統時間,將三天內的數據全都保留在內存中,固然
timestamp仍是從日誌獲得的,由於timestamp要做爲區分Window的依據,若是用系統時間來
生成timestamp就會產生由於程序啓動時間不一樣而產生不一樣結果的問題了.

consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[JsObject] {

      override def getCurrentWatermark: Watermark = {
        new Watermark(System.currentTimeMillis() - 86400l * 1000 * 3)
      }

      override def extractTimestamp(t: JsObject, l: Long): Long = {
        (t \ "time").as[Long] * 1000
      }
    })

Window:
Watermark設置完成後,使用Window功能很是簡單,加兩句話就能夠了

env.addSource(consumer)
  .keyBy(_.\("platform").as[Int])
  .timeWindow(Time.days(1))
  .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))

timeWindow(Time.days(1)) 表示把數據按天進行分隔,後面還要帶一個
trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
是由於,雖然咱們關心的是天天的總結果,可是不但願在一天結束的時候纔看到這個結果,
而是每秒鐘都能得到最新結果的輸出.

業務邏輯

窗口的事情處理完了,如今能夠真的開始處理業務邏輯了,Flink中的數據流通過Window處理後
叫作WindowedStream,WindowedStream有一個fold方法,能夠將一個Window中的數據進行
聚合產生新的數據,可是除此以外,咱們還想知道數據所在窗口的起始時間和結束時間,
這就須要用到apply方法,這至關於fold方法的一個擴展,在進行聚合的同時還能得到窗口信息.

env.addSource(consumer)
        .keyBy(_.\("platform").as[Int])
        .timeWindow(Time.days(1))
        .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
        .applyWith((0, Set.empty[Int], 0l, 0l))(
          foldFunction = {
            case ((_, set, _, 0), jsObj) =>
              val platform = (jsObj \ "platform").as[Int]
              val uuid = (jsObj \ "uuid").as[String]
              (platform, set + uuid.hashCode, 0, 0)
          },
          windowFunction = {
            case (key, window, results) =>
              results.map {
                case (platform, set, _, _) =>
                  (platform, set, window.getStart, window.getEnd)
              }
          }
        )

如今咱們擁有了最新的 (平臺,集合,窗口起始時間,窗口結束時間),可是一方面咱們不須要集合
的內容,只須要了解它最新的大小信息,另外一方面集合的大小未必每秒都會改變,爲了減小沒必要要
的輸出,咱們要在連續兩條相同的日誌裏去掉一條,通常集合的map/flatMap方法作不到這一點,
可是Flink額外提供了一個flatMapWithState方法,能夠根據數據流中以前的數據執行不一樣的
邏輯,下面就是對數據流中的重複數據進行去重的例子.

env.addSource(consumer)
        .keyBy(_.\("platform").as[Int])
        .timeWindow(Time.days(1))
        .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
        .applyWith((0, Set.empty[Int], 0l, 0l))(
          foldFunction = {
            case ((_, set, _, 0), jsObj) =>
              val platform = (jsObj \ "platform").as[Int]
              val uuid = (jsObj \ "uuid").as[String]
              (platform, set + uuid.hashCode, 0, 0)
          },
          windowFunction = {
            case (key, window, results) =>
              results.map {
                case (platform, set, _, _) =>
                  (platform, set, window.getStart, window.getEnd)
              }
          }
        )
        .mapWith {
          case (key, set, windowStart, windowEnd) =>
            (key, set.size, windowStart, windowEnd)
        }
        .keyBy(0)
        .flatMapWithState[(Int, Int, Long, Long), Int] {
        case ((key, num, begin, end), curr) =>
          curr match {
            case Some(numCurr) if numCurr == num =>
              (Seq.empty, Some(num)) //若是以前已經有相同的數據,則返回空結果
            case _ =>
              (Seq((key, num, begin, end)), Some(num))
          }
      }

這樣一來咱們就能夠作到,數據不更新時不輸出結果,數據更新時最快速度輸出最新結果.

完整代碼

import java.util.Properties

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import play.api.libs.json.{JsObject, Json}

object FlinkMain {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "xxx")
    properties.setProperty("zookeeper.connect", "xxx")
    properties.setProperty("group.id", "xxx")

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setBufferTimeout(100)
    val consumer = new FlinkKafkaConsumer08[JsObject]("order_refer_v6.1", new DeserializationSchema[JsObject] {
      override def isEndOfStream(t: JsObject): Boolean = false

      override def deserialize(bytes: Array[Byte]): JsObject = {
        Json.parse(new String(bytes)).as[JsObject]
      }

      override def getProducedType: TypeInformation[JsObject] = TypeInformation.of(classOf[JsObject])
    }, properties)

    consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[JsObject] {

      override def getCurrentWatermark: Watermark = {
        new Watermark(System.currentTimeMillis() - 86400l * 1000 * 3)
      }

      override def extractTimestamp(t: JsObject, l: Long): Long = {
        (t \ "time").as[Long] * 1000
      }
    })
    val stream =
      env.addSource(consumer)
        .keyBy(_.\("platform").as[Int])
        .timeWindow(Time.days(1))
        .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
        .applyWith((0, Set.empty[Int], 0l, 0l))(
          foldFunction = {
            case ((_, set, _, 0), jsObj) =>
              val platform = (jsObj \ "platform").as[Int]
              val uuid = (jsObj \ "uuid").as[String]
              (platform, set + uuid.hashCode, 0, 0)
          },
          windowFunction = {
            case (key, window, results) =>
              results.map {
                case (platform, set, _, _) =>
                  (platform, set, window.getStart, window.getEnd)
              }
          }
        )
        .mapWith {
          case (key, set, windowStart, windowEnd) =>
            (key, set.size, windowStart, windowEnd)
        }
        .keyBy(0)
        .flatMapWithState[(Int, Int, Long, Long), Int] {
        case ((key, num, begin, end), curr) =>
          curr match {
            case Some(numCurr) if numCurr == num =>
              (Seq.empty, Some(num)) //若是以前已經有相同的數據,則返回空結果
            case _ =>
              (Seq((key, num, begin, end)), Some(num))
          }
      }
    stream.print()
    env.execute("Boom")
  }
}

和 Storm API 比較

從"讓人類更輕鬆"這個角度而言,Flink API 很好地優化了我上面提到的幾個問題,固然與此同時
你也失去了更精細化控制一些東西的能力,不過我認爲就大部分的平常工做而言,讓人類更輕鬆纔是
咱們應該追求的目標,利用精細控制來特別優化程序,極可能只在少數很是重要的業務上

其實可能也並無差那麼多

Storm最近的版本增長了WindowedBolt和Watermark功能,若是配合Storm一直有的Trident API
的話,應該是能夠用很接近Flink API的方式寫出同樣的邏輯的,雖然Trident API不明緣由地
彷佛一直不怎麼流行.

事實上,我以爲主流的流計算框架API都會變得愈來愈像.甚至大部分不一樣的框架會支持同一套API,
好比如今 Apache Beam
就同時有spark和flink的實現,我以爲最後的發展,仍是要看每一個框架的砸錢社區發展作得好很差.

相關文章
相關標籤/搜索