消息轉發過程(下)

轉載自 呂博的博客html

網址 http://blog.sina.com.cn/s/blog_48ded66a01018shx.htmljava


3.消息的轉發流程node

    鏈接的創建是爲消息傳輸服務的,當一個鏈接處於UP狀態,鏈接兩端的節點就能夠傳輸消息。消息的傳輸過程一樣是經過update函數來驅動的,在DTNHost中更新當前節點的路由的update()函數。       緩存

    MessageRouter類是全部路由的父類,但在ONE中幾乎是個空類,其最關鍵的子類是ActiveRouter,幾乎全部的路由都是ActiveRouter的子類。ide

    ActiveRouter()之因此稱之爲主動的路由,就體如今對鏈接的處理上,它維護着正在發送消息的鏈接的列表,一旦在update()函數中發現鏈接已經中斷,則採用積極的措施來善後。這些善後措施包括:終止在該鏈接上的數據傳輸、清理該消息所佔用的額外存儲空間。函數

protectedArrayList sendingConnections;

    可是ActiveRouter仍然不能實現消息的轉發,由於在update函數中沒有涉及到消息的處理,這個功能其實是經過其各類子類來實現的,因爲ONE中繼承ActiveRouter的路由衆多,這裏只以最簡單的EpidemicRouter爲例來講明消息轉發的流程。this

    在EpidemicRouter類的update函數中真正實現了節點經過鏈接進行消息交互。若是本節點正在發送數據或者不能發送數據,則不進行消息轉發。經過調用exchangeDeliverableMessages()函數查看是否有到達目的節點的消息能夠發送,如今這個事件優先級是最高的。最後在全部的鏈接上嘗試發送全部的消息。spa

 
 @Override
    public void update() {
       super.update();
       if (isTransferring() || !canStartTransfer()) {
           return; // transferring, don't try other connections yet
       }
       // Try first the messages that can be delivered to final recipient
       if (exchangeDeliverableMessages() != null) {
           return; // started a transfer, don't try others (yet)
       }
       // then try any/all message to any/all connection
       this.tryAllMessagesToAllConnections();
    }

(1)若是消息可以直接發送到目的節點htm

    exchangeDeliverableMessages()的做用是發送可以到達目的節點的消息,即當前的節點遍歷全部的消息和全部UP狀態的鏈接,一旦發現某個消息可以到達目的節點,當即發送這個消息。blog

    exchangeDeliverableMessages的返回值是一個Connection,若是沒有能夠發送的消息,返回值爲空,若是能找到一個(注意只要找到一個就能夠)能夠到達目的節點的消息,則返回發送這個消息所對應的鏈接。

protected Connection exchangeDeliverableMessages() {
       List connections = getConnections();     //獲得全部鏈接
       if (connections.size() == 0) {     //沒有鏈接,返回爲空
           return null;
       }
       @SuppressWarnings(value = "unchecked")
       Tuple t =tryMessagesForConnected(sortByQueueMode(getMessagesForConnected()));
       if (t != null) {
           return t.getValue(); // started transfer
       }
       // didn't start transfer to any node -> ask messages from connected
       for (Connection con : connections) {
           if (con.getOtherNode(getHost()).requestDeliverableMessages(con)) {
              return con;
           }
       }
       return null;
    }

    函數中調用了getMessagesForConnected()函數,其返回值是一個List,組成它的元組格式爲,這些元組中都知足條件:(1)Message是當前節點緩存中的消息;(2)Connection是當前節點維護的UP鏈接;(3)Message的目的節點就是Connection的另外一端。

protected List> getMessagesForConnected() {
       if (getNrofMessages() == 0 || getConnections().size() == 0) {
          
           return new ArrayList>(0);
       } //若是當先節點根本沒有消息或者根本沒有可用的鏈接,返回的是一個空List,而不是NULL
 
       List> forTuples =     //用於儲存臨時的元組
           new ArrayList>();
       for (Message m : getMessageCollection()) {       //遍歷全部的Message
           for (Connection con : getConnections()) {     //遍歷全部的Connection
              DTNHost to = con.getOtherNode(getHost()); //提取Connection的另外一端
              if (m.getTo() == to) {                    //消息m是發往con的另外一端的
                  forTuples.add(new Tuple(m,con));
              }
           }
       }
       return forTuples;
    }

    將getMessagesForConnected()的返回值傳遞給sortByQueueMode函數,這個函數根據消息的優先級對消息進行排序,ONE中定義了兩種優先級策略,一是隨機排序(默認爲0),二是先入先出策略(值爲1),這個值能夠在配置腳本中進行設置,語句爲Group.sendQueue = 1。若是須要對緩存機制進行擴展,顯然須要修改這個函數。因爲在這裏並無涉及緩存策略,所以不贅述。sortByQueueMode()函數返回的是排序以後的List,保持元組格式不變。

    獲得了排序以後的,傳遞給tryMessagesForConnected函數,這個函數的做用是將元組中優先級最高的那個(最上面的那個消息)在對應的connection上發送出去。要強調的是,這裏只發送一個消息,返回的一個元組,而不是元組的List。固然優先級最高的那個消息可能不會發送成功,那麼則會嘗試發送優先級次之的消息,若是List中全部的消息都嘗試過了,那隻好返回NULL了。

