Linux下電騾aMule Kademlia網絡構建分析2

讀代碼讀到如今,補充一點關於Kademlia網絡的理論知識。
node

Kademlia網絡的基本原理

Kademlia 是一種結構化的覆蓋網絡(Structured Overlay Network)。所謂覆蓋網絡,就是一種在物理的Internet之上再次構建的虛擬網絡。全部參與的節點都知道一部分其它節點的IP地址,這些節點稱爲它的鄰居。若是須要查找什麼資源,它先向本身的鄰居詢問,鄰居則在本地尋找,若是找不到,就把這個查詢轉發到它的鄰居處,但願可以查找到相應的結果。 算法

覆蓋網絡分爲結構化和非結構化的兩種,它們的區別在於一個節點所知道的其它節點是否有特定的規律。 express

在非結構化的覆蓋網中,節點之間鄰居關係的創建沒有特定的規則,節點隨機地與其它節點創建鄰居關係。在非結構化網絡中,要進行查詢,會採起一種叫作泛洪(flooding)的方法,一個節點若是在本地沒有查找到想要的結果,會把查找請求轉發到它的鄰居中,而後再經過鄰居的鄰居這種方式來進行一步步的查找。可是這種方法若是處理很差,會形成整個網絡的消息負載過大。已經有很多文章對於優化非結構化覆蓋網絡中的查詢進行了很深刻的探討。 網絡

對於結構化的覆蓋網絡,它的特色是,每一個節點會選擇性地與其它節點創建鄰居關係。搜索資源的過程當中,節點轉發搜索請求時,可以依照必定的規律選擇合適的節點來進行。這樣能大大減小搜索的代價,提高搜索的效率。 socket

結構化的覆蓋網絡一般要求每個節點隨機生成一個ID,用以判斷各個節點之間的關係。這個ID和它所在的物理網絡必須是沒有關係的。對於Kademlia網絡來講,這個ID是一個128位的數值,全部的節點都用這個ID來衡量本身與其它節點的邏輯距離。而邏輯距離的計算方法就是將兩個節點進行異或(XOR)操做。在Kademlia網絡的造成過程當中,每一個節點選擇鄰居的原則是:離本身邏輯距離越近的節點越有可能被加入到本身的鄰居節點列表中,具體來講就是在每次新獲得一個節點的信息的時候,是否把它加入到本身的鄰居節點列表是根據邏輯距離的遠近來處理的。後面分析具體程序的代碼時會有說明。 ide

結構化網絡的好處就是若是咱們要尋找一個距離某ID邏輯距離足夠近的節點,咱們能夠保證在O(logn)級別的 跳數找到。只要先尋找本身已知的離目標ID邏輯距離足夠近的節點,而後再問它知不知道更近的,而後就這樣持續下去。 函數

當須要發佈資源的時候,就對文件或關鍵字進行hash,這樣可以計算出一個128位的ID而後尋找到離這個結果邏輯距離最近的節點,把文件或者關鍵 字的信息發送給它,讓它存起來。當有人要搜索一樣的東西時,使用同一個hash算法,計算出一樣的ID,而後去搜索那些和這個ID邏輯距離相近的節點。由於它知道,若是網絡中真有這些資源的話,這些節點是最有可能知道這些信息的。 oop

由此咱們能夠看出,結構化網絡的資源查找效率是很高的,但和非結構化覆蓋網絡相比,缺點是不能進行復雜查詢,即只能經過簡單的關鍵字或者文件的hash值進行查找。非結構化網絡的查找自己就是隨機轉發的,每一個收到查詢請求的節點都對本地所具備的資源掌握的很清楚,所以天然能夠支持複雜查詢,可是顯然非結構化的網絡支持的複雜查詢不太可能動員全部的節點都來作這一動做。 學習

上面這幾段內容轉自:eMule源代碼學習心得,致敬。 優化

Kademlia網絡中依關鍵字進行資源搜索的發起

從UI操做開始。


上面的輸入框中輸入要查找的關鍵字。輸入框右邊的「類型」下拉框,能夠選擇搜索的網絡類型,咱們能夠選擇「Kad」,來從Kad網絡搜索資源。咱們在輸入完了查找關鍵字以後,鍵入回車或點擊按鈕「開始」能夠啓動搜索。

啓動Kad網絡的那個界面,如咱們前面所瞭解,其構造是在amule-2.3.1/src/ServerWnd.cpp文件的CServerWnd類中進行的。根據aMule主UI的結構,有理由猜想,建立CServerWnd對象的地方也同時爲「搜索」、「下載」、「共享文件」等界面建立了UI組件。CServerWnd對象建立的地方在amule-2.3.1/src/amuleDlg.cpp文件,CamuleDlg::CamuleDlg()函數中,在這個函數中還能看到其它一些有意思的東西呢:

m_serverwnd = new CServerWnd(p_cnt, m_srv_split_pos);
	AddLogLineN(wxEmptyString);
	AddLogLineN(wxT(" - ") +
		CFormat(_("This is aMule %s based on eMule.")) % GetMuleVersion());
	AddLogLineN(wxT("   ") +
		CFormat(_("Running on %s")) % wxGetOsDescription());
	AddLogLineN(wxT(" - ") +
		wxString(_("Visit http://www.amule.org to check if a new version is available.")));
	AddLogLineN(wxEmptyString);

#ifdef ENABLE_IP2COUNTRY
	m_GeoIPavailable = true;
	m_IP2Country = new CIP2Country(theApp->ConfigDir);
#else
	m_GeoIPavailable = false;
#endif
	m_searchwnd = new CSearchDlg(p_cnt);
	m_transferwnd = new CTransferWnd(p_cnt);
	m_sharedfileswnd = new CSharedFilesWnd(p_cnt);
	m_statisticswnd = new CStatisticsDlg(p_cnt, theApp->m_statistics);
	m_chatwnd = new CChatWnd(p_cnt);
	m_kademliawnd = CastChild(wxT("kadWnd"), CKadDlg);

	m_serverwnd->Show(false);
	m_searchwnd->Show(false);
	m_transferwnd->Show(false);
	m_sharedfileswnd->Show(false);
	m_statisticswnd->Show(false);
	m_chatwnd->Show(false);

瞅瞅那幾個變量的名字,不難猜到,「搜索」界面主要是在CSearchDlg類中建立的,而CSearchDlg類在amule-2.3.1/src/SearchDlg.cpp文件中定義。界面中最上面那個輸入查找關鍵字的文本框,應該是一個wxTextCtrl。在amule-2.3.1/src/SearchDlg.cpp文件搜索中搜索wxTextCtrl,能夠發現,那個文本框的ID應當爲IDC_SEARCHNAME。

搜索過程能夠經過兩個事件觸發,分別是文本輸入框的回車事件,和「開始」按鈕的點擊事件:

EVT_BUTTON(		IDC_STARTS,		CSearchDlg::OnBnClickedStart)
	EVT_TEXT_ENTER(	IDC_SEARCHNAME,	CSearchDlg::OnBnClickedStart)
事件被觸發以後,會執行CSearchDlg::OnBnClickedStart()函數。隨後的執行流程爲CSearchDlg::OnBnClickedStart() -> CSearchDlg::StartNewSearch() -> CSearchList::StartNewSearch()(amule-2.3.1/src/SearchList.cpp文件) -> Kademlia::CSearchManager::PrepareFindKeywords()。Kademlia::CSearchManager::PrepareFindKeywords()的執行過程以下所示(在文件amule-2.3.1/src/kademlia/kademlia/SearchManager.cpp中):

