消息隊列NetMQ 原理分析2-IO線程和完成端口


前言

介紹

[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是對標準socket接口的擴展。它提供了一種異步消息隊列,多消息模式,消息過濾(訂閱),對多種傳輸協議的無縫訪問。
當前有2個版本正在維護,版本3最新版爲3.3.4,版本4最新版本爲4.0.0-rc5。本文檔是對4.0.0-rc5分支代碼進行分析。

zeromq的英文文檔
NetMQ的英文文檔git

目的

對NetMQ的源碼進行學習並分析理解,所以寫下該系列文章,本系列文章暫定編寫計劃以下:github

  1. 消息隊列NetMQ 原理分析1-Context和ZObject
  2. 消息隊列NetMQ 原理分析2-IO線程和完成端口
  3. 消息隊列NetMQ 原理分析3-命令產生/處理和回收線程
  4. 消息隊列NetMQ 原理分析4-Session和Pipe
  5. 消息隊列NetMQ 原理分析5-Engine
  6. 消息隊列NetMQ 原理分析6-TCP和Inpoc實現
  7. 消息隊列NetMQ 原理分析7-Device
  8. 消息隊列NetMQ 原理分析8-不一樣類型的Socket
  9. 消息隊列NetMQ 原理分析9-實戰

友情提示: 看本系列文章時最好獲取源碼,更有助於理解。編程


IO線程

NetMQ 4.0.0底層使用的是IOCP(即完成端口)模式進行通訊的(3.3.4使用的是select模型),經過異步IO綁定到完成端口,來最大限度的提升性能。這裏不對同步/異步socket進行詳細介紹。稍微解釋下完成端口,爲了解決每一個socket客戶端使用一個線程進行通訊的性能問題,完成端口它充分利用內核對象的調度,只使用少許的幾個線程來處理和客戶端的全部通訊,消除了無謂的線程上下文切換,最大限度的提升了網絡通訊的性能。
想詳細瞭解完成端口的請看完成端口(Completion Port)詳解,講解的比較詳細,同時對各類網絡編程模型作了簡單的介紹。
所以NetMQ經過幾個(默認1個)IO線程處理通訊,上一片文章介紹了ZObejct對象,在該對象中存在許多命令的處理,實際對命令的發送,分配都是IO線程的工做。數組

初始化IO線程

IO線程初始化時會初始化ProactorIOThreadMailbox網絡

var name = "iothread-" + threadId;
m_proactor = new Proactor(name);
m_mailbox = new IOThreadMailbox(name, m_proactor, this);

Proactor對象就是用來綁定或處理完成端口用的,後面再作做詳細介紹。
IOThreadMailbox是IO線程處理的信箱,每當有命令須要處理時,都會向當前Socket對象所在的IO線程信箱發送命令。
讓咱們看一眼IOThread對象和IOThreadMailbox的定義負載均衡

internal sealed class IOThread : ZObject, IMailboxEvent
{
}

IOThread對象繼承自ZObject對象,記得上一節想到ZObject對象知道如何處理各類命令嗎?所以IOThread對象也繼承了他父親的技能。同時IOThread對象實現了IMailboxEvent接口,這個接口之定義了一個方法。異步

internal interface IMailboxEvent
{
    void Ready();
}

當IO信箱接受到命令時表示當前有命令準備好了,能夠進行 處理,IO信箱則會調用IO線程的Ready方法處理命令,那麼IO信息如何調用IO線程的Ready方法呢,來看下IOThreadMailbox的構造函數。socket

internal class IOThreadMailbox : IMailbox
{
    ...
    public IOThreadMailbox([NotNull] string name, [NotNull] Proactor proactor, [NotNull] IMailboxEvent mailboxEvent)
    {
        m_proactor = proactor;
        m_mailboxEvent = mailboxEvent;
        Command cmd;
        bool ok = m_commandPipe.TryRead(out cmd);
    }
    ...
}

在IOThreadMailbox初始化時,傳入了IMailboxEvent。async

m_commandPipe是NetMQ的管道(Pipe),後面咱們會對其作介紹,這裏只要知道該管道用於存放命令便可,能夠__暫時__理解爲管道隊列。

Proactor

每一個IOThread會有一個Proactor,Proactor的工做就是將Socket對象綁定到完成端口,而後定時去掃描完成端口是否有須要處理的Socket對象。

internal class Proactor : PollerBase
{
    ...
    public Proactor([NotNull] string name)
    {
        m_name = name;
        m_stopping = false;
        m_stopped = false;
        m_completionPort = CompletionPort.Create();
        m_sockets = new Dictionary<AsyncSocket, Item>();
    }
    ...
}

Proactor對象繼承自PollerBase,那麼PollerBase又是什麼呢?從命名能夠看這是一個輪詢基類,即該對象須要長時間不斷循環處理某件事情。
PollerBase對象是一個抽象類,它有2個功能:

  1. 負載均衡

    還記的Context中選擇IO線程時有這個一段代碼嗎?

    IO線程的負載均衡功能就是PollBase對象提供的

    每次選擇IO線程時會將m_load字段值+1
    protected void AdjustLoad(int amount) { Interlocked.Add(ref m_load, amount); }
    public int Load { get { #if NETSTANDARD1_3 return Volatile.Read(ref m_load); #else Thread.MemoryBarrier(); return m_load; #endif } }
    IOThread取PollBase對象(Proactor)的Load屬性時候會特殊處理,保證拿到的是最新的值。
  2. 定時任務
    PollBase第二個功能就是支持定時任務,即定時觸發某事件。

    private readonly SortedList<long, List<TimerInfo>> m_timers;

    PollBase內部有一個SortedList,key爲任務執行的時間,value爲TimeInfo
    TimeInfo對象包含2個信息,idITimerEvent接口,id用來辨別當前任務的類型,ITimerEvent接口就包含了TimerEvent方法,即如何執行。
    TcpConnection鏈接失敗會從新鏈接時會重連,下面時TcpConnection開始鏈接方法

    private void StartConnecting()
    {
        Debug.Assert(m_s == null);
    
        // Create the socket.
        try
        {
            m_s = AsyncSocket.Create(m_addr.Resolved.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
        }
        catch (SocketException)
        {
            AddReconnectTimer();
            return;
        }
        ...
    }
    private void AddReconnectTimer()
    {
        //獲取重連時間間隔
        int rcIvl = GetNewReconnectIvl();
        //IO線程的Proactor中,TcpConnection的ReconnectTimerId = 1 
        m_ioObject.AddTimer(rcIvl, ReconnectTimerId);
        ...
    }

    IO線程會被封裝到IOObject中,調用IOObjectAddTimer方法實際就是調用IO線程中Proactor對象的AddTimer方法,其方法定義以下

    public void AddTimer(long timeout, [NotNull] IProactorEvents sink, int id)
    {
        long expiration = Clock.NowMs() + timeout;
        var info = new TimerInfo(sink, id);
    
        if (!m_timers.ContainsKey(expiration))
            m_timers.Add(expiration, new List<TimerInfo>());
    
        m_timers[expiration].Add(info);
    }

    第一行會獲取當前的毫秒時間加上時間間隔。而後加入到m_timers中。

m_completionPort = CompletionPort.Create();
m_sockets = new Dictionary<AsyncSocket, Item>();

初始化時會建立完成端口,當有socket須要處理時會和完成端口綁定。
初始化時還會初始化一個存放異步AsyncSocketitem的字典。
有關於AsyncSocketCompletionPort能夠去Git上看AsyncIO的源碼,這裏不作分析。
Item結構以下

private class Item
{
    public Item([NotNull] IProactorEvents proactorEvents)
    {
        ProactorEvents = proactorEvents;
        Cancelled = false;
    }

    [NotNull] 
    public IProactorEvents ProactorEvents { get; }
    public bool Cancelled { get; set; }
}

它包含了IProactorEvents接口的信息和當前Socket操做是否被取消標誌。

internal interface IProactorEvents : ITimerEvent
{
    void InCompleted(SocketError socketError, int bytesTransferred);
    void OutCompleted(SocketError socketError, int bytesTransferred);
}

IProactorEvents繼承自ITimerEvent。同時它還聲明瞭InCompletedOutCompleted方法,即發送或接收完成時如何處理,所以當須要處理Socket時,會將當前Socket處理方式保存到這個字典中。噹噹前對象發送消息完成,則會調用OutCompleted方法,接收完成時則會調用InCompleted方法。
當有Socket須要綁定時會調用ProactorAddSocket方法

public void AddSocket(AsyncSocket socket, IProactorEvents proactorEvents)
{
    var item = new Item(proactorEvents);
    m_sockets.Add(socket, item);
    m_completionPort.AssociateSocket(socket, item);
    AdjustLoad(1);
}

它包含2個參數,一個時異步Socket對象和IProactorEvents。而後加把他們加入到字段中並將他們綁定到完成端口上。第四段AdjustLoad方法即把當前IO線程處理數量+1,用於負載均衡用。

Socket操做完成時會調用ProactorRemoveSocket移除綁定

public void RemoveSocket(AsyncSocket socket)
{
    AdjustLoad(-1);
    var item = m_sockets[socket];
    m_sockets.Remove(socket);
    item.Cancelled = true;
}

移除時會將itemCancelled字段設置爲true。因此當Proactor輪詢處理Socket時發現該Socket操做被取消(移除),就會跳過處理。

啓動Procator線程輪詢

在IO線程啓動時實際就是啓動Procator的work線程

public void Start()
{
    m_proactor.Start();
}
public void Start()
{
    m_worker = new Thread(Loop) { IsBackground = true, Name = m_name };
    m_worker.Start();
}

處理socket

完整的Loop方法以下

private void Loop()
{
    var completionStatuses = new CompletionStatus[CompletionStatusArraySize];
    while (!m_stopping)
    {
        // Execute any due timers.
        int timeout = ExecuteTimers();
        int removed;
        if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))
            continue;
        for (int i = 0; i < removed; i++)
        {
            try
            {
                if (completionStatuses[i].OperationType == OperationType.Signal)
                {
                    var mailbox = (IOThreadMailbox)completionStatuses[i].State;
                    mailbox.RaiseEvent();
                }
                    // if the state is null we just ignore the completion status
                else if (completionStatuses[i].State != null)
                {
                    var item = (Item)completionStatuses[i].State;

                    if (!item.Cancelled)
                    {
                            switch (completionStatuses[i].OperationType)
                            {
                                case OperationType.Accept:
                                case OperationType.Receive:
                                    item.ProactorEvents.InCompleted(
                                        completionStatuses[i].SocketError,
                                        completionStatuses[i].BytesTransferred);
                                    break;
                                case OperationType.Connect:
                                case OperationType.Disconnect:
                                case OperationType.Send:
                                    item.ProactorEvents.OutCompleted(
                                        completionStatuses[i].SocketError,
                                        completionStatuses[i].BytesTransferred);
                                    break;
                                default:
                                    throw new ArgumentOutOfRangeException();
                            }
                        }
                    }
                }
            catch (TerminatingException)
            { }
        }
    }
}
var completionStatuses = new CompletionStatus[CompletionStatusArraySize];

第一行初始化了CompletionStatus數組,CompletionStatusArraySize值爲100。
CompletionStatus做用是用來保存socket的信息或狀態。

獲取超時時間

int timeout = ExecuteTimers();
protected int ExecuteTimers()
{
    if (m_timers.Count == 0)
        return 0;
    long current = Clock.NowMs();
    var keys = m_timers.Keys;
    for (int i = 0; i < keys.Count; i++)
    {
        var key = keys[i];
        if (key > current)
        {
            return (int)(key - current);
        }
        var timers = m_timers[key];
        foreach (var timer in timers)
        {
            timer.Sink.TimerEvent(timer.Id);
        }
        timers.Clear();
        m_timers.Remove(key);
        i--;
    }
    return 0;
}

ExecuteTimers會計算以前加入到m_timers須要等待的超時時間,若沒有對象則直接返回0,不然獲取若獲取到key時間在當前時間以前,則須要調用TimerEvent方法,調用完成後移除。
若獲取到的key時間比當前時間大,則返回他們的差即爲須要等待的超時時間。

從完成端口獲取處理完的狀態

int removed;
if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))
    continue;

