ZeroMQ(java)之I/O線程的實現與組件間的通訊

算是開始讀ZeroMQ(java)的代碼實現了吧,如今有了一個大致的瞭解,看起來實現是比較的乾淨的,抽象什麼的不算複雜。。。java

這裏先來看看它的I/O線程的實現吧,順帶看看是如何實現組件的通訊的。。。。api

首先要搞清楚I/O線程的實現,就先要弄懂一個類型,Poller(zmq.Poller.java),能夠將其當作是對selector的一個封裝,同時它還要管理定時事件,看了這麼多代碼,發現基本上都是在實現I/Oselect的地方完成了定時的實現。。。。微信

好了,不說太多閒話了,來看看它的繼承體系吧:多線程



這裏還將依賴關係也標出來了,首先繼承自PollerBase抽象類,而後實現了Runnable接口,本身還會建立一個Thread對象。。。看了這個圖,基本上就已經可以知道Poller的運行原理了吧。。。。socket

這裏先來看看PollerBase的實現吧,它其實主要是用來管理定時的,那麼先來看看他的一些重要的屬性和定義:函數

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. private final AtomicInteger load;   //這個load其實就是當前poller裏面註冊的channel的數量  
  2.   
  3. //這裏是要註冊的超時是事件  
  4. private final class TimerInfo {  
  5.     IPollEvents sink;  //事件回調  
  6.     int id;  
  7.       
  8.     public TimerInfo(IPollEvents sink_, int id_) {  
  9.         sink = sink_;  
  10.         id = id_;  
  11.     }  
  12. }  
  13. private final Map<Long, TimerInfo> timers;   //這裏記錄全部的超時對象,key是時間  
  14. private final Map<Long, TimerInfo> addingTimers;   //等待加入的超時事件  

前面的一個原子Integer是用於記錄負載的,用於記錄當前poller裏面一共註冊了多少I/O對象。。。而後是超時事件的定義,sink是超時的事件回調函數,裏面有相應的方法,timer就記錄了全部的超時事件,addingTimers是須要加入的超時事件。。這裏的key都是超時的時間,value就是超時對象了。。。ui

這裏就來看兩個主要的方法就行了吧,先來看看如何加入超時事件:this

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. //添加一個超時事件  
  2. public void add_timer (long timeout_, IPollEvents sink_, int id_) {  
  3.     long expiration = Clock.now_ms () + timeout_;   //計算超時的時間  
  4.     TimerInfo info = new TimerInfo(sink_, id_);  //建立超時對象  
  5.     addingTimers.put(expiration, info);  //將其添加到adding裏面去  
  6.   
  7. }  

代碼應該很簡單可以看明白吧,第一個參數是超時時間,第二個參數是回調方法,第三個參數是ID,首先加上當前的時間就算出了超時的時間,而後建立超時對象,這裏先是將其放入了addingTimers裏面,而不是直接放到了timer裏面,。。。url

那麼接下來來看看如何執行全部的超時的方法吧:spa

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1.  //執行全部的超時事件,返回下一個超時還剩下的時間  
  2.     protected long execute_timers() {  
  3.         if (!addingTimers.isEmpty()) {  //若是當前還有須要添的超時時間,那麼須要將其添加進去  
  4.             timers.putAll(addingTimers);  
  5.             addingTimers.clear();  
  6.         }  
  7.         //沒有超時事件  
  8.         if (timers.isEmpty())  
  9.             return 0L;  
  10.   
  11.         //獲取當前的時間  
  12.         long current = Clock.now_ms ();  
  13.   
  14.         //遍歷全部的超時時間,這裏是從最小的開始的  
  15.         Iterator<Entry <Long, TimerInfo>> it = timers.entrySet().iterator();  
  16.         while (it.hasNext()) {  
  17.   
  18.             Entry <Long, TimerInfo> o = it.next();  
  19.             //  If we have to wait to execute the item, same will be true about  
  20.             //  all the following items (multimap is sorted). Thus we can stop  
  21.             //  checking the subsequent timers and return the time to wait for  
  22.             //  the next timer (at least 1ms).  
  23.   
  24.             //若是超時的時間大於當前的時間,那麼表示尚未超時,  
  25.             if (o.getKey() > current) {  
  26.                 return o.getKey() - current;  //返回下一個超時還剩下的時間  
  27.             }  
  28.   
  29.             //  Trigger the timer.  
  30.             //執行超時方法  
  31.             o.getValue().sink.timer_event (o.getValue().id);  
  32.             //  Remove it from the list of active timers.  
  33.             it.remove();  
  34.         }  
  35.   
  36.         if (!addingTimers.isEmpty())  
  37.             return execute_timers();  
  38.   
  39.         //  There are no more timers.  
  40.   
  41.         return 0L;  //若是是0 的話,表示沒有timer執行了  
  42.     }  
  43. }  

