從Play2.5.x開始,Play使用Akka Streams實現流處理,廢棄了以前的Enumerator/Iteratee Api。根據官方文檔描述,遷移至Akka Streams以後,Play2.5.x的總體性能提高了20%,性能提高至關可觀。雖然官方已經更新了JavaStream的文檔,可是ScalaStream的文檔仍然沒有更新,已經提了issue,但願能儘快獲得反饋。 ReactiveMongo是一個基於Scala開發的徹底異步非阻塞、而且提供流處理功能的MongoDB驅動。該項目目前的流處理功能基於Enumerator/Iteratee實現,Akka Stream的實現放在一個單獨的項目開發(RM-AkkaStreams)。 結合Play和ReactiveMongo兩者的流處理功能,咱們能夠很方便地實現徹底異步非阻塞的報表導出功能。react
因爲ReactiveMongo暫時尚未提供Akka Streams的流處理實現,因此沒法直接經過map/flatMap直接返回一個Stream寫回響應:git
@Singleton class TestStreamController @Inject()(val reactiveMongoApi: ReactiveMongoApi, implicit val mat: Materializer) extends Controller { def qaCol: JSONCollection = reactiveMongoApi.db.collection[JSONCollection]("qa") def exportDataStream = Action { implicit request => val source = Source.actorRef[ByteString](10000, OverflowStrategy.fail).mapMaterializedValue { sourceActor => qaCol .find(Json.obj()) .options(QueryOpts(batchSizeN = 1000)) .cursor[QA]() .foldBulks[Int](0){ (index, list) => sourceActor ! ByteString(list.map(qa => qa.question).mkString("\n") + "\n") Cursor.Cont(index + 1) } .map{ _ => sourceActor ! akka.actor.Status.Success(()) } } Ok.chunked(source) .withHeaders( "Content-Type" -> "application/octet-stream", "Content-Disposition" -> (s"attachment;filename=export-" + new DateTime().toString("yyyy-MM-dd") + ".txt")) } }
代碼第5行Source.actorRef
函數啓動一個actor實例sourceActor
負責收集報表數據,Source.actorRef
的第1個參數bufferSize
用於指定緩衝區大小,即Play來不及寫回響應的數據暫時放在緩衝區,第2個參數overflowStrategy
指定緩衝區溢出後的處理策略。 第10行foldBulks
方法負責批量從MongoDB數據庫讀取查詢結果,而後以消息形式將數據發送給sourceActor
,最後發送一個Status.Success
消息代表數據已經發送完畢。 數據傳遞過程以下:github
foldBulks(讀取查詢結果) -> sourceActor(收集查詢結果) -> source(生產者) -> Ok.chunked(消費者)
下面是瀏覽器中看到的效果:數據庫
圖中499KB/s
表示當前的下載速度,997KB
表示當前累計的下載大小。瀏覽器