所謂文件交換指的是Http協議中服務端和客戶端之間文件的上傳和下載。Akka-http做爲一種系統集成工具應該具有高效率的數據交換方式包括文件交換和數據庫錶行的上傳下載。Akka-http的數據交換模式支持流式操做:表明交換數據能夠是一種無限長度流的元素。這種模式首先解決了純Http大數據經過Multipart傳輸所必須進行的數據分段操做和複雜的消息屬性設定等須要的技術門檻,再者用戶還能夠很方便的使用Akka-stream對數據進行深度處理,免去了數據轉換的麻煩。更重要的是:Akka-http還支持reactive-stream,能夠避免由傳輸速率所產生的種種問題。在本篇咱們討論利用Akka-http進行文件的雙向傳遞。java
任何文件的內容儲存格式不管在硬盤、內存或者數據線上都是一堆bytes。文件交換流程包括讀取文件裏的bytes,傳送這些bytes,最終把這些bytes寫入文件。咱們看到這裏每一個環節操做目標都是bytes,因此可能在程序裏是不須要任何數據轉換過程的。Akka提供了一組文件讀寫函數,以下:react
def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] = fromPath(f, chunkSize, startPosition = 0) def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] = Source.fromGraph(new FileSource(f, chunkSize, startPosition, DefaultAttributes.fileSource, sourceShape("FileSource"))) def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] = toPath(f, options, startPosition = 0) def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]] = Sink.fromGraph(new FileSink(f, startPosition, options, DefaultAttributes.fileSink, sinkShape("FileSink")))
咱們看到:fromPath類型是Source[ByteSgtring,_],toPath類型是Sink[ByteString,_],直接就是流型式,應該能夠直接放入Http消息的Entity中,以下: 數據庫
def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = { def loadFile = { // implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
val file = Paths.get(filePath) FileIO.fromPath(file, chunkSize) .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher")) } limitableByteSource(loadFile) }
fileStream是Source[ByteString,_]能夠直接放進Entity:app
val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/text") val textData = HttpEntity( ContentTypes.`application/octet-stream`, fileStream("/Users/tiger-macpro/downloads/A4.TIF",256) )
咱們把fileStream放入了HttpRequest中。對於HttpResponse能夠用下面的方式:函數
val route = pathPrefix("file") { (get & path("text" / Remaining)) { fp => withoutSizeLimit { complete( HttpEntity( ContentTypes.`application/octet-stream`, fileStream("/users/tiger-macpro/" + fp, 256)) ) }
注意:complete進行了HttpResponse的構建。由於Entity.dataByes就是Source[ByteString,_],因此咱們能夠直接把它導入Sink:工具
entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath))) .onComplete { case _ => println(s"Download file saved to: $destPath") }
上面咱們提過FileIO.toPath就是一個Sink。因爲咱們的目的是大型的文件交換,因此不管上傳下載都使用了withoutSizeLimit:post
val route = pathPrefix("file") { (get & path("exchange" / Remaining)) { fp => withoutSizeLimit { complete( HttpEntity( ContentTypes.`application/octet-stream`, fileStream("/users/tiger-macpro/" + fp, 256)) ) } } ~ (post & path("exchange")) { withoutSizeLimit { extractDataBytes { bytes => val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath))) onComplete(fut) { _ => complete(s"Save upload file to: $destPath") } } } }
好了下面的示範代碼裏對字符型或二進制文件都進行了交換的示範操做:大數據
服務端:spa
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.HttpEntity._ import java.nio.file._ object FileServer extends App { implicit val httpSys = ActorSystem("httpSystem") implicit val httpMat = ActorMaterializer() implicit val httpEC = httpSys.dispatcher def fileStream(filePath: String, chunkSize: Int) = { def loadFile = { // implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
val file = Paths.get(filePath) FileIO.fromPath(file, chunkSize) .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher")) } limitableByteSource(loadFile) } val destPath = "/users/tiger-macpro/downloads/A4-1.TIF" val route = pathPrefix("file") { (get & path("exchange" / Remaining)) { fp => withoutSizeLimit { complete( HttpEntity( ContentTypes.`application/octet-stream`, fileStream("/users/tiger-macpro/" + fp, 256)) ) } } ~ (post & path("exchange")) { withoutSizeLimit { extractDataBytes { bytes => val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath))) onComplete(fut) { _ => complete(s"Save upload file to: $destPath") } } } } } val (port, host) = (8011,"localhost") val bindingFuture = Http().bindAndHandle(route,host,port) println(s"Server running at $host $port. Press any key to exit ...") scala.io.StdIn.readLine() bindingFuture.flatMap(_.unbind()) .onComplete(_ => httpSys.terminate()) }
客戶端:scala
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpEntity.limitableByteSource import akka.http.scaladsl.model._ import java.nio.file._ import akka.util.ByteString import scala.util._ object FileClient extends App { implicit val sys = ActorSystem("ClientSys") implicit val mat = ActorMaterializer() implicit val ec = sys.dispatcher def downloadFileTo(request: HttpRequest, destPath: String) = { val futResp = Http(sys).singleRequest(request) futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath))) .onComplete { case _ => println(s"Download file saved to: $destPath") } case Success(r@HttpResponse(code, _, _, _)) => println(s"Download request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to download file!") case Failure(err) => println(s"Download failed: ${err.getMessage}") } } val dlFile = "Downloads/readme.txt" val downloadText = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile) downloadFileTo(downloadText, "/users/tiger-macpro/downloads/sample.txt") scala.io.StdIn.readLine() val dlFile2 = "Downloads/image.png" val downloadText2 = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile2) downloadFileTo(downloadText2, "/users/tiger-macpro/downloads/sample.png") scala.io.StdIn.readLine() def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = { val futResp = Http(sys).singleRequest( request.copy(entity = dataEntity) ) futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => entity.dataBytes.map(_.utf8String).runForeach(println) case Success(r@HttpResponse(code, _, _, _)) => println(s"Upload request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to Upload file!") case Failure(err) => println(s"Upload failed: ${err.getMessage}") } } def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = { def loadFile = { // implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
val file = Paths.get(filePath) FileIO.fromPath(file, chunkSize) .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher")) } limitableByteSource(loadFile) } val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/exchange") val textData = HttpEntity( ContentTypes.`application/octet-stream`, fileStream("/Users/tiger-macpro/downloads/readme.txt",256) ) uploadFile(uploadText,textData) scala.io.StdIn.readLine() sys.terminate() }