Linux下電騾aMule Kademlia網絡構建分析5 —— 資源的發佈

資源發佈請求消息的發送

在aMule中,主要用CSharedFileList class來管理共享給其它節點的文件如咱們前面在 Linux下電騾aMule Kademlia網絡構建分析3 一文中分析的那樣,aMule在啓動的時候,會起一些定時器,以便於按期的執行一些任務。CamuleApp::OnCoreTimer()是其中之一,在這個函數中,咱們能夠看到這樣的幾行: node

// Publish files to server if needed.
		sharedfiles->Process();

由註釋可見,在aMule中,是週期性地執行CSharedFileList::Process(),向網絡發佈本身的資源。那咱們就來看一下這個CSharedFileList::Process()(amule-2.3.1/src/SharedFileList.cpp): c++

void CSharedFileList::Process()
{
	Publish();
	if( !m_lastPublishED2KFlag || ( ::GetTickCount() - m_lastPublishED2K < ED2KREPUBLISHTIME ) ) {
		return;
	}
	SendListToServer();
	m_lastPublishED2K = ::GetTickCount();
}

在這個函數中,既會向Kademlia網絡發佈資源,也會向ED2K網絡的中心服務器發佈資源,這兩種發佈動做分別由Publish()和SendListToServer()函數完成。這裏主要來看向Kademlia網絡發佈資源的過程,也就是CSharedFileList::Publish(): 安全

void CSharedFileList::Publish()
{
	// Variables to save cpu.
	unsigned int tNow = time(NULL);
	bool IsFirewalled = theApp->IsFirewalled();

	if( Kademlia::CKademlia::IsConnected() && ( !IsFirewalled || ( IsFirewalled && theApp->clientlist->GetBuddyStatus() == Connected)) && GetCount() && Kademlia::CKademlia::GetPublish()) { 
		//We are connected to Kad. We are either open or have a buddy. And Kad is ready to start publishing.

		if( Kademlia::CKademlia::GetTotalStoreKey() < KADEMLIATOTALSTOREKEY) {

			//We are not at the max simultaneous keyword publishes 
			if (tNow >= m_keywords->GetNextPublishTime()) {

				//Enough time has passed since last keyword publish

				//Get the next keyword which has to be (re)-published
				CPublishKeyword* pPubKw = m_keywords->GetNextKeyword();
				if (pPubKw) {

					//We have the next keyword to check if it can be published

					//Debug check to make sure things are going well.
					wxASSERT( pPubKw->GetRefCount() != 0 );

					if (tNow >= pPubKw->GetNextPublishTime()) {
						//This keyword can be published.
						Kademlia::CSearch* pSearch = Kademlia::CSearchManager::PrepareLookup(Kademlia::CSearch::STOREKEYWORD, false, pPubKw->GetKadID());
						if (pSearch) {
							//pSearch was created. Which means no search was already being done with this HashID.
							//This also means that it was checked to see if network load wasn't a factor.

							//This sets the filename into the search object so we can show it in the gui.
							pSearch->SetFileName(pPubKw->GetKeyword());

							//Add all file IDs which relate to the current keyword to be published
							const KnownFileArray& aFiles = pPubKw->GetReferences();
							uint32 count = 0;
							for (unsigned int f = 0; f < aFiles.size(); ++f) {

								//Only publish complete files as someone else should have the full file to publish these keywords.
								//As a side effect, this may help reduce people finding incomplete files in the network.
								if( !aFiles[f]->IsPartFile() ) {
									count++;
									pSearch->AddFileID(Kademlia::CUInt128(aFiles[f]->GetFileHash().GetHash()));
									if( count > 150 ) {
										//We only publish up to 150 files per keyword publish then rotate the list.
										pPubKw->RotateReferences(f);
										break;
									}
								}
							}

							if( count ) {
								//Start our keyword publish
								pPubKw->SetNextPublishTime(tNow+(KADEMLIAREPUBLISHTIMEK));
								pPubKw->IncPublishedCount();
								Kademlia::CSearchManager::StartSearch(pSearch);
							} else {
								//There were no valid files to publish with this keyword.
								delete pSearch;
							}
						}
					}
				}
				m_keywords->SetNextPublishTime(KADEMLIAPUBLISHTIME+tNow);
			}
		}
		
		if( Kademlia::CKademlia::GetTotalStoreSrc() < KADEMLIATOTALSTORESRC) {
			if(tNow >= m_lastPublishKadSrc) {
				if(m_currFileSrc > GetCount()) {
					m_currFileSrc = 0;
				}
				CKnownFile* pCurKnownFile = const_cast<CKnownFile*>(GetFileByIndex(m_currFileSrc));
				if(pCurKnownFile) {
					if(pCurKnownFile->PublishSrc()) {
						Kademlia::CUInt128 kadFileID;
						kadFileID.SetValueBE(pCurKnownFile->GetFileHash().GetHash());
						if(Kademlia::CSearchManager::PrepareLookup(Kademlia::CSearch::STOREFILE, true, kadFileID )==NULL) {
							pCurKnownFile->SetLastPublishTimeKadSrc(0,0);
						}
					}	
				}
				m_currFileSrc++;

				// even if we did not publish a source, reset the timer so that this list is processed
				// only every KADEMLIAPUBLISHTIME seconds.
				m_lastPublishKadSrc = KADEMLIAPUBLISHTIME+tNow;
			}
		}

		if( Kademlia::CKademlia::GetTotalStoreNotes() < KADEMLIATOTALSTORENOTES) {
			if(tNow >= m_lastPublishKadNotes) {
				if(m_currFileNotes > GetCount()) {
					m_currFileNotes = 0;
				}
				CKnownFile* pCurKnownFile = const_cast<CKnownFile*>(GetFileByIndex(m_currFileNotes));
				if(pCurKnownFile) {
					if(pCurKnownFile->PublishNotes()) {
						Kademlia::CUInt128 kadFileID;
						kadFileID.SetValueBE(pCurKnownFile->GetFileHash().GetHash());
						if(Kademlia::CSearchManager::PrepareLookup(Kademlia::CSearch::STORENOTES, true, kadFileID )==NULL)
							pCurKnownFile->SetLastPublishTimeKadNotes(0);
					}	
				}
				m_currFileNotes++;

				// even if we did not publish a source, reset the timer so that this list is processed
				// only every KADEMLIAPUBLISHTIME seconds.
				m_lastPublishKadNotes = KADEMLIAPUBLISHTIME+tNow;
			}
		}
	}
}

CSharedFileList::Publish()首先確保(1)、Kademlia正在運行;(2)、本節點不處於防火牆後,或處於防火牆後但BuddyStatus是Connected的;(3)、可供分享的文件數大於0;(4)、同時Kademlia網絡的設置容許進行發佈。 服務器

而後,嘗試向Kademlia網絡中分別發佈3種不一樣類型的信息,分別爲keyword,file,和filenotes。這三種類型發佈的實際執行,都要知足必定的約束條件,即正在進行的同種類型發佈的個數不能太多,發佈的頻率不能過高。具體點說,也就是這三種類型的發佈,同一時間正在進行的發佈個數分別不能多於KADEMLIATOTALSTOREKEY,KADEMLIATOTALSTORESRC和KADEMLIATOTALSTORENOTES個。同時這三種類型的發佈有它們本身的週期,也就是KADEMLIAPUBLISHTIME。在amule-2.3.1/src/include/protocol/kad/Constants.h文件中能夠看到這幾個常量值的定義: 網絡

#define	KADEMLIAPUBLISHTIME		SEC(2)		//2 second
#define	KADEMLIATOTALSTORENOTES		1		//Total hashes to store.
#define	KADEMLIATOTALSTORESRC		3		//Total hashes to store.
#define	KADEMLIATOTALSTOREKEY		2		//Total hashes to store.
#define    KADEMLIAREPUBLISHTIMES        HR2S(5)        //5 hours
#define    KADEMLIAREPUBLISHTIMEN        HR2S(24)    //24 hours
#define    KADEMLIAREPUBLISHTIMEK        HR2S(24)    //24 hours

咱們具體分析這三種類型發佈的執行過程。 dom

1. Keyword的發佈。 ide

