封裝Socket.BeginReceive/EndReceive以支持Timeout

Socket

.NET中的Socket類提供了網絡通訊經常使用的方法,分別提供了同步和異步兩個版本,其中異步的實現是基於APM異步模式實現,即BeginXXX/EndXXX的方式。異步方法因爲其非阻塞的特性,在需考慮程序性能和伸縮性的狀況下,通常會選擇使用異步方法。但使用過Socket提供的異步方法的同窗,應該都會注意到了Socket的異步方法是沒法設置Timeout的。以Receive操做爲例,Socket提供了一個ReceiveTimeout屬性,但該屬性設置的是同步版本的Socket.Receive()方法的Timeout值,該設置對異步的Socket.BeginReceive()無效:若是對方沒有返回任何消息,則BeginReceive操做將沒法完成,其中提供的回調函數也將不會調用。以下示例代碼所示:html

private static void TestSocketBeginReceive()
{
    Socket socket = new Socket(AddressFamily.InterNetwork,
        SocketType.Dgram, ProtocolType.Udp);
    byte[] content = Encoding.ASCII.GetBytes("Hello world");

    IPAddress ip = Dns.Resolve("www.google.com").AddressList[0];
    IPEndPoint receiver = new IPEndPoint(ip, 80);

    socket.BeginSendTo(content, 0, content.Length, SocketFlags.None, 
        receiver, SendToCb, socket);
    Console.WriteLine("Sent bytes: " + content.Length);
}

private static void SendToCb(IAsyncResult ar)
{
    var socket = ar.AsyncState as Socket;
    socket.EndSendTo(ar);
    byte[] buffer = new byte[1024];

    IAsyncResult receiveAr = socket.BeginReceive(buffer, 0, buffer.Length,
        SocketFlags.None, null, null);
    int received = socket.EndReceive(receiveAr);
    Console.WriteLine("Received bytes: " + received);
} 

因爲接收方不會返回任何消息,Socket.BeginReceive將永遠不會完成,SentToCb方法中的socket.EndReceive()調用將永遠阻塞,應用程序也沒法得知操做的狀態。網絡

支持Timeout

在個別的應用場景下,咱們但願既能使用Socket的異步通訊方法,保證程序的性能,同時又但願能指定Timeout值,當操做沒有在指定的時間內完成時,應用程序能獲得通知,以進行下一步的操做,如retry等。如下介紹的就是一種支持Timeout的Socket異步Receive操做的實現,方式以下:異步

  1. 基於APM異步模式封裝Socket.BeginReceive/EndReceive方法。
  2. 使用ThreadPool提供的RegisterWaitForSingleObject()方法註冊一個WaitOrTimerCallback,若是指定時間內操做未完成,則結束操做,並設置狀態爲Timeout。
  3. 將上述封裝實現爲Socket的擴展方法方便調用。

如下代碼簡化了全部的參數檢查和異常處理,實際使用中需添加相關邏輯。socket

AsyncResultWithTimeout

首先看一下IAsyncResult接口的實現:async

public class AsyncResultWithTimeout : IAsyncResult
{
    private ManualResetEvent m_waitHandle = new ManualResetEvent(false);
    public AsyncResultWithTimeout(AsyncCallback cb, object state)
    {
        this.AsyncState = state;
        this.Callback = cb;
    }

    #region IAsyncResult

    public object AsyncState { get; private set; }
    public WaitHandle AsyncWaitHandle { get { return m_waitHandle; } }
    public bool CompletedSynchronously { get { return false; } }
    public bool IsCompleted { get; private set; }

    #endregion

    public AsyncCallback Callback { get; private set; }
    public int ReceivedCount { get; private set; }
    public bool TimedOut { get; private set; }
    public void SetResult(int count)
    {
        this.IsCompleted = true;
        this.ReceivedCount = count;
        this.m_waitHandle.Set();

        if (Callback != null) Callback(this);
    }

    public void SetTimeout()
    {
        this.TimedOut = true;
        this.IsCompleted = true;
        this.m_waitHandle.Set();
    }
}

AsyncResultWithTimeOut類中包含了IAsyncResult接口中4個屬性的實現、用戶傳入的AsyncCallback委託、接收到的字節數ReceivedCount以及兩個額外的方法:函數

  1. SetResult(): 用於正常接收到消息時設置結果,標記操做完成以及執行回調。
  2. SetTimeout():當超時時,標記操做完成以及設置超時狀態。

StateInfo