應該代碼也還算比較好理解吧,這裏能夠看到將addingTimers裏面的都放到了timers裏面。。。而後遍歷全部的超時對象,並執行他們的超時回調,知道一個超時時間尚未到,最後返回的是下一個超時事件還剩下多長的時間。。。

好了,那麼接下來來看看Poller類型的實現吧,先來看看它的重要定義:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. //在當前poller裏面註冊的封裝。。。  
  2.    private static class PollSet {  
  3.        protected IPollEvents handler;   //事件的回調  
  4.        protected SelectionKey key;   //註冊以後的key  
  5.        protected int ops;    //註冊的事件  
  6.        protected boolean cancelled;   //是否已經取消  
  7.          
  8.        protected PollSet(IPollEvents handler) {  
  9.            this.handler = handler;  
  10.            key = null;  
  11.            cancelled = false;  
  12.            ops = 0;  
  13.        }  
  14.    }  
  15.    final private Map<SelectableChannel, PollSet> fd_table;   //記錄全部的註冊,key是channel  
  16.   
  17.    //  If true, there's at least one retired event source.  
  18.    private boolean retired;    //當前註冊的對象是否有更新,若是有更新的話,在執行select以前須要先更新註冊  
  19.   
  20.    //  If true, thread is in the process of shutting down.  
  21.    volatile private boolean stopping;    //若是是true的話,那麼執行線程將會中止  
  22.    volatile private boolean stopped;   //是否已經中止  
  23.      
  24.    private Thread worker;   //worker線程  
  25.    private Selector selector;   //selector  
  26.    final private String name;   //名字  

這裏顯示定義了一個嵌套類,全部須要註冊到selector上的channel都會先構建這個對象,將其當作附件註冊到selector上。。。。其中handler是事件回調,key是selector註冊後取得的key,ops是註冊的事件類型

而後是fd_table,這個應該知道是幹嗎用的吧,用於關聯註冊的channel對象與其的PollSet對象。。。

這裏的retired用於標識當前的註冊的channel什麼的是否有更新。。。接下來的重要屬性還有thread,這個是幹嗎應該很清楚吧,還有一個selector就很少說了。。。

接下來來看看如何在poller對象上面註冊channel吧,有幾個比較重要的方法:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. //用於在當前的集合裏面添加須要註冊的channel,第一個參數是channel,第二個參數是事件回調  
  2. public final void add_fd (SelectableChannel fd_, IPollEvents events_) {  
  3.     fd_table.put(fd_, new PollSet(events_));  //直接把放到map裏面就行了  
  4.     adjust_load (1);  //增長load值,這裏所謂的負載其實就是在當前poller裏面註冊的channel的數量  
  5. }  
  6. //在key上面註冊事件,若是negate爲true的話,那麼表示是取消事件  
  7. private final void register (SelectableChannel handle_, int ops, boolean negate) {  
  8.     PollSet pollset = fd_table.get(handle_);  //獲取pollset對象  
  9.       
  10.     if (negate)  {  
  11.         pollset.ops = pollset.ops &~ ops;  //取反,至關於取消事件  
  12.     } else {  
  13.         pollset.ops = pollset.ops | ops;  //註冊事件  
  14.     }  
  15.       
  16.     if (pollset.key != null) {  //若是有key了,那麼表示已經註冊到selector上面了,那麼只須要更新key就行了  
  17.         pollset.key.interestOps(pollset.ops);    
  18.     } else {  
  19.         retired = true;  
  20.   
  21.     }  
  22. }  

這裏首先須要調用add_fd方法,channel加入進去,而後再調用register方法註冊相應的事件,不知道爲啥要這麼弄。。直接一個方法實現不就行了麼。。可能有一些細節的東西我還不太清楚吧,很少說這個了。。