protected TupletryMessagesForConnected(
           List> tuples) {
       if (tuples.size() == 0) {   //元組爲空,返回NULL
           return null;
       }
       for (Tuple t : tuples) {    //遍歷全部的元組
           Message m = t.getKey();                       //提取出消息m  
           Connection con = t.getValue();                //提取出鏈接con
           if (startTransfer(m, con) == RCV_OK) {        //將消息m在con上發送
              return t;         //若是消息發送成功,直接跳出tryMessagesForConnected
           }
       }
       return null;
    }

    怎樣判斷消息m可否在con上發送成功?須要調用startTransfer函數。這個函數很簡單,可是其返回值很關鍵。當函數返回TRY_LATER_BUSY=1時,說明該鏈接正忙,稍後再試,當返回RCV_OK=0時,說明發送成功,以上還很容易理解。但retVal還有其餘的取值來代表消息發送的狀況,DENIED_OLD = -1表示該消息發送失敗,緣由是已經處理過(已經接收過或者已經轉發過),DENIED_NO_SPACE = -2表示該消息發送失敗,緣由是緩存中沒有空位置,DENIED_TTL=-3表示消息發送失敗,緣由是該消息超時。

protected int startTransfer(Message m, Connection con) {
       int retVal;
      
       if (!con.isReadyForTransfer()) {       //鏈接正忙,返回1
           return TRY_LATER_BUSY;
       }
       retVal = con.startTransfer(getHost(), m);     //調用鏈接的startTrransfer函數
       if (retVal == RCV_OK) { // started transfer
           addToSendingConnections(con);
       }
       else if (deleteDelivered && retVal == DENIED_OLD &&
              m.getTo() == con.getOtherNode(this.getHost())) {    
           this.deleteMessage(m.getId(), false);     //清理因DENIED_OLD被拒絕的消息
       }
       return retVal;
    }

    在這裏涉及到緩存的清理機制,若是在配置腳本中設置Group.deleteDelivered = 1,則開啓了已發送消息的清理機制,這樣,一旦節點發現消息m因DENIED_OLD被拒絕,則會清除本地緩存中的m。

    在路由的startTransfer函數中調用了鏈接的startTransfer,這個函數在CBRConnection類中被重載。在CBRConnection的startTransfer中,首先調用對方節點的receiveMessage函數,看對方節點可否接收這個消息。若是能接收,則進行如下操做:(1)設置正在發送的消息msgOnFly;(2)計算消息預計完成時間;(3)返回值RCV_OK=0。

public int startTransfer(DTNHost from, Message m) {
 
       assert this.msgOnFly == null : "Already transferring " +
       this.msgOnFly + " from " + this.msgFromNode + " to " +
       this.getOtherNode(this.msgFromNode) + ". Can't "+
       "start transfer of " + m + " from " + from;
 
       this.msgFromNode = from;
       Message newMessage = m.replicate();
       int retVal = getOtherNode(from).receiveMessage(newMessage, from);
 
       if (retVal == MessageRouter.RCV_OK) {
           this.msgOnFly = newMessage;
           this.transferDoneTime = SimClock.getTime() +
           (1.0*m.getSize()) / this.speed;
       }
       return retVal;
    }

    在上面的函數中,調用的是節點的receiveMessage函數(在DTNHost.java定義),進而進入路由的receiveMessage函數,在MessageRouter.java中的receiveMessage函數進行了下面的幾個工做:

    (1)將消息m放進IncomingBuffer;

    (2)當前節點存入消息m的Path中,Path是一系列DTNHost組成的列表,記錄着這個消息所走過的路徑;

    (3)通知全部的Listener發生了messageTransferStarted事件;

    (4)返回RCV_OK=0。

public int receiveMessage(Message m, DTNHost from) {
       Message newMessage = m.replicate();
             
       this.putToIncomingBuffer(newMessage, from);     
       newMessage.addNodeOnPath(this.host);
      
       for (MessageListener ml : this.mListeners) {
           ml.messageTransferStarted(newMessage, from, getHost());
       }
      
       return RCV_OK; // superclass always accepts messages
    }

    至此消息m被成功的發送到了目的節點,實際上工做才完成了一半,由於還有一些消息不能被髮送到目的節點,須要一些中間節點進行轉發。

