使用Task實現非阻塞式的I/O操做 基於任務的異步編程模式(TAP)

  在前面的《基於任務的異步編程模式(TAP)》文章中講述了.net 4.5框架下的異步操做自我實現方式,實際上,在.net 4.5中部分類已實現了異步封裝。如在.net 4.5中,Stream類加入了Async方法,因此基於流的通訊方式均可以實現異步操做。html

一、異步讀取文件數據

public static void TaskFromIOStreamAsync(string fileName)
{
    int chunkSize = 4096;
    byte[] buffer = new byte[chunkSize];

    FileStream fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, true);

    Task<int> task = fileStream.ReadAsync(buffer, 0, buffer.Length);
    task.ContinueWith((readTask) =>
    {
        int amountRead = readTask.Result;
        //必須在ContinueWith中釋放文件流 
        fileStream.Dispose();
        Console.WriteLine($"Async(Simple) Read {amountRead} bytes");
    });
}

  上述代碼中,異步讀取數據只讀取了一次,完成讀取後就將執行權交還主線程了。但在真實場景中,須要從流中讀取屢次才能得到所有的數據(如文件數據大於給定緩衝區大小,或處理來自網絡流的數據(數據還沒所有到達機器))。所以,爲了完成異步讀取操做,須要連續從流中讀取數據,直到獲取所需所有數據。編程

  上述問題致使須要兩級Task來處理。外層的Task用於所有的讀取工做,供調用程序使用。內層的Task用於每次的讀取操做。緩存

  第一次異步讀取會返回一個Task。若是直接返回調用Wait或者ContinueWith的地方,會在第一次讀取結束後繼續向下執行。其實是但願調用者在完成所有讀取操做後才執行。所以,不能把第一個Task發佈會給調用者,須要一個「僞Task」在完成所有讀取操做後再返回。網絡

  上述問題須要使用到TaskCompletionSource<T>類解決,該類能夠生成一個用於返回的「僞Task」。當異步讀取操做所有完成後,調用其對象的TrySetResult,讓Wait或ContinueWith的調用者繼續執行。框架

public static Task<long> AsynchronousRead(string fileName)
{
    int chunkSize = 4096;
    byte[] buffer = new byte[chunkSize];
    //建立一個返回的僞Task對象
    TaskCompletionSource<long> tcs = new TaskCompletionSource<long>();

    MemoryStream fileContents = new MemoryStream();//用於保存讀取的內容
    FileStream fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, true);
    fileContents.Capacity += chunkSize;//指定緩衝區大小。好像Capacity會自動增加,設置與否不要緊,後續寫入多少數據,就增加多少

    Task<int> task = fileStream.ReadAsync(buffer, 0, buffer.Length);
    task.ContinueWith(readTask => ContinueRead(readTask, fileStream, fileContents, buffer, tcs));
    //在ContinueWith中循環讀取,讀取完成後,再返回tcs的Task
    return tcs.Task;
}

/// <summary>
/// 繼續讀取數據
/// </summary>
/// <param name="task">讀取數據的線程</param>
/// <param name="fileStream">文件流</param>
/// <param name="fileContents">文件存放位置</param>
/// <param name="buffer">讀取數據緩存</param>
/// <param name="tcs">僞Task對象</param>
private static void ContinueRead(Task<int> task, FileStream fileStream, MemoryStream fileContents, byte[] buffer, TaskCompletionSource<long> tcs)
{
    if (task.IsCompleted)
    {
        int bytesRead = task.Result;
        fileContents.Write(buffer, 0, bytesRead);//寫入內存區域。彷佛Capacity會自動增加
        if (bytesRead > 0)
        {
            //雖然看似是一個新的任務,可是使用了ContinueWith,因此使用的是同一個線程。
            //沒有讀取完,開啓另外一個異步繼續讀取
            Task<int> newTask = fileStream.ReadAsync(buffer, 0, buffer.Length);
            //此處作了一個循環
            newTask.ContinueWith(readTask => ContinueRead(readTask, fileStream, fileContents, buffer, tcs));
        }
        else
        {
            //已經所有讀取完,因此須要返回數據
            tcs.TrySetResult(fileContents.Length);
            fileStream.Dispose();
            fileContents.Dispose();//應該是在使用了數據以後才釋放數據緩衝區的數據
        }
    }
}