CSharedFileList用CPublishKeywordList m_keywords管理可供發佈的PublishKeyword,用CPublishKeyword來表示一個要發佈的keyword。CSharedFileList::Publish()在發佈keyword時,會先從m_keywords中取出下一個要發佈的CPublishKeyword pPubKw,而後檢查這個pPubKw的發佈時間是否到了。對於一個具體的PublishKeyword,能夠看到,它的發佈週期爲KADEMLIAREPUBLISHTIMEK,也就是24個小時。 函數

若發佈時間到了,則執行Kademlia::CSearchManager::PrepareLookup()爲發佈建立一個CSearch,設置search的FileName爲Keyword,獲取與關鍵字關聯的全部的文件。隨後逐個地將每一個文件的Hash值添加到search。但每一個PublishKeyword一次最多隻發佈150個文件。若是與關鍵字關聯的文件數量超過了150個,則在向search添加了150個文件的hash值以後,還會rotate PublishKeyword的關聯文件列表,以便於在下一次執行相同的關鍵字的發佈時,那些沒有被髮布過的文件可以被優先發布,並跳出添加文件Hash值的循環。 oop

最後更新PublishKeyword的下次發佈時間,增長髮布的次數值,並經過Kademlia::CSearchManager::StartSearch()來執行發佈過程。 ui

咱們能夠再來看一下與CSearch建立相關的一些函數,CSearchManager::PrepareLookup()以下(amule-2.3.1/src/kademlia/kademlia/SearchManager.cpp):

CSearch* CSearchManager::PrepareLookup(uint32_t type, bool start, const CUInt128& id)
{
	// Prepare a kad lookup.
	// Make sure this target is not already in progress.
	if (AlreadySearchingFor(id)) {
		return NULL;
	}

	// Create a new search.
	CSearch *s = new CSearch;

	// Set type and target.
	s->SetSearchTypes(type);
	s->SetTargetID(id);

	try {
		switch(type) {
			case CSearch::STOREKEYWORD:
				if (!Kademlia::CKademlia::GetIndexed()->SendStoreRequest(id)) {
					delete s;
					return NULL;
				}
				break;
		}

		s->SetSearchID(++m_nextID);
		if (start) {
			m_searches[id] = s;
			s->Go();
		}
	} catch (const CEOFException& DEBUG_ONLY(err)) {
		delete s;
		AddDebugLogLineN(logKadSearch, wxT("CEOFException in CSearchManager::PrepareLookup: ") + err.what());
		return NULL;
	} catch (...) {
		AddDebugLogLineN(logKadSearch, wxT("Exception in CSearchManager::PrepareLookup"));
		delete s;
		throw;
	}

	return s;
}

AlreadySearchingFor()的檢查仍是蠻重要的。這個檢查確保,同一時間發送到同一個節點的搜索請求不會超過1個。這樣在收到目標節點的響應以後,就能夠根據target clientID正確地找到對應的CSearch了。

此函數會new一個CSearch對象,並適當地設置一些值。就關鍵字發佈而言,此時這個search還不會被真正執行。

CSearch::AddFileID()以下(amule-2.3.1/src/kademlia/kademlia/Search.h):

void	 AddFileID(const CUInt128& id)		{ m_fileIDs.push_back(id); }

CSharedFileList::Publish()經過Kademlia::CSearchManager::StartSearch()執行發佈(搜索):

bool CSearchManager::StartSearch(CSearch* search)
{
	// A search object was created, now try to start the search.
	if (AlreadySearchingFor(search->GetTarget())) {
		// There was already a search in progress with this target.
		delete search;
		return false;
	}
	// Add to the search map
	m_searches[search->GetTarget()] = search;
	// Start the search.
	search->Go();
	return true;
}

確保沒有發向同一個目標節點的多個發佈/搜索請求。而後執行search。CSearch::Go()前面咱們也屢次見到了,此處再也不贅述。

2. 文件的發佈。

(1)、首先會確保欲發佈文件的index m_currFileSrc不會超出可發佈文件的範圍,若超出則重置爲0。

(2)、取出要發佈的文件。

const CKnownFile *CSharedFileList::GetFileByIndex(unsigned int index) const
{
	wxMutexLocker lock(list_mut);
	if ( index >= m_Files_map.size() ) {
		return NULL;
	}
	CKnownFileMap::const_iterator pos = m_Files_map.begin();
	std::advance(pos, index);
	return pos->second;
}

(3)、執行CSearchManager::PrepareLookup()直接發佈文件。

(4)、更新下一次要發佈的文件的index m_currFileSrc

3. FileNotes的發佈。

FileNotes的發佈文件的發佈的過程很是類似,甚至發送出去的消息的內容都基本同樣。只是在此處,經過CKnownFile::PublishNotes()來判斷是否要發佈一個文件,而文件的發佈則經過CKnownFile::PublishSrc()來判斷。至於這兩種判斷的具體差異,這裏暫且不作太詳細的分析。

資源發佈消息的發送過程大致如上所示。

資源發佈請求消息的處理

咱們瞭解了資源發佈消息的發送,那其它節點在接收到這個消息以後,又會作什麼樣的處理呢?

資源發佈消息與關鍵字的搜索消息區別不大,主要在RequestContactCount,也就是請求的聯繫的個數上面。這裏能夠具體來看一下這個參數在不一樣狀況下的具體值的不一樣,也就是CSearch::GetRequestContactCount()函數(amule-2.3.1/src/kademlia/kademlia/Search.cpp):

uint8_t CSearch::GetRequestContactCount() const
{
	// Returns the amount of contacts we request on routing queries based on the search type
	switch (m_type) {
		case NODE:
		case NODECOMPLETE:
		case NODESPECIAL:
		case NODEFWCHECKUDP:
			return KADEMLIA_FIND_NODE;
		case FILE:
		case KEYWORD:
		case FINDSOURCE:
		case NOTES:
			return KADEMLIA_FIND_VALUE;
		case FINDBUDDY:
		case STOREFILE:
		case STOREKEYWORD:
		case STORENOTES:
			return KADEMLIA_STORE;
		default:
			AddDebugLogLineN(logKadSearch, wxT("Invalid search type. (CSearch::GetRequestContactCount())"));
			wxFAIL;
			return 0;
	}
}

Search能夠分爲3個大類,12個小類,3個大類分別爲節點查找,資源查找和資源發佈。每一個大類中全部的小類請求的聯繫人個數都相同。

再來看一下幾個常量值的定義(amule-2.3.1/src/include/protocol/kad/Constants.h):

// Kad parameters
#define	KADEMLIA_FIND_VALUE		0x02
#define	KADEMLIA_STORE			0x04
#define	KADEMLIA_FIND_NODE		0x0B
#define	KADEMLIA_FIND_VALUE_MORE	KADEMLIA_FIND_NODE

接收端對這個消息的處理與對搜索請求的消息的處理也沒有太大的區別,都是查找一些節點信息,包在一個KADEMLIA2_RES消息裏,返回給發送端。具體能夠參考 Linux下電騾aMule Kademlia網絡構建分析2 一文。

KADEMLIA2_RES消息的處理

球又被踢回了消息發送節點,對於資源發佈的case而言,也就是資源發佈節點。來看KADEMLIA2_RES消息的處理。KADEMLIA2_RES消息最終會被dispatch給CKademliaUDPListener::ProcessPacket(),具體dispatch的過程,與KADEMLIA2_REQ消息的dispatch過程沒有區別,具體可參考 Linux下電騾aMule Kademlia網絡構建分析2 一文。

這裏主要關注CKademliaUDPListener::ProcessPacket()中對於KADEMLIA2_RES消息的處理

case KADEMLIA2_RES:
			DebugRecv(Kad2Res, ip, port);
			ProcessKademlia2Response(packetData, lenPacket, ip, port, senderKey);
			break;

能夠看到消息的處理被委託給了CKademliaUDPListener::ProcessKademlia2Response(),這個函數定義以下:

// KADEMLIA2_RES
// Used in Kad2.0 only
void CKademliaUDPListener::ProcessKademlia2Response(const uint8_t *packetData, uint32_t lenPacket, uint32_t ip, uint16_t port, const CKadUDPKey& WXUNUSED(senderKey))
{
	CHECK_TRACKED_PACKET(KADEMLIA2_REQ);

	// Used Pointers
	CRoutingZone *routingZone = CKademlia::GetRoutingZone();

	// What search does this relate to
	CMemFile bio(packetData, lenPacket);
	CUInt128 target = bio.ReadUInt128();
	uint8_t numContacts = bio.ReadUInt8();

	// Is this one of our legacy challenge packets?
	CUInt128 contactID;
	if (IsLegacyChallenge(target, ip, KADEMLIA2_REQ, contactID)) {
		// yup it is, set the contact as verified
		if (!routingZone->VerifyContact(contactID, ip)) {
			AddDebugLogLineN(logKadRouting, wxT("Unable to find valid sender in routing table (sender: ") + KadIPToString(ip) + wxT(")"));
		} else {
			AddDebugLogLineN(logKadRouting, wxT("Verified contact with legacy challenge (Kad2Req) - ") + KadIPToString(ip));
		}
		return;	// we do not actually care for its other content
	}
	// Verify packet is expected size
	CHECK_PACKET_EXACT_SIZE(16+1 + (16+4+2+2+1)*numContacts);

	// is this a search for firewallcheck ips?
	bool isFirewallUDPCheckSearch = false;
	if (CUDPFirewallTester::IsFWCheckUDPRunning() && CSearchManager::IsFWCheckUDPSearch(target)) {
		isFirewallUDPCheckSearch = true;
	}

	DEBUG_ONLY( uint32_t ignoredCount = 0; )
	DEBUG_ONLY( uint32_t kad1Count = 0; )
	CScopedPtr<ContactList> results;
	for (uint8_t i = 0; i < numContacts; i++) {
		CUInt128 id = bio.ReadUInt128();
		uint32_t contactIP = bio.ReadUInt32();
		uint16_t contactPort = bio.ReadUInt16();
		uint16_t tport = bio.ReadUInt16();
		uint8_t version = bio.ReadUInt8();
		uint32_t hostIP = wxUINT32_SWAP_ALWAYS(contactIP);
		if (version > 1) {	// Kad1 nodes are no longer accepted and ignored
			if (::IsGoodIPPort(hostIP, contactPort)) {
				if (!theApp->ipfilter->IsFiltered(hostIP) && !(contactPort == 53 && version <= 5) /*No DNS Port without encryption*/) {
					if (isFirewallUDPCheckSearch) {
						// UDP FirewallCheck searches are special. The point is we need an IP which we didn't sent a UDP message yet
						// (or in the near future), so we do not try to add those contacts to our routingzone and we also don't
						// deliver them back to the searchmanager (because he would UDP-ask them for further results), but only report
						// them to FirewallChecker - this will of course cripple the search but thats not the point, since we only
						// care for IPs and not the random set target
						CUDPFirewallTester::AddPossibleTestContact(id, contactIP, contactPort, tport, target, version, 0, false);
					} else {
						bool verified = false;
						bool wasAdded = routingZone->AddUnfiltered(id, contactIP, contactPort, tport, version, 0, verified, false, false);
						CContact *temp = new CContact(id, contactIP, contactPort, tport, version, 0, false, target);
						if (wasAdded || routingZone->IsAcceptableContact(temp)) {
							results->push_back(temp);
						} else {
							DEBUG_ONLY( ignoredCount++; )
							delete temp;
						}
					}
				}
			}
		} else {
			DEBUG_ONLY( kad1Count++; )
		}
	}

#ifdef __DEBUG__
	if (ignoredCount > 0) {
		AddDebugLogLineN(logKadRouting, CFormat(wxT("Ignored %u bad %s in routing answer from %s")) % ignoredCount % (ignoredCount > 1 ? wxT("contacts") : wxT("contact")) % KadIPToString(ip));
	}
	if (kad1Count > 0) {
		AddDebugLogLineN(logKadRouting, CFormat(wxT("Ignored %u kad1 %s in routing answer from %s")) % kad1Count % (kad1Count > 1 ? wxT("contacts") : wxT("contact")) % KadIPToString(ip));
	}
#endif

	CSearchManager::ProcessResponse(target, ip, port, results.release());
}

這個地方主要作了兩件事情,

1. 是從消息中解出聯繫人的信息,並保存至RoutingZone中。

2. 是調用CSearchManager::ProcessResponse(target, ip, port, results.release()),執行一些與所發起的搜索/資源發佈等自己有關的動做。具體CSearchManager會如何處理,則須要來看CSearchManager::ProcessResponse()的定義了,代碼以下(amule-2.3.1/src/kademlia/kademlia/SearchManager.cpp):

void CSearchManager::ProcessResponse(const CUInt128& target, uint32_t fromIP, uint16_t fromPort, ContactList *results)
{
	// We got a response to a kad lookup.
	CSearch *s = NULL;
	SearchMap::const_iterator it = m_searches.find(target);
	if (it != m_searches.end()) {
		s = it->second;
	}

	// If this search was deleted before this response, delete contacts and abort, otherwise process them.
	if (s == NULL) {
		AddDebugLogLineN(logKadSearch,
			wxT("Search either never existed or receiving late results (CSearchManager::ProcessResponse)"));
		DeleteContents(*results);
	} else {
		s->ProcessResponse(fromIP, fromPort, results);
	}
	delete results;
}

這個函數中,所作的事情主要是找到相對應的CSearch對象(AlreadySearchingFor()的檢查的成果在這裏派上用場),而後執行它的ProcessResponse()函數。來看CSearch::ProcessResponse()(amule-2.3.1/src/kademlia/kademlia/Search.cpp):

