算是開始讀ZeroMQ(java)的代碼實現了吧,如今有了一個大致的瞭解,看起來實現是比較的乾淨的,抽象什麼的不算複雜。。。java
這裏先來看看它的I/O線程的實現吧,順帶看看是如何實現組件的通訊的。。。。api
首先要搞清楚I/O線程的實現,就先要弄懂一個類型,Poller(zmq.Poller.java),能夠將其當作是對selector的一個封裝,同時它還要管理定時事件,看了這麼多代碼,發現基本上都是在實現I/Oselect的地方完成了定時的實現。。。。微信
好了,不說太多閒話了,來看看它的繼承體系吧:多線程
這裏還將依賴關係也標出來了,首先繼承自PollerBase抽象類,而後實現了Runnable接口,本身還會建立一個Thread對象。。。看了這個圖,基本上就已經可以知道Poller的運行原理了吧。。。。socket
這裏先來看看PollerBase的實現吧,它其實主要是用來管理定時的,那麼先來看看他的一些重要的屬性和定義:函數
- private final AtomicInteger load; //這個load其實就是當前poller裏面註冊的channel的數量
-
- //這裏是要註冊的超時是事件
- private final class TimerInfo {
- IPollEvents sink; //事件回調
- int id;
-
- public TimerInfo(IPollEvents sink_, int id_) {
- sink = sink_;
- id = id_;
- }
- }
- private final Map<Long, TimerInfo> timers; //這裏記錄全部的超時對象,key是時間
- private final Map<Long, TimerInfo> addingTimers; //等待加入的超時事件
前面的一個原子Integer是用於記錄負載的,用於記錄當前poller裏面一共註冊了多少I/O對象。。。而後是超時事件的定義,sink是超時的事件回調函數,裏面有相應的方法,timer就記錄了全部的超時事件,addingTimers是須要加入的超時事件。。這裏的key都是超時的時間,value就是超時對象了。。。ui
這裏就來看兩個主要的方法就行了吧,先來看看如何加入超時事件:this
- //添加一個超時事件
- public void add_timer (long timeout_, IPollEvents sink_, int id_) {
- long expiration = Clock.now_ms () + timeout_; //計算超時的時間
- TimerInfo info = new TimerInfo(sink_, id_); //建立超時對象
- addingTimers.put(expiration, info); //將其添加到adding裏面去
-
- }
代碼應該很簡單可以看明白吧,第一個參數是超時時間,第二個參數是回調方法,第三個參數是ID,首先加上當前的時間就算出了超時的時間,而後建立超時對象,這裏先是將其放入了addingTimers裏面,而不是直接放到了timer裏面,。。。url
那麼接下來來看看如何執行全部的超時的方法吧:spa
- //執行全部的超時事件,返回下一個超時還剩下的時間
- protected long execute_timers() {
- if (!addingTimers.isEmpty()) { //若是當前還有須要添的超時時間,那麼須要將其添加進去
- timers.putAll(addingTimers);
- addingTimers.clear();
- }
- //沒有超時事件
- if (timers.isEmpty())
- return 0L;
-
- //獲取當前的時間
- long current = Clock.now_ms ();
-
- //遍歷全部的超時時間,這裏是從最小的開始的
- Iterator<Entry <Long, TimerInfo>> it = timers.entrySet().iterator();
- while (it.hasNext()) {
-
- Entry <Long, TimerInfo> o = it.next();
- // If we have to wait to execute the item, same will be true about
- // all the following items (multimap is sorted). Thus we can stop
- // checking the subsequent timers and return the time to wait for
- // the next timer (at least 1ms).
-
- //若是超時的時間大於當前的時間,那麼表示尚未超時,
- if (o.getKey() > current) {
- return o.getKey() - current; //返回下一個超時還剩下的時間
- }
-
- // Trigger the timer.
- //執行超時方法
- o.getValue().sink.timer_event (o.getValue().id);
- // Remove it from the list of active timers.
- it.remove();
- }
-
- if (!addingTimers.isEmpty())
- return execute_timers();
-
- // There are no more timers.
-
- return 0L; //若是是0 的話,表示沒有timer執行了
- }
- }
應該代碼也還算比較好理解吧,這裏能夠看到將addingTimers裏面的都放到了timers裏面。。。而後遍歷全部的超時對象,並執行他們的超時回調,知道一個超時時間尚未到,最後返回的是下一個超時事件還剩下多長的時間。。。
好了,那麼接下來來看看Poller類型的實現吧,先來看看它的重要定義:
- //在當前poller裏面註冊的封裝。。。
- private static class PollSet {
- protected IPollEvents handler; //事件的回調
- protected SelectionKey key; //註冊以後的key
- protected int ops; //註冊的事件
- protected boolean cancelled; //是否已經取消
-
- protected PollSet(IPollEvents handler) {
- this.handler = handler;
- key = null;
- cancelled = false;
- ops = 0;
- }
- }
- final private Map<SelectableChannel, PollSet> fd_table; //記錄全部的註冊,key是channel
-
- // If true, there's at least one retired event source.
- private boolean retired; //當前註冊的對象是否有更新,若是有更新的話,在執行select以前須要先更新註冊
-
- // If true, thread is in the process of shutting down.
- volatile private boolean stopping; //若是是true的話,那麼執行線程將會中止
- volatile private boolean stopped; //是否已經中止
-
- private Thread worker; //worker線程
- private Selector selector; //selector
- final private String name; //名字
這裏顯示定義了一個嵌套類,全部須要註冊到selector上的channel都會先構建這個對象,將其當作附件註冊到selector上。。。。其中handler是事件回調,key是selector註冊後取得的key,ops是註冊的事件類型
而後是fd_table,這個應該知道是幹嗎用的吧,用於關聯註冊的channel對象與其的PollSet對象。。。
這裏的retired用於標識當前的註冊的channel什麼的是否有更新。。。接下來的重要屬性還有thread,這個是幹嗎應該很清楚吧,還有一個selector就很少說了。。。
接下來來看看如何在poller對象上面註冊channel吧,有幾個比較重要的方法:
- //用於在當前的集合裏面添加須要註冊的channel,第一個參數是channel,第二個參數是事件回調
- public final void add_fd (SelectableChannel fd_, IPollEvents events_) {
- fd_table.put(fd_, new PollSet(events_)); //直接把放到map裏面就行了
- adjust_load (1); //增長load值,這裏所謂的負載其實就是在當前poller裏面註冊的channel的數量
- }
- //在key上面註冊事件,若是negate爲true的話,那麼表示是取消事件
- private final void register (SelectableChannel handle_, int ops, boolean negate) {
- PollSet pollset = fd_table.get(handle_); //獲取pollset對象
-
- if (negate) {
- pollset.ops = pollset.ops &~ ops; //取反,至關於取消事件
- } else {
- pollset.ops = pollset.ops | ops; //註冊事件
- }
-
- if (pollset.key != null) { //若是有key了,那麼表示已經註冊到selector上面了,那麼只須要更新key就行了
- pollset.key.interestOps(pollset.ops);
- } else {
- retired = true;
-
- }
- }
這裏首先須要調用add_fd方法,channel加入進去,而後再調用register方法註冊相應的事件,不知道爲啥要這麼弄。。直接一個方法實現不就行了麼。。可能有一些細節的東西我還不太清楚吧,很少說這個了。。
好了,接下來來看看它的run方法吧:
- //poller的執行流程
- public void run () {
- int returnsImmediately = 0;
-
- while (!stopping) {
- long timeout = execute_timers (); //執行全部的超時,而且獲取下一個超時的時間
-
- if (retired) { //這裏表示註冊的東西有更新
-
- Iterator <Map.Entry <SelectableChannel,PollSet>> it = fd_table.entrySet ().iterator ();
- while (it.hasNext ()) { //遍歷全部須要註冊的
- Map.Entry <SelectableChannel,PollSet> entry = it.next ();
- SelectableChannel ch = entry.getKey (); //獲取channel
- PollSet pollset = entry.getValue (); //獲取pollset
- if (pollset.key == null) { //這裏沒有key的話,表示當前channel並無註冊到selector上面去
- try {
- pollset.key = ch.register(selector, pollset.ops, pollset.handler); //註冊,這裏註冊的附件竟然是事件的回調函數
- } catch (ClosedChannelException e) {
- }
- }
-
-
- if (pollset.cancelled || !ch.isOpen()) { //若是是取消註冊,那麼直接取消掉就能夠了
- if(pollset.key != null) {
- pollset.key.cancel();
- }
- it.remove ();
- }
- }
- retired = false;
-
- }
-
- // Wait for events.
- int rc;
- long start = System.currentTimeMillis (); //select以前的時間
- try {
- rc = selector.select (timeout);
- } catch (IOException e) {
- throw new ZError.IOException (e);
- }
-
- if (rc == 0) { //出錯啦,好像
- // Guess JDK epoll bug
- if (timeout == 0 ||
- System.currentTimeMillis () - start < timeout / 2)
- returnsImmediately ++;
- else
- returnsImmediately = 0;
-
- if (returnsImmediately > 10) {
- rebuildSelector (); //重建selector
- returnsImmediately = 0;
- }
- continue;
- }
-
-
- Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //全部select出來的key
- while (it.hasNext()) { //遍歷
- SelectionKey key = it.next();
- IPollEvents evt = (IPollEvents) key.attachment();
- it.remove();
-
- try { //接下來就是判斷事件的類型執行相應的方法就行了
- if (key.isReadable() ) { //有數據能夠讀取了
- evt.in_event();
- } else if (key.isAcceptable()) { //有新的鏈接進來了
- evt.accept_event();
- } else if (key.isConnectable()) { //鏈接創建
- evt.connect_event();
- }
- if (key.isWritable()) { //可寫
- evt.out_event();
- }
- } catch (CancelledKeyException e) {
- // channel might have been closed
- }
-
- }
-
- }
-
- stopped = true;
-
- }
這個應該很容易看懂吧,首先執行了全部超時的事件,而後若是有註冊的channel更新的話,須要從新更新這些註冊,而後就能夠執行select方法了,接着遍歷出全部select的key,而後判斷事件的類型,執行相應的回調方法就行了。。。
最後來看看它的start方法:
- //啓動,這裏主要是建立一個線程,而後開始運行
- public void start() {
- worker = new Thread(this, name); //建立thread,
- worker.start(); //啓動這個執行線程
- }
好吧,簡單吧,建立一個線程,而後啓動就行了,這裏執行的就是run方法。。。。
好了,到這裏整個poller的實現和其運行基本上就算是搞清楚了。。。並且能夠知道poller對象纔是真的I/O線程的持有者。。。。
接下來來介紹另一個類型:Mailbox,每個I/O線程都會有本身的mailbox,並且鏈接也會有本身的mailbox,能夠向mailbox裏面發送命令,而後讓其執行。。。這裏能夠理解爲mailbox是命令的接收器,ZeroMQ就是用這個來實現組件之間的通訊的。。。。
先來看看他的一些重要的屬性定義吧:
- private final YPipe<Command> cpipe; //這名字太唬人了,其實就是一個保存command的隊列而已
-
- //其實能夠將其理解爲一個socketpair,若是有命令寫入了隊列,那麼經過在這裏寫入一個數據,能夠用於提醒有命令發送到了mialbox
- private final Signaler signaler; //用於通訊的signal,使用pipe實現的。。。,其實這裏只不過是一個噱頭,這裏寫入數據是爲了提醒執行線程command隊列裏面有命令寫入了
-
- private final Lock sync; //只有一個線程從mailbox裏面收命令,可是會有不少線程向mialbox裏面發送命令,用這個鎖來保護
-
- private boolean active; //用於判斷底層的pipe是否仍是活躍的,若是是true的話,表示底層的pipe活躍,能夠讀取命令
-
- // mailbox name, for better debugging
- private final String name; //當前mailbox的名字
這裏cpipe這個名字比較唬人,其實能夠就將其理解爲一個command的隊列,全部的命令都會放到這個裏面去,而後是signaler,這個是底層通訊的實現,它裏面建立了pipe,相似於socketpair,經過在在這個裏面寫數據,用於提醒cpipe裏面有命令寫進去了。。須要處理。。。
來看看幾個比較重要的方法吧:
- public SelectableChannel get_fd () {
- return signaler.get_fd (); //這裏其實獲取的是signal用到的pipe的讀channel
- }
-
- //向當前的mailbox發送命令,其實就是寫到command隊列裏面去而已
- public void send (final Command cmd_) {
- boolean ok = false;
- sync.lock ();
- try {
- cpipe.write (cmd_, false);
- ok = cpipe.flush (); //pipeflush,這裏將會被selector感應到,從而能夠執行相應的處理,在執行線程裏面執行命令
- } finally {
- sync.unlock ();
- }
-
- if (!ok) {
- signaler.send (); //經過寫端寫數據,這樣子的話會被讀端收到
- }
- }
-
- //收取命令,若是這裏沒法馬上獲取命令的話,還能夠有一個超時時間
- public Command recv (long timeout_) {
- Command cmd_ = null;
- // Try to get the command straight away.
- if (active) {
- cmd_ = cpipe.read (); //從隊列裏面獲取命令
- if (cmd_ != null) {
-
- return cmd_;
- }
- // If there are no more commands available, switch into passive state.
- active = false;
- signaler.recv (); //這裏會從讀端不斷的讀數據
- }
-
-
- // Wait for signal from the command sender.
- boolean rc = signaler.wait_event (timeout_);
- if (!rc)
- return null;
-
- // We've got the signal. Now we can switch into active state.
- active = true;
-
- // Get a command.
- cmd_ = cpipe.read ();
- assert (cmd_ != null);
-
- return cmd_;
- }
這裏獲取底層的fd,其實就是獲取用於通訊的signal的讀端的channel,而後向這個mailbox發送命令其實就是直接向command的隊列裏面放入命令就行了,而且這裏須要經過signaler來提醒一下。。。。
而後recv方法,用於獲取命令,其實最終仍是在命令隊列裏去拿。。。。
好了,到這裏mailbox差很少了,一些細節並無貼出來,由於其實這東西若是沒有搞懂具體是怎麼用的話也不可能搞得明白。。。。
好了,在最後開始IOThread這個類型以前先來介紹另外兩個東西吧:
(1)IPollEvents,這個是一個接口,也就是事件的回調。。來看看它的定義就知道了。。。
- public interface IPollEvents {
- void in_event () ; //當有數據能夠讀取的時候須要執行的方法
- void out_event () ; //當能夠寫的時候應該執行的方法
- void connect_event () ; //當已經創建了鏈接以後,應該執行的
- void accept_event(); //當有accept的時候,應該執行這個
- void timer_event (int id_) ; //當超時的時候應該執行的
- }
裏面定義了5個方法,具體這5個方法分別處理什麼事件應該看名字就可以很容易知道吧。。就不細說了。。
(2)ZObject,這個類型是幹嗎的呢,在前面已已經說過了,mailbox用於存取別的地方發送過來的命令,而ZObject就是用於執行命令的,若是須要組件能夠進行命令的交互,那麼就須要類型實現繼承ZObject,具體的類容就不說了,有興趣的本身看吧,很簡單的,,,,
好啦,終於到了最激動人心的時候了,來看看IOThread類型,看這個名字就知道它是幹嗎的吧,先來看看它的類型定義圖吧:
其實看到這裏也可以猜出來IOThread類型自己並無太多的內容,更多的時候都是有mailbox,poller來作了。。。
來看看它的一些重要屬性和構造函數吧:
- final private Mailbox mailbox; //I/O線程將會從這個mailbox裏面獲取命令
-
- final private SelectableChannel mailbox_handle; //mailbox會用到的chanel,其實也就是底層pipe的讀端
-
- final private Poller poller; //poller對象
-
- final String name; //這個IO線程的名字
-
- public IOThread(Ctx ctx_, int tid_) { //所屬的ctx,以及這個是第幾個IO線程,也能夠把它理解爲ID吧
- super(ctx_, tid_);
- name = "iothread-" + tid_;
- poller = new Poller(name); //建立poller
-
- mailbox = new Mailbox(name); //建立mailbox
- mailbox_handle = mailbox.get_fd(); //mailbox會用到的channel,pipe的讀端
- poller.add_fd (mailbox_handle, this); //在poller裏面註冊,其實這裏只是將其放到fd列表裏面,這裏的事件回調就是當前對象
- poller.set_pollin (mailbox_handle); //這裏註冊讀取事件
- }
這裏mailbox和poller是幹嗎用的就很少說了,另外這個mailbox_handle實際上是mailbox的signaler的讀端,並且能夠在構造函數中能夠看到將這個channel註冊到了poller上面去。。這樣若是有數據讀,那麼會被響應,也就意味着有命令發送到mailbox須要執行了。。。
咱們來看看這個回調函:
- //當mailbox能夠讀取的時候,將會執行這個方法,這裏其實也就是收到了命令
- public void in_event() {
- // TODO: Do we want to limit number of commands I/O thread can
- // process in a single go?
-
- while (true) {
-
- // Get the next command. If there is none, exit.
- //獲取須要執行的命令
- Command cmd = mailbox.recv (0);
- if (cmd == null)
- break;
-
- // Process the command.
- //執行命令
- cmd.destination().process_command (cmd); //其實對於IO線程對象,也就只有stop命令能夠執行
- }
-
- }
簡單吧,從mailbox裏面獲取command,而後直接執行就行了。。。。這裏IOThread自己就繼承了ZOjbect,因此這裏說白了就是本身須要執行命令,而在IOThread中,只有stop命令須要執行:
- //中止poller
- protected void process_stop ()
- {
- poller.rm_fd (mailbox_handle);
-
- poller.stop ();
-
- }
好啦,到這裏ZeroMQ中IO線程的實現應該就算是比較的清楚了。。並且如何實現組件間的通訊也算是比較的瞭解了。。。