好了,接下來來看看它的run方法吧:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. //poller的執行流程  
  2. public void run () {  
  3.     int returnsImmediately = 0;  
  4.   
  5.     while (!stopping) {  
  6.         long timeout = execute_timers ();  //執行全部的超時,而且獲取下一個超時的時間  
  7.           
  8.         if (retired) {  //這裏表示註冊的東西有更新  
  9.               
  10.             Iterator <Map.Entry <SelectableChannel,PollSet>> it = fd_table.entrySet ().iterator ();  
  11.             while (it.hasNext ()) {  //遍歷全部須要註冊的  
  12.                 Map.Entry <SelectableChannel,PollSet> entry = it.next ();  
  13.                 SelectableChannel ch = entry.getKey ();  //獲取channel  
  14.                 PollSet pollset = entry.getValue ();   //獲取pollset  
  15.                 if (pollset.key == null) {  //這裏沒有key的話,表示當前channel並無註冊到selector上面去  
  16.                     try {  
  17.                         pollset.key = ch.register(selector, pollset.ops, pollset.handler);   //註冊,這裏註冊的附件竟然是事件的回調函數  
  18.                     } catch (ClosedChannelException e) {  
  19.                     }  
  20.                 }   
  21.                   
  22.                   
  23.                 if (pollset.cancelled || !ch.isOpen()) {  //若是是取消註冊,那麼直接取消掉就能夠了  
  24.                     if(pollset.key != null) {  
  25.                         pollset.key.cancel();  
  26.                     }  
  27.                     it.remove ();  
  28.                 }  
  29.             }  
  30.             retired = false;  
  31.               
  32.         }  
  33.   
  34.         //  Wait for events.  
  35.         int rc;  
  36.         long start = System.currentTimeMillis ();  //select以前的時間  
  37.         try {  
  38.             rc = selector.select (timeout);  
  39.         } catch (IOException e) {  
  40.             throw new ZError.IOException (e);  
  41.         }  
  42.           
  43.         if (rc == 0) {   //出錯啦,好像  
  44.             //  Guess JDK epoll bug  
  45.             if (timeout == 0 ||  
  46.                     System.currentTimeMillis () - start < timeout / 2)  
  47.                 returnsImmediately ++;  
  48.             else  
  49.                 returnsImmediately = 0;  
  50.   
  51.             if (returnsImmediately > 10) {  
  52.                 rebuildSelector ();   //重建selector  
  53.                 returnsImmediately = 0;  
  54.             }  
  55.             continue;  
  56.         }  
  57.   
  58.   
  59.         Iterator<SelectionKey> it = selector.selectedKeys().iterator();  //全部select出來的key  
  60.         while (it.hasNext()) {  //遍歷  
  61.             SelectionKey key = it.next();  
  62.             IPollEvents evt = (IPollEvents) key.attachment();  
  63.             it.remove();  
  64.   
  65.             try {  //接下來就是判斷事件的類型執行相應的方法就行了  
  66.                 if (key.isReadable() ) {  //有數據能夠讀取了   
  67.                     evt.in_event();  
  68.                 } else if (key.isAcceptable()) {  //有新的鏈接進來了  
  69.                     evt.accept_event();  
  70.                 } else if (key.isConnectable()) {  //鏈接創建  
  71.                     evt.connect_event();  
  72.                 }   
  73.                 if (key.isWritable()) {  //可寫  
  74.                     evt.out_event();  
  75.                 }   
  76.             } catch (CancelledKeyException e) {  
  77.                 // channel might have been closed  
  78.             }  
  79.               
  80.         }  
  81.   
  82.     }  
  83.       
  84.     stopped = true;  
  85.       
  86. }  

這個應該很容易看懂吧,首先執行了全部超時的事件,而後若是有註冊的channel更新的話,須要從新更新這些註冊,而後就能夠執行select方法了,接着遍歷出全部select的key,而後判斷事件的類型,執行相應的回調方法就行了。。。

最後來看看它的start方法:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. //啓動,這裏主要是建立一個線程,而後開始運行  
  2. public void start() {  
  3.     worker = new Thread(this, name);  //建立thread,  
  4.     worker.start();  //啓動這個執行線程  
  5. }  

好吧,簡單吧,建立一個線程,而後啓動就行了,這裏執行的就是run方法。。。。