CSearch* CSearchManager::PrepareFindKeywords(const wxString& keyword, uint32_t searchTermsDataSize, const uint8_t *searchTermsData, uint32_t searchid)
{
    AddLogLineNS(wxT("") + wxString(wxT("CSearchManager::PrepareFindKeywords, keyword is ")) + keyword);
	// Create a keyword search object.
	CSearch *s = new CSearch;
	try {
		// Set search to a keyword type.
		s->SetSearchTypes(CSearch::KEYWORD);

		// Make sure we have a keyword list
		GetWords(keyword, &s->m_words, true);
		if (s->m_words.size() == 0) {
			throw wxString(_("Kademlia: search keyword too short"));
		}

		wxString wstrKeyword = s->m_words.front();

		AddLogLineNS(CFormat(_("Keyword for search: %s")) % wstrKeyword);

		// Kry - I just decided to assume everyone is unicoded
		// GonoszTopi - seconded
		KadGetKeywordHash(wstrKeyword, &s->m_target);

		// Verify that we are not already searching for this target.
		if (AlreadySearchingFor(s->m_target)) {
			throw _("Kademlia: Search keyword is already on search list: ") + wstrKeyword;
		}

		s->SetSearchTermData(searchTermsDataSize, searchTermsData);
		// Inc our searchID
		// If called from external client use predefined search id
		s->SetSearchID((searchid & 0xffffff00) == 0xffffff00 ? searchid : ++m_nextID);
		// Insert search into map
		m_searches[s->GetTarget()] = s;
		// Start search
		s->Go();
	} catch (const CEOFException& err) {
		delete s;
		wxString strError = wxT("CEOFException in ") + wxString::FromAscii(__FUNCTION__) + wxT(": ") + err.what();
		throw strError;
	} catch (const CInvalidPacket& err) {
		delete s;
		wxString strError = wxT("CInvalidPacket exception in ") + wxString::FromAscii(__FUNCTION__) + wxT(": ") + err.what();
		throw strError;
	} catch (...) {
		delete s;
		throw;
	}
	return s;
}

這個函數中主要作了這樣的一些事情:

1. 建立了一個CSearch對象s,設置其SearchType爲CSearch::KEYWORD。

2. 解析傳進來的關鍵字,將關鍵字適當地拆分爲幾個單詞。(GetWords(keyword, &s->m_words, true))

3. 取出第一個單詞做爲後面搜索的關鍵字。

4. 計算關鍵字的Hash值。(KadGetKeywordHash(wstrKeyword, &s->m_target))

5. 經過計算出的Hash值,檢查這個搜索是否已經在進行了,若是已經在進行了,則返回。不然繼續執行。

6. 設置CSearch s的SearchTermData和SearchID。

7. 將CSearch s放進searchs列表。

8. 執行CSearch s的GO()啓動搜索。(s->Go())

這裏能夠簡單看一下,搜索關鍵字的Hash值是如何計算的:

// Global function.

#include "../../CryptoPP_Inc.h"
void KadGetKeywordHash(const wxString& rstrKeyword, Kademlia::CUInt128* pKadID)
{
	byte Output[16];

	#ifdef __WEAK_CRYPTO__
		CryptoPP::Weak::MD4 md4_hasher;
	#else
		CryptoPP::MD4 md4_hasher;
	#endif

	// This should be safe - we assume rstrKeyword is ANSI anyway.
	char* ansi_buffer = strdup(unicode2UTF8(rstrKeyword));
	
	//printf("Kad keyword hash: UTF8 %s\n",ansi_buffer);
	md4_hasher.CalculateDigest(Output,(const unsigned char*)ansi_buffer,strlen(ansi_buffer));
	//DumpMem(Output,16);
	free(ansi_buffer);
	
	pKadID->SetValueBE(Output);
}

搜索過程最重要的仍是CSearch::Go()函數,在文件amule-2.3.1/src/kademlia/kademlia/Search.cpp中:

void CSearch::Go()
{
	// Start with a lot of possible contacts, this is a fallback in case search stalls due to dead contacts
	if (m_possible.empty()) {
		CUInt128 distance(CKademlia::GetPrefs()->GetKadID() ^ m_target);
		CKademlia::GetRoutingZone()->GetClosestTo(3, m_target, distance, 50, &m_possible, true, true);
	}

	if (!m_possible.empty()) {
		//Lets keep our contact list entries in mind to dec the inUse flag.
		for (ContactMap::iterator it = m_possible.begin(); it != m_possible.end(); ++it) {
			m_inUse[it->first] = it->second;
		}

		wxASSERT(m_possible.size() == m_inUse.size());

		// Take top ALPHA_QUERY to start search with.
		int count = m_type == NODE ? 1 : min(ALPHA_QUERY, (int)m_possible.size());

		// Send initial packets to start the search.
		ContactMap::iterator it = m_possible.begin();
		for (int i = 0; i < count; i++) {
			CContact *c = it->second;
			// Move to tried
			m_tried[it->first] = c;
			// Send the KadID so other side can check if I think it has the right KadID.
			// Send request
			SendFindValue(c);
			++it;
		}
	}
}

這個函數比較清晰。整體而言,它首先計算出本節點與目標資源之間的距離,而後找到與目標資源邏輯距離更進的節點並存放在m_possible map中,最後向這些節點發送查找請求。

在這裏,會根據查找的類型來肯定向其發送查找請求的節點的個數。若是要查找的是一個節點,則只向一個最近節點發送請求。若是是其餘的查找類型,則向最近的多個節點發送查找請求,但請求的個數很少於ALPHA_QUERY個,這一版本也就是3個,同時很少於查找到的全部聯繫人的個數。發送請求的過程,則經過 CSearch::SendFindValue()函數完成:

void CSearch::SendFindValue(CContact *contact, bool reaskMore)
{
	// Found a node that we think has contacts closer to our target.
	try {
		if (m_stopping) {
			return;
		}

		CMemFile packetdata(33);
		// The number of returned contacts is based on the type of search.
		uint8_t contactCount = GetRequestContactCount();

		if (reaskMore) {
			if (m_requestedMoreNodesContact == NULL) {
				m_requestedMoreNodesContact = contact;
				wxASSERT(contactCount == KADEMLIA_FIND_VALUE);
				contactCount = KADEMLIA_FIND_VALUE_MORE;
			} else {
				wxFAIL;
			}
		}

		if (contactCount > 0) {
			packetdata.WriteUInt8(contactCount);
		} else {
			return;
		}

		// Put the target we want into the packet.
		packetdata.WriteUInt128(m_target);
		// Add the ID of the contact we're contacting for sanity checks on the other end.
		packetdata.WriteUInt128(contact->GetClientID());
		if (contact->GetVersion() >= 2) {
			if (contact->GetVersion() >= 6) {
				CUInt128 clientID = contact->GetClientID();
				CKademlia::GetUDPListener()->SendPacket(packetdata, KADEMLIA2_REQ, contact->GetIPAddress(), contact->GetUDPPort(), contact->GetUDPKey(), &clientID);
			} else {
				CKademlia::GetUDPListener()->SendPacket(packetdata, KADEMLIA2_REQ, contact->GetIPAddress(), contact->GetUDPPort(), 0, NULL);
				wxASSERT(contact->GetUDPKey() == CKadUDPKey(0));
			}
。。。。。。
		} else {
			wxFAIL;
		}
	} catch (const CEOFException& err) {
		AddDebugLogLineC(logKadSearch, wxT("CEOFException in CSearch::SendFindValue: ") + err.what());
	} catch (const CInvalidPacket& err) {
		AddDebugLogLineC(logKadSearch, wxT("CInvalidPacket Exception in CSearch::SendFindValue: ") + err.what());
	} catch (const wxString& e) {
		AddDebugLogLineC(logKadSearch, wxT("Exception in CSearch::SendFindValue: ") + e);
	}
}

