我原本覺得文件斷點續傳功能很簡單,不就是提供2個方法:java
一個返回已經上傳的文件的長度;另一個負責上傳文件唄(請求帶上content-range 指明本次上傳的內容在整個文件中的位置),而後根據請求提供的位置寫唄,太簡單了。數據庫
可是實際狀況仍是比較複雜的,關鍵問題是,上面的描述如今想一想只能稱做爲文件分段上傳,而不是斷點續傳。apache
斷點意味着網絡會斷,而後斷了以後,服務端根本獲取不到本次上傳的內容,因而下次又只能從頭開始傳文件。一種解決辦法是客戶端將文件分紅很小的片斷(單個片斷丟了就整個片斷重傳),這個方案要求客戶端作不少工做,服務端還得根據片斷的編號組織文件,總之客戶端和服務端都挺麻煩。網絡
因而就想到用netty在寫一個服務filestoreApdapterServer,文件上傳提交給這個代理服務。這個作法有個前提就是,客戶端上傳的文件名稱保證惟一,而且在請求頭裏面帶着這個名字,以便服務端定位文件。利用的原理是通常長度比較大的消息體,netty會使用chunk傳輸,咱們取得chunk寫入臨時文件,這樣即便網絡斷了,服務端已經獲取的文件內容仍是保留在臨時文件裏面。session
流程以下:ide
1. filestoreApdapterServer將請求的消息體寫到臨時文件(網絡斷了也沒關係,讀到多少寫多少)。ui
2. 客戶端下次傳以前先調用getSize獲取上傳傳遞的文件長度,咱們就在這個getSize方法裏面偷偷的將第一步保存的臨時文件追加到正式文件裏面,而後返回文件長度。this
3. 客戶端根據獲取的服務端文件長度,定位未傳的文件位置,讀取上傳。重複1,2步驟。直到文件上傳完成。spa
看代碼:FilestoreAdaptorServerInitializerdebug
public class FilestoreAdaptorServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new HttpRequestDecoder()); pipeline.addLast("aggregator", new StreamChunkAggregator(-1)); pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("handler", new FileUploadAdaptorHandler()); } }
StreamChunkAggregator就是獲取上傳文件,寫臨時文件的:
public class StreamChunkAggregator extends MessageToMessageDecoder<HttpObject> { private static final Logger log = LoggerFactory.getLogger(StreamChunkAggregator.class); private volatile FullHttpMessage currentMessage; private volatile OutputStream out; private final int maxContentLength; private volatile File file; private ChannelHandlerContext ctx; public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024; private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS; /** * Creates a new instance. */ public StreamChunkAggregator(int maxContentLength) { this.maxContentLength = maxContentLength; } @Override protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception { FullHttpMessage currentMessage = this.currentMessage; if (msg instanceof HttpMessage) { HttpMessage m = (HttpMessage) msg; if (msg instanceof HttpRequest) { HttpRequest header = (HttpRequest) msg; this.currentMessage = currentMessage = new DefaultFullHttpRequest(header.getProtocolVersion(), header.getMethod(), header.getUri(), Unpooled.compositeBuffer(maxCumulationBufferComponents)); final String localName = m.headers().get("file"); // 取上傳文件名 log.debug("upload file name is {}", localName); if(null == localName || "".equals(localName.trim())) { ctx.fireChannelRead(m); } File dir = new File(ServerHelper.getDestDir().getAbsolutePath() + File.separator + ServerHelper.getStorePath(localName)); if(!dir.exists()) dir.mkdirs(); log.debug("upload file path is {}", dir.getAbsolutePath()); File tempFile = new File(dir, localName + ".utmp"); if(tempFile.exists()) { // 文件已經存在多是上次上傳遺留的 tempFile.delete(); } this.file = tempFile; this.out = new FileOutputStream(file, true); } else { throw new Error(); } currentMessage.headers().set(m.headers()); } else if (msg instanceof HttpContent) { assert currentMessage != null; HttpContent chunk = (HttpContent) msg; if (chunk.content().isReadable()) { chunk.retain(); IOUtils.copyLarge(new ByteBufInputStream(chunk.content()), this.out); } final boolean last; if (!chunk.getDecoderResult().isSuccess()) { currentMessage.setDecoderResult( DecoderResult.failure(chunk.getDecoderResult().cause())); last = true; } else { last = chunk instanceof LastHttpContent; } if (last) { this.out.flush(); this.out.close(); this.out = null; this.currentMessage = null; this.file = null; out.add(currentMessage); } } else { throw new Error(); } }
FileUploadAdaptorHandler 這個是最後傳成功後通知真正的服務端,而且獲取服務的返回,給客戶端:
public class FileUploadAdaptorHandler extends SimpleChannelInboundHandler<DefaultFullHttpRequest> { private static final Logger log = LoggerFactory.getLogger(FileUploadAdaptorHandler.class); @Override protected void channelRead0(final ChannelHandlerContext ctx, DefaultFullHttpRequest msg) throws Exception { if(log.isDebugEnabled()) { log.debug("message received: begin"); } final String filename = msg.headers().get("file"); if(filename == null || "".equals(filename.trim())) { //沒有文件名 直接返回4001 參數錯誤 String responseBody = "{\"result_code\": 4001,\"result_msg\": \"請求參數錯誤\"}"; response(responseBody.getBytes(), HttpResponseStatus.BAD_REQUEST, ctx); } else { // 轉發給play服務處理 final CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault(); httpclient.start(); try { HttpGet request1 = new HttpGet(ServerHelper.getPlayServer()); request1.setHeader("Client-Session", msg.headers().get("client-session")); request1.setHeader("Content-Range", msg.headers().get("content-range")); request1.setHeader("file", msg.headers().get("file")); httpclient.start(); httpclient.execute(request1, new FutureCallback<org.apache.http.HttpResponse>() { @Override public void failed(Exception e) { try { httpclient.close(); } catch (IOException e1) { log.error(e1.getMessage(), e1); } serve500(ctx, filename); } @Override public void completed(org.apache.http.HttpResponse playResonse) { log.debug("HttpAsyncClient callback"); int status = playResonse.getStatusLine().getStatusCode(); log.debug("HttpAsyncClient callback playResonse status is {}", status); if(status != 200) { ServerHelper.deleteTmpFile(filename); } HttpEntity entity = playResonse.getEntity(); byte[] bytes = new byte[(int) entity.getContentLength()]; try { IOUtils.read(entity.getContent(), bytes); response(bytes, new HttpResponseStatus(status, ""), ctx); } catch (Exception e) { log.error(e.getMessage(), e); serve500(ctx, filename); } finally { try { httpclient.close(); } catch (IOException e1) { log.error(e1.getMessage(), e1); } } } @Override public void cancelled() { try { httpclient.close(); } catch (IOException e1) { log.error(e1.getMessage(), e1); } serve500(ctx, filename); } }); } catch (Exception e) { httpclient.close(); log.error(e.getMessage(), e); serve500(ctx, filename); } } if(log.isDebugEnabled()) { log.debug("message received: end"); } }
真正服務提供2個方法,一個是獲取長度,一個是接收filestoreAapterServer請求的方法:
public static void getFileLength(String name) { Logger.debug("getFileLength path is " + FileHelper.getStorgePath(name)); File file = new File(FileHelper.getStorgePath(name)); long length = file.length(); response.status = StatusCode.OK; response.setHeader("Content-Size", String.valueOf(length)); LocalFile file = LocalFile .find(。。。).first(); if(file != null){ // 若是數據中有記錄則認爲文件已經保存完整 Logger.debug("getFileLength file has been in database"); FileResult result = new FileResult(); 。。。 throw new CustomJsonResult(result); } File fileTmp = new File(FileHelper.getStorgePath(name) + FileHelper.TMP_SUFFIX); if(Logger.isDebugEnabled()) Logger.debug("getFileLength temp path is " + fileTmp.getAbsolutePath() + ", existed is: " + fileTmp.exists()); if(fileTmp.exists()) { // 臨時文件存在,則保存臨時文件 Logger.debug("getFileLength save tmp file"); try { FileHelper.saveFileFromTmp(fileTmp, file); } catch (IOException ingore) { Logger.error(ingore.getMessage(), ingore); } length = file.length(); } response.setHeader("Content-Size", String.valueOf(length)); }
public static void saveUploadFile() { String filename = getFileName(); Logger.debug("saveUploadFile name is %s", filename); long total = getFileTotal(); // 整個文件的大小 File tempFile = new File(FileHelper.getStorgePath(filename) + FileHelper.TMP_SUFFIX); if(Logger.isDebugEnabled()) { Logger.debug("saveUploadFile upload tmp file is: " + tempFile.getAbsolutePath()); } if(!tempFile.exists()) { ApiResult result = new ApiResult(); result.resultCode = ApiResultCode.UPLOAD_FILE_FAIL; response.status = Http.StatusCode.INTERNAL_ERROR; throw new CustomJsonResult(result); } File destFile = new File(FileHelper.getStorgePath(filename)); if(destFile.length() >= total) { // 已經上傳成功了 須要刪除臨時文件 FileUtils.deleteQuietly(tempFile); if(Logger.isDebugEnabled()) { Logger.debug("saveUploadFile video has upload completely"); } // 已經完整了,若是數據庫不存在保存數據庫 .... FileResult result = new FileResult(); result.resultCode = ApiResultCode.SUCCESS; result.videoUrl = video.videoUrl; result.shortUrl = video.shortUrl; throw new CustomJsonResult(result); } try { FileHelper.saveFileFromTmp(tempFile, destFile); } catch (IOException e) { Logger.error("saveUploadFile " + e.getMessage(), e); ApiResult result = new ApiResult(); result.resultCode = ApiResultCode.UPLOAD_FILE_FAIL; response.status = Http.StatusCode.INTERNAL_ERROR; throw new CustomJsonResult(result); } afterWrite(filename, destFile, total); //一些後續工做,若是文件保存完整,保存數據庫返回成功結果給客戶端 }
這個解決方法,和咱們的服務綁定的比較緊,不能解決較爲通用的問題 只是提出一種思路。