好了,到這裏整個poller的實現和其運行基本上就算是搞清楚了。。。並且能夠知道poller對象纔是真的I/O線程的持有者。。。。


接下來來介紹另一個類型:Mailbox,每個I/O線程都會有本身的mailbox,並且鏈接也會有本身的mailbox,能夠向mailbox裏面發送命令,而後讓其執行。。。這裏能夠理解爲mailbox是命令的接收器,ZeroMQ就是用這個來實現組件之間的通訊的。。。。

先來看看他的一些重要的屬性定義吧:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. private final YPipe<Command> cpipe;   //這名字太唬人了,其實就是一個保存command的隊列而已  
  2.   
  3. //其實能夠將其理解爲一個socketpair,若是有命令寫入了隊列,那麼經過在這裏寫入一個數據,能夠用於提醒有命令發送到了mialbox  
  4. private final Signaler signaler;   //用於通訊的signal,使用pipe實現的。。。,其實這裏只不過是一個噱頭,這裏寫入數據是爲了提醒執行線程command隊列裏面有命令寫入了  
  5.   
  6. private final Lock sync;  //只有一個線程從mailbox裏面收命令,可是會有不少線程向mialbox裏面發送命令,用這個鎖來保護  
  7.   
  8. private boolean active;   //用於判斷底層的pipe是否仍是活躍的,若是是true的話,表示底層的pipe活躍,能夠讀取命令  
  9.   
  10. // mailbox name, for better debugging  
  11. private final String name;   //當前mailbox的名字  

這裏cpipe這個名字比較唬人,其實能夠就將其理解爲一個command的隊列,全部的命令都會放到這個裏面去,而後是signaler,這個是底層通訊的實現,它裏面建立了pipe,相似於socketpair,經過在在這個裏面寫數據,用於提醒cpipe裏面有命令寫進去了。。須要處理。。。

來看看幾個比較重要的方法吧:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. public SelectableChannel get_fd () {  
  2.     return signaler.get_fd ();   //這裏其實獲取的是signal用到的pipe的讀channel  
  3. }  
  4.   
  5. //向當前的mailbox發送命令,其實就是寫到command隊列裏面去而已  
  6. public void send (final Command cmd_) {     
  7.     boolean ok = false;  
  8.     sync.lock ();  
  9.     try {  
  10.         cpipe.write (cmd_, false);  
  11.         ok = cpipe.flush ();  //pipeflush,這裏將會被selector感應到,從而能夠執行相應的處理,在執行線程裏面執行命令  
  12.     } finally {  
  13.         sync.unlock ();  
  14.     }  
  15.       
  16.     if (!ok) {  
  17.         signaler.send (); //經過寫端寫數據,這樣子的話會被讀端收到  
  18.     }  
  19. }  
  20.   
  21. //收取命令,若是這裏沒法馬上獲取命令的話,還能夠有一個超時時間  
  22. public Command recv (long timeout_)  {  
  23.     Command cmd_ = null;  
  24.     //  Try to get the command straight away.  
  25.     if (active) {  
  26.         cmd_ = cpipe.read ();  //從隊列裏面獲取命令  
  27.         if (cmd_ != null) {  
  28.               
  29.             return cmd_;  
  30.         }  
  31.         //  If there are no more commands available, switch into passive state.  
  32.         active = false;  
  33.         signaler.recv ();  //這裏會從讀端不斷的讀數據  
  34.     }  
  35.   
  36.   
  37.     //  Wait for signal from the command sender.  
  38.     boolean rc = signaler.wait_event (timeout_);  
  39.     if (!rc)  
  40.         return null;  
  41.   
  42.     //  We've got the signal. Now we can switch into active state.  
  43.     active = true;  
  44.   
  45.     //  Get a command.  
  46.     cmd_ = cpipe.read ();  
  47.     assert (cmd_ != null);  
  48.       
  49.     return cmd_;  
  50. }  

這裏獲取底層的fd,其實就是獲取用於通訊的signal的讀端的channel,而後向這個mailbox發送命令其實就是直接向command的隊列裏面放入命令就行了,而且這裏須要經過signaler來提醒一下。。。。

而後recv方法,用於獲取命令,其實最終仍是在命令隊列裏去拿。。。。

好了,到這裏mailbox差很少了,一些細節並無貼出來,由於其實這東西若是沒有搞懂具體是怎麼用的話也不可能搞得明白。。。。


