Akka實戰:HTTP大文件斷點上傳、下載,秒傳

訪問:https://github.com/yangbajing/scala-applications/tree/master/file-upload 獲取本文所述完整源碼,包括Akka HTTP後端和HTML5實現的前端。html

在不少應用裏面都會有相似大文件上傳的需求,但不少時候咱們程序員都會以不支持或很差實現將其推脫掉^_^。前端

此次由於公司項目須要,另外一個組的同事使用Spring實現了一版。他們是採用在前端對大文件分塊上傳,後端在全部分塊上傳完成後合併的方式實現了。這個方案有一個弊端,若文件很大,後期對分塊作合併時會很是的耗磁盤資源。git

本文,我使用Akka HTTP和Akka Stream作爲後端服務,能夠很優雅的實現大文件的斷點續傳。原理其實很是的簡單,前端計算文件的hash(使用sha256),將hash傳到後端查詢是否有相同文件已上傳,如有將返回已上傳文件及文件長度(bytes)。這時候前端就能夠知道文件的上傳進度,進而判斷還須要斷點續傳的偏移量或者已上傳完成(這就是秒傳)。程序員

這裏有一個設計取捨:客戶端對單個文件不作分片,從文件頭開始上傳。這樣的一個好處是可簡化服務端的實現,同時也能夠優化服務端對文件的存儲 (同一個文件將一直使用APPEND的方式寫入文件,這樣能夠更高效的利用磁盤IO。同時,不須要分塊合併,若文件很大,生成的大量分塊在文件上傳完成後再次合併會是一個很是大的資源開銷)github

斷點下載

這個怎麼說呢?斷點下載Akka HTTP原生支持^_^。你只須要使用以下代碼:web

private def downloadRoute: Route = path("download" / Segment) { hash =>
    getFromFile(FileUtils.getLocalPath(hash).toFile)
  }

FileRoute#downloadRoute後端

FileUtils.getLocalPath(hash) 函數經過對hash值(sha256hex)進行計算和拼接,獲取實際文件的本地存儲路徑再交給Akka HTTP提供的getFromFile指令,剩下的工做就交給Akka。app

咱們能夠經過向Akka HTTP發起HEAD請求來查看支持的HTTP功能,看到在反回的header裏有Accept-Ranges: bytes,意思是服務端支持使用字節爲單位的範圍下載(斷點下載功能既基於此實現)。curl

$ curl --head http://localhost:33333/file/download/7d0559e2f7bf42f0c2becc7fbf91b20ca2e7ec373c941fca21314169de9c7ef4
HTTP/1.1 200 OK
Last-Modified: Fri, 28 Dec 2018 14:12:32 GMT
ETag: "132766a7f528d080"
Accept-Ranges: bytes
Server: akka-http/10.1.6
Date: Sat, 29 Dec 2018 02:17:41 GMT
Content-Type: application/octet-stream
Content-Length: 65463496

斷點上傳

很遺憾,Akka HTTP默認不支持斷點上傳,這須要自行實現。可是,Akka HTTP作爲一個toolkit,足夠靈活且強大,實現斷點上傳功能so easy。ide

斷點上傳實現

基於常規的代碼設計方式,咱們須要ControllerService,那就先從Controller開始:

FileRoute#uploadRoute

def uploadRoute: Route = path("upload") {
  post {
    withoutSizeLimit {
      entity(as[Multipart.FormData]) { formData =>
        onSuccess(fileService.handleUpload(formData)) { results =>
          import me.yangbajing.fileupload.util.JacksonSupport._
          complete(results)
        }
      }
    }
  }
}

FileRoute#uploadRoute

對於不熟悉Akka HTTP的朋友,推薦閱讀我寫的電子書(打個廣告):Scala Web 開發。這裏須要注意的一個指令是withoutSizeLimit,默認Akka HTTP對請求大小限制比較低,咱們能夠經過withoutSizeLimit指令取消對單個API的請求大小限制,同時又不影響整個Web服務的大小限制。另外,這裏經過entity(as[Multipart.FormData])Unmarshaller的方式獲取整個Multipart.FormData對象並傳入 FileService#handleUpload 函數進行處理。

FileService#handleUpload

override def handleUpload(formData: Multipart.FormData): Future[Seq[FileBO]] = {
    formData.parts
      .map { part =>
        val (hash, contentLength, startPosition) = part.name.split('.') match {
          case Array(a, b, c) => (a, b.toLong, c.toLong)
          case Array(a, b)    => (a, b.toLong, 0L)
          case Array(a)       => (a, 0L, 0L)
          case _              => throw new IllegalArgumentException(s"Multipart.FormData name格式不符合要求:${part.name}")
        }
        FileInfo(part, hash, contentLength, startPosition)
      }
      .log("fileInfo", info => logger.debug(s"fileInfo: $info"))
      .mapAsync(Constants.FILE_PART_MAX)(processFile)
      .runWith(Sink.seq)
  }

formData.parts是一個Akka Stream流,類型簽名爲Source[Multipart.FormData.BodyPart, Any]。有關Akka Stream更詳細的資料請參閱Akka Streams官方文檔。這裏,每一個part都表明FormData的一個字段(對應HTML 5的FormData類型,同時前端須要使用Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryrP4DAyu31ilqEWmz方式發起請求)。每一個part.name都是用英號逗號分隔的三部分來作爲請求的字段名,分別是:<hash>.<content length>.<start position>,這樣咱們就能夠在不加入任何其它字段的狀況下告知服務端當前上傳文件的sha256hex計算出的hash值、文件大小(bytes)和上傳起始偏移量。

