PlayScala 2.5.x - 實現徹底異步非阻塞的流數據導出

介紹

從Play2.5.x開始,Play使用Akka Streams實現流處理,廢棄了以前的Enumerator/Iteratee Api。根據官方文檔描述,遷移至Akka Streams以後,Play2.5.x的總體性能提高了20%,性能提高至關可觀。雖然官方已經更新了JavaStream的文檔,可是ScalaStream的文檔仍然沒有更新,已經提了issue,但願能儘快獲得反饋。 ReactiveMongo是一個基於Scala開發的徹底異步非阻塞、而且提供流處理功能的MongoDB驅動。該項目目前的流處理功能基於Enumerator/Iteratee實現,Akka Stream的實現放在一個單獨的項目開發(RM-AkkaStreams)。 結合Play和ReactiveMongo兩者的流處理功能,咱們能夠很方便地實現徹底異步非阻塞的報表導出功能。react

實現

因爲ReactiveMongo暫時尚未提供Akka Streams的流處理實現,因此沒法直接經過map/flatMap直接返回一個Stream寫回響應:git

@Singleton
class TestStreamController @Inject()(val reactiveMongoApi: ReactiveMongoApi, implicit val mat: Materializer) extends Controller {
  def qaCol: JSONCollection = reactiveMongoApi.db.collection[JSONCollection]("qa")
  def exportDataStream = Action { implicit request =>
    val source = Source.actorRef[ByteString](10000, OverflowStrategy.fail).mapMaterializedValue { sourceActor =>
      qaCol
        .find(Json.obj())
        .options(QueryOpts(batchSizeN = 1000))
        .cursor[QA]()
        .foldBulks[Int](0){ (index, list) =>
          sourceActor ! ByteString(list.map(qa => qa.question).mkString("\n") + "\n")
          Cursor.Cont(index + 1)
        }
        .map{ _ =>
          sourceActor ! akka.actor.Status.Success(())
        }
    }

    Ok.chunked(source)
      .withHeaders(
      "Content-Type" -> "application/octet-stream",
      "Content-Disposition" -> (s"attachment;filename=export-" + new DateTime().toString("yyyy-MM-dd") + ".txt"))
  }
}

代碼第5行Source.actorRef函數啓動一個actor實例sourceActor負責收集報表數據,Source.actorRef的第1個參數bufferSize用於指定緩衝區大小,即Play來不及寫回響應的數據暫時放在緩衝區,第2個參數overflowStrategy指定緩衝區溢出後的處理策略。 第10行foldBulks方法負責批量從MongoDB數據庫讀取查詢結果,而後以消息形式將數據發送給sourceActor,最後發送一個Status.Success消息代表數據已經發送完畢。 數據傳遞過程以下:github

foldBulks(讀取查詢結果) -> sourceActor(收集查詢結果) -> source(生產者) -> Ok.chunked(消費者)

下面是瀏覽器中看到的效果:數據庫

輸入圖片說明

圖中499KB/s表示當前的下載速度,997KB表示當前累計的下載大小。瀏覽器

相關文章
相關標籤/搜索