void CSearch::ProcessResponse(uint32_t fromIP, uint16_t fromPort, ContactList *results)
{
	AddDebugLogLineN(logKadSearch, wxT("Processing search response from ") + KadIPPortToString(fromIP, fromPort));

	ContactList::iterator response;
	// Remember the contacts to be deleted when finished
	for (response = results->begin(); response != results->end(); ++response) {
		m_delete.push_back(*response);
	}

	m_lastResponse = time(NULL);

	// Find contact that is responding.
	CUInt128 fromDistance(0u);
	CContact *fromContact = NULL;
	for (ContactMap::const_iterator it = m_tried.begin(); it != m_tried.end(); ++it) {
		CContact *tmpContact = it->second;
		if ((tmpContact->GetIPAddress() == fromIP) && (tmpContact->GetUDPPort() == fromPort)) {
			fromDistance = it->first;
			fromContact = tmpContact;
			break;
		}
	}

	// Make sure the node is not sending more results than we requested, which is not only a protocol violation
	// but most likely a malicious answer
	if (results->size() > GetRequestContactCount() && !(m_requestedMoreNodesContact == fromContact && results->size() <= KADEMLIA_FIND_VALUE_MORE)) {
		AddDebugLogLineN(logKadSearch, wxT("Node ") + KadIPToString(fromIP) + wxT(" sent more contacts than requested on a routing query, ignoring response"));
		return;
	}

	if (m_type == NODEFWCHECKUDP) {
		m_answers++;
		return;
	}

	// Not interested in responses for FIND_NODE, will be added to contacts by udp listener
	if (m_type == NODE) {
		AddDebugLogLineN(logKadSearch, wxT("Node type search result, discarding."));
		// Note that we got an answer.
		m_answers++;
		// We clear the possible list to force the search to stop.
		m_possible.clear();
		return;
	}

	if (fromContact != NULL) {
		bool providedCloserContacts = false;
		std::map<uint32_t, unsigned> receivedIPs;
		std::map<uint32_t, unsigned> receivedSubnets;
		// A node is not allowed to answer with contacts to itself
		receivedIPs[fromIP] = 1;
		receivedSubnets[fromIP & 0xFFFFFF00] = 1;
		// Loop through their responses
		for (ContactList::iterator it = results->begin(); it != results->end(); ++it) {
			// Get next result
			CContact *c = *it;
			// calc distance this result is to the target
			CUInt128 distance(c->GetClientID() ^ m_target);

			if (distance < fromDistance) {
				providedCloserContacts = true;
			}

			// Ignore this contact if already known or tried it.
			if (m_possible.count(distance) > 0) {
				AddDebugLogLineN(logKadSearch, wxT("Search result from already known client: ignore"));
				continue;
			}
			if (m_tried.count(distance) > 0) {
				AddDebugLogLineN(logKadSearch, wxT("Search result from already tried client: ignore"));
				continue;
			}

			// We only accept unique IPs in the answer, having multiple IDs pointing to one IP in the routing tables
			// is no longer allowed since eMule0.49a, aMule-2.2.1 anyway
			if (receivedIPs.count(c->GetIPAddress()) > 0) {
				AddDebugLogLineN(logKadSearch, wxT("Multiple KadIDs pointing to same IP (") + KadIPToString(c->GetIPAddress()) + wxT(") in Kad2Res answer - ignored, sent by ") + KadIPToString(fromContact->GetIPAddress()));
				continue;
			} else {
				receivedIPs[c->GetIPAddress()] = 1;
			}
				// and no more than 2 IPs from the same /24 subnet
			if (receivedSubnets.count(c->GetIPAddress() & 0xFFFFFF00) > 0 && !::IsLanIP(wxUINT32_SWAP_ALWAYS(c->GetIPAddress()))) {
				wxASSERT(receivedSubnets.find(c->GetIPAddress() & 0xFFFFFF00) != receivedSubnets.end());
				int subnetCount = receivedSubnets.find(c->GetIPAddress() & 0xFFFFFF00)->second;
				if (subnetCount >= 2) {
					AddDebugLogLineN(logKadSearch, wxT("More than 2 KadIDs pointing to same subnet (") + KadIPToString(c->GetIPAddress() & 0xFFFFFF00) + wxT("/24) in Kad2Res answer - ignored, sent by ") + KadIPToString(fromContact->GetIPAddress()));
					continue;
				} else {
					receivedSubnets[c->GetIPAddress() & 0xFFFFFF00] = subnetCount + 1;
				}
			} else {
				receivedSubnets[c->GetIPAddress() & 0xFFFFFF00] = 1;
			}

			// Add to possible
			m_possible[distance] = c;

			// Verify if the result is closer to the target than the one we just checked.
			if (distance < fromDistance) {
				// The top ALPHA_QUERY of results are used to determine if we send a request.
				bool top = false;
				if (m_best.size() < ALPHA_QUERY) {
					top = true;
					m_best[distance] = c;
				} else {
					ContactMap::iterator worst = m_best.end();
					--worst;
					if (distance < worst->first) {
						// Prevent having more than ALPHA_QUERY within the Best list.
						m_best.erase(worst);
						m_best[distance] = c;
						top = true;
					}
				}

				if (top) {
					// We determined this contact is a candidate for a request.
					// Add to tried
					m_tried[distance] = c;
					// Send the KadID so other side can check if I think it has the right KadID.
					// Send request
					SendFindValue(c);
				}
			}
		}

		// Add to list of people who responded.
		m_responded[fromDistance] = providedCloserContacts;

		// Complete node search, just increment the counter.
		if (m_type == NODECOMPLETE || m_type == NODESPECIAL) {
			AddDebugLogLineN(logKadSearch, wxString(wxT("Search result type: Node")) + (m_type == NODECOMPLETE ? wxT("Complete") : wxT("Special")));
			m_answers++;
		}
	}
}

這個函數,

(1)、首先會在m_delet中記錄下結果裏面全部的聯繫人信息,以便在結束以後能夠將它們都移除掉。

(2)、而後更新m_lastResponse爲當前時間。

(3)、在發送消息的時候,會在m_tried中記錄都向那些聯繫人發送了消息。這裏則將在m_tried中查找這個響應具體是哪一個目標聯繫人發送回來的,也就是fromContact。

(4)、基於安全考慮,確保響應中的聯繫人不會過多。返回的聯繫人個數超出預期的話,可能說明,這是一個惡意的響應。

(5)、遍歷返回回來的全部聯繫人,找到distance與target clientID更近的聯繫人,向其發送相同的搜索/發佈資源的消息。

總結一下KADEMLIA2_RES消息的處理過程,主要是解析消息中發回來的節點信息,並向那些與target clientID 距離更近的節點重複發送相同的搜索/發佈資源的消息,持續地進行這樣的節點查找的過程。

發送資源信息至其餘節點

如咱們前面看到的,Kademlia網絡中,不論是搜索關鍵字的過程,仍是發佈資源的過程,最早要作的事情都是先發送KADEMLIA2_REQ消息,找到一些合適的節點回來。可是搜索到的這麼多節點,又會如何被使用呢?

如咱們前面在 Linux下電騾aMule Kademlia網絡構建分析3 一文中所看到的。Kademlia::CKademlia::Process()會經過一個定時器週期性地被執行,在這個函數中,咱們能夠看到這樣幾行:

// This is a convenient place to add this, although not related to routing
	if (m_nextSearchJumpStart <= now) {
		CSearchManager::JumpStart();
		m_nextSearchJumpStart = SEARCH_JUMPSTART + now;
	}

因此,CSearchManager::JumpStart()也會被週期性地執行。在這個函數中,CSearchManager會遍歷全部的CSearch,找到全部那些須要作進一步動做的searches,並jumpstart它們,即推進search的進一步執行。來看一下這個函數的定義:

void CSearchManager::JumpStart()
{
	// Find any searches that has stalled and jumpstart them.
	// This will also prune all searches.
	time_t now = time(NULL);
	SearchMap::iterator next_it = m_searches.begin();
	while (next_it != m_searches.end()) {
		SearchMap::iterator current_it = next_it++; /* don't change this to a ++next_it! */
		switch(current_it->second->GetSearchTypes()){
			case CSearch::FILE: {
				if (current_it->second->m_created + SEARCHFILE_LIFETIME < now) {
					delete current_it->second;
					m_searches.erase(current_it);
				} else if (current_it->second->GetAnswers() > SEARCHFILE_TOTAL ||
					   current_it->second->m_created + SEARCHFILE_LIFETIME - SEC(20) < now) {
					current_it->second->PrepareToStop();
				} else {
					current_it->second->JumpStart();
				}					
				break;
			}
			case CSearch::KEYWORD: {

。。。。。。

			case CSearch::STOREFILE: {
				if (current_it->second->m_created + SEARCHSTOREFILE_LIFETIME < now) {
					delete current_it->second;
					m_searches.erase(current_it);
				} else if (current_it->second->GetAnswers() > SEARCHSTOREFILE_TOTAL ||
					   current_it->second->m_created + SEARCHSTOREFILE_LIFETIME - SEC(20) < now) {
					current_it->second->PrepareToStop();
				} else {
					current_it->second->JumpStart();
				}
				break;
			}
			case CSearch::STOREKEYWORD: {
				if (current_it->second->m_created + SEARCHSTOREKEYWORD_LIFETIME < now) {
					delete current_it->second;
					m_searches.erase(current_it);
				} else if (current_it->second->GetAnswers() > SEARCHSTOREKEYWORD_TOTAL ||
					   current_it->second->m_created + SEARCHSTOREKEYWORD_LIFETIME - SEC(20)< now) {
					current_it->second->PrepareToStop();
				} else {
					current_it->second->JumpStart();
				}
				break;
			}
			case CSearch::STORENOTES: {
				if (current_it->second->m_created + SEARCHSTORENOTES_LIFETIME < now) {
					delete current_it->second;
					m_searches.erase(current_it);
				} else if (current_it->second->GetAnswers() > SEARCHSTORENOTES_TOTAL ||
					   current_it->second->m_created + SEARCHSTORENOTES_LIFETIME - SEC(20)< now) {
					current_it->second->PrepareToStop();
				} else {
					current_it->second->JumpStart();
				}
				break;
			}
			default: {
				if (current_it->second->m_created + SEARCH_LIFETIME < now) {
					delete current_it->second;
					m_searches.erase(current_it);
				} else {
					current_it->second->JumpStart();
				}
				break;
			}

在這裏咱們主要關注與資源發佈相關的幾個case,STOREFILE,STOREKEYWORD,和STORENOTES。這幾個case內的人處理也比較類似,都是根據search的不一樣狀態來執行不一樣的動做:

1. Search建立的時間已經很長了,具體點說,便是建立的時間已經超出了其生命週期SEARCHSTOREFILE_LIFETIME,SEARCHSTOREKEYWORD_LIFETIME或SEARCHSTORENOTES_LIFETIME了。直接刪除search。

2. Search建立的時間已經比較長了,具體點說,便是建立的時間已經超出了各自生命週期減去20S了;或者已經獲得的足夠多的響應了,由CSearch::GetAnswers()的定義,可滿足夠多的含義是指平均每50個要發佈的文件,獲得了超過SEARCHSTOREFILE_TOTAL,SEARCHSTOREKEYWORD_TOTAL,或SEARCHSTORENOTES_TOTAL個相應了。停掉搜索過程。

3. 其它狀況。執行search的JumpStart()推進search的進一步執行

這裏主要來看CSearch::JumpStart()的執行過程:

void CSearch::JumpStart()
{
    AddLogLineNS(CFormat(_("CSearch::JumpStart, search type is %s, search id is %u, answers is %u, fileID size is %d"))
            % SearchTypeToString() % m_searchID % m_answers % m_fileIDs.size());
	// If we had a response within the last 3 seconds, no need to jumpstart the search.
	if ((time_t)(m_lastResponse + SEC(3)) > time(NULL)) {
		return;
	}

	// If we ran out of contacts, stop search.
	if (m_possible.empty()) {
		PrepareToStop();
		return;
	}

	// Is this a find lookup and are the best two (=KADEMLIA_FIND_VALUE) nodes dead/unreachable?
	// In this case try to discover more close nodes before using our other results
	// The reason for this is that we may not have found the closest node alive due to results being limited to 2 contacts,
	// which could very well have been the duplicates of our dead closest nodes
	bool lookupCloserNodes = false;
	if (m_requestedMoreNodesContact == NULL && GetRequestContactCount() == KADEMLIA_FIND_VALUE && m_tried.size() >= 3 * KADEMLIA_FIND_VALUE) {
		ContactMap::const_iterator it = m_tried.begin();
		lookupCloserNodes = true;
		for (unsigned i = 0; i < KADEMLIA_FIND_VALUE; i++) {
			if (m_responded.count(it->first) > 0) {
				lookupCloserNodes = false;
				break;
			}
			++it;
		}
		if (lookupCloserNodes) {
			while (it != m_tried.end()) {
				if (m_responded.count(it->first) > 0) {
					AddDebugLogLineN(logKadSearch, CFormat(wxT("Best %d nodes for lookup (id=%x) were unreachable or dead, reasking closest for more")) % KADEMLIA_FIND_VALUE % GetSearchID());
					SendFindValue(it->second, true);
					return;
				}
				++it;
			}
		}
	}

	// Search for contacts that can be used to jumpstart a stalled search.
	while (!m_possible.empty()) {
		// Get a contact closest to our target.
		CContact *c = m_possible.begin()->second;
	
		// Have we already tried to contact this node.
		if (m_tried.count(m_possible.begin()->first) > 0) {
			// Did we get a response from this node, if so, try to store or get info.
			if (m_responded.count(m_possible.begin()->first) > 0) {
				StorePacket();
			}
			// Remove from possible list.
			m_possible.erase(m_possible.begin());
		} else {
			// Add to tried list.
			m_tried[m_possible.begin()->first] = c;
			// Send the KadID so other side can check if I think it has the right KadID.
			// Send request
			SendFindValue(c);
			break;
		}
	}
	
}

這個函數主要作2件事情,

1是,在須要的時候,繼續發送相同的發佈資源/搜索請求出去,以便於能找到更多的節點。

2是,執行CSearch::StorePacket()來推進資源發佈/搜索的更進一步執行。

至於這兩件是的具體執行條件,如上面代碼所示,此處再也不贅述。這裏主要來看一下CSearch::StorePacket()的執行:

void CSearch::StorePacket()
{
	wxASSERT(!m_possible.empty());

	// This method is currently only called by jumpstart so only use best possible.
	ContactMap::const_iterator possible = m_possible.begin();
	CUInt128 fromDistance(possible->first);
	CContact *from = possible->second;

	if (fromDistance < m_closestDistantFound || m_closestDistantFound == 0) {
		m_closestDistantFound = fromDistance;
	}

	// Make sure this is a valid node to store.
	if (fromDistance.Get32BitChunk(0) > SEARCHTOLERANCE && !::IsLanIP(wxUINT32_SWAP_ALWAYS(from->GetIPAddress()))) {
		return;
	}

	// What kind of search are we doing?
	switch (m_type) {
		case FILE: {

。。。。。。

		case STOREFILE: {
			AddDebugLogLineN(logKadSearch, wxT("Search request type: StoreFile"));
			// Try to store ourselves as a source to a Node.
			// As a safeguard, check to see if we already stored to the max nodes.
			if (m_answers > SEARCHSTOREFILE_TOTAL) {
				PrepareToStop();
				break;
			}

			// Find the file we are trying to store as a source to.
			uint8_t fileid[16];
			m_target.ToByteArray(fileid);
			CKnownFile* file = theApp->sharedfiles->GetFileByID(CMD4Hash(fileid));
			if (file) {
				// We store this mostly for GUI reasons.
				m_fileName = file->GetFileName().GetPrintable();

				// Get our clientID for the packet.
				CUInt128 id(CKademlia::GetPrefs()->GetClientHash());
				TagPtrList taglist;

				//We can use type for different types of sources. 
				//1 HighID sources..
				//2 cannot be used as older clients will not work.
				//3 Firewalled Kad Source.
				//4 >4GB file HighID Source.
				//5 >4GB file Firewalled Kad source.
				//6 Firewalled source with Direct Callback (supports >4GB)

				bool directCallback = false;
				if (theApp->IsFirewalled()) {
					directCallback = (Kademlia::CKademlia::IsRunning() && !Kademlia::CUDPFirewallTester::IsFirewalledUDP(true) && Kademlia::CUDPFirewallTester::IsVerified());
					if (directCallback) {
						// firewalled, but direct udp callback is possible so no need for buddies
						// We are not firewalled..
						taglist.push_back(new CTagVarInt(TAG_SOURCETYPE, 6));
						taglist.push_back(new CTagVarInt(TAG_SOURCEPORT, thePrefs::GetPort()));
						if (!CKademlia::GetPrefs()->GetUseExternKadPort()) {
							taglist.push_back(new CTagInt16(TAG_SOURCEUPORT, CKademlia::GetPrefs()->GetInternKadPort()));
						}
						if (from->GetVersion() >= 2) {
							taglist.push_back(new CTagVarInt(TAG_FILESIZE, file->GetFileSize()));
						}
					} else if (theApp->clientlist->GetBuddy()) {	// We are firewalled, make sure we have a buddy.
						// We send the ID to our buddy so they can do a callback.
						CUInt128 buddyID(true);
						buddyID ^= CKademlia::GetPrefs()->GetKadID();
						taglist.push_back(new CTagInt8(TAG_SOURCETYPE, file->IsLargeFile() ? 5 : 3));
						taglist.push_back(new CTagVarInt(TAG_SERVERIP, theApp->clientlist->GetBuddy()->GetIP()));
						taglist.push_back(new CTagVarInt(TAG_SERVERPORT, theApp->clientlist->GetBuddy()->GetUDPPort()));
						uint8_t hashBytes[16];
						buddyID.ToByteArray(hashBytes);
						taglist.push_back(new CTagString(TAG_BUDDYHASH, CMD4Hash(hashBytes).Encode()));
						taglist.push_back(new CTagVarInt(TAG_SOURCEPORT, thePrefs::GetPort()));
						if (!CKademlia::GetPrefs()->GetUseExternKadPort()) {
							taglist.push_back(new CTagInt16(TAG_SOURCEUPORT, CKademlia::GetPrefs()->GetInternKadPort()));
						}
						if (from->GetVersion() >= 2) {
							taglist.push_back(new CTagVarInt(TAG_FILESIZE, file->GetFileSize()));
						}
					} else {
						// We are firewalled, but lost our buddy.. Stop everything.
						PrepareToStop();
						break;
					}
				} else {
					// We're not firewalled..
					taglist.push_back(new CTagInt8(TAG_SOURCETYPE, file->IsLargeFile() ? 4 : 1));
					taglist.push_back(new CTagVarInt(TAG_SOURCEPORT, thePrefs::GetPort()));
					if (!CKademlia::GetPrefs()->GetUseExternKadPort()) {
						taglist.push_back(new CTagInt16(TAG_SOURCEUPORT, CKademlia::GetPrefs()->GetInternKadPort()));
					}
					if (from->GetVersion() >= 2) {
						taglist.push_back(new CTagVarInt(TAG_FILESIZE, file->GetFileSize()));
					}
				}

				taglist.push_back(new CTagInt8(TAG_ENCRYPTION, CPrefs::GetMyConnectOptions(true, true)));

				// Send packet
				CKademlia::GetUDPListener()->SendPublishSourcePacket(*from, m_target, id, taglist);
				m_totalRequestAnswers++;
				// Delete all tags.
				deleteTagPtrListEntries(&taglist);
			} else {
				PrepareToStop();
			}
			break;
		}
		case STOREKEYWORD: {
			AddDebugLogLineN(logKadSearch, wxT("Search request type: StoreKeyword"));
			// Try to store keywords to a Node.
			// As a safeguard, check to see if we already stored to the max nodes.
			if (m_answers > SEARCHSTOREKEYWORD_TOTAL) {
				PrepareToStop();
				break;
			}

			uint16_t count = m_fileIDs.size();
			if (count == 0) {
				PrepareToStop();
				break;
			} else if (count > 150) {
				count = 150;
			}

			UIntList::const_iterator itListFileID = m_fileIDs.begin();
			uint8_t fileid[16];

			while (count && (itListFileID != m_fileIDs.end())) {
				uint16_t packetCount = 0;
				CMemFile packetdata(1024*50); // Allocate a good amount of space.			
				packetdata.WriteUInt128(m_target);
				packetdata.WriteUInt16(0); // Will be updated before sending.
				while ((packetCount < 50) && (itListFileID != m_fileIDs.end())) {
					CUInt128 id(*itListFileID);
					id.ToByteArray(fileid);
					CKnownFile *pFile = theApp->sharedfiles->GetFileByID(CMD4Hash(fileid));
					if (pFile) {
						count--;
						packetCount++;
						packetdata.WriteUInt128(id);
						PreparePacketForTags(&packetdata, pFile);
					}
					++itListFileID;
				}

				// Correct file count.
				uint64_t current_pos = packetdata.GetPosition();
				packetdata.Seek(16);
				packetdata.WriteUInt16(packetCount);
				packetdata.Seek(current_pos);

				// Send packet
				if (from->GetVersion() >= 6) {
					DebugSend(Kad2PublishKeyReq, from->GetIPAddress(), from->GetUDPPort());
					CUInt128 clientID = from->GetClientID();
					CKademlia::GetUDPListener()->SendPacket(packetdata, KADEMLIA2_PUBLISH_KEY_REQ, from->GetIPAddress(), from->GetUDPPort(), from->GetUDPKey(), &clientID);
				} else if (from->GetVersion() >= 2) {
					DebugSend(Kad2PublishKeyReq, from->GetIPAddress(), from->GetUDPPort());
					CKademlia::GetUDPListener()->SendPacket(packetdata, KADEMLIA2_PUBLISH_KEY_REQ, from->GetIPAddress(), from->GetUDPPort(), 0, NULL);
					wxASSERT(from->GetUDPKey() == CKadUDPKey(0));
				} else {
					wxFAIL;
				}
			}
			m_totalRequestAnswers++;
			break;
		}
		case STORENOTES: {
			AddDebugLogLineN(logKadSearch, wxT("Search request type: StoreNotes"));
			// Find file we are storing info about.
			uint8_t fileid[16];
			m_target.ToByteArray(fileid);
			CKnownFile* file = theApp->sharedfiles->GetFileByID(CMD4Hash(fileid));

			if (file) {
				CMemFile packetdata(1024*2);
				// Send the hash of the file we're storing info about.
				packetdata.WriteUInt128(m_target);
				// Send our ID with the info.
				packetdata.WriteUInt128(CKademlia::GetPrefs()->GetKadID());

				// Create our taglist.
				TagPtrList taglist;
				taglist.push_back(new CTagString(TAG_FILENAME, file->GetFileName().GetPrintable()));
				if (file->GetFileRating() != 0) {
					taglist.push_back(new CTagVarInt(TAG_FILERATING, file->GetFileRating()));
				}
				if (!file->GetFileComment().IsEmpty()) {
					taglist.push_back(new CTagString(TAG_DESCRIPTION, file->GetFileComment()));
				}
				if (from->GetVersion() >= 2) {
					taglist.push_back(new CTagVarInt(TAG_FILESIZE, file->GetFileSize()));
				}
				packetdata.WriteTagPtrList(taglist);

				// Send packet
				if (from->GetVersion() >= 6) {
					DebugSend(Kad2PublishNotesReq, from->GetIPAddress(), from->GetUDPPort());
					CUInt128 clientID = from->GetClientID();
					CKademlia::GetUDPListener()->SendPacket(packetdata, KADEMLIA2_PUBLISH_NOTES_REQ, from->GetIPAddress(), from->GetUDPPort(), from->GetUDPKey(), &clientID);
				} else if (from->GetVersion() >= 2) {
					DebugSend(Kad2PublishNotesReq, from->GetIPAddress(), from->GetUDPPort());
					CKademlia::GetUDPListener()->SendPacket(packetdata, KADEMLIA2_PUBLISH_NOTES_REQ, from->GetIPAddress(), from->GetUDPPort(), 0, NULL);
					wxASSERT(from->GetUDPKey() == CKadUDPKey(0));
				} else {
					wxFAIL;
				}
				m_totalRequestAnswers++;
				// Delete all tags.
				deleteTagPtrListEntries(&taglist);
			} else {
				PrepareToStop();
			}
			break;
		}

這個函數有點長,但結構不復雜。先是取出m_possible中的首個contact from,作一些檢查。而後就根據具體的search type,執行一些操做。對於咱們的發佈資源而言,主要關注STOREFILE,STOREKEYWORD和STORENOTES三個case。咱們來分別分析三種不一樣類型的發佈具體的執行過程。

1. 文件的發佈,也就是case STOREFILE。

(1)、根據fileid找到相應的文件的表示file CKnownFile。fileid也是搜索的target,同時fileid通過MD4Hash以後也是CSharedFileList在管理文件時的index。

(2)、從文件中讀出信息,包裝成一個個的CTag,並放在一塊兒構造一個TagPtrList。

(3)、經過CKademliaUDPListener::SendPublishSourcePacket()函數將TagPtrList發送給from contact。

(4)、刪除TagPtrList的內容。

這裏能夠再來看一下CKademliaUDPListener的SendPublishSourcePacket()函數:

void CKademliaUDPListener::SendPublishSourcePacket(const CContact& contact, const CUInt128 &targetID, const CUInt128 &contactID, const TagPtrList& tags)
{
	uint8_t opcode;
	CMemFile packetdata;
	packetdata.WriteUInt128(targetID);
	if (contact.GetVersion() >= 4/*47c*/) {
		opcode = KADEMLIA2_PUBLISH_SOURCE_REQ;
		packetdata.WriteUInt128(contactID);
		packetdata.WriteTagPtrList(tags);
		DebugSend(Kad2PublishSrcReq, contact.GetIPAddress(), contact.GetUDPPort());
	} else {
		opcode = KADEMLIA_PUBLISH_REQ;
		//We only use this for publishing sources now.. So we always send one here..
		packetdata.WriteUInt16(1);
		packetdata.WriteUInt128(contactID);
		packetdata.WriteTagPtrList(tags);
		DebugSend(KadPublishReq, contact.GetIPAddress(), contact.GetUDPPort());
	}
	if (contact.GetVersion() >= 6) {	// obfuscated ?
		CUInt128 clientID = contact.GetClientID();
		SendPacket(packetdata, opcode, contact.GetIPAddress(), contact.GetUDPPort(), contact.GetUDPKey(), &clientID);
	} else {
		SendPacket(packetdata, opcode, contact.GetIPAddress(), contact.GetUDPPort(), 0, NULL);
	}
}

這個函數主要是將全部的數據寫入一個CMemFile packetdata中,根據contact版本的不一樣,將packetdata包裝爲一個KADEMLIA2_PUBLISH_SOURCE_REQ或KADEMLIA_PUBLISH_REQ消息,經過CKademliaUDPListener::SendPacket()函數發送出去。

packetdata的格式也會根據contact的版本不一樣而略有差別。

CKademliaUDPListener::SendPacket()函數的具體執行過程能夠參考 Linux下電騾aMule Kademlia網絡構建分析2 一文。

2. 關鍵字Keyword的發佈,case STOREKEYWORD。

關鍵字Keyword發佈的search在建立時,會將與關鍵字Keyword相關的全部文件的fileID都添加進m_fileIDs。在這裏則是將全部的文件信息打包進CMemFile packetdata,並將packetdata包裝爲KADEMLIA2_PUBLISH_KEY_REQ消息,而後經過CKademliaUDPListener::SendPacket()函數發送出去。

能夠看到,每一個packetdata最多包50個文件的信息,若是文件過多,則經過多個packetdata發送出去。

這裏會使用CSearch::PreparePacketForTags()將一個文件的信息組裝爲一個TagPtrList,而後將TagPtrList寫入packetdata

CKademliaUDPListener::SendPacket()函數的具體執行過程能夠參考 Linux下電騾aMule Kademlia網絡構建分析2 一文。

3. FileNotes的發佈,case STORENOTES

將文件的信息包裝爲一個個的CTag,並放在一塊兒構造一個TagPtrList。

TagPtrList包裝爲一個CMemFile packetdata。

packetdata包裝爲一個KADEMLIA2_PUBLISH_NOTES_REQ經過CKademliaUDPListener::SendPacket()函數發送出去。

目標節點對所發佈資源的相關信息的保存

一個節點能夠向Kademlia網絡發佈三種不一樣類型的資源信息,File,Keyword,和FileNotes,具體的信息則會經過KADEMLIA2_PUBLISH_SOURCE_REQ、KADEMLIA2_PUBLISH_KEY_REQ或KADEMLIA2_PUBLISH_NOTES_REQ發送出去。

Kademlia網絡中的目標節點在接收到這些消息以後,又是如何處理的呢?CKademliaUDPListener::ProcessPacket()能夠看到這樣的幾個case:

case KADEMLIA2_PUBLISH_NOTES_REQ:
			DebugRecv(Kad2PublishNotesReq, ip, port);
			Process2PublishNotesRequest(packetData, lenPacket, ip, port, senderKey);
			break;
		case KADEMLIA2_PUBLISH_KEY_REQ:
			DebugRecv(Kad2PublishKeyReq, ip, port);
			Process2PublishKeyRequest(packetData, lenPacket, ip, port, senderKey);
			break;
		case KADEMLIA2_PUBLISH_SOURCE_REQ:
			DebugRecv(Kad2PublishSourceReq, ip, port);
			Process2PublishSourceRequest(packetData, lenPacket, ip, port, senderKey);
			break;

這些消息又會被委託給CKademliaUDPListener的Process2PublishNotesRequest(),Process2PublishKeyRequest()和Process2PublishSourceRequest()處理,來看這幾個函數的定義,首先是Process2PublishNotesRequest()

// KADEMLIA2_PUBLISH_NOTES_REQ
// Used only by Kad2.0
void CKademliaUDPListener::Process2PublishNotesRequest(const uint8_t *packetData, uint32_t lenPacket, uint32_t ip, uint16_t port, const CKadUDPKey& senderKey)
{
	// check if we are UDP firewalled
	if (CUDPFirewallTester::IsFirewalledUDP(true)) {
		//We are firewalled. We should not index this entry and give publisher a false report.
		return;
	}

	CMemFile bio(packetData, lenPacket);
	CUInt128 target = bio.ReadUInt128();

	CUInt128 distance(CKademlia::GetPrefs()->GetKadID());
	distance.XOR(target);

	if (distance.Get32BitChunk(0) > SEARCHTOLERANCE && !::IsLanIP(wxUINT32_SWAP_ALWAYS(ip))) {
		return;
	}

	CUInt128 source = bio.ReadUInt128();

	Kademlia::CEntry* entry = new Kademlia::CEntry();
	try {
		entry->m_uIP = ip;
		entry->m_uUDPport = port;
		entry->m_uKeyID.SetValue(target);
		entry->m_uSourceID.SetValue(source);
		entry->m_bSource = false;
		uint32_t tags = bio.ReadUInt8();
		while (tags > 0) {
			CTag* tag = bio.ReadTag();
			if(tag) {
				if (!tag->GetName().Cmp(TAG_FILENAME)) {
					if (entry->GetCommonFileName().IsEmpty()) {
						entry->SetFileName(tag->GetStr());
					}
					delete tag;
				} else if (!tag->GetName().Cmp(TAG_FILESIZE)) {
					if (entry->m_uSize == 0) {
						entry->m_uSize = tag->GetInt();
					}
					delete tag;
				} else {
					//TODO: Filter tags
					entry->AddTag(tag);
				}
			}
			tags--;
		}
	} catch(...) {
		//DebugClientOutput(wxT("CKademliaUDPListener::Process2PublishNotesRequest"),ip,port,packetData,lenPacket);
		delete entry;
		entry = NULL;
		throw;
	}

	uint8_t load = 0;
	if (CKademlia::GetIndexed()->AddNotes(target, source, entry, load)) {
		CMemFile packetdata(17);
		packetdata.WriteUInt128(target);
		packetdata.WriteUInt8(load);
		DebugSend(Kad2PublishRes, ip, port);
		SendPacket(packetdata, KADEMLIA2_PUBLISH_RES, ip, port, senderKey, NULL);
	} else {
		delete entry;
	}
}

從消息裏面解出FileNotes信息,而後調用CIndexed::AddNotes()將相關信息保存起來,最後發回一個KADEMLIA2_PUBLISH_RES消息。

而後是Process2PublishKeyRequest()

// KADEMLIA2_PUBLISH_KEY_REQ
// Used in Kad2.0 only
void CKademliaUDPListener::Process2PublishKeyRequest(const uint8_t *packetData, uint32_t lenPacket, uint32_t ip, uint16_t port, const CKadUDPKey& senderKey)
{
	//Used Pointers
	CIndexed *indexed = CKademlia::GetIndexed();

	// check if we are UDP firewalled
	if (CUDPFirewallTester::IsFirewalledUDP(true)) {
		//We are firewalled. We should not index this entry and give publisher a false report.
		return;
	}

	CMemFile bio(packetData, lenPacket);
	CUInt128 file = bio.ReadUInt128();

	CUInt128 distance(CKademlia::GetPrefs()->GetKadID());
	distance.XOR(file);

	if (distance.Get32BitChunk(0) > SEARCHTOLERANCE && !::IsLanIP(wxUINT32_SWAP_ALWAYS(ip))) {
		return;
	}

	DEBUG_ONLY( wxString strInfo; )
	uint16_t count = bio.ReadUInt16();
	uint8_t load = 0;
	while (count > 0) {
		DEBUG_ONLY( strInfo.Clear(); )

		CUInt128 target = bio.ReadUInt128();

		Kademlia::CKeyEntry* entry = new Kademlia::CKeyEntry();
		try
		{
			entry->m_uIP = ip;
			entry->m_uUDPport = port;
			entry->m_uKeyID.SetValue(file);
			entry->m_uSourceID.SetValue(target);
			entry->m_tLifeTime = (uint32_t)time(NULL) + KADEMLIAREPUBLISHTIMEK;
			entry->m_bSource = false;
			uint32_t tags = bio.ReadUInt8();
			while (tags > 0) {
				CTag* tag = bio.ReadTag();
				if (tag) {
					if (!tag->GetName().Cmp(TAG_FILENAME)) {
						if (entry->GetCommonFileName().IsEmpty()) {
							entry->SetFileName(tag->GetStr());
							DEBUG_ONLY( strInfo += CFormat(wxT("  Name=\"%s\"")) % entry->GetCommonFileName(); )
						}
						delete tag; // tag is no longer stored, but membervar is used
					} else if (!tag->GetName().Cmp(TAG_FILESIZE)) {
						if (entry->m_uSize == 0) {
							if (tag->IsBsob() && tag->GetBsobSize() == 8) {
								entry->m_uSize = PeekUInt64(tag->GetBsob());
							} else {
								entry->m_uSize = tag->GetInt();
							}
							DEBUG_ONLY( strInfo += CFormat(wxT("  Size=%u")) % entry->m_uSize; )
						}
						delete tag; // tag is no longer stored, but membervar is used
					} else {
						//TODO: Filter tags
						entry->AddTag(tag);
					}
				}
				tags--;
			}
#ifdef __DEBUG__
			if (!strInfo.IsEmpty()) {
				AddDebugLogLineN(logClientKadUDP, strInfo);
			}
#endif
		} catch(...) {
			//DebugClientOutput(wxT("CKademliaUDPListener::Process2PublishKeyRequest"),ip,port,packetData,lenPacket);
			delete entry;
			throw;
		}

		if (!indexed->AddKeyword(file, target, entry, load)) {
			//We already indexed the maximum number of keywords.
			//We do not index anymore but we still send a success..
			//Reason: Because if a VERY busy node tells the publisher it failed,
			//this busy node will spread to all the surrounding nodes causing popular
			//keywords to be stored on MANY nodes..
			//So, once we are full, we will periodically clean our list until we can
			//begin storing again..
			delete entry;
			entry = NULL;
		}
		count--;
	}
	CMemFile packetdata(17);
	packetdata.WriteUInt128(file);
	packetdata.WriteUInt8(load);
	DebugSend(Kad2PublishRes, ip, port);
	SendPacket(packetdata, KADEMLIA2_PUBLISH_RES, ip, port, senderKey, NULL);
}

從消息裏面解出Keyword信息,而後調用CIndexed::AddKeyword()將相關信息保存起來,最後發回一個KADEMLIA2_PUBLISH_RES消息。

最後是Process2PublishSourceRequest()

// KADEMLIA2_PUBLISH_SOURCE_REQ
// Used in Kad2.0 only
void CKademliaUDPListener::Process2PublishSourceRequest(const uint8_t *packetData, uint32_t lenPacket, uint32_t ip, uint16_t port, const CKadUDPKey& senderKey)
{
	//Used Pointers
	CIndexed *indexed = CKademlia::GetIndexed();

	// check if we are UDP firewalled
	if (CUDPFirewallTester::IsFirewalledUDP(true)) {
		//We are firewalled. We should not index this entry and give publisher a false report.
		return;
	}

	CMemFile bio(packetData, lenPacket);
	CUInt128 file = bio.ReadUInt128();

	CUInt128 distance(CKademlia::GetPrefs()->GetKadID());
	distance.XOR(file);

	if (distance.Get32BitChunk(0) > SEARCHTOLERANCE && !::IsLanIP(wxUINT32_SWAP_ALWAYS(ip))) {
		return;
	}

	DEBUG_ONLY( wxString strInfo; )
	uint8_t load = 0;
	bool flag = false;
	CUInt128 target = bio.ReadUInt128();
	Kademlia::CEntry* entry = new Kademlia::CEntry();
	try {
		entry->m_uIP = ip;
		entry->m_uUDPport = port;
		entry->m_uKeyID.SetValue(file);
		entry->m_uSourceID.SetValue(target);
		entry->m_bSource = false;
		entry->m_tLifeTime = (uint32_t)time(NULL) + KADEMLIAREPUBLISHTIMES;
		bool addUDPPortTag = true;
		uint32_t tags = bio.ReadUInt8();
		while (tags > 0) {
			CTag* tag = bio.ReadTag();
			if (tag) {
				if (!tag->GetName().Cmp(TAG_SOURCETYPE)) {
					if (entry->m_bSource == false) {
						entry->AddTag(new CTagVarInt(TAG_SOURCEIP, entry->m_uIP));
						entry->AddTag(tag);
						entry->m_bSource = true;
					} else {
						//More than one sourcetype tag found.
						delete tag;
					}
				} else if (!tag->GetName().Cmp(TAG_FILESIZE)) {
					if (entry->m_uSize == 0) {
						if (tag->IsBsob() && tag->GetBsobSize() == 8) {
							entry->m_uSize = PeekUInt64(tag->GetBsob());
						} else {
							entry->m_uSize = tag->GetInt();
						}
						DEBUG_ONLY( strInfo += CFormat(wxT("  Size=%u")) % entry->m_uSize; )
					}
					delete tag;
				} else if (!tag->GetName().Cmp(TAG_SOURCEPORT)) {
					if (entry->m_uTCPport == 0) {
						entry->m_uTCPport = (uint16_t)tag->GetInt();
						entry->AddTag(tag);
					} else {
						//More than one port tag found
						delete tag;
					}
				} else if (!tag->GetName().Cmp(TAG_SOURCEUPORT)) {
					if (addUDPPortTag && tag->IsInt() && tag->GetInt() != 0) {
						entry->m_uUDPport = (uint16_t)tag->GetInt();
						entry->AddTag(tag);
						addUDPPortTag = false;
					} else {
						//More than one udp port tag found
						delete tag;
					}
				} else {
					//TODO: Filter tags
					entry->AddTag(tag);
				}
			}
			tags--;
		}
		if (addUDPPortTag) {
			entry->AddTag(new CTagVarInt(TAG_SOURCEUPORT, entry->m_uUDPport));
		}
#ifdef __DEBUG__
		if (!strInfo.IsEmpty()) {
			AddDebugLogLineN(logClientKadUDP, strInfo);
		}
#endif
	} catch(...) {
		//DebugClientOutput(wxT("CKademliaUDPListener::Process2PublishSourceRequest"),ip,port,packetData,lenPacket);
		delete entry;
		throw;
	}

	if (entry->m_bSource == true) {
		if (indexed->AddSources(file, target, entry, load)) {
			flag = true;
		} else {
			delete entry;
			entry = NULL;
		}
	} else {
		delete entry;
		entry = NULL;
	}
	if (flag) {
		CMemFile packetdata(17);
		packetdata.WriteUInt128(file);
		packetdata.WriteUInt8(load);
		DebugSend(Kad2PublishRes, ip, port);
		SendPacket(packetdata, KADEMLIA2_PUBLISH_RES, ip, port, senderKey, NULL);
	}
}

從消息裏面解出File信息,而後調用CIndexed::AddSources()將相關信息保存起來,最後發回一個KADEMLIA2_PUBLISH_RES消息。

CIndexed在aMule的Kademlia協議實現中,用來管理髮布的資源信息。

球再一次被提給了發送資源的節點。來看KADEMLIA2_PUBLISH_RES消息的處理。在CKademliaUDPListener::ProcessPacket()中能夠看到這樣的幾個case:

case KADEMLIA2_PUBLISH_RES:
			DebugRecv(Kad2PublishRes, ip, port);
			Process2PublishResponse(packetData, lenPacket, ip, port, senderKey);
			break;

消息的處理被委託給了CKademliaUDPListener::Process2PublishResponse():

void CKademliaUDPListener::Process2PublishResponse(const uint8_t *packetData, uint32_t lenPacket, uint32_t ip, uint16_t port, const CKadUDPKey& senderKey)
{
	if (!IsOnOutTrackList(ip, KADEMLIA2_PUBLISH_KEY_REQ) && !IsOnOutTrackList(ip, KADEMLIA2_PUBLISH_SOURCE_REQ) && !IsOnOutTrackList(ip, KADEMLIA2_PUBLISH_NOTES_REQ)) {
		throw wxString(CFormat(wxT("***NOTE: Received unrequested response packet, size (%u) in %s")) % lenPacket % wxString::FromAscii(__FUNCTION__));
	}
	CMemFile bio(packetData, lenPacket);
	CUInt128 file = bio.ReadUInt128();
	uint8_t load = bio.ReadUInt8();
	CSearchManager::ProcessPublishResult(file, load, true);
	if (bio.GetLength() > bio.GetPosition()) {
		// for future use
		uint8_t options = bio.ReadUInt8();
		bool requestACK = (options & 0x01) > 0;
		if (requestACK && !senderKey.IsEmpty()) {
			DebugSend(Kad2PublishResAck, ip, port);
			SendNullPacket(KADEMLIA2_PUBLISH_RES_ACK, ip, port, senderKey, NULL);
		}
	}
}

執行CSearchManager::ProcessPublishResult()作最後的處理,而後根據須要再發送一個KADEMLIA2_PUBLISH_RES_ACK消息回去。至此,整個資源的發佈過程徹底結束。

再來經過幾張圖,來總結一下,aMule資源發佈過程的消息傳遞:


文件發佈過程的消息傳遞


關鍵字發佈的消息傳遞

FileNotes發佈過程的消息傳遞

Done。

相關文章
相關標籤/搜索