好了,在最後開始IOThread這個類型以前先來介紹另外兩個東西吧:

(1)IPollEvents,這個是一個接口,也就是事件的回調。。來看看它的定義就知道了。。。

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. public interface IPollEvents {  
  2.     void in_event () ;  //當有數據能夠讀取的時候須要執行的方法  
  3.     void out_event () ;  //當能夠寫的時候應該執行的方法  
  4.     void connect_event () ;  //當已經創建了鏈接以後,應該執行的  
  5.     void accept_event();  //當有accept的時候,應該執行這個  
  6.     void timer_event (int id_) ;  //當超時的時候應該執行的  
  7. }  

裏面定義了5個方法,具體這5個方法分別處理什麼事件應該看名字就可以很容易知道吧。。就不細說了。。

(2)ZObject,這個類型是幹嗎的呢,在前面已已經說過了,mailbox用於存取別的地方發送過來的命令,而ZObject就是用於執行命令的,若是須要組件能夠進行命令的交互,那麼就須要類型實現繼承ZObject,具體的類容就不說了,有興趣的本身看吧,很簡單的,,,,


好啦,終於到了最激動人心的時候了,來看看IOThread類型,看這個名字就知道它是幹嗎的吧,先來看看它的類型定義圖吧:



其實看到這裏也可以猜出來IOThread類型自己並無太多的內容,更多的時候都是有mailbox,poller來作了。。。

來看看它的一些重要屬性和構造函數吧:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. final private Mailbox mailbox;   //I/O線程將會從這個mailbox裏面獲取命令  
  2.   
  3. final private SelectableChannel mailbox_handle;    //mailbox會用到的chanel,其實也就是底層pipe的讀端  
  4.   
  5. final private Poller poller;  //poller對象  
  6.   
  7. final String name;  //這個IO線程的名字  
  8.   
  9. public IOThread(Ctx ctx_, int tid_) {  //所屬的ctx,以及這個是第幾個IO線程,也能夠把它理解爲ID吧  
  10.     super(ctx_, tid_);  
  11.     name = "iothread-" + tid_;  
  12.     poller = new Poller(name);  //建立poller  
  13.   
  14.     mailbox = new Mailbox(name);  //建立mailbox  
  15.     mailbox_handle = mailbox.get_fd();  //mailbox會用到的channel,pipe的讀端  
  16.     poller.add_fd (mailbox_handle, this);   //在poller裏面註冊,其實這裏只是將其放到fd列表裏面,這裏的事件回調就是當前對象  
  17.     poller.set_pollin (mailbox_handle);  //這裏註冊讀取事件   
  18. }  

這裏mailbox和poller是幹嗎用的就很少說了,另外這個mailbox_handle實際上是mailbox的signaler的讀端,並且能夠在構造函數中能夠看到將這個channel註冊到了poller上面去。。這樣若是有數據讀,那麼會被響應,也就意味着有命令發送到mailbox須要執行了。。。

咱們來看看這個回調函:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. //當mailbox能夠讀取的時候,將會執行這個方法,這裏其實也就是收到了命令  
  2. public void in_event() {  
  3.     //  TODO: Do we want to limit number of commands I/O thread can  
  4.     //  process in a single go?  
  5.   
  6.     while (true) {  
  7.   
  8.         //  Get the next command. If there is none, exit.  
  9.         //獲取須要執行的命令  
  10.         Command cmd = mailbox.recv (0);  
  11.         if (cmd == null)  
  12.             break;  
  13.   
  14.         //  Process the command.  
  15.         //執行命令  
  16.         cmd.destination().process_command (cmd);  //其實對於IO線程對象,也就只有stop命令能夠執行  
  17.     }  
  18.   
  19. }  

簡單吧,從mailbox裏面獲取command,而後直接執行就行了。。。。這裏IOThread自己就繼承了ZOjbect,因此這裏說白了就是本身須要執行命令,而在IOThread中,只有stop命令須要執行:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. //中止poller  
  2. protected void process_stop ()  
  3. {  
  4.     poller.rm_fd (mailbox_handle);  
  5.       
  6.     poller.stop ();  
  7.   
  8. }  

好啦,到這裏ZeroMQ中IO線程的實現應該就算是比較的清楚了。。並且如何實現組件間的通訊也算是比較的瞭解了。。。

相關文章
相關標籤/搜索