FileUtils#uploadFile

文件上傳的核心邏輯在 FileUtils#uploadFile 函數:

def uploadFile(fileInfo: FileInfo)(implicit mat: Materializer, ec: ExecutionContext): Future[FileBO] = {
    val maybeMeta = FileUtils.getFileMeta(fileInfo.hash)
    val beContinue = maybeMeta.isDefined && fileInfo.startPosition > 0L
    if (beContinue) uploadContinue(fileInfo, maybeMeta.get) else uploadNewFile(fileInfo)
  }

uploadFile函數根據是否爲續傳來分別調用uploadContinueuploadNewFile函數。首先來看看新文件上傳時的代碼邏輯:

def uploadNewFile(fileInfo: FileInfo)(implicit mat: Materializer, ec: ExecutionContext) = {
    val bodyPart = fileInfo.bodyPart
    val tmpPath =
      if (fileInfo.validHash) FileUtils.getLocalPath(fileInfo.hash)
      else Files.createTempFile(FileUtils.TMP_DIR, bodyPart.filename.getOrElse(""), "") // (1)
    val sha = MessageDigest.getInstance("SHA-256")
    bodyPart.entity.dataBytes
      .map { byteString =>
        byteString.asByteBuffers.foreach(sha.update) // (2)
        byteString
      }
      .runWith(FileIO.toPath(tmpPath)) // (3)
      .map { ioResult =>
        val hash = Utils.bytesToHex(sha.digest()) // (4)
        if (fileInfo.validHash) {
          require(fileInfo.hash == hash, s"前端上傳hash與服務端計算hash值不匹配,${fileInfo.hash} != $hash")
        }
        val localPath = if (fileInfo.validHash) tmpPath else move(hash, tmpPath, ioResult.count) // (5)
        FileBO(hash, localPath, ioResult.count, bodyPart.filename, bodyPart.headers)
      }
  }
  1. 根據前端是否上傳了有效的hash值(sha256hex)來判斷是把文件先寫入臨時文件仍是直接寫入實際的本地存儲位置(根據hash值計算出本地實際的存儲位置)。
  2. Akka HTTP中,上傳的文件以流的方式進入,在此對每一個ByteString計算並更新sha256值。
  3. 在Akka Stream的Sink端,接收流傳入的元素並寫入本地文件。
  4. 文件寫入結束後調用sha.digest()方法獲取已上傳文件的sha256值。
  5. 根據是否臨時文件來判斷是否須要將臨時文件移動到實際的本地存儲路徑,經過文件的hash值來計算出實際的本地存儲路徑。
def uploadContinue(fileInfo: FileInfo, meta: FileMeta)(implicit mat: Materializer, ec: ExecutionContext) = {
    val bodyPart = fileInfo.bodyPart
    val localPath = FileUtils.getLocalPath(fileInfo.hash)
    logger.debug(s"斷點續傳,startPosition:${fileInfo.startPosition},路徑:$localPath")
    val hash = fileInfo.hash
    bodyPart.entity.dataBytes
      .runWith(FileIO.toPath(localPath, Set(APPEND), fileInfo.startPosition)) // (1)
      .map(ioResult => FileBO(hash, localPath, meta.size + ioResult.count, bodyPart.filename, bodyPart.headers))
  }

斷點上傳時的邏輯其實相對簡單,須要注意的是在(1)處調用FileIO.toPath將流定入本地時須要以APPEND模式追加寫入到已存在文件。

秒傳

在已實現斷點上傳功能之上,秒傳的實現邏輯就很是清晰了。客戶端在調用file/upload接口上傳文件以前先調用/file/progress/{hash}接口判斷相同hash值文件的上傳狀況,再決定下一步處理。

  1. 客戶端計算文件hash,並以文件hash和文件大小做爲參數調用/file/progress/{hash}接口
  2. 服務端根據上傳hash值判斷文件是否已上傳,若存在返回已上傳文檔大小(bytes)
  3. 客戶端收到服務端響應後根據文件是否存在及已存在文件大小判斷秒傳斷點續傳仍是新上傳
  4. 秒傳,返回文件長度與當前準備上傳文件長度大小一致
  5. 斷點續傳,返回文件大小比當前準備上傳文件長度小
  6. 新上傳,返回文件不存在
  7. 其它狀況,做爲新文件上傳

秒傳的代碼邏輯臺下:

def progressRoute: Route = path("progress" / Segment) { hash =>
    onSuccess(fileService.progressByHash(hash)) {
      case Some(v) =>
        import me.yangbajing.fileupload.util.JacksonSupport._
        complete(v)
      case None => complete(StatusCodes.NotFound)
    }
  }

文件上傳進度接口定義如上。

// FileServiceImpl.scala
  override def progressByHash(hash: String): Future[Option[FileMeta]] = {
    require(Objects.nonNull(hash) && hash.nonEmpty, "hash 不能爲空。")
    Future.successful(FileUtils.getFileMeta(hash))
  }

  // FileUtils.getFileMeta
  def getFileMeta(hash: String): Option[FileMeta] = {
    if (hash == null || Constants.HASH_LENGTH != hash.length) {
      None
    } else {
      val path = getLocalPath(hash)
      if (Files.exists(path) && Files.isReadable(path)) Some(FileMeta(hash, Files.size(path), path)) else None
    }
  }

文件上傳進度服務實現定義如上。

小結

本文以Akka HTTP和HTML 5講述了怎樣實現一個支持大文件斷點上傳、下載和秒傳的示例應用程序。

相關文章
相關標籤/搜索