restapi(1)- 文件上傳下載服務

  上次對restapi開了個頭,設計了一個包括了身份驗證和使用權限的restful服務開發框架。這是一個通用框架,開發人員只要直接往裏面加新功能就好了。雖然此次的restapi是圍繞着數據庫表的CRUD操做設計的,但文件類數據在服務端與客戶端之間的交換其實也很經常使用,特別是多媒體類如圖片等文件類型。那咱們就試着設計一個文件交換服務功能而後看看能不能很方便的加入到restapi框架內。java

akka-http是以akka-stream爲核心的,使用了大量的akka-stream功能。akka-stream中有一個FileIO組件庫,裏面提供了一系列有關文件讀寫功能,以數據流Source,Sink方式呈現:數據庫

... def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] = fromPath(f, chunkSize, startPosition = 0) ... def toPath( f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] = toPath(f, options, startPosition = 0)

能夠發現,這些Source,Sink都是以ByteString爲流元素進行操做的,akka-http自帶了ByteString的Marshaller,能夠實現數據格式自動轉換,在網絡傳輸中不須要增長什麼數據格式轉換動做。先用FileIO來產生一個Source[ByteString,_]:json

package com.datatech.restapi import akka.stream._ import akka.stream.scaladsl._ import java.nio.file._ import akka.util._ object FileStreaming { def fileStreamSource(filePath: String, chunkSize: Int = 1024, dispatcherName: String = ""): Source[ ByteString,Any] = { def loadFile = { // implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
      val file = Paths.get(filePath) if (dispatcherName != "") FileIO.fromPath(file, chunkSize) .withAttributes(ActorAttributes.dispatcher(dispatcherName)) else FileIO.fromPath(file, chunkSize) } loadFile } }

注意,咱們能夠用akka系統以外的線程池來進行FileIO操做,能夠避免影響akka系統的運行效率。dispatcherName標註了在application.conf裏自定義的線程池:api

akka { http { blocking-ops-dispatcher { type = Dispatcher executor = "thread-pool-executor" thread-pool-executor { // or in Akka 2.4.2+
        fixed-pool-size = 16 } throughput = 100 } } }

下面是File功能架構FileRoute的設計:restful

package com.datatech.restapi import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.model._ import akka.http.scaladsl.coding.Gzip import java.nio.file._ import FileStreaming._ import AuthBase._ case class FileRoute(jwt: String)(implicit auth: AuthBase, sys: ActorSystem) { val destPath = "/users/tiger-macpro/cert4/meme.jpg"
  implicit val mat = ActorMaterializer() val route = pathPrefix("file") { val privatePath = auth.tempDirFromJwt(jwt) if (privatePath.length == 0) complete(StatusCodes.NotFound) (get & path(Remaining)) { filename => withoutSizeLimit { encodeResponseWith(Gzip) { complete( HttpEntity( ContentTypes.`application/octet-stream`, fileStreamSource(privatePath+"/download/"+filename, 1024)) ) } } } ~ (post &  parameters('filename)) { filename =>
 withoutSizeLimit { decodeRequest { extractDataBytes { bytes => val fut = bytes.runWith(FileIO.toPath(Paths.get(privatePath+"/upload/"+filename))) onComplete(fut) { _ => complete(StatusCodes.OK)} } } } } } }

每一個用戶在服務端都應該有個獨立的文件目錄,這個恰好能夠放在jwt裏:網絡

package com.datatech.restapi import akka.http.scaladsl.server.directives.Credentials import AuthBase._ object MockUserAuthService { case class User(username: String, password: String, userInfo: UserInfo) val validUsers = Seq(User("johnny", "p4ssw0rd", Map("shopid" -> "1101", "userid" -> "101", "tmpdir" ->"/users/tiger-macpro/1101101")) ,User("tiger", "secret", Map("shopid" -> "1101" , "userid" -> "102", "tmpdir" ->"/users/tiger-macpro/1101102"))) def getValidUser(credentials: Credentials): Option[UserInfo] = credentials match { case p @ Credentials.Provided(_) => validUsers.find(user => user.username == p.identifier && p.verify(user.password)) match { case Some(user) => Some(user.userInfo) case _ => None } case _ => None } }

我的目錄tmpdir是放在UserInfo裏的,咱們只須要從jwt裏解析分離出來:架構

   def tempDirFromJwt(jwt: String): String = { val optUserInfo = getUserInfo(jwt) val dir: String = optUserInfo match { case Some(m) =>
          try { m("tmpdir").toString } catch {case err: Throwable => ""} case None => "" } dir }

文件交換服務是須要使用權限的,因此FileRoute要放在authenticateOAuth2下面:app

 val route = path("auth") { authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo => post { complete(authenticator.issueJwt(userinfo))} } } ~ pathPrefix("api") { authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken => (path("hello") & get) { complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}") } ~ (path("how are you") & get) { complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}") } ~ FileRoute(validToken) .route // ~ ...
 } }