StateInfo類用於保存相關的狀態信息,該對象會做爲Socket.BeginReceive()的最後一個參數傳入。當接收到消息時,接收到的字節數會保存到AsyncResult屬性中,並設置操做完成。當超時時,WatchTimeOut方法會將AsyncResult設置爲TimeOut狀態,並經過RegisteredWaitHandle屬性取消註冊的WaitOrTimerCallback.post

public class StateInfo
{
    public StateInfo(AsyncResultWithTimeout result, Socket socket)
    {
        this.AsycResult = result;
        this.Socket = socket;
    }

    public Socket Socket { get; private set; }
    public AsyncResultWithTimeout AsycResult { get; private set; }
    public RegisteredWaitHandle RegisteredWaitHandle { get; set; }
}

封裝Socket.BeginReceive性能

與Socket.BeginReceive方法相比,BeginReceive2添加了一個參數timeout,能夠設置該操做的超時時間,單位爲毫秒。BeginReceive2中調用Socket.BeginReceive()方法,其中指定的ReceiveCb回調將在正常接收到消息後將結果保存在stateInfo對象的AsyncResult屬性中,該屬性中的值就是BeginReceive2()方法返回的IAsyncResult。BeginReceive2調用Socket.BeginReceive後,在ThreadPool中註冊了一個WaitOrTimerCallback委託。ThreadPool將在Receive操做完成或者Timeout時調用該委託。this

public static class SocketExtension
{

    public static int EndReceive2(IAsyncResult ar)
    {
        var result = ar as AsyncResultWithTimeout;
        result.AsyncWaitHandle.WaitOne();

        return result.ReceivedCount;
    }

    public static AsyncResultWithTimeout BeginReceive2
    (
        this Socket socket,
        int timeout,
        byte[] buffer,
        int offset,
        int size,
        SocketFlags flags,
        AsyncCallback callback,
        object state
    )
    {
        var result = new AsyncResultWithTimeout(callback, state);

        var stateInfo = new StateInfo(result, socket);

        socket.BeginReceive(buffer, offset, size, flags, ReceiveCb, state);

        var registeredWaitHandle =
            ThreadPool.RegisterWaitForSingleObject(
                result.AsyncWaitHandle,
                WatchTimeOut,
                stateInfo, // 做爲state傳遞給WatchTimeOut
                timeout,
                true);

        // stateInfo中保存RegisteredWaitHandle,以方便在úWatchTimeOut
        // 中unregister.
        stateInfo.RegisteredWaitHandle = registeredWaitHandle;

        return result;
    }

    private static void WatchTimeOut(object state, bool timeout)
    {
        var stateInfo = state as StateInfo;
        // 設置的timeout前,操做未完成,則設置爲操做Timeout
        if (timeout)
        {
            stateInfo.AsycResult.SetTimeout();
        }

        // 取消以前註冊的WaitOrTimerCallback
        stateInfo.RegisteredWaitHandle.Unregister(
            stateInfo.AsycResult.AsyncWaitHandle);
    }

    private static void ReceiveCb(IAsyncResult result)
    {
        var state = result.AsyncState as StateInfo;
        var asyncResultWithTimeOut = state.AsycResult;
        var count = state.Socket.EndReceive(result);
        state.AsycResult.SetResult(count);
    }
}

 

試一下

如下代碼演示瞭如何使用BeginReceive2:google

 

private static void TestSocketBeginReceive2()
{
    Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
    byte[] content = Encoding.ASCII.GetBytes("Hello world");

    IPAddress ip = Dns.Resolve("www.google.com").AddressList[0];
    IPEndPoint receiver = new IPEndPoint(ip, 80);

    socket.BeginSendTo(content, 0, content.Length, SocketFlags.None, receiver, SendToCb2, socket);
    Console.WriteLine("Sent bytes: " + content.Length);
}

private static void SendToCb2(IAsyncResult ar)
{
    var socket = ar.AsyncState as Socket;
    socket.EndSendTo(ar);
    byte[] buffer = new byte[1024];

    AsyncResultWithTimeout receiveAr = socket.BeginReceive2(2000, buffer, 0, buffer.Length, SocketFlags.None, null, null);
    receiveAr.AsyncWaitHandle.WaitOne();
    if (receiveAr.TimedOut)
    {
        Console.WriteLine("Operation timed out.");
    }
    else
    {
        int received = socket.EndReceive(ar);
        Console.WriteLine("Received bytes: " + received);
    }
}

輸出結果以下:

image_thumb4

 

上述實現是針對BeginReceive的封裝,還能夠以相同的方式將Send/Receive封裝以支持Timeout, 或者更進一步支持retry操做。

附示例代碼:下載

 

出處:http://www.cnblogs.com/dytes/archive/2012/08/13/SocketAsyncOpWithTimeout.html

相關文章
相關標籤/搜索