Akka實戰:分散、聚合模式

分散與聚合:簡單說就是一個任務須要拆分紅多個小任務,每一個小任務執行完後再把結果聚合在一塊兒返回。git

代碼 http://git.oschina.net/yangbajing/akka-actiongithub

分散與聚合圖

實例背景

本實例來自一個真實的線上產品,現將其需求簡化以下:dom

  1. 傳入一個關鍵詞:key,根據key從網上抓取相關新聞
  2. 可選傳入一個超時參數:duration,設置任務到期時必須反回數據(返回實際已抓取數據)
  3. 若超時到返回實際已爬取數據,則任務應繼續運行直到因此數據抓取完成,並存庫

設計

根據需求,一個簡化的分散、聚合模式能夠使用兩個actor來實現。ide

  • NewsTask:接收請求,並設置超時時間
  • SearchPageTask:執行實際的新聞抓取操做(本實例將使用TimeUnit模擬抓取耗時)

實現

NewsTask測試

https://github.com/yangbajing/akka-action/blob/master/src/main/scala/me/yangbajing/akkaaction/scattergather/NewsTask.scala.net

override def metricPreStart(): Unit = {
    context.system.scheduler.scheduleOnce(doneDuration, self, TaskDelay)
  }

  override def metricReceive: Receive = {
    case StartFetchNews =>
      _receipt = sender()
      (0 until NewsTask.TASK_SIZE).foreach { i =>
        context.actorOf(SearchPageTask.props(self), "scatter-" + i) ! SearchPage(key)
      }

    case GetNewsItem(newsItem) =>
      _newses ::= newsItem
      if (_newses.size == NewsTask.TASK_SIZE) {
        logger.debug(s"分散任務,${NewsTask.TASK_SIZE}個已所有完成")

        if (_receipt != null) {
          _receipt ! NewsResult(key, _newses)
          _receipt = null
        }
        self ! PoisonPill
      }

    case TaskDelay =>
      if (_receipt != null) {
        _receipt ! NewsResult(key, _newses)
        _receipt = null
      }
  }

metricPreStart方法中設置定時方法,調用時間爲從代碼運行開始到doneDuration時間爲止。定時被觸發時將向當前Actor發送一個TaskDelay消息。scala

metricReceive方法中,分別對StartFetchNewsGetNewsItemTaskDelay三個消息進行操做。debug

在收到StartFetchNews消息時,actor首先保存發送者actor的引用(結果將返回到此actor)。再根據TASK_SIZE生成相應子任務設計

GetNewsItem消息的處理中,每收到一個消息就將其添加到_newses列表中。並判斷當_newses個數等於TASK_SIZE時(全部子任務已完成)將結果發送給_receiptcode

self ! PoisonPill,這句代碼中止actor自身。它將把「毒藥」發送到NewsTask Actor的接收郵箱隊列中。

TaskDelay消息被觸發時,將直接返回已完成的新聞_newses。返回數據後並不終止當前還未運行完任務。

SearchPageTask

https://github.com/yangbajing/akka-action/blob/master/src/main/scala/me/yangbajing/akkaaction/scattergather/SearchPageTask.scala

override def metricReceive: Receive = {
    case SearchPage(key) =>
      // XXX 模擬抓取新聞時間
      TimeUtils.sleep(Random.nextInt(20).seconds)

      val item = NewsItem(
        "http://newssite/news/" + self.path.name,
        "測試新聞" + self.path.name,
        self.path.name,
        TimeUtils.now().toString,
        "內容簡介", "新聞正文")

      taskRef ! GetNewsItem(item)
      context.stop(self)
  }

SearchPageTask的代碼邏輯就比較易懂了,這裏使用sleep來模擬實際抓取新聞時的耗時。生成結果後返回數據給`taskRef`,並終止本身。

執行測試

./sbt
akka-action > testOnly me.yangbajing.akkaaction.scattergather.ScatterGatherTest

總結

這是一個簡單的Akka實例,實現了任務分發與結果聚合。提供了一種在指定時間內返回部份有效數據,同時任務繼續執行的方式。這種分散、聚合的模式在實際生產中很經常使用,好比對多種數據源的整合,或某些須要長時間運行同時對返回數據完整性無強制要求的狀況等。

MetricActor演示了怎麼自定義Actor,併爲其提供一些偵測點的方式。之後有時間會寫篇詳文介紹。

相關文章
相關標籤/搜索