寫一個客戶端來測試文件交換服務:框架

import akka.stream._ import java.nio.file._ import java.io._ import akka.http.scaladsl.model.headers._ import scala.concurrent._ import com.datatech.restapi.FileStreaming._ import scala.concurrent.duration._ import akka.actor.ActorSystem import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ import akka.http.scaladsl.Http import akka.stream.scaladsl.{FileIO, Source} import scala.util._ case class FileUtil(implicit sys: ActorSystem) { import sys.dispatcher implicit val mat = ActorMaterializer() def createEntity(file: File): RequestEntity = { require(file.exists()) val formData = Multipart.FormData( Source.single( Multipart.FormData.BodyPart( "test", HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance
            Map("filename" -> file.getName)))) Await.result(Marshal(formData).to[RequestEntity], 3 seconds) } def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = { implicit val mat = ActorMaterializer() import sys.dispatcher val futResp = Http(sys).singleRequest( // Gzip.encodeMessage(
        request.copy(entity = dataEntity)   //.addHeader(`Content-Encoding`(HttpEncodings.gzip)) // )
 ) futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => entity.dataBytes.map(_.utf8String).runForeach(println) case Success(r@HttpResponse(code, _, _, _)) => println(s"Upload request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to Upload file!") case Failure(err) => println(s"Upload failed: ${err.getMessage}") } } def downloadFileTo(request: HttpRequest, destPath: String) = { // val req = request.addHeader(`Content-Encoding`(HttpEncodings.gzip))
    val futResp = Http(sys).singleRequest(request)  //.map(Gzip.decodeMessage(_))
 futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => entity.withoutSizeLimit().dataBytes.runWith(FileIO.toPath(Paths.get(destPath))) .onComplete { case _ => println(s"Download file saved to: $destPath") } case Success(r@HttpResponse(code, _, _, _)) => println(s"Download request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to download file!") case Failure(err) => println(s"Download failed: ${err.getMessage}") } } } object TestFileClient { type UserInfo = Map[String,Any] def main(args: Array[String]): Unit = { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher val helloRequest = HttpRequest(uri = "http://192.168.11.189:50081/") val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd")) val authRequest = HttpRequest( HttpMethods.POST, uri = "http://192.168.11.189:50081/auth", headers = List(authorization) ) val futToken: Future[HttpResponse] = Http().singleRequest(authRequest) val respToken = for { resp <- futToken jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String} } yield jstr val jstr =  Await.result[String](respToken,2 seconds) println(jstr) scala.io.StdIn.readLine() val authentication = headers.Authorization(OAuth2BearerToken(jstr)) val entity = HttpEntity( ContentTypes.`application/octet-stream`, fileStreamSource("/Users/tiger-macpro/cert3/ctiger.jpg",1024) ) //     val chunked = HttpEntity.Chunked.fromData( ContentTypes.`application/octet-stream`, fileStreamSource("/Users/tiger-macpro/cert3/ctiger.jpg",1024) ) val multipart = FileUtil().createEntity(new File("/Users/tiger-macpro/cert3/ctiger.jpg")) val uploadRequest = HttpRequest( HttpMethods.POST, uri = "http://192.168.11.189:50081/api/file?filename=mypic.jpg", ).addHeader(authentication) //upload file //Await.ready(FileUtil().uploadFile(uploadRequest,entity),2 seconds) //Await.ready(FileUtil().uploadFile(uploadRequest,chunked),2 seconds)
    Await.ready(FileUtil().uploadFile(uploadRequest,multipart),2 seconds) val dlRequest = HttpRequest( HttpMethods.GET, uri = "http://192.168.11.189:50081/api/file/mypic.jpg", ).addHeader(authentication) FileUtil().downloadFileTo(dlRequest, "/users/tiger-macpro/cert3/mypic.jpg") scala.io.StdIn.readLine() system.terminate() } }

在文件上傳upload時試過用entity,chunked,multipart方式構建的request-entity,服務端都能處理。好像看過不少java的httpclient圖片上傳,都是用multipart entity。如今這個服務端能正確處理。固然,在服務端一樣能夠用multipart方式提供文件下載服務,就不在這裏實現了。不過能夠提供一段示範代碼:ide