二、適應Task的異步編程模式

  .NET Framework中的舊版異步方法都帶有「Begin-」和「End-」前綴。這些方法仍然有效,爲了接口的一致性,它們能夠被封裝到Task中。異步

  FromAsyn方法把流的BeginRead和EndRead方法做爲參數,再加上存放數據的緩衝區。BeginRead和EndRead方法會執行,並在EndRead完成後調用Continuation Task,把控制權交回主代碼。上述例子會關閉流並返回轉換的數據async

const int ReadSize = 256;//16k

/// <summary>
/// 從文件中獲取字符串
/// </summary>
/// <param name="path">文件路徑</param>
/// <returns>字符串</returns>
public static Task<string> GetStringFromFile(string path)
{
    FileInfo file = new FileInfo(path);
    byte[] buffer = new byte[file.Length];//存放數據的緩衝區

    FileStream fileStream = new FileStream(
        path, FileMode.Open, FileAccess.Read, FileShare.None, buffer.Length,
        FileOptions.DeleteOnClose | FileOptions.Asynchronous);

    Task<int> task = Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead,
        buffer, 0, ReadSize, null);//此參數爲BeginRead須要的參數

    TaskCompletionSource<string> tcs = new TaskCompletionSource<string>();

    task.ContinueWith(taskRead => OnReadBuffer(taskRead, fileStream, buffer, 0, tcs));

    return tcs.Task;
}

/// <summary>
/// 讀取數據
/// </summary>
/// <param name="taskRead">讀取任務</param>
/// <param name="fileStream">文件流</param>
/// <param name="buffer">讀取數據存放位置</param>
/// <param name="offset">讀取偏移量</param>
/// <param name="tcs">僞Task</param>
private static void OnReadBuffer(Task<int> taskRead, FileStream fileStream, byte[] buffer, int offset, TaskCompletionSource<string> tcs)
{
    int readLength = taskRead.Result;
    if (readLength > 0)
    {
        int newOffset = offset + readLength;
        Task<int> task = Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead,
            buffer, newOffset, Math.Min(buffer.Length - newOffset, ReadSize), null);

        task.ContinueWith(callBackTask => OnReadBuffer(callBackTask, fileStream, buffer, newOffset, tcs));
    }
    else
    {
        tcs.TrySetResult(System.Text.Encoding.UTF8.GetString(buffer, 0, buffer.Length));
        fileStream.Dispose();
    }
}

三、使用async 和 await方式讀取數據

  下面的示例中,使用了async和await關鍵字實現異步讀取一個文件的同時進行壓縮並寫入另外一個文件。全部位於await關鍵字以前的操做都運行於調用者線程,從await開始的操做都是在Continuation Task中運行。但有沒法使用這兩個關鍵字的場合:①Task的結束時機不明確時;②必須用到多級Task和TaskCompletionSource時異步編程

/// <summary>
/// 同步方法的壓縮
/// </summary>
/// <param name="lstFiles">文件清單</param>
public static void SyncCompress(IEnumerable<string> lstFiles)
{
    byte[] buffer = new byte[16384];
    foreach(string file in lstFiles)
    {
        using (FileStream inputStream = File.OpenRead(file))
        {
            using (FileStream outputStream = File.OpenWrite(file + ".compressed"))
            {
                using (System.IO.Compression.GZipStream compressStream = new System.IO.Compression.GZipStream(outputStream, System.IO.Compression.CompressionMode.Compress))
                {
                    int read = 0;
                    while((read=inputStream.Read(buffer,0,buffer.Length))>0)
                    {
                        compressStream.Write(buffer, 0,read);
                    }
                }
            }
        }
    }
}

/// <summary>
/// 異步方法的文件壓縮
/// </summary>
/// <param name="lstFiles">須要壓縮的文件</param>
/// <returns></returns>
public static async Task AsyncCompress(IEnumerable<string> lstFiles)
{
    byte[] buffer = new byte[16384];
    foreach(string file in lstFiles)
    {
        using (FileStream inputStream = File.OpenRead(file))
        {
            using (FileStream outputStream = File.OpenWrite(file + ".compressed"))
            {
                using (System.IO.Compression.GZipStream compressStream = new System.IO.Compression.GZipStream(outputStream, System.IO.Compression.CompressionMode.Compress))
                {
                    int read = 0;
                    while ((read = await inputStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
                    {
                        await compressStream.WriteAsync(buffer, 0, read);
                    }
                }
            }
        }
    }
}
相關文章
相關標籤/搜索