[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是對標準socket接口的擴展。它提供了一種異步消息隊列,多消息模式,消息過濾(訂閱),對多種傳輸協議的無縫訪問。 當前有2個版本正在維護,版本3最新版爲3.3.4,版本4最新版本爲4.0.0-rc5。本文檔是對4.0.0-rc5分支代碼進行分析。
對NetMQ的源碼進行學習並分析理解,所以寫下該系列文章,本系列文章暫定編寫計劃以下:github
友情提示: 看本系列文章時最好獲取源碼,更有助於理解。編程
NetMQ 4.0.0底層使用的是IOCP(即完成端口)模式進行通訊的(3.3.4使用的是select模型),經過異步IO綁定到完成端口,來最大限度的提升性能。這裏不對同步/異步socket進行詳細介紹。稍微解釋下完成端口,爲了解決每一個socket客戶端使用一個線程進行通訊的性能問題,完成端口它充分利用內核對象的調度,只使用少許的幾個線程來處理和客戶端的全部通訊,消除了無謂的線程上下文切換,最大限度的提升了網絡通訊的性能。
想詳細瞭解完成端口的請看完成端口(Completion Port)詳解,講解的比較詳細,同時對各類網絡編程模型作了簡單的介紹。
所以NetMQ經過幾個(默認1個)IO線程處理通訊,上一片文章介紹了ZObejct對象,在該對象中存在許多命令的處理,實際對命令的發送,分配都是IO線程的工做。數組
IO線程初始化時會初始化Proactor
和IOThreadMailbox
網絡
var name = "iothread-" + threadId; m_proactor = new Proactor(name); m_mailbox = new IOThreadMailbox(name, m_proactor, this);
Proactor
對象就是用來綁定或處理完成端口用的,後面再作做詳細介紹。
IOThreadMailbox
是IO線程處理的信箱,每當有命令須要處理時,都會向當前Socket
對象所在的IO線程信箱發送命令。
讓咱們看一眼IOThread
對象和IOThreadMailbox
的定義負載均衡
internal sealed class IOThread : ZObject, IMailboxEvent { }
IOThread
對象繼承自ZObject
對象,記得上一節想到ZObject對象知道如何處理各類命令嗎?所以IOThread
對象也繼承了他父親的技能。同時IOThread
對象實現了IMailboxEvent
接口,這個接口之定義了一個方法。異步
internal interface IMailboxEvent { void Ready(); }
當IO信箱接受到命令時表示當前有命令準備好了,能夠進行 處理,IO信箱則會調用IO線程的Ready方法處理命令,那麼IO信息如何調用IO線程的Ready方法呢,來看下IOThreadMailbox
的構造函數。socket
internal class IOThreadMailbox : IMailbox { ... public IOThreadMailbox([NotNull] string name, [NotNull] Proactor proactor, [NotNull] IMailboxEvent mailboxEvent) { m_proactor = proactor; m_mailboxEvent = mailboxEvent; Command cmd; bool ok = m_commandPipe.TryRead(out cmd); } ... }
在IOThreadMailbox初始化時,傳入了IMailboxEvent。async
m_commandPipe是NetMQ的管道(Pipe),後面咱們會對其作介紹,這裏只要知道該管道用於存放命令便可,能夠__暫時__理解爲管道隊列。
每一個IOThread
會有一個Proactor
,Proactor
的工做就是將Socket
對象綁定到完成端口,而後定時去掃描完成端口是否有須要處理的Socket
對象。
internal class Proactor : PollerBase { ... public Proactor([NotNull] string name) { m_name = name; m_stopping = false; m_stopped = false; m_completionPort = CompletionPort.Create(); m_sockets = new Dictionary<AsyncSocket, Item>(); } ... }
Proactor
對象繼承自PollerBase
,那麼PollerBase
又是什麼呢?從命名能夠看這是一個輪詢基類,即該對象須要長時間不斷循環處理某件事情。
PollerBase
對象是一個抽象類,它有2個功能:
負載均衡
還記的Context中選擇IO線程時有這個一段代碼嗎?
IO線程的負載均衡功能就是PollBase對象提供的
m_load
字段值+1protected void AdjustLoad(int amount) { Interlocked.Add(ref m_load, amount); }
public int Load { get { #if NETSTANDARD1_3 return Volatile.Read(ref m_load); #else Thread.MemoryBarrier(); return m_load; #endif } }
IOThread
取PollBase對象(Proactor)的Load屬性時候會特殊處理,保證拿到的是最新的值。定時任務
PollBase第二個功能就是支持定時任務,即定時觸發某事件。
private readonly SortedList<long, List<TimerInfo>> m_timers;
PollBase內部有一個SortedList
,key爲任務執行的時間,value爲TimeInfo
。
TimeInfo
對象包含2個信息,id
和ITimerEvent
接口,id
用來辨別當前任務的類型,ITimerEvent
接口就包含了TimerEvent
方法,即如何執行。
如TcpConnection
鏈接失敗會從新鏈接時會重連,下面時TcpConnection
開始鏈接方法
private void StartConnecting() { Debug.Assert(m_s == null); // Create the socket. try { m_s = AsyncSocket.Create(m_addr.Resolved.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp); } catch (SocketException) { AddReconnectTimer(); return; } ... } private void AddReconnectTimer() { //獲取重連時間間隔 int rcIvl = GetNewReconnectIvl(); //IO線程的Proactor中,TcpConnection的ReconnectTimerId = 1 m_ioObject.AddTimer(rcIvl, ReconnectTimerId); ... }
IO線程會被封裝到IOObject
中,調用IOObject
的AddTimer
方法實際就是調用IO線程中Proactor
對象的AddTimer
方法,其方法定義以下
public void AddTimer(long timeout, [NotNull] IProactorEvents sink, int id) { long expiration = Clock.NowMs() + timeout; var info = new TimerInfo(sink, id); if (!m_timers.ContainsKey(expiration)) m_timers.Add(expiration, new List<TimerInfo>()); m_timers[expiration].Add(info); }
第一行會獲取當前的毫秒時間加上時間間隔。而後加入到m_timers
中。
m_completionPort = CompletionPort.Create(); m_sockets = new Dictionary<AsyncSocket, Item>();
初始化時會建立完成端口,當有socket須要處理時會和完成端口綁定。
初始化時還會初始化一個存放異步AsyncSocket
和item
的字典。
有關於AsyncSocket
和CompletionPort
能夠去Git上看AsyncIO的源碼,這裏不作分析。
Item
結構以下
private class Item { public Item([NotNull] IProactorEvents proactorEvents) { ProactorEvents = proactorEvents; Cancelled = false; } [NotNull] public IProactorEvents ProactorEvents { get; } public bool Cancelled { get; set; } }
它包含了IProactorEvents
接口的信息和當前Socket
操做是否被取消標誌。
internal interface IProactorEvents : ITimerEvent { void InCompleted(SocketError socketError, int bytesTransferred); void OutCompleted(SocketError socketError, int bytesTransferred); }
IProactorEvents
繼承自ITimerEvent
。同時它還聲明瞭InCompleted
和OutCompleted
方法,即發送或接收完成時如何處理,所以當須要處理Socket
時,會將當前Socket
處理方式保存到這個字典中。噹噹前對象發送消息完成,則會調用OutCompleted
方法,接收完成時則會調用InCompleted
方法。
當有Socket
須要綁定時會調用Proactor
的AddSocket
方法
public void AddSocket(AsyncSocket socket, IProactorEvents proactorEvents) { var item = new Item(proactorEvents); m_sockets.Add(socket, item); m_completionPort.AssociateSocket(socket, item); AdjustLoad(1); }
它包含2個參數,一個時異步Socket
對象和IProactorEvents
。而後加把他們加入到字段中並將他們綁定到完成端口上。第四段AdjustLoad
方法即把當前IO線程處理數量+1,用於負載均衡用。
當Socket
操做完成時會調用Proactor
的RemoveSocket
移除綁定
public void RemoveSocket(AsyncSocket socket) { AdjustLoad(-1); var item = m_sockets[socket]; m_sockets.Remove(socket); item.Cancelled = true; }
移除時會將item
的Cancelled
字段設置爲true
。因此當Proactor
輪詢處理Socket
時發現該Socket
操做被取消(移除),就會跳過處理。
在IO線程啓動時實際就是啓動Procator的work線程
public void Start() { m_proactor.Start(); }
public void Start() { m_worker = new Thread(Loop) { IsBackground = true, Name = m_name }; m_worker.Start(); }
完整的Loop
方法以下
private void Loop() { var completionStatuses = new CompletionStatus[CompletionStatusArraySize]; while (!m_stopping) { // Execute any due timers. int timeout = ExecuteTimers(); int removed; if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed)) continue; for (int i = 0; i < removed; i++) { try { if (completionStatuses[i].OperationType == OperationType.Signal) { var mailbox = (IOThreadMailbox)completionStatuses[i].State; mailbox.RaiseEvent(); } // if the state is null we just ignore the completion status else if (completionStatuses[i].State != null) { var item = (Item)completionStatuses[i].State; if (!item.Cancelled) { switch (completionStatuses[i].OperationType) { case OperationType.Accept: case OperationType.Receive: item.ProactorEvents.InCompleted( completionStatuses[i].SocketError, completionStatuses[i].BytesTransferred); break; case OperationType.Connect: case OperationType.Disconnect: case OperationType.Send: item.ProactorEvents.OutCompleted( completionStatuses[i].SocketError, completionStatuses[i].BytesTransferred); break; default: throw new ArgumentOutOfRangeException(); } } } } catch (TerminatingException) { } } } }
var completionStatuses = new CompletionStatus[CompletionStatusArraySize];
第一行初始化了CompletionStatus
數組,CompletionStatusArraySize
值爲100。
CompletionStatus
做用是用來保存socket的信息或狀態。
int timeout = ExecuteTimers();
protected int ExecuteTimers() { if (m_timers.Count == 0) return 0; long current = Clock.NowMs(); var keys = m_timers.Keys; for (int i = 0; i < keys.Count; i++) { var key = keys[i]; if (key > current) { return (int)(key - current); } var timers = m_timers[key]; foreach (var timer in timers) { timer.Sink.TimerEvent(timer.Id); } timers.Clear(); m_timers.Remove(key); i--; } return 0; }
ExecuteTimers
會計算以前加入到m_timers
須要等待的超時時間,若沒有對象則直接返回0,不然獲取若獲取到key時間在當前時間以前,則須要調用TimerEvent
方法,調用完成後移除。
若獲取到的key時間比當前時間大,則返回他們的差即爲須要等待的超時時間。
int removed; if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed)) continue;
GetMultipleQueuedCompletionStatus
方法傳入一個超時時間,若前面獲取的超時時間爲0,則這邊會設置爲-1,表示阻斷直到有要處理的才返回。
CompletionPort
內部維護了一個狀態隊列,removed
即爲處理完成返回的狀態個數。
若獲取成功則會返回true
,後面就開始遍歷completionStatuses
數組處理完成Socket
。
public struct CompletionStatus { internal CompletionStatus(AsyncSocket asyncSocket, object state, OperationType operationType, SocketError socketError, int bytesTransferred) : this() { AsyncSocket = asyncSocket; State = state; OperationType = operationType; SocketError = socketError; BytesTransferred = bytesTransferred; } public AsyncSocket AsyncSocket { get; private set; } public object State { get; internal set; } public OperationType OperationType { get; internal set; } public SocketError SocketError { get; internal set; } public int BytesTransferred { get; internal set; } }
CompletionStatus
是個結構體,它包含的信息如上。其中OperationType
是當前Socket
的處理方式。
public enum OperationType { Send, Receive, Accept, Connect, Disconnect, Signal }
在for
循環的一開始先會判斷當前狀態的OperationType
,如果Signal,則說明當前是個信號狀態,說明有命令須要處理,則會調用IO信箱的RaiseEvent
方法,實際爲IO線程的Ready
方法。
public void Ready() { Command command; while (m_mailbox.TryRecv(out command)) command.Destination.ProcessCommand(command); }
IOThread
會將當前信箱的全部命令進行處理。
若不是Signal
則會將CompletionStatus
保存的狀態信息轉換爲Item
對象,並判斷當前Socket
是否移除(取消)。若沒有則對其進行處理。判斷OperationType
,若爲Accept
或Receive
則表示須要接收,則調用InCompleted
方法。若爲Connect
,Disconnect
或Send
則表示有消息向外發送,則調用OutCompleted
方法。
至此IOThread
代碼分析完畢。
internal class IOObject : IProactorEvents { public IOObject([CanBeNull] IOThread ioThread) { if (ioThread != null) Plug(ioThread); } public void Plug([NotNull] IOThread ioThread) { Debug.Assert(ioThread != null); m_ioThread = ioThread; } }
IOObject
實際就是保存了IOThread
的信息和Socket
處理完成時如何執行,以及向外暴露了一些接口。
再次說明,若是向簡單瞭解完成端口如何使用,則看《完成端口使用》,若是想詳細瞭解完成端口則看下《完成端口詳細介紹》,若是想直到NetMQ的AsyncIO和完成端口的源碼請看AsyncIO。
該篇介紹了IO線程和完成端口的處理方式,若哪裏分析的不到位或有誤但願支出。
本文地址:http://www.javashuo.com/article/p-ghjehjvc-z.html 做者博客:傑哥很忙 歡迎轉載,請在明顯位置給出出處及連接)