GetMultipleQueuedCompletionStatus方法傳入一個超時時間,若前面獲取的超時時間爲0,則這邊會設置爲-1,表示阻斷直到有要處理的才返回。
CompletionPort內部維護了一個狀態隊列,removed即爲處理完成返回的狀態個數。
若獲取成功則會返回true,後面就開始遍歷completionStatuses數組處理完成Socket

開始處理待處理的狀態

public struct CompletionStatus
{
    internal CompletionStatus(AsyncSocket asyncSocket, object state, OperationType operationType, SocketError socketError, int bytesTransferred) : 
        this()
    {
        AsyncSocket = asyncSocket;
        State = state;
        OperationType = operationType;
        SocketError = socketError;
        BytesTransferred = bytesTransferred;
    }
    public AsyncSocket AsyncSocket { get; private set; }
    public object State { get; internal set; }
    public OperationType OperationType { get; internal set; }
    public SocketError SocketError { get; internal set; }
    public int BytesTransferred { get; internal set; }        
}

CompletionStatus是個結構體,它包含的信息如上。其中OperationType是當前Socket的處理方式。

public enum OperationType
{
    Send, Receive, Accept, Connect, Disconnect, Signal
}

for循環的一開始先會判斷當前狀態的OperationType,如果Signal,則說明當前是個信號狀態,說明有命令須要處理,則會調用IO信箱的RaiseEvent方法,實際爲IO線程的Ready方法。