(2) 當消息不能直接到達目的節點時 

    在update函數中還有一個函數tryAllMessagesToAllConnections()專門負責處理類事件,他的優先級是要低於exchangeDeliverableMessages()的,也就是說當前節點若是有消息可以直接發送到目的節點,就應該優先發送。不然纔會執行exchangeDeliverableMessages()函數,首先對全部的消息從新排序,排序的函數還是sortByQueueMode,在前面介紹過。最後,調用tryMessagesToConnections函數遍歷排序以後的全部消息和全部鏈接,尋找可以發送的消息。

 
 protected Connection tryAllMessagesToAllConnections(){
       List connections = getConnections();
 
       if (connections.size() == 0 || this.getNrofMessages() == 0) {
           return null;
       }
 
       List messages =  new ArrayList(this.getMessageCollection());
       this.sortByQueueMode(messages);    //將全部的消息從新排序
 
       return tryMessagesToConnections(messages, connections);
    }

    在tryMessagesToConnections中,遍歷全部的鏈接,而後調用tryAllMessages(con, messages)依次在每一個鏈接上發送全部的消息。注意區別:在tryMessagesToConnections(messages, connections)中,messages指的是不少消息,connections指的是不少鏈接;而在tryAllMessages(con, messages)中,messages指的是不少消息,con指的是某個鏈接;然而在startTransfer(m, con)函數中,m是指某個消息,con是指某個鏈接。

    在tryAllMessages函數中,遍歷全部的消息,依次嘗試在con上可否發送成功,只要找到一個能發送的消息m,就終止該函數,返回消息m。不然嘗試全部的消息都不能在con上發送,返回NULL。

protected Message tryAllMessages(Connection con, List messages) {
       for (Message m : messages) {
           int retVal = startTransfer(m, con);
           if (retVal == RCV_OK) {
              return m;  // accepted a message, don't try others
           }
           else if (retVal > 0) {
              return null; // should try later -> don't bother trying others
           }
       }
      
       return null; // no message was accepted      
    }

    在發送消息的函數中,仍然執行的是startTransfer函數,startTransfer函數在前面已經介紹過了。

    到這裏,EpidemicRouter中發送消息的流程已經介紹清楚了,EpidemicRouter路由中鼓勵節點儘量將全部的消息都轉發給本身的鄰居,所以只須要在ActiveRouter父類上作簡單的擴展便可。接下的部分以其餘路由爲例研究一下路由協議是怎麼擴展ActiveRouter的。


4.  路由擴展

 

(1)DirectDeliveryRouter

    這個路由的改法是最簡單的,只須要在EpidemicRouter的基礎上刪除this.tryAllMessagesToAllConnections()函數,保留exchangeDeliverableMessages()函數便可,那麼消息的轉發流程中只剩下了第一部分。

 (2)FirstContactRouter

    EpidemicRouter是最簡單的洪泛消息,其餘路由協議與它最大的區別就在於:並非全部的消息都照單全收。回憶上面的消息傳輸過程,不管是到目的節點仍是到中間節點,都須要執行startTransfer,它負責在某個鏈接con上發送某個消息m。在這個函數中,首先找到con的另外一端DTNHost,而後調用receiveMessage函數查看對方能不能接收消息m。receiveMessage是一個不斷被各類路由重載的函數,在MessageRouter中,實現了最基本的功能:

        將消息m放進IncomingBuffer;

        當前節點存入消息m的Path中;

        通知全部的Listener發生了messageTransferStarted事件;

    在ActiveRouter中增長了功能:

    調用checkReceiving函數看對方節點能不能接收消息m,不少種可能都會致使消息不能被接收,在ActiveRouter中主要有:

    (1)對方節點正在發送消息,返回TRY_LATER_BUSY=1;

    (2)對方節點緩存中已經有該消息了,返回DENIED_OLD=-1;

    (3)消息超時,返回DENIED_TTL=-2;

    (4)對方節點緩存已滿,返回ENIED_NO_SPACE=-3。

    從上面的分析發現,要想消息按某種條件被轉發,在checkReceiving函數中進行一些過濾是很合理的。

    FirstContactRouter路由中,區別在於節點把數據轉發給第一次遇到的節點,而後就會將轉發過的消息從緩存中刪除。這裏重載了checkReceiving函數。每次節點轉發某個消息,都會將本身寫在消息的path中,當節點發現本身已經處理過某個消息了,則會用DENIED_OLD的理由將其拒絕。須要指出的是順序問題,將當前節點加入消息路徑Path的操做是在receiveMessage中,這個操做是在checkReceiving以後,因此在檢索path的時候當前節點尚未將本身寫進Path。

@Override
    protected int checkReceiving(Message m) {
       int recvCheck = super.checkReceiving(m);
      
       if (recvCheck == RCV_OK) {
          
           if (m.getHops().contains(getHost())) {
              recvCheck = DENIED_OLD;
           }
       }
       return recvCheck;
    }

    除此以外,FirstContactRouter還重載了transferDone函數。節點將消息發送成功以後就會清理緩存。這個函數在ActiveRouter中爲空,在EpidemicRouter中也沒有將其重載,因此在這裏是第一次出現。

@Override
    protected void transferDone(Connection con) {
      
       this.deleteMessage(con.getMessage().getId(), false);
    }

    在這裏只是給出了兩個路由協議最基本的例子,本文重點在於解釋TheONE中消息轉發的流程,所以其餘的路由協議將在其餘文章中進行分析。

相關文章
相關標籤/搜索