import akka.actor._ import akka.stream._ import java.nio.file._ import java.io._ import akka.http.scaladsl.unmarshalling.Unmarshal import akka.util.ByteString import scala.concurrent.duration._ import akka.actor.ActorSystem import akka.http.scaladsl.Http.ServerBinding import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Route import akka.http.scaladsl.Http import akka.stream.scaladsl.{FileIO, Source} import com.typesafe.config.{Config, ConfigFactory} import scala.concurrent.Future object TestMultipartFileUpload extends App { val testConf: Config = ConfigFactory.parseString("""     akka.loglevel = INFO akka.log-dead-letters = off""")
  implicit val system = ActorSystem("ServerTest", testConf) import system.dispatcher implicit val materializer = ActorMaterializer() val testFile: File = new File("/users/tiger-macpro/downloads/uploadFileDemo.scala")  //args(0))
 def startTestServer(): Future[ServerBinding] = { import akka.http.scaladsl.server.Directives._ val route: Route = path("upload") { entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) ⇒ val fileNamesFuture = formdata.parts.mapAsync(1) { p ⇒ println(s"Got part. name: ${p.name} filename: ${p.filename}") println("Counting size...") @volatile var lastReport = System.currentTimeMillis() @volatile var lastSize = 0L def receiveChunk(counter: (Long, Long), chunk: ByteString): (Long, Long) = { val (oldSize, oldChunks) = counter val newSize = oldSize + chunk.size val newChunks = oldChunks + 1 val now = System.currentTimeMillis() if (now > lastReport + 1000) { val lastedTotal = now - lastReport val bytesSinceLast = newSize - lastSize val speedMBPS = bytesSinceLast.toDouble / 1000000 /* bytes per MB */ / lastedTotal * 1000 /* millis per second */ println(f"Already got $newChunks%7d chunks with total size $newSize%11d bytes avg chunksize ${newSize / newChunks}%7d bytes/chunk speed: $speedMBPS%6.2f MB/s") lastReport = now lastSize = newSize } (newSize, newChunks) } p.entity.dataBytes.runFold((0L, 0L))(receiveChunk).map { case (size, numChunks) ⇒ println(s"Size is $size") (p.name, p.filename, size) } }.runFold(Seq.empty[(String, Option[String], Long)])(_ :+ _).map(_.mkString(", ")) complete { fileNamesFuture } } } Http().bindAndHandle(route, interface = "localhost", port = 0) } def createEntity(file: File): Future[RequestEntity] = { require(file.exists()) val formData = Multipart.FormData( Source.single( Multipart.FormData.BodyPart( "test", HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance
            Map("filename" -> file.getName)))) Marshal(formData).to[RequestEntity] } def createRequest(target: Uri, file: File): Future[HttpRequest] =
    for { e ← createEntity(file) } yield HttpRequest(HttpMethods.POST, uri = target, entity = e) try { val result =
      for { ServerBinding(address) ← startTestServer() _ = println(s"Server up at $address") port = address.getPort target = Uri(scheme = "http", authority = Uri.Authority(Uri.Host("localhost"), port = port), path = Uri.Path("/upload")) req ← createRequest(target, testFile) _ = println(s"Running request, uploading test file of size ${testFile.length} bytes") response ← Http().singleRequest(req) responseBodyAsString ← Unmarshal(response).to[String] } yield responseBodyAsString result.onComplete { res ⇒ println(s"The result was $res") system.terminate() } system.scheduler.scheduleOnce(60.seconds) { println("Shutting down after timeout...") system.terminate() } } catch { case _: Throwable ⇒ system.terminate() } }

上面這個示範裏包括了服務端,客戶端對multipart的數據處理。

在上面這個例子裏咱們先設計了一個獨立的包括文件交換服務功能的FileRoute類,而後直接把FileRoute.route貼在主菜單後面就完成了文件交換服務功能的添加。比較接近實現restapi設計初衷。

下面是本次示範源代碼:

 build.sbt

name := "restapi"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http"   % "10.1.8",
  "com.typesafe.akka" %% "akka-stream" % "2.5.23",
  "com.pauldijou" %% "jwt-core" % "3.0.1",
  "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0",
  "org.json4s" %% "json4s-native" % "3.6.1",
  "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8",
  "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
  "org.slf4j" % "slf4j-simple" % "1.7.25",
  "org.json4s" %% "json4s-jackson" % "3.6.7",
  "org.json4s" %% "json4s-ext" % "3.6.7"
)

auth/AuthBase.scala

package com.datatech.restapi

import akka.http.scaladsl.server.directives.Credentials
import pdi.jwt._
import org.json4s.native.Json
import org.json4s._
import org.json4s.jackson.JsonMethods._
import pdi.jwt.algorithms._
import scala.util._

