轉載自 呂博的博客html
網址 http://blog.sina.com.cn/s/blog_48ded66a01018shx.htmljava
3.消息的轉發流程node
鏈接的創建是爲消息傳輸服務的,當一個鏈接處於UP狀態,鏈接兩端的節點就能夠傳輸消息。消息的傳輸過程一樣是經過update函數來驅動的,在DTNHost中更新當前節點的路由的update()函數。 緩存
MessageRouter類是全部路由的父類,但在ONE中幾乎是個空類,其最關鍵的子類是ActiveRouter,幾乎全部的路由都是ActiveRouter的子類。ide
ActiveRouter()之因此稱之爲主動的路由,就體如今對鏈接的處理上,它維護着正在發送消息的鏈接的列表,一旦在update()函數中發現鏈接已經中斷,則採用積極的措施來善後。這些善後措施包括:終止在該鏈接上的數據傳輸、清理該消息所佔用的額外存儲空間。函數
protectedArrayList sendingConnections;
可是ActiveRouter仍然不能實現消息的轉發,由於在update函數中沒有涉及到消息的處理,這個功能其實是經過其各類子類來實現的,因爲ONE中繼承ActiveRouter的路由衆多,這裏只以最簡單的EpidemicRouter爲例來講明消息轉發的流程。this
在EpidemicRouter類的update函數中真正實現了節點經過鏈接進行消息交互。若是本節點正在發送數據或者不能發送數據,則不進行消息轉發。經過調用exchangeDeliverableMessages()函數查看是否有到達目的節點的消息能夠發送,如今這個事件優先級是最高的。最後在全部的鏈接上嘗試發送全部的消息。spa
@Override public void update() { super.update(); if (isTransferring() || !canStartTransfer()) { return; // transferring, don't try other connections yet } // Try first the messages that can be delivered to final recipient if (exchangeDeliverableMessages() != null) { return; // started a transfer, don't try others (yet) } // then try any/all message to any/all connection this.tryAllMessagesToAllConnections(); }
(1)若是消息可以直接發送到目的節點htm
exchangeDeliverableMessages()的做用是發送可以到達目的節點的消息,即當前的節點遍歷全部的消息和全部UP狀態的鏈接,一旦發現某個消息可以到達目的節點,當即發送這個消息。blog
exchangeDeliverableMessages的返回值是一個Connection,若是沒有能夠發送的消息,返回值爲空,若是能找到一個(注意只要找到一個就能夠)能夠到達目的節點的消息,則返回發送這個消息所對應的鏈接。
protected Connection exchangeDeliverableMessages() { List connections = getConnections(); //獲得全部鏈接 if (connections.size() == 0) { //沒有鏈接,返回爲空 return null; } @SuppressWarnings(value = "unchecked") Tuple t =tryMessagesForConnected(sortByQueueMode(getMessagesForConnected())); if (t != null) { return t.getValue(); // started transfer } // didn't start transfer to any node -> ask messages from connected for (Connection con : connections) { if (con.getOtherNode(getHost()).requestDeliverableMessages(con)) { return con; } } return null; }
函數中調用了getMessagesForConnected()函數,其返回值是一個List,組成它的元組格式爲,這些元組中都知足條件:(1)Message是當前節點緩存中的消息;(2)Connection是當前節點維護的UP鏈接;(3)Message的目的節點就是Connection的另外一端。
protected List> getMessagesForConnected() { if (getNrofMessages() == 0 || getConnections().size() == 0) { return new ArrayList>(0); } //若是當先節點根本沒有消息或者根本沒有可用的鏈接,返回的是一個空List,而不是NULL List> forTuples = //用於儲存臨時的元組 new ArrayList>(); for (Message m : getMessageCollection()) { //遍歷全部的Message for (Connection con : getConnections()) { //遍歷全部的Connection DTNHost to = con.getOtherNode(getHost()); //提取Connection的另外一端 if (m.getTo() == to) { //消息m是發往con的另外一端的 forTuples.add(new Tuple(m,con)); } } } return forTuples; }
將getMessagesForConnected()的返回值傳遞給sortByQueueMode函數,這個函數根據消息的優先級對消息進行排序,ONE中定義了兩種優先級策略,一是隨機排序(默認爲0),二是先入先出策略(值爲1),這個值能夠在配置腳本中進行設置,語句爲Group.sendQueue = 1。若是須要對緩存機制進行擴展,顯然須要修改這個函數。因爲在這裏並無涉及緩存策略,所以不贅述。sortByQueueMode()函數返回的是排序以後的List,保持元組格式不變。
獲得了排序以後的,傳遞給tryMessagesForConnected函數,這個函數的做用是將元組中優先級最高的那個(最上面的那個消息)在對應的connection上發送出去。要強調的是,這裏只發送一個消息,返回的一個元組,而不是元組的List。固然優先級最高的那個消息可能不會發送成功,那麼則會嘗試發送優先級次之的消息,若是List中全部的消息都嘗試過了,那隻好返回NULL了。
protected TupletryMessagesForConnected( List> tuples) { if (tuples.size() == 0) { //元組爲空,返回NULL return null; } for (Tuple t : tuples) { //遍歷全部的元組 Message m = t.getKey(); //提取出消息m Connection con = t.getValue(); //提取出鏈接con if (startTransfer(m, con) == RCV_OK) { //將消息m在con上發送 return t; //若是消息發送成功,直接跳出tryMessagesForConnected } } return null; }
怎樣判斷消息m可否在con上發送成功?須要調用startTransfer函數。這個函數很簡單,可是其返回值很關鍵。當函數返回TRY_LATER_BUSY=1時,說明該鏈接正忙,稍後再試,當返回RCV_OK=0時,說明發送成功,以上還很容易理解。但retVal還有其餘的取值來代表消息發送的狀況,DENIED_OLD = -1表示該消息發送失敗,緣由是已經處理過(已經接收過或者已經轉發過),DENIED_NO_SPACE = -2表示該消息發送失敗,緣由是緩存中沒有空位置,DENIED_TTL=-3表示消息發送失敗,緣由是該消息超時。
protected int startTransfer(Message m, Connection con) { int retVal; if (!con.isReadyForTransfer()) { //鏈接正忙,返回1 return TRY_LATER_BUSY; } retVal = con.startTransfer(getHost(), m); //調用鏈接的startTrransfer函數 if (retVal == RCV_OK) { // started transfer addToSendingConnections(con); } else if (deleteDelivered && retVal == DENIED_OLD && m.getTo() == con.getOtherNode(this.getHost())) { this.deleteMessage(m.getId(), false); //清理因DENIED_OLD被拒絕的消息 } return retVal; }
在這裏涉及到緩存的清理機制,若是在配置腳本中設置Group.deleteDelivered = 1,則開啓了已發送消息的清理機制,這樣,一旦節點發現消息m因DENIED_OLD被拒絕,則會清除本地緩存中的m。
在路由的startTransfer函數中調用了鏈接的startTransfer,這個函數在CBRConnection類中被重載。在CBRConnection的startTransfer中,首先調用對方節點的receiveMessage函數,看對方節點可否接收這個消息。若是能接收,則進行如下操做:(1)設置正在發送的消息msgOnFly;(2)計算消息預計完成時間;(3)返回值RCV_OK=0。
public int startTransfer(DTNHost from, Message m) { assert this.msgOnFly == null : "Already transferring " + this.msgOnFly + " from " + this.msgFromNode + " to " + this.getOtherNode(this.msgFromNode) + ". Can't "+ "start transfer of " + m + " from " + from; this.msgFromNode = from; Message newMessage = m.replicate(); int retVal = getOtherNode(from).receiveMessage(newMessage, from); if (retVal == MessageRouter.RCV_OK) { this.msgOnFly = newMessage; this.transferDoneTime = SimClock.getTime() + (1.0*m.getSize()) / this.speed; } return retVal; }
在上面的函數中,調用的是節點的receiveMessage函數(在DTNHost.java定義),進而進入路由的receiveMessage函數,在MessageRouter.java中的receiveMessage函數進行了下面的幾個工做:
(1)將消息m放進IncomingBuffer;
(2)當前節點存入消息m的Path中,Path是一系列DTNHost組成的列表,記錄着這個消息所走過的路徑;
(3)通知全部的Listener發生了messageTransferStarted事件;
(4)返回RCV_OK=0。
public int receiveMessage(Message m, DTNHost from) { Message newMessage = m.replicate(); this.putToIncomingBuffer(newMessage, from); newMessage.addNodeOnPath(this.host); for (MessageListener ml : this.mListeners) { ml.messageTransferStarted(newMessage, from, getHost()); } return RCV_OK; // superclass always accepts messages }
至此消息m被成功的發送到了目的節點,實際上工做才完成了一半,由於還有一些消息不能被髮送到目的節點,須要一些中間節點進行轉發。
(2) 當消息不能直接到達目的節點時
在update函數中還有一個函數tryAllMessagesToAllConnections()專門負責處理類事件,他的優先級是要低於exchangeDeliverableMessages()的,也就是說當前節點若是有消息可以直接發送到目的節點,就應該優先發送。不然纔會執行exchangeDeliverableMessages()函數,首先對全部的消息從新排序,排序的函數還是sortByQueueMode,在前面介紹過。最後,調用tryMessagesToConnections函數遍歷排序以後的全部消息和全部鏈接,尋找可以發送的消息。
protected Connection tryAllMessagesToAllConnections(){ List connections = getConnections(); if (connections.size() == 0 || this.getNrofMessages() == 0) { return null; } List messages = new ArrayList(this.getMessageCollection()); this.sortByQueueMode(messages); //將全部的消息從新排序 return tryMessagesToConnections(messages, connections); }
在tryMessagesToConnections中,遍歷全部的鏈接,而後調用tryAllMessages(con, messages)依次在每一個鏈接上發送全部的消息。注意區別:在tryMessagesToConnections(messages, connections)中,messages指的是不少消息,connections指的是不少鏈接;而在tryAllMessages(con, messages)中,messages指的是不少消息,con指的是某個鏈接;然而在startTransfer(m, con)函數中,m是指某個消息,con是指某個鏈接。
在tryAllMessages函數中,遍歷全部的消息,依次嘗試在con上可否發送成功,只要找到一個能發送的消息m,就終止該函數,返回消息m。不然嘗試全部的消息都不能在con上發送,返回NULL。
protected Message tryAllMessages(Connection con, List messages) { for (Message m : messages) { int retVal = startTransfer(m, con); if (retVal == RCV_OK) { return m; // accepted a message, don't try others } else if (retVal > 0) { return null; // should try later -> don't bother trying others } } return null; // no message was accepted }
在發送消息的函數中,仍然執行的是startTransfer函數,startTransfer函數在前面已經介紹過了。
到這裏,EpidemicRouter中發送消息的流程已經介紹清楚了,EpidemicRouter路由中鼓勵節點儘量將全部的消息都轉發給本身的鄰居,所以只須要在ActiveRouter父類上作簡單的擴展便可。接下的部分以其餘路由爲例研究一下路由協議是怎麼擴展ActiveRouter的。
4. 路由擴展
(1)DirectDeliveryRouter
這個路由的改法是最簡單的,只須要在EpidemicRouter的基礎上刪除this.tryAllMessagesToAllConnections()函數,保留exchangeDeliverableMessages()函數便可,那麼消息的轉發流程中只剩下了第一部分。
(2)FirstContactRouter
EpidemicRouter是最簡單的洪泛消息,其餘路由協議與它最大的區別就在於:並非全部的消息都照單全收。回憶上面的消息傳輸過程,不管是到目的節點仍是到中間節點,都須要執行startTransfer,它負責在某個鏈接con上發送某個消息m。在這個函數中,首先找到con的另外一端DTNHost,而後調用receiveMessage函數查看對方能不能接收消息m。receiveMessage是一個不斷被各類路由重載的函數,在MessageRouter中,實現了最基本的功能:
將消息m放進IncomingBuffer;
當前節點存入消息m的Path中;
通知全部的Listener發生了messageTransferStarted事件;
在ActiveRouter中增長了功能:
調用checkReceiving函數看對方節點能不能接收消息m,不少種可能都會致使消息不能被接收,在ActiveRouter中主要有:
(1)對方節點正在發送消息,返回TRY_LATER_BUSY=1;
(2)對方節點緩存中已經有該消息了,返回DENIED_OLD=-1;
(3)消息超時,返回DENIED_TTL=-2;
(4)對方節點緩存已滿,返回ENIED_NO_SPACE=-3。
從上面的分析發現,要想消息按某種條件被轉發,在checkReceiving函數中進行一些過濾是很合理的。
FirstContactRouter路由中,區別在於節點把數據轉發給第一次遇到的節點,而後就會將轉發過的消息從緩存中刪除。這裏重載了checkReceiving函數。每次節點轉發某個消息,都會將本身寫在消息的path中,當節點發現本身已經處理過某個消息了,則會用DENIED_OLD的理由將其拒絕。須要指出的是順序問題,將當前節點加入消息路徑Path的操做是在receiveMessage中,這個操做是在checkReceiving以後,因此在檢索path的時候當前節點尚未將本身寫進Path。
@Override protected int checkReceiving(Message m) { int recvCheck = super.checkReceiving(m); if (recvCheck == RCV_OK) { if (m.getHops().contains(getHost())) { recvCheck = DENIED_OLD; } } return recvCheck; }
除此以外,FirstContactRouter還重載了transferDone函數。節點將消息發送成功以後就會清理緩存。這個函數在ActiveRouter中爲空,在EpidemicRouter中也沒有將其重載,因此在這裏是第一次出現。
@Override protected void transferDone(Connection con) { this.deleteMessage(con.getMessage().getId(), false); }
在這裏只是給出了兩個路由協議最基本的例子,本文重點在於解釋TheONE中消息轉發的流程,所以其餘的路由協議將在其餘文章中進行分析。