消息服務框架使用案例之--大文件上傳(斷點續傳)功能html
在咱們的一個產品應用中,客戶須要上傳大量的文件到服務器,其中不乏很大的視頻文件。雖然可使用FTP這樣成熟穩定的工具,但客戶表示不會使用FTP工具,而且咱們產品也以爲客戶從咱們軟件在切換到FTP用戶體驗很差,若是作成後臺腳本調用FTP上傳那麼進度信息很難呈現到咱們軟件上。最終,決定咱們本身作文件上傳功能。服務器
大文件上傳受限於服務器每次處理數據的能力,不能一次傳輸完成,因此分塊上傳是必然的了,因爲上傳時間可能較長,中途可能由於網絡或者人爲緣由終止上傳,因此還須要斷點上傳功能。網絡
分塊上傳其實是在客戶端分塊讀取文件,而後在服務器分塊寫入文件,每次讀寫記錄下讀寫的起始位置,也就是文件的偏移量,和要讀寫的數據長度。在上傳過程當中,每完成一個文件數據塊的寫入,就向客戶端返回一次信息,客戶端據此進行下一文件數據塊的讀取。
斷點續傳功能也比較好實現,就是上傳過程當中將文件在服務器寫爲臨時文件,等所有寫完了(文件上傳完),將此臨時文件重命名爲正式文件便可,若是中途上傳中斷過,下次上傳的時候根據當前臨時文件大小,做爲在客戶端讀取文件的偏移量,今後位置繼續讀取文件數據塊,上傳到服務器今後偏移量繼續寫入文件便可。框架
假設咱們將每個文件數據塊看作一份「消息」,那麼文件上傳本質上就是客戶端和服務器兩端頻繁的消息交互而已。消息服務框架(MSF)是一個集成了服務容器和消息訪問的框架,正好能夠用來作文件上傳應用。具體作法就是在服務端,編寫一個「文件上傳服務」,在客戶端,編寫一個調用上傳服務的回調方法便可。異步
新建一個MSF服務類:tcp
public class FilesService : ServiceBase { }
而後添加一個處理上傳文件的方法:分佈式
/// <summary> /// 批量上傳文件(經過回調客戶端的方式,支持斷點續傳) /// </summary> /// <param name="list">文件列表</param> /// <returns></returns> public UploadResult UploadFiles(List<UploadFileInfos> list) { int uploadCount = 0; foreach (var uploadInfo in list) { string pathfile = string.Empty; try { pathfile = this.MapServerPath(uploadInfo.FilePath); if (!Directory.Exists(Path.GetDirectoryName(pathfile))) { Directory.CreateDirectory(Path.GetDirectoryName(pathfile)); } if (File.Exists(pathfile)) { FileInfo fi = new FileInfo(pathfile); if (fi.Length == uploadInfo.Size && fi.LastWriteTime == uploadInfo.FileModifyTime) { Console.WriteLine("文件 {0} {1}", pathfile, "已上傳,跳過"); continue;//文件已上傳,跳過 } else { fi.Delete(); } } //"斷點"上傳的文件 long offset = 0; //上傳的分部文件名稱增長一個文件長度數字,避免下次客戶端上傳的時候,修改了內容。 //若是文件上傳了一部分,的確修改了內容,那麼原來上傳的部分文件就丟棄了。 string partFile = pathfile + uploadInfo.Size + ".part"; if (File.Exists(partFile)) { FileInfo fi = new FileInfo(partFile); offset = fi.Length; } while (offset < uploadInfo.Size) { uploadInfo.Offset = offset; uploadInfo.Length = MaxReadSize; if (uploadInfo.Offset + uploadInfo.Length > uploadInfo.Size) uploadInfo.Length = (int)(uploadInfo.Size - uploadInfo.Offset); //回調客戶端,通知上傳文件塊 var data = GetUploadFileData(uploadInfo); if (data.Length == 0) { //若是有長度爲零的文件表示客戶讀取文件失敗,終止上傳操做 throw new Exception("讀取客戶端文件失敗(Length=0),終止上傳操做"); } if (data.Length != uploadInfo.Length) throw new Exception("網絡異常:上傳的文件流數據塊大小與預期的不一致"); //等待上次寫完 resetEvent.WaitOne(); //異步寫文件 System.Threading.Thread t = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(obj => { WriteFileInfo wfi = (WriteFileInfo)obj; CurWriteFile(wfi.FileName, wfi.WriteData, wfi.Offset); })); t.Start(new WriteFileInfo() { FileName = partFile, WriteData = data, Offset = offset }); offset += uploadInfo.Length; } resetEvent.WaitOne(); //重命名到正常文件名 File.Move(partFile, pathfile); System.IO.File.SetLastWriteTime(pathfile, uploadInfo.FileModifyTime); uploadCount++; resetEvent.Set(); } catch (Exception ex) { resetEvent.Set(); return new UploadResult() { Success = false, FilesCount = 0, Message = ex.Message }; }//end try } //end for return new UploadResult() { Success = true, FilesCount = list.Count }; }
在這個方法中,有一個重要方法,
//回調客戶端,通知上傳文件塊
var data = GetUploadFileData(uploadInfo);ide
它調用了MSF框架服務上下文的回調函數CallBackFunction,來讀取客戶端文件數據的,代碼以下:函數
private byte[] GetUploadFileData(UploadFileInfos fileinfo) { return base.CurrentContext.CallBackFunction<UploadFileInfos, byte[]>(fileinfo); }
另外,服務端寫文件的方法CurWriteFile 實現以下:工具
/// <summary> /// 將服務器端獲取到的字節流寫入文件 /// </summary> /// <param name="pReadByte">流</param> /// <param name="fileName">文件名</param> /// <param name="offset">要寫入文件的位置</param> public void CurWriteFile(string fileName, byte[] pReadByte, long offset) { FileStream pFileStream = null; try { pFileStream = new FileStream(fileName, FileMode.OpenOrCreate); pFileStream.Seek(offset, SeekOrigin.Begin); pFileStream.Write(pReadByte, 0, pReadByte.Length); } catch(Exception ex) { throw new Exception("寫文件塊失敗,寫入位置:"+offset+",文件名:"+fileName+",錯誤緣由:"+ex.Message); } finally { if (pFileStream != null) pFileStream.Close(); resetEvent.Set(); } }
如今看文件上傳客戶端代碼,如何提供服務端須要的文件讀取回調函數:
ServiceRequest request = new ServiceRequest(); request.ServiceName = "FilesService"; request.MethodName = "UploadFiles"; request.Parameters = new object[] { infos }; Proxy srvProxy = new Proxy(); srvProxy.ServiceBaseUri = string.Format("net.tcp://{0}", serverHost); srvProxy.ErrorMessage += srvProxy_ErrorMessage; Task<UploadResult> result= srvProxy.RequestServiceAsync<UploadResult, UploadFileInfos, byte[]>(request, uploadingInfo => { //action委託方法顯示進度給客戶端 action(new UploadStateArg() { State = uploadingInfo.Offset + uploadingInfo.Length >= uploadingInfo.Size ? UploadState.Success: UploadState.Uploading, ProgressFile = uploadingInfo.FilePath, ProcessValue = Convert.ToInt32(uploadingInfo.Offset * 100 / uploadingInfo.Size), TotalProcessValue = Convert.ToInt32((uploadingInfo.UploadIndex +1) * 100 / index) }); Console.WriteLine(">>>Debug:Path:{0},FilePath:{1}",folder, uploadingInfo.FilePath); var fullName = Path.IsPathRooted(folder)? folder + uploadingInfo.FilePath : uploadingInfo.FilePath; Console.WriteLine(">>>服務器讀取客戶端文件:{0},偏移量:{1} 長度:{2}", fullName, uploadingInfo.Offset, uploadingInfo.Length); return ReadFileData(fullName, uploadingInfo.Offset, uploadingInfo.Length); } );
在上面的方法中, srvProxy.RequestServiceAsync泛型方法須要3個參數,第一個參數是服務的結果類型,第二個參數是提供給服務端回調方法(前面的base.CurrentContext.CallBackFunction方法)的參數,第三個參數是服務回調方法的結果。srvProxy.RequestServiceAsync 的回調方法的參數 uploadingInfo 是服務器推送過來的消息,裏面包含了須要讀取的文件信息,包括文件名,偏移量,讀取長度等信息。
其中,客戶端讀取文件的方法 ReadFileData 實現以下:
/// <summary> /// 讀取文件返回字節流 /// </summary> /// <param name="fileName">文件路徑</param> /// <param name="offset">要讀取的文件流的位置</param> /// <param name="length">要讀取的文件塊大小</param> /// <returns></returns> private byte[] ReadFileData(string fileName, long offset, int length) { FileStream pFileStream = null; byte[] pReadByte = new byte[0]; try { pFileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read); BinaryReader r = new BinaryReader(pFileStream); r.BaseStream.Seek(offset, SeekOrigin.Begin); pReadByte = r.ReadBytes(length); return pReadByte; } catch { return pReadByte; } finally { if (pFileStream != null) pFileStream.Close(); } }
這樣,在一次文件上傳的「請求-響應」過程當中,MSF的服務端進行了屢次回調客戶端的操做,客戶端根據服務端推送過來的參數信息來精確的讀取服務端須要的文件數據。一個支持斷點續傳的大文件上傳服務,使用MSF框架就作好了。
本文使用到的其它相關服務端對象的代碼定義以下:
/// <summary> /// 上傳狀態枚舉 /// </summary> public enum UploadState { /// <summary> /// 上傳成功 /// </summary> Success, /// <summary> /// 上傳中 /// </summary> Uploading, /// <summary> /// 錯誤 /// </summary> Error } /// <summary> /// 上傳狀態參數 /// </summary> public class UploadStateArg { /// <summary> /// 上傳狀態 /// </summary> public UploadState State { get; set; } /// <summary> /// 上傳的文件名 /// </summary> public string ProgressFile { get; set; } /// <summary> /// 處理的消息,若是出錯,這裏是錯誤消息 /// </summary> public string Message { get; set; } /// <summary> /// 處理進度(百分比) /// </summary> public int ProcessValue { get; set; } /// <summary> /// 整體處理進度(百分比) /// </summary> public int TotalProcessValue { get; set; } }
若是你不清楚如何使用MSF來實現本文的功能,請先閱讀下面的文章:
建議你讀完相關的其它兩篇文章:
「一切都是消息」--MSF(消息服務框架)之【請求-響應】模式
「一切都是消息」--MSF(消息服務框架)之【發佈-訂閱】模式
讀完後,建議你再讀讀MSF的理論總結:
有關消息服務框架(MSF)更多的討論,請加咱們QQ羣討論,羣號:18215717 ,加羣口令:消息服務框架