object AuthBase {
  type UserInfo = Map[String, Any]
  case class AuthBase(
                       algorithm: JwtAlgorithm = JwtAlgorithm.HMD5,
                       secret: String = "OpenSesame",
                       getUserInfo: Credentials => Option[UserInfo] = null) {
    ctx =>
    def withAlgorithm(algo: JwtAlgorithm): AuthBase = ctx.copy(algorithm=algo)
    def withSecretKey(key: String): AuthBase = ctx.copy(secret = key)
    def withUserFunc(f: Credentials => Option[UserInfo]): AuthBase = ctx.copy(getUserInfo = f)

    def authenticateToken(credentials: Credentials): Option[String] =
      credentials match {
        case Credentials.Provided(token) =>
          algorithm match {
            case algo: JwtAsymmetricAlgorithm =>
              Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtAsymmetricAlgorithm]))) match {
                case true => Some(token)
                case _ => None
              }
            case _ =>
              Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtHmacAlgorithm]))) match {
                case true => Some(token)
                case _ => None
              }
          }
        case _ => None
      }

    def getUserInfo(token: String): Option[UserInfo] = {
      algorithm match {
        case algo: JwtAsymmetricAlgorithm =>
          Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtAsymmetricAlgorithm])) match {
            case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])
            case Failure(err) => None
          }
        case _ =>
          Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtHmacAlgorithm])) match {
            case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])
            case Failure(err) => None
          }
      }
    }

    def issueJwt(userinfo: UserInfo): String = {
      val claims = JwtClaim() + Json(DefaultFormats).write(("userinfo", userinfo))
      Jwt.encode(claims, secret, algorithm)
    }

    def tempDirFromJwt(jwt: String): String = {
      val optUserInfo = getUserInfo(jwt)
      val dir: String = optUserInfo match {
        case Some(m) =>
          try {
            m("tmpdir").toString
          } catch {case err: Throwable => ""}
        case None => ""
      }
      dir
    }

  }

}

file/FileStreaming.scala

package com.datatech.restapi
import akka.stream._
import akka.stream.scaladsl._
import java.nio.file._
import akka.util._
object FileStreaming {

  def fileStreamSource(filePath: String, chunkSize: Int = 1024, dispatcherName: String = ""): Source[ ByteString,Any] = {
    def loadFile  = {
      //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
      val file = Paths.get(filePath)
      if (dispatcherName != "")
        FileIO.fromPath(file, chunkSize)
         .withAttributes(ActorAttributes.dispatcher(dispatcherName))
      else
        FileIO.fromPath(file, chunkSize)
    }
    loadFile
  }
}

file/FileRoute.scala

package com.datatech.restapi
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.http.scaladsl.coding.Gzip
import java.nio.file._
import FileStreaming._
import AuthBase._

case class FileRoute(jwt: String)(implicit auth: AuthBase, sys: ActorSystem) {

  val destPath = "/users/tiger-macpro/cert4/meme.jpg"
  implicit val mat = ActorMaterializer()
  val route = pathPrefix("file") {
    val privatePath = auth.tempDirFromJwt(jwt)
    if (privatePath.length == 0)
      complete(StatusCodes.NotFound)

    (get & path(Remaining)) { filename =>
      withoutSizeLimit {
        encodeResponseWith(Gzip) {
          complete(
            HttpEntity(
              ContentTypes.`application/octet-stream`,
              fileStreamSource(privatePath+"/download/"+filename, 1024))
          )
        }
      }
    } ~
      (post &  parameters('filename)) { filename =>
        withoutSizeLimit {
          decodeRequest {
            extractDataBytes { bytes =>
              val fut = bytes.runWith(FileIO.toPath(Paths.get(privatePath+"/upload/"+filename)))
              onComplete(fut) { _ => complete(StatusCodes.OK)}
            }
          }
        }

      }

  }
}

MockUserAuthService.scala

package com.datatech.restapi
import akka.http.scaladsl.server.directives.Credentials
import AuthBase._
object MockUserAuthService {

  case class User(username: String, password: String, userInfo: UserInfo)
  val validUsers = Seq(User("johnny", "p4ssw0rd",
       Map("shopid" -> "1101", "userid" -> "101", "tmpdir" ->"/users/tiger-macpro/1101101"))
    ,User("tiger", "secret",
      Map("shopid" -> "1101" , "userid" -> "102", "tmpdir" ->"/users/tiger-macpro/1101102")))

  def getValidUser(credentials: Credentials): Option[UserInfo] =
    credentials match {
      case p @ Credentials.Provided(_) =>
        validUsers.find(user => user.username == p.identifier && p.verify(user.password)) match {
          case Some(user) => Some(user.userInfo)
          case _ => None
        }
      case _ => None
    }

}

RestApiServer.scala

package com.datatech.restapi

import akka.actor._
import akka.stream._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import pdi.jwt._
import AuthBase._
import MockUserAuthService._

object RestApiServer extends App {

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher



  implicit val authenticator = new AuthBase()
    .withAlgorithm(JwtAlgorithm.HS256)
    .withSecretKey("OpenSesame")
    .withUserFunc(getValidUser)

  val route =
     path("auth") {
        authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
          post { complete(authenticator.issueJwt(userinfo))}
        }
     } ~
       pathPrefix("api") {
          authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
            (path("hello") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            } ~
            (path("how are you") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            } ~
              FileRoute(validToken)
                .route
            // ~ ...
          }
     }


  val (port, host) = (50081,"192.168.11.189")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()


  bindingFuture.flatMap(_.unbind())
  .onComplete(_ => httpSys.terminate())


}