public void Ready()
{
    Command command;
    while (m_mailbox.TryRecv(out command))
        command.Destination.ProcessCommand(command);
}

IOThread會將當前信箱的全部命令進行處理。
若不是Signal則會將CompletionStatus保存的狀態信息轉換爲Item對象,並判斷當前Socket是否移除(取消)。若沒有則對其進行處理。判斷OperationType,若爲AcceptReceive則表示須要接收,則調用InCompleted方法。若爲Connect,DisconnectSend則表示有消息向外發送,則調用OutCompleted方法。

至此IOThread代碼分析完畢。

IOObject

internal class IOObject : IProactorEvents
{
    public IOObject([CanBeNull] IOThread ioThread)
    {
        if (ioThread != null)
            Plug(ioThread);
    }
    public void Plug([NotNull] IOThread ioThread)
    {
        Debug.Assert(ioThread != null);
        m_ioThread = ioThread;
    }
}

IOObject實際就是保存了IOThread的信息和Socket處理完成時如何執行,以及向外暴露了一些接口。

再次說明,若是向簡單瞭解完成端口如何使用,則看《完成端口使用》,若是想詳細瞭解完成端口則看下《完成端口詳細介紹》,若是想直到NetMQ的AsyncIO和完成端口的源碼請看AsyncIO

總結

該篇介紹了IO線程和完成端口的處理方式,若哪裏分析的不到位或有誤但願支出。


本文地址:http://www.javashuo.com/article/p-ghjehjvc-z.html 做者博客:傑哥很忙 歡迎轉載,請在明顯位置給出出處及連接)

相關文章
相關標籤/搜索