Kademlia網絡數據包的總體結構與其它許多網絡協議數據包的總體結構沒有太大的區別,數據包的開頭是一個Packet Header,後面是Packet數據。在這個函數中,構造的主要是數據包的Packet數據部分。咱們能夠看一下,關鍵字查詢請求數據包數據部分的結構:

請求返回的聯繫人的個數,根據請求的類型不一樣而不一樣(8位1字節) -> 要查找的目標節點的節點ID(128位16字節)-> 用於接收端進行消息校驗的目標聯繫人節點的節點ID(128位16字節)

數據包數據部分總共33個字節。

Packet數據構造好了以後,會調用CKademlia::GetUDPListener()->SendPacket()發送數據,在文件amule-2.3.1/src/kademlia/net/KademliaUDPListener.cpp中:

void CKademliaUDPListener::SendPacket(const CMemFile &data, uint8_t opcode, uint32_t destinationHost, uint16_t destinationPort, const CKadUDPKey& targetKey, const CUInt128* cryptTargetID)
{
	AddTrackedOutPacket(destinationHost, opcode);
	CPacket* packet = new CPacket(data, OP_KADEMLIAHEADER, opcode);
	if (packet->GetPacketSize() > 200) {
		packet->PackPacket();
	}
	uint8_t cryptData[16];
	uint8_t *cryptKey;
	if (cryptTargetID != NULL) {
		cryptKey = (uint8_t *)&cryptData;
		cryptTargetID->StoreCryptValue(cryptKey);
	} else {
		cryptKey = NULL;
	}
	theStats::AddUpOverheadKad(packet->GetPacketSize());
	theApp->clientudp->SendPacket(packet, wxUINT32_SWAP_ALWAYS(destinationHost), destinationPort, true, cryptKey, true, targetKey.GetKeyValue(theApp->GetPublicIP(false)));
}

在這裏,Packet數據會被再次打包,構造一個CPacket對象。能夠看一下上面的代碼中所用到的CPacket構造函數(amule-2.3.1/src/Packet.cpp中):

CPacket::CPacket(const CMemFile& datafile, uint8 protocol, uint8 ucOpcode)
{
	size		= datafile.GetLength();
	opcode		= ucOpcode;
	prot		= protocol;
	m_bSplitted 	= false;
	m_bLastSplitted = false;
	m_bPacked 	= false;
	m_bFromPF 	= false;
	memset(head, 0, sizeof head);
	tempbuffer = NULL;
	completebuffer = new byte[size + sizeof(Header_Struct)/*Why this 4?*/];
	pBuffer = completebuffer + sizeof(Header_Struct);
	
	// Write contents of MemFile to buffer (while keeping original position in file)
	off_t position = datafile.GetPosition();
	datafile.Seek(0, wxFromStart);
	datafile.Read(pBuffer, size);
	datafile.Seek(position, wxFromStart);
}


byte* CPacket::DetachPacket() {
    if (completebuffer) {
        if (!m_bSplitted) {
            memcpy(completebuffer, GetHeader(), sizeof(Header_Struct));
        }
        byte* result = completebuffer;
        completebuffer = pBuffer = NULL;
        return result;
    } else{
        if (tempbuffer){
            delete[] tempbuffer;
            tempbuffer = NULL;
        }
        tempbuffer = new byte[size+sizeof(Header_Struct)+4 /* Why this 4?*/];
        memcpy(tempbuffer,GetHeader(),sizeof(Header_Struct));
        memcpy(tempbuffer+sizeof(Header_Struct),pBuffer,size);
        byte* result = tempbuffer;
        tempbuffer = 0;
        return result;
    }
}

byte* CPacket::GetHeader() {
    wxASSERT( !m_bSplitted );

    Header_Struct* header = (Header_Struct*) head;
    header->command = opcode;
    header->eDonkeyID =  prot;
    header->packetlength = ENDIAN_SWAP_32(size + 1);

    return head;
}

amule-2.3.1/src/OtherStructs.h中:

struct Header_Struct{
	int8	eDonkeyID;
	int32	packetlength;
	int8	command;
}

CPacket主要是在Packet數據的前面又加了一個Packet Header,來描述協議等信息。在一個CPacket真的要被髮送時,它的DetachPacket()函數會被調用,以最終將CPacket構造爲一個字節流。

若是Packet數據量過大,也就是CPacket總體大小超過200字節,則在CKademlia::GetUDPListener()::SendPacket()中,Packet數據將會被壓縮。CPacket是UDP socket真正要發送到網絡上的數據包,經過CMuleUDPSocket::SendPacket()函數。

對於咱們的關鍵字查詢請求而言,它的opcode爲KADEMLIA2_REQ,它的protocol則爲OP_KADEMLIAHEADER。

Kademlia網絡中搜索請求消息的處理

看了關鍵字搜索的發起過程以後,咱們心中不免就有一個疑問,這些消息是如何被接收的,接收到這個請求的節點又會如何處理這個消息?

這還要從aMule的啓動提及。在CamuleApp::ReinitializeNetwork()函數中,會建立一個CClientUDPSocket對象,監聽一個UDP端口,默認是4672,以此來接收從其它節點發送的與Kademlia網絡相關的消息。具體代碼以下(amule-2.3.1/src/amule.cpp):

// Create the UDP socket.
	// Used for extended eMule protocol, Queue Rating, File Reask Ping.
	// Also used for Kademlia.
	// Default is port 4672.
	myaddr[3] = myaddr[1];
	myaddr[3].Service(thePrefs::GetUDPPort());
	clientudp = new CClientUDPSocket(myaddr[3], thePrefs::GetProxyData());
	if (!thePrefs::IsUDPDisabled()) {

來看CClientUDPSocket構造函數的具體實現(amule-2.3.1/src/ClientUDPSocket.cpp):

CClientUDPSocket::CClientUDPSocket(const amuleIPV4Address& address, const CProxyData* ProxyData)
	: CMuleUDPSocket(wxT("Client UDP-Socket"), ID_CLIENTUDPSOCKET_EVENT, address, ProxyData)
{
	if (!thePrefs::IsUDPDisabled()) {
		Open();
	}
}

CClientUDPSocket繼承自CMuleUDPSocket,這裏有調用到CMuleUDPSocket的構造函數來構造CMuleUDPSocket的部分,並在UDP沒有被禁用的狀況下,調用Open()函數,Open()函數是接口和實現同時繼承CMuleUDPSocket。這裏咱們看一下這幾個函數的定義(amule-2.3.1/src/MuleUDPSocket.cpp):

CMuleUDPSocket::CMuleUDPSocket(const wxString& name, int id, const amuleIPV4Address& address, const CProxyData* ProxyData)
:
m_busy(false),
m_name(name),
m_id(id),
m_addr(address),
m_proxy(ProxyData),
m_socket(NULL)
{
}


void CMuleUDPSocket::Open()
{
	wxMutexLocker lock(m_mutex);

	CreateSocket();
}


void CMuleUDPSocket::CreateSocket()
{
	wxCHECK_RET(!m_socket, wxT("Socket already opened."));
	
	m_socket = new CEncryptedDatagramSocket(m_addr, wxSOCKET_NOWAIT, m_proxy);
	m_socket->SetClientData(this);
	m_socket->SetEventHandler(*theApp, m_id);
	m_socket->SetNotify(wxSOCKET_INPUT_FLAG | wxSOCKET_OUTPUT_FLAG | wxSOCKET_LOST_FLAG);
	m_socket->Notify(true);

	if (!m_socket->Ok()) {
		AddDebugLogLineC(logMuleUDP, wxT("Failed to create valid ") + m_name);
		DestroySocket();
	} else {
		AddLogLineN(wxString(wxT("Created ")) << m_name << wxT(" at port ") << m_addr.Service());
	}
}

在Open()函數中會調用CreateSocket()建立一個CEncryptedDatagramSocket來執行底層的socket監聽的任務。能夠看到,在CreateSocket()函數中會給新建立的soket設置一個EventHandler。也就是說,在socket監聽到有數據傳進來時會產生一個事件,事件的ID如CClientUDPSocket的構造函數所指明的,爲ID_CLIENTUDPSOCKET_EVENT,而處理事件的對象將爲theApp對象。對於咱們正在分析的GUI版的aMule而言,theApp的類型爲CamuleGuiApp,咱們能夠看到這個class的Event註冊表中有這樣的幾行(amule-2.3.1/src/amule-gui.cpp):

BEGIN_EVENT_TABLE(CamuleGuiApp, wxApp)

	// Socket handlers
	// Listen Socket
	EVT_SOCKET(ID_LISTENSOCKET_EVENT, CamuleGuiApp::ListenSocketHandler)

	// UDP Socket (servers)
	EVT_SOCKET(ID_SERVERUDPSOCKET_EVENT, CamuleGuiApp::UDPSocketHandler)
	// UDP Socket (clients)
	EVT_SOCKET(ID_CLIENTUDPSOCKET_EVENT, CamuleGuiApp::UDPSocketHandler)

也就意味着,當CClientUDPSocket/CEncryptedDatagramSocket監聽到有消息進來而產生事件,該事件將會由CamuleGuiApp::UDPSocketHandler()來處理。這個函數的實現以下:

void CamuleApp::UDPSocketHandler(wxSocketEvent& event)
{
	CMuleUDPSocket* socket = (CMuleUDPSocket*)(event.GetClientData());
	wxCHECK_RET(socket, wxT("No socket owner specified."));

	if (IsOnShutDown() || thePrefs::IsUDPDisabled()) return;

	if (!IsRunning()) {
		if (event.GetSocketEvent() == wxSOCKET_INPUT) {
			// Back to the queue!
			theApp->AddPendingEvent(event);
			return;
		}
	}

	switch (event.GetSocketEvent()) {
		case wxSOCKET_INPUT:
			socket->OnReceive(0);
			break;

		case wxSOCKET_OUTPUT:
			socket->OnSend(0);
			break;
		
		case wxSOCKET_LOST:
			socket->OnDisconnected(0);
			break;

		default:
			wxFAIL;
			break;
	}
}

對於咱們分析的case而言,會調用socket的OnReceive()函數,也就是CMuleUDPSocket::OnReceive()函數(amule-2.3.1/src/MuleUDPSocket.cpp):

void CMuleUDPSocket::OnReceive(int errorCode)
{
	AddDebugLogLineN(logMuleUDP, CFormat(wxT("Got UDP callback for read: Error %i Socket state %i"))
		% errorCode % Ok());
	
	char buffer[UDP_BUFFER_SIZE];
	wxIPV4address addr;
	unsigned length = 0;
	bool error = false;
	int lastError = 0;
	
	{
		wxMutexLocker lock(m_mutex);

		if (errorCode || (m_socket == NULL) || !m_socket->Ok()) {
			DestroySocket();
			CreateSocket();

			return;
		}

		
		length = m_socket->RecvFrom(addr, buffer, UDP_BUFFER_SIZE).LastCount();
		error = m_socket->Error();
		lastError = m_socket->LastError();
	}
	
	const uint32 ip = StringIPtoUint32(addr.IPAddress());
	const uint16 port = addr.Service();
	if (error) {
		OnReceiveError(lastError, ip, port);
	} else if (length < 2) {
		// 2 bytes (protocol and opcode) is the smallets possible packet.
		AddDebugLogLineN(logMuleUDP, m_name + wxT(": Invalid Packet received"));
	} else if (!ip) {
		// wxFAIL;
		AddLogLineNS(wxT("Unknown ip receiving a UDP packet! Ignoring: '") + addr.IPAddress() + wxT("'"));
	} else if (!port) {
		// wxFAIL;
		AddLogLineNS(wxT("Unknown port receiving a UDP packet! Ignoring"));
	} else if (theApp->clientlist->IsBannedClient(ip)) {
		AddDebugLogLineN(logMuleUDP, m_name + wxT(": Dropped packet from banned IP ") + addr.IPAddress());
	} else {
		AddDebugLogLineN(logMuleUDP, (m_name + wxT(": Packet received ("))
			<< addr.IPAddress() << wxT(":") << port << wxT("): ")
			<< length << wxT("b"));
		OnPacketReceived(ip, port, (byte*)buffer, length);
	}
}

這裏會作一些檢查,而後調用OnPacketReceived(ip, port, (byte*)buffer, length),這是一個虛函數,實際調用到的將會是CClientUDPSocket::OnPacketReceived()函數,在這個函數中能夠看到以下的幾行(amule-2.3.1/src/ClientUDPSocket.cpp):

if (packetLen >= 1) {
		try {
			switch (protocol) {
				case OP_EMULEPROT:
				    AddLogLineNS(wxT("") + wxString(wxT("CClientUDPSocket::OnPacketReceived protocol OP_EMULEPROT ")));
					ProcessPacket(decryptedBuffer + 2, packetLen - 2, opcode, ip, port);
					break;

				case OP_KADEMLIAHEADER:
				    AddLogLineNS(wxT("") + wxString(wxT("CClientUDPSocket::OnPacketReceived protocol OP_KADEMLIAHEADER ")));
					theStats::AddDownOverheadKad(length);
					if (packetLen >= 2) {
						Kademlia::CKademlia::ProcessPacket(decryptedBuffer, packetLen, wxUINT32_SWAP_ALWAYS(ip), port, (Kademlia::CPrefs::GetUDPVerifyKey(ip) == receiverVerifyKey), Kademlia::CKadUDPKey(senderVerifyKey, theApp->GetPublicIP(false)));
					} else {
						throw wxString(wxT("Kad packet too short"));
					}
					break;

可見,protocol爲OP_KADEMLIAHEADER,消息將最終被轉給Kademlia::CKademlia::ProcessPacket()函數處理,來看這個函數的實現(amule-2.3.1/src/kademlia/kademlia/Kademlia.cpp):

void CKademlia::ProcessPacket(const uint8_t *data, uint32_t lenData, uint32_t ip, uint16_t port, bool validReceiverKey, const CKadUDPKey& senderKey)
{
    AddLogLineNS(wxT("") + wxString(wxT("CKademlia::ProcessPacket from ")) + CFormat(_("ip: 0x%x, port %u.")) % ip % port);
	try {
		if( instance && instance->m_udpListener ) {
			instance->m_udpListener->ProcessPacket(data, lenData, ip, port, validReceiverKey, senderKey);
		}
	} catch (const wxString& DEBUG_ONLY(error)) {
		AddDebugLogLineN(logKadMain, CFormat(wxT("Exception on Kad ProcessPacket while processing packet (length = %u) from %s:"))
			% lenData % KadIPPortToString(ip, port));
		AddDebugLogLineN(logKadMain, error);
		throw;
	} catch (...) {
		AddDebugLogLineN(logKadMain, wxT("Unhandled exception on Kad ProcessPacket"));
		throw;
	}
}

因而可知,處理消息的任務又被委託給了CKademliaUDPListener::ProcessPacket(),在這個函數裏,一樣也是一長串的swich-case,並將消息處理的任務繼續委託。能夠看到這樣的一個case(amule-2.3.1/src/kademlia/net/KademliaUDPListener.cpp):

case KADEMLIA2_REQ:
			DebugRecv(Kad2Req, ip, port);
			ProcessKademlia2Request(packetData, lenPacket, ip, port, senderKey);
			break;

KADEMLIA2_REQ正是咱們的關鍵字查詢消息的opcode,這也就是說消息處理的任務,會最終交給CKademliaUDPListener::ProcessKademlia2Request()函數來處理。

這裏再來總結一下,關鍵字查詢消息處理的任務是如何被一步步委託的:

首先是CClientUDPSocket/CEncryptedDatagramSocket接收到消息,產生一個ID_CLIENTUDPSOCKET_EVENT事件。消息被CamuleGuiApp::UDPSocketHandler()函數接收。

隨後消息被委託給CMuleUDPSocket::OnReceive()函數處理。

隨後消息再被委託給CClientUDPSocket::OnPacketReceived()函數處理。

隨後消息又一次被委託給Kademlia::CKademlia::ProcessPacket()函數處理

而後消息再次被委託,給CKademliaUDPListener::ProcessPacket()函數處理。

還沒完,消息又最終被委託給CKademliaUDPListener::ProcessKademlia2Request()處理。

好艱難。

這裏再來仔細地看一下CKademliaUDPListener::ProcessKademlia2Request()函數:

// KADEMLIA2_REQ
// Used in Kad2.0 only
void CKademliaUDPListener::ProcessKademlia2Request(const uint8_t *packetData, uint32_t lenPacket, uint32_t ip, uint16_t port, const CKadUDPKey& senderKey)
{
	// Get target and type
	CMemFile bio(packetData, lenPacket);
	uint8_t type = bio.ReadUInt8();
	type &= 0x1F;
	if (type == 0) {
		throw wxString(CFormat(wxT("***NOTE: Received wrong type (0x%02x) in %s")) % type % wxString::FromAscii(__FUNCTION__));
	}

	// This is the target node trying to be found.
	CUInt128 target = bio.ReadUInt128();
	// Convert Target to Distance as this is how we store contacts.
	CUInt128 distance(CKademlia::GetPrefs()->GetKadID());
	distance.XOR(target);

	// This makes sure we are not mistaken identify. Some client may have fresh installed and have a new KadID.
	CUInt128 check = bio.ReadUInt128();
	if (CKademlia::GetPrefs()->GetKadID() == check) {
		// Get required number closest to target
		ContactMap results;
		CKademlia::GetRoutingZone()->GetClosestTo(2, target, distance, type, &results);
		uint8_t count = (uint8_t)results.size();

		// Write response
		// Max count is 32. size 817..
		// 16 + 1 + 25(32)
		CMemFile packetdata(817);
		packetdata.WriteUInt128(target);
		packetdata.WriteUInt8(count);
		CContact *c;
		for (ContactMap::const_iterator it = results.begin(); it != results.end(); ++it) {
			c = it->second;
			packetdata.WriteUInt128(c->GetClientID());
			packetdata.WriteUInt32(c->GetIPAddress());
			packetdata.WriteUInt16(c->GetUDPPort());
			packetdata.WriteUInt16(c->GetTCPPort());
			packetdata.WriteUInt8(c->GetVersion()); //<- Kad Version inserted to allow backward compatibility.
		}

		DebugSendF(CFormat(wxT("Kad2Res (count=%u)")) % count, ip, port);
		SendPacket(packetdata, KADEMLIA2_RES, ip, port, senderKey, NULL);
	}
}

發送請求的過程就是各類數據打包,而這個函數,則執行徹底相反的操做,也就是解開數據包並處理。能夠看到,此處解出了type,解出了target contact ID,解出了數據包發送的目的contact ID。而後對這些數據進行檢驗,看看數據包是否是發送給本身的。若是是,則從本身的聯繫人列表中找出距離target contact邏輯距離更近的聯繫人,並把他們的信息經過一個KADEMLIA2_RES消息發送給請求者。

KADEMLIA2_RES消息的具體處理過程能夠參考Linux下電騾aMule Kademlia網絡構建分析5 —— 資源的發佈 一文。

如在Linux下電騾aMule Kademlia網絡構建分析5 —— 資源的發佈 一文所述,aMule會經過一個定時器來週期性的推進search的持續進行,也就是Kademlia::CKademlia::Process()->CSearchManager::JumpStart(),在amule-2.3.1/src/kademlia/kademlia/SearchManager.cpp中,能夠看到這樣的幾個case:

while (next_it != m_searches.end()) {
		SearchMap::iterator current_it = next_it++; /* don't change this to a ++next_it! */
		switch(current_it->second->GetSearchTypes()){
			case CSearch::FILE: {
				if (current_it->second->m_created + SEARCHFILE_LIFETIME < now) {
					delete current_it->second;
					m_searches.erase(current_it);
				} else if (current_it->second->GetAnswers() > SEARCHFILE_TOTAL ||
					   current_it->second->m_created + SEARCHFILE_LIFETIME - SEC(20) < now) {
					current_it->second->PrepareToStop();
				} else {
					current_it->second->JumpStart();
				}					
				break;
			}
			case CSearch::KEYWORD: {
				if (current_it->second->m_created + SEARCHKEYWORD_LIFETIME < now) {
					delete current_it->second;
					m_searches.erase(current_it);
				} else if (current_it->second->GetAnswers() > SEARCHKEYWORD_TOTAL ||
					   current_it->second->m_created + SEARCHKEYWORD_LIFETIME - SEC(20) < now) {
					current_it->second->PrepareToStop();
				} else {
					current_it->second->JumpStart();
				}
				break;
			}
			case CSearch::NOTES: {
				if (current_it->second->m_created + SEARCHNOTES_LIFETIME < now) {
					delete current_it->second;
					m_searches.erase(current_it);
				} else if (current_it->second->GetAnswers() > SEARCHNOTES_TOTAL ||
					   current_it->second->m_created + SEARCHNOTES_LIFETIME - SEC(20) < now) {
					current_it->second->PrepareToStop();
				} else {
					current_it->second->JumpStart();
				}
				break;
			}

它們的處理與資源發佈時CSearchManager對於資源發佈的Search的處理很類似,此處再也不贅述,具體能夠參考Linux下電騾aMule Kademlia網絡構建分析5 —— 資源的發佈 一文。最終要的執行路徑都是會執行CSearch::JumpStart()。CSearch::JumpStart()最重要的執行路徑會最終執行CSearch::StorePacket(),相關的更詳細的分析也能夠參考同一篇文章

這裏再來看一下CSearch::StorePacket()中與資源搜索相關的幾個case(amule-2.3.1/src/kademlia/kademlia/Search.cpp):

// What kind of search are we doing?
	switch (m_type) {
		case FILE: {
			AddDebugLogLineN(logKadSearch, wxT("Search request type: File"));
			CMemFile searchTerms;
			searchTerms.WriteUInt128(m_target);
			if (from->GetVersion() >= 3) {
				// Find file we are storing info about.
				uint8_t fileid[16];
				m_target.ToByteArray(fileid);
				CKnownFile *file = theApp->downloadqueue->GetFileByID(CMD4Hash(fileid));
				if (file) {
					// Start position range (0x0 to 0x7FFF)
					searchTerms.WriteUInt16(0);
					searchTerms.WriteUInt64(file->GetFileSize());
					DebugSend(Kad2SearchSourceReq, from->GetIPAddress(), from->GetUDPPort());
					if (from->GetVersion() >= 6) {
						CUInt128 clientID = from->GetClientID();
						CKademlia::GetUDPListener()->SendPacket(searchTerms, KADEMLIA2_SEARCH_SOURCE_REQ, from->GetIPAddress(), from->GetUDPPort(), from->GetUDPKey(), &clientID);
					} else {
						CKademlia::GetUDPListener()->SendPacket(searchTerms, KADEMLIA2_SEARCH_SOURCE_REQ, from->GetIPAddress(), from->GetUDPPort(), 0, NULL);
						wxASSERT(from->GetUDPKey() == CKadUDPKey(0));
					}
				} else {
					PrepareToStop();
					break;
				}
			} else {
				searchTerms.WriteUInt8(1);
				DebugSendF(wxT("KadSearchReq(File)"), from->GetIPAddress(), from->GetUDPPort());
				CKademlia::GetUDPListener()->SendPacket(searchTerms, KADEMLIA_SEARCH_REQ, from->GetIPAddress(), from->GetUDPPort(), 0, NULL);
			}
			m_totalRequestAnswers++;
			break;
		}
		case KEYWORD: {
			AddDebugLogLineN(logKadSearch, wxT("Search request type: Keyword"));
			CMemFile searchTerms;
			searchTerms.WriteUInt128(m_target);
			if (from->GetVersion() >= 3) {
				if (m_searchTermsDataSize == 0) {
					// Start position range (0x0 to 0x7FFF)
					searchTerms.WriteUInt16(0);
				} else {
					// Start position range (0x8000 to 0xFFFF)
					searchTerms.WriteUInt16(0x8000);
					searchTerms.Write(m_searchTermsData, m_searchTermsDataSize);
				}
				DebugSend(Kad2SearchKeyReq, from->GetIPAddress(), from->GetUDPPort());
			} else {
				if (m_searchTermsDataSize == 0) {
					searchTerms.WriteUInt8(0);
					// We send this extra byte to flag we handle large files.
					searchTerms.WriteUInt8(0);
				} else {
					// Set to 2 to flag we handle large files.
					searchTerms.WriteUInt8(2);
					searchTerms.Write(m_searchTermsData, m_searchTermsDataSize);
				}
				DebugSendF(wxT("KadSearchReq(Keyword)"), from->GetIPAddress(), from->GetUDPPort());
			}
			if (from->GetVersion() >= 6) {
				CUInt128 clientID = from->GetClientID();
				CKademlia::GetUDPListener()->SendPacket(searchTerms, KADEMLIA2_SEARCH_KEY_REQ, from->GetIPAddress(), from->GetUDPPort(), from->GetUDPKey(), &clientID);
			} else if (from->GetVersion() >= 3) {
				CKademlia::GetUDPListener()->SendPacket(searchTerms, KADEMLIA2_SEARCH_KEY_REQ, from->GetIPAddress(), from->GetUDPPort(), 0, NULL);
				wxASSERT(from->GetUDPKey() == CKadUDPKey(0));
			} else {
				CKademlia::GetUDPListener()->SendPacket(searchTerms, KADEMLIA_SEARCH_REQ, from->GetIPAddress(), from->GetUDPPort(), 0, NULL);
			}
			m_totalRequestAnswers++;
			break;
		}
		case NOTES: {
			AddDebugLogLineN(logKadSearch, wxT("Search request type: Notes"));
			// Write complete packet.
			CMemFile searchTerms;
			searchTerms.WriteUInt128(m_target);
			if (from->GetVersion() >= 3) {
				// Find file we are storing info about.
				uint8_t fileid[16];
				m_target.ToByteArray(fileid);
				CKnownFile *file = theApp->sharedfiles->GetFileByID(CMD4Hash(fileid));
				if (file) {
					// Start position range (0x0 to 0x7FFF)
					searchTerms.WriteUInt64(file->GetFileSize());
					DebugSend(Kad2SearchNotesReq, from->GetIPAddress(), from->GetUDPPort());
					if (from->GetVersion() >= 6) {
						CUInt128 clientID = from->GetClientID();
						CKademlia::GetUDPListener()->SendPacket(searchTerms, KADEMLIA2_SEARCH_NOTES_REQ, from->GetIPAddress(), from->GetUDPPort(), from->GetUDPKey(), &clientID);
					} else {
						CKademlia::GetUDPListener()->SendPacket(searchTerms, KADEMLIA2_SEARCH_NOTES_REQ, from->GetIPAddress(), from->GetUDPPort(), 0, NULL);
						wxASSERT(from->GetUDPKey() == CKadUDPKey(0));
					}
				} else {
					PrepareToStop();
					break;
				}
			} else {
				searchTerms.WriteUInt128(CKademlia::GetPrefs()->GetKadID());
				DebugSend(KadSearchNotesReq, from->GetIPAddress(), from->GetUDPPort());
				CKademlia::GetUDPListener()->SendPacket(searchTerms, KADEMLIA_SEARCH_NOTES_REQ, from->GetIPAddress(), from->GetUDPPort(), 0, NULL);
			}
			m_totalRequestAnswers++;
			break;
		}
這裏咱們逐個分析這三個case,

Case 1,搜索文件。

主要是將要搜索的文件相關的信息,好比target,文件的大小,請求的文件數據的偏移量等,打包進CMemFile searchTerms,並將searchTerms封裝爲一個KADEMLIA2_SEARCH_SOURCE_REQ消息,經過CKademliaUDPListener::SendPacket()函數發送出去。SendPacket()函數的具體執行過程,能夠參考本篇前述內容。

Case 2,搜索關鍵字Keyword。

主要是將要搜索的文件相關的信息,好比關鍵字的hash值target等,打包進CMemFile searchTerms,並將searchTerms封裝爲一個KADEMLIA2_SEARCH_KEY_REQ消息,經過CKademliaUDPListener::SendPacket()函數發送出去。SendPacket()函數的具體執行過程,能夠參考本篇前述內容。

Case 3,搜索FileNotes。

搜索FileNotes則主要是將打包好的數據封裝爲一個KADEMLIA2_SEARCH_NOTES_REQ消息經過CKademliaUDPListener::SendPacket()函數發送出去。其它則與前兩種case類似,固然信息的格式會有一些差別。

Kademlia網絡中具體搜索請求的處理

咱們稱KADEMLIA2_SEARCH_SOURCE_REQKADEMLIA2_SEARCH_KEY_REQKADEMLIA2_SEARCH_NOTES_REQ等消息爲具體的搜索請求。接收端在收到這些具體的搜索請求時又會如何處理呢?

消息的詳細dispatch過程,如前文,此處再也不贅述。咱們直接來看CKademliaUDPListener::ProcessPacket()中對於這些消息的處理:

case KADEMLIA2_SEARCH_NOTES_REQ:
			DebugRecv(Kad2SearchNotesReq, ip, port);
			Process2SearchNotesRequest(packetData, lenPacket, ip, port, senderKey);
			break;
		case KADEMLIA2_SEARCH_KEY_REQ:
			DebugRecv(Kad2SearchKeyReq, ip, port);
			Process2SearchKeyRequest(packetData, lenPacket, ip, port, senderKey);
			break;
		case KADEMLIA2_SEARCH_SOURCE_REQ:
			DebugRecv(Kad2SearchSourceReq, ip, port);
			Process2SearchSourceRequest(packetData, lenPacket, ip, port, senderKey);
			break;

能夠看到這裏是把這些消息的處理分別委託給了Process2SearchNotesRequest(),Process2SearchKeyRequest()和Process2SearchSourceRequest()等幾個函數。

// KADEMLIA2_SEARCH_KEY_REQ
// Used in Kad2.0 only
void CKademliaUDPListener::Process2SearchKeyRequest(const uint8_t *packetData, uint32_t lenPacket, uint32_t ip, uint16_t port, const CKadUDPKey& senderKey)
{
	CMemFile bio(packetData, lenPacket);
	CUInt128 target = bio.ReadUInt128();
	uint16_t startPosition = bio.ReadUInt16();
	bool restrictive = ((startPosition & 0x8000) == 0x8000);
	startPosition &= 0x7FFF;
	SSearchTerm* pSearchTerms = NULL;
	if (restrictive) {
		pSearchTerms = CreateSearchExpressionTree(bio, 0);
		if (pSearchTerms == NULL) {
			throw wxString(wxT("Invalid search expression"));
		}
	}
	CKademlia::GetIndexed()->SendValidKeywordResult(target, pSearchTerms, ip, port, false, startPosition, senderKey);
	if (pSearchTerms) {
		Free(pSearchTerms);
	}
}

// KADEMLIA2_SEARCH_SOURCE_REQ
// Used in Kad2.0 only
void CKademliaUDPListener::Process2SearchSourceRequest(const uint8_t *packetData, uint32_t lenPacket, uint32_t ip, uint16_t port, const CKadUDPKey& senderKey)
{
	CMemFile bio(packetData, lenPacket);
	CUInt128 target = bio.ReadUInt128();
	uint16_t startPosition = (bio.ReadUInt16() & 0x7FFF);
	uint64_t fileSize = bio.ReadUInt64();
	CKademlia::GetIndexed()->SendValidSourceResult(target, ip, port, startPosition, fileSize, senderKey);
}

。。。。。。

// KADEMLIA2_SEARCH_NOTES_REQ
// Used only by Kad2.0
void CKademliaUDPListener::Process2SearchNotesRequest(const uint8_t *packetData, uint32_t lenPacket, uint32_t ip, uint16_t port, const CKadUDPKey& senderKey)
{
	CMemFile bio(packetData, lenPacket);
	CUInt128 target = bio.ReadUInt128();
	uint64_t fileSize = bio.ReadUInt64();
	CKademlia::GetIndexed()->SendValidNoteResult(target, ip, port, fileSize, senderKey);
}

在這幾個函數中,能夠看到,都是從收到的數據包中解出一些信息,而後經過CIndexed的一些方法將請求端想要的數據發回去。在aMule中,CIndexed用來管理能夠共享的文件的相關信息。具體來看CIndexed中這幾個函數的處理過程(amule-2.3.1/src/kademlia/kademlia/Indexed.cpp):

void CIndexed::SendValidKeywordResult(const CUInt128& keyID, const SSearchTerm* pSearchTerms, uint32_t ip, uint16_t port, bool oldClient, uint16_t startPosition, const CKadUDPKey& senderKey)
{
	KeyHash* currKeyHash = NULL;
	KeyHashMap::iterator itKeyHash = m_Keyword_map.find(keyID);
	if (itKeyHash != m_Keyword_map.end()) {
		currKeyHash = itKeyHash->second;
		CMemFile packetdata(1024 * 50);
		packetdata.WriteUInt128(Kademlia::CKademlia::GetPrefs()->GetKadID());
		packetdata.WriteUInt128(keyID);
		packetdata.WriteUInt16(50);
		const uint16_t maxResults = 300;
		int count = 0 - startPosition;

		// we do 2 loops: In the first one we ignore all results which have a trustvalue below 1
		// in the second one we then also consider those. That way we make sure our 300 max results are not full
		// of spam entries. We could also sort by trustvalue, but we would risk to only send popular files this way
		// on very hot keywords
		bool onlyTrusted = true;
		DEBUG_ONLY( uint32_t dbgResultsTrusted = 0; )
		DEBUG_ONLY( uint32_t dbgResultsUntrusted = 0; )

		do {
			for (CSourceKeyMap::iterator itSource = currKeyHash->m_Source_map.begin(); itSource != currKeyHash->m_Source_map.end(); ++itSource) {
				Source* currSource =  itSource->second;

				for (CKadEntryPtrList::iterator itEntry = currSource->entryList.begin(); itEntry != currSource->entryList.end(); ++itEntry) {
					Kademlia::CKeyEntry* currName = static_cast<Kademlia::CKeyEntry*>(*itEntry);
					wxASSERT(currName->IsKeyEntry());
					if ((onlyTrusted ^ (currName->GetTrustValue() < 1.0)) && (!pSearchTerms || currName->SearchTermsMatch(pSearchTerms))) {
						if (count < 0) {
							count++;
						} else if ((uint16_t)count < maxResults) {
							if (!oldClient || currName->m_uSize <= OLD_MAX_FILE_SIZE) {
								count++;
#ifdef __DEBUG__
								if (onlyTrusted) {
									dbgResultsTrusted++;
								} else {
									dbgResultsUntrusted++;
								}
#endif
								packetdata.WriteUInt128(currName->m_uSourceID);
								currName->WriteTagListWithPublishInfo(&packetdata);
								if (count % 50 == 0) {
									DebugSend(Kad2SearchRes, ip, port);
									CKademlia::GetUDPListener()->SendPacket(packetdata, KADEMLIA2_SEARCH_RES, ip, port, senderKey, NULL);
									// Reset the packet, keeping the header (Kad id, key id, number of entries)
									packetdata.SetLength(16 + 16 + 2);
								}
							}
						} else {
							itSource = currKeyHash->m_Source_map.end();
							--itSource;
							break;
						}
					}
				}
			}

			if (onlyTrusted && count < (int)maxResults) {
				onlyTrusted = false;
			} else {
				break;
			}
		} while (!onlyTrusted);

		AddDebugLogLineN(logKadIndex, CFormat(wxT("Kad keyword search result request: Sent %u trusted and %u untrusted results")) % dbgResultsTrusted % dbgResultsUntrusted);

		if (count > 0) {
			uint16_t countLeft = (uint16_t)count % 50;
			if (countLeft) {
				packetdata.Seek(16 + 16);
				packetdata.WriteUInt16(countLeft);
				DebugSend(Kad2SearchRes, ip, port);
				CKademlia::GetUDPListener()->SendPacket(packetdata, KADEMLIA2_SEARCH_RES, ip, port, senderKey, NULL);
			}
		}
	}
	Clean();
}

void CIndexed::SendValidSourceResult(const CUInt128& keyID, uint32_t ip, uint16_t port, uint16_t startPosition, uint64_t fileSize, const CKadUDPKey& senderKey)
{
	SrcHash* currSrcHash = NULL;
	SrcHashMap::iterator itSrcHash = m_Sources_map.find(keyID);
	if (itSrcHash != m_Sources_map.end()) {
		currSrcHash = itSrcHash->second;
		CMemFile packetdata(1024*50);
		packetdata.WriteUInt128(Kademlia::CKademlia::GetPrefs()->GetKadID());
		packetdata.WriteUInt128(keyID);
		packetdata.WriteUInt16(50);
		uint16_t maxResults = 300;
		int count = 0 - startPosition;

		for (CKadSourcePtrList::iterator itSource = currSrcHash->m_Source_map.begin(); itSource != currSrcHash->m_Source_map.end(); ++itSource) {
			Source* currSource = *itSource;	
			if (currSource->entryList.size()) {
				Kademlia::CEntry* currName = currSource->entryList.front();
				if (count < 0) {
					count++;
				} else if (count < maxResults) {
					if (!fileSize || !currName->m_uSize || currName->m_uSize == fileSize) {
						packetdata.WriteUInt128(currName->m_uSourceID);
						currName->WriteTagList(&packetdata);
						count++;
						if (count % 50 == 0) {
							DebugSend(Kad2SearchRes, ip, port);
							CKademlia::GetUDPListener()->SendPacket(packetdata, KADEMLIA2_SEARCH_RES, ip, port, senderKey, NULL);
							// Reset the packet, keeping the header (Kad id, key id, number of entries)
							packetdata.SetLength(16 + 16 + 2);
						}
					}
				} else {
					break;
				}
			}
		}

		if (count > 0) {
			uint16_t countLeft = (uint16_t)count % 50;
			if (countLeft) {
				packetdata.Seek(16 + 16);
				packetdata.WriteUInt16(countLeft);
				DebugSend(Kad2SearchRes, ip, port);
				CKademlia::GetUDPListener()->SendPacket(packetdata, KADEMLIA2_SEARCH_RES, ip, port, senderKey, NULL);
			}
		}
	}
	Clean();
}

void CIndexed::SendValidNoteResult(const CUInt128& keyID, uint32_t ip, uint16_t port, uint64_t fileSize, const CKadUDPKey& senderKey)
{
	SrcHash* currNoteHash = NULL;
	SrcHashMap::iterator itNote = m_Notes_map.find(keyID);
	if (itNote != m_Notes_map.end()) {
		currNoteHash = itNote->second;		
		CMemFile packetdata(1024*50);
		packetdata.WriteUInt128(Kademlia::CKademlia::GetPrefs()->GetKadID());
		packetdata.WriteUInt128(keyID);
		packetdata.WriteUInt16(50);
		uint16_t maxResults = 150;
		uint16_t count = 0;

		for (CKadSourcePtrList::iterator itSource = currNoteHash->m_Source_map.begin(); itSource != currNoteHash->m_Source_map.end(); ++itSource ) {
			Source* currNote = *itSource;
			if (currNote->entryList.size()) {
				Kademlia::CEntry* currName = currNote->entryList.front();
				if (count < maxResults) {
					if (!fileSize || !currName->m_uSize || fileSize == currName->m_uSize) {
						packetdata.WriteUInt128(currName->m_uSourceID);
						currName->WriteTagList(&packetdata);
						count++;
						if (count % 50 == 0) {
							DebugSend(Kad2SearchRes, ip, port);
							CKademlia::GetUDPListener()->SendPacket(packetdata, KADEMLIA2_SEARCH_RES, ip, port, senderKey, NULL);
							// Reset the packet, keeping the header (Kad id, key id, number of entries)
							packetdata.SetLength(16 + 16 + 2);
						}
					}
				} else {
					break;
				}
			}
		}

		uint16_t countLeft = count % 50;
		if (countLeft) {
			packetdata.Seek(16 + 16);
			packetdata.WriteUInt16(countLeft);
			DebugSend(Kad2SearchRes, ip, port);
			CKademlia::GetUDPListener()->SendPacket(packetdata, KADEMLIA2_SEARCH_RES, ip, port, senderKey, NULL);
		}
	}
}

他們都是將請求的信息打包爲CMemFile packetdata,而後包裝爲一個KADEMLIA2_SEARCH_RES消息,經過CKademliaUDPListener::SendPacket()函數發送出去。

球再次被踢會到搜索請求的發起端。來看CKademliaUDPListener::ProcessPacket()中對於KADEMLIA2_SEARCH_RES消息的處理:

case KADEMLIA2_SEARCH_RES:
			DebugRecv(Kad2SearchRes, ip, port);
			Process2SearchResponse(packetData, lenPacket, senderKey);
			break;


。。。。。。

void CKademliaUDPListener::ProcessSearchResponse(CMemFile& bio)
{
	// What search does this relate to
	CUInt128 target = bio.ReadUInt128();

	// How many results..
	uint16_t count = bio.ReadUInt16();
	while (count > 0) {
		// What is the answer
		CUInt128 answer = bio.ReadUInt128();

		// Get info about answer
		// NOTE: this is the one and only place in Kad where we allow string conversion to local code page in
		// case we did not receive an UTF8 string. this is for backward compatibility for search results which are 
		// supposed to be 'viewed' by user only and not feed into the Kad engine again!
		// If that tag list is once used for something else than for viewing, special care has to be taken for any
		// string conversion!
 		CScopedContainer<TagPtrList> tags;
		bio.ReadTagPtrList(tags.get(), true/*bOptACP*/);
		CSearchManager::ProcessResult(target, answer, tags.get());
		count--;
	}
}

。。。。。。

// KADEMLIA2_SEARCH_RES
// Used in Kad2.0 only
void CKademliaUDPListener::Process2SearchResponse(const uint8_t *packetData, uint32_t lenPacket, const CKadUDPKey& WXUNUSED(senderKey))
{
	CMemFile bio(packetData, lenPacket);

	// Who sent this packet.
	bio.ReadUInt128();

	ProcessSearchResponse(bio);
}

搜索返回的信息會再轉給CSearchManager::ProcessResult()處理(amule-2.3.1/src/kademlia/kademlia/SearchManager.cpp):

void CSearchManager::ProcessResult(const CUInt128& target, const CUInt128& answer, TagPtrList *info)
{
	// We have results for a request for info.
	CSearch *s = NULL;
	SearchMap::const_iterator it = m_searches.find(target);
	if (it != m_searches.end()) {
		s = it->second;
	}

	// If this search was deleted before these results, delete contacts and abort, otherwise process them.
	if (s == NULL) {
		AddDebugLogLineN(logKadSearch,
			wxT("Search either never existed or receiving late results (CSearchManager::ProcessResult)"));
	} else {
		s->ProcessResult(answer, info);
	}
}

搜索的結果最後仍是要由具體的search本身來處理的:

void CSearch::ProcessResult(const CUInt128& answer, TagPtrList *info)
{
	wxString type = wxT("Unknown");
	switch (m_type) {
		case FILE:
			type = wxT("File");
			ProcessResultFile(answer, info);
			break;
		case KEYWORD:
			type = wxT("Keyword");
			ProcessResultKeyword(answer, info);
			break;
		case NOTES:
			type = wxT("Notes");
			ProcessResultNotes(answer, info);
			break;
	}
	AddDebugLogLineN(logKadSearch, wxT("Got result (") + type + wxT(")"));
}

整個搜索的過程大致如此。

Done。

相關文章
相關標籤/搜索