TestFileClient.scala

import akka.stream._
import java.nio.file._
import java.io._
import akka.http.scaladsl.model.headers._
import scala.concurrent._
import com.datatech.restapi.FileStreaming._
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http
import akka.stream.scaladsl.{FileIO, Source}
import scala.util._


case class FileUtil(implicit sys: ActorSystem) {
  import sys.dispatcher
  implicit val mat = ActorMaterializer()
  def createEntity(file: File): RequestEntity = {
    require(file.exists())
    val formData =
      Multipart.FormData(
        Source.single(
          Multipart.FormData.BodyPart(
            "test",
            HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance
            Map("filename" -> file.getName))))
    Await.result(Marshal(formData).to[RequestEntity], 3 seconds)
  }

  def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {
    implicit val mat = ActorMaterializer()
    import sys.dispatcher
    val futResp = Http(sys).singleRequest(
   //   Gzip.encodeMessage(
        request.copy(entity = dataEntity)   //.addHeader(`Content-Encoding`(HttpEncodings.gzip))
   //   )
    )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")
      }
  }

  def downloadFileTo(request: HttpRequest, destPath: String) = {
  //  val req = request.addHeader(`Content-Encoding`(HttpEncodings.gzip))
    val futResp = Http(sys).singleRequest(request)  //.map(Gzip.decodeMessage(_))
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.withoutSizeLimit().dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
            .onComplete { case _ => println(s"Download file saved to: $destPath") }
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download file!")
        case Failure(err) => println(s"Download failed: ${err.getMessage}")
      }

  }

}

object TestFileClient  {
  type UserInfo = Map[String,Any]
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher

    val helloRequest = HttpRequest(uri = "http://192.168.11.189:50081/")

    val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
    val authRequest = HttpRequest(
      HttpMethods.POST,
      uri = "http://192.168.11.189:50081/auth",
      headers = List(authorization)
    )


    val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)

    val respToken = for {
      resp <- futToken
      jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}
    } yield jstr

    val jstr =  Await.result[String](respToken,2 seconds)
    println(jstr)

    scala.io.StdIn.readLine()

    val authentication = headers.Authorization(OAuth2BearerToken(jstr))

    val entity = HttpEntity(
      ContentTypes.`application/octet-stream`,
      fileStreamSource("/Users/tiger-macpro/cert3/ctiger.jpg",1024)
    )
    //
    val chunked = HttpEntity.Chunked.fromData(
      ContentTypes.`application/octet-stream`,
      fileStreamSource("/Users/tiger-macpro/cert3/ctiger.jpg",1024)
    )

    val multipart = FileUtil().createEntity(new File("/Users/tiger-macpro/cert3/ctiger.jpg"))

    val uploadRequest = HttpRequest(
      HttpMethods.POST,
      uri = "http://192.168.11.189:50081/api/file?filename=mypic.jpg",
    ).addHeader(authentication)

    //upload file
    //Await.ready(FileUtil().uploadFile(uploadRequest,entity),2 seconds)
    //Await.ready(FileUtil().uploadFile(uploadRequest,chunked),2 seconds)
    Await.ready(FileUtil().uploadFile(uploadRequest,multipart),2 seconds)

    val dlRequest = HttpRequest(
      HttpMethods.GET,
      uri = "http://192.168.11.189:50081/api/file/mypic.jpg",
    ).addHeader(authentication)

    FileUtil().downloadFileTo(dlRequest, "/users/tiger-macpro/cert3/mypic.jpg")

    scala.io.StdIn.readLine()
    system.terminate()
  }

}
相關文章
相關標籤/搜索