Linux下電騾aMule Kademlia網絡構建分析3

將本節點加入Kademlia網絡

鏈接請求的發起

aMule在啓動的時候,會起一些定時器,以便於按期的執行一些任務。其中比較重要的就是core_timer,相關code以下(amule-2.3.1/src/amule-gui.cpp): node

// Create the Core timer
	core_timer = new CTimer(this,ID_CORE_TIMER_EVENT);
	if (!core_timer) {
		AddLogLineCS(_("Fatal Error: Failed to create Core Timer"));
		OnExit();
	}

	// Start the Core and Gui timers

	// Note: wxTimer can be off by more than 10% !!!
	// In addition to the systematic error introduced by wxTimer, we are losing
	// timer cycles due to high CPU load.  I've observed about 0.5% random loss of cycles under
	// low load, and more than 6% lost cycles with heavy download traffic and/or other tasks
	// in the system, such as a video player or a VMware virtual machine.
	// The upload queue process loop has now been rewritten to compensate for timer errors.
	// When adding functionality, assume that the timer is only approximately correct;
	// for measurements, always use the system clock [::GetTickCount()].
	core_timer->Start(CORE_TIMER_PERIOD);
wxWidgets的定時器,按期的產生一些事件,具體的事件在Timer建立時傳入,而定時器的週期則在Start()時傳入。在amule-2.3.1/src/amule.h中能夠看到CORE_TIMER_PERIOD的定義爲100,也就是說定時器的週期是100ms。

在amule-2.3.1/src/amule-gui.cpp的EventTable中,能夠看到事件將由CamuleGuiApp::OnCoreTimer()處理: 網絡

// Core timer
	EVT_MULE_TIMER(ID_CORE_TIMER_EVENT, CamuleGuiApp::OnCoreTimer)

在CamuleApp::OnCoreTimer()函數(amule-2.3.1/src/amule.cpp)中,會執行Kademlia::CKademlia::Process(): 數據結構

if (msCur-msPrev1 > 1000) {  // approximately every second
		msPrev1 = msCur;
		clientcredits->Process();
		clientlist->Process();
		
		// Publish files to server if needed.
		sharedfiles->Process();
		
		if( Kademlia::CKademlia::IsRunning() ) {
			Kademlia::CKademlia::Process();
			if(Kademlia::CKademlia::GetPrefs()->HasLostConnection()) {
				StopKad();
				clientudp->Close();
				clientudp->Open();
				if (thePrefs::Reconnect()) {
					StartKad();
				}
			}
		}
在Kademlia::CKademlia::Process()(文件amule-2.3.1/src/kademlia/kademlia/Kademlia.cpp)中,主要來關注以下的幾行:

if (m_nextSelfLookup <= now) {
		CSearchManager::FindNode(instance->m_prefs->GetKadID(), true);
		m_nextSelfLookup = HR2S(4) + now;
	}
回想 Kademlia 網絡在啓動的時候,會執行的CKademlia::Start(),其中有這麼幾行:
// Force a FindNodeComplete within the first 3 minutes.
	m_nextSelfLookup = time(NULL) + MIN2S(3);
綜合來看這兩段code,也就是說,在啓動以後3分鐘,將首次執行 CSearchManager::FindNode(instance->m_prefs->GetKadID(), true) ,然後,則將每隔4個小時執行這個方法一次。

也就意味着,在Kademlia模塊啓動以後3分鐘,將首次搜尋KadID與本節點最接近的節點,而後與它們創建鏈接,並將它們做爲鄰居節點。以後則將每隔4個小時執行一次相同的過程。 app

那咱們來看CSearchManager::FindNode()的執行(amule-2.3.1/src/kademlia/kademlia/SearchManager.cpp): dom

void CSearchManager::FindNode(const CUInt128& id, bool complete)
{
	// Do a node lookup.
	CSearch *s = new CSearch;
	if (complete) {
		s->SetSearchTypes(CSearch::NODECOMPLETE);
	} else {
		s->SetSearchTypes(CSearch::NODE);
	}
	s->SetTargetID(id);
	StartSearch(s);
}

。。。。。。

bool CSearchManager::StartSearch(CSearch* search)
{
	// A search object was created, now try to start the search.
	if (AlreadySearchingFor(search->GetTarget())) {
		// There was already a search in progress with this target.
		delete search;
		return false;
	}
	// Add to the search map
	m_searches[search->GetTarget()] = search;
	// Start the search.
	search->Go();
	return true;
}

如咱們前面Linux下電騾aMule Kademlia網絡構建分析2中看到的,這個請求發出去,能獲得的響應也只是一些節點的信息。 tcp

那Kademlia網絡中一個節點是如何鏈接到網絡中的另外一個節點的呢?先回想一下 Linux下電騾aMule Kademlia網絡構建分析I 一文,CRoutingZone的初始化函數CRoutingZone::Init()會調用到CRoutingZone::StartTimer(),其中又調用了CKademlia::AddEvent(),以下所示(amule-2.3.1/src/kademlia/routing/RoutingZone.cpp): ide

void CRoutingZone::StartTimer()
{
	// Start filling the tree, closest bins first.
	m_nextBigTimer = time(NULL) + SEC(10);
	CKademlia::AddEvent(this);
}
再來看CKademlia::AddEvent(),在amule-2.3.1/src/kademlia/kademlia/Kademlia.h中:

static void AddEvent(CRoutingZone *zone) throw()		{ m_events[zone] = zone; }

也就是把當前CRoutingZone對象的指針,保存在CKademlia的一個map中,key和value都是該指針。 函數

在按期會被執行的CKademlia::Process()函數中,咱們還能看到以下的這樣一段code: oop

for (EventMap::const_iterator it = m_events.begin(); it != m_events.end(); ++it) {
		CRoutingZone *zone = it->first;
		if (updateUserFile) {
			// The EstimateCount function is not made for really small networks, if we are in LAN mode, it is actually
			// better to assume that all users of the network are in our routing table and use the real count function
			if (IsRunningInLANMode()) {
				tempUsers = zone->GetNumContacts();
			} else {
				tempUsers = zone->EstimateCount();
			}
			if (maxUsers < tempUsers) {
				maxUsers = tempUsers;
			}
		}

		if (m_bigTimer <= now) {
			if (zone->m_nextBigTimer <= now) {
				if(zone->OnBigTimer()) {
					zone->m_nextBigTimer = HR2S(1) + now;
					m_bigTimer = SEC(10) + now;
				}
			} else {
				if (lastContact && (now - lastContact > KADEMLIADISCONNECTDELAY - MIN2S(5))) {
					if(zone->OnBigTimer()) {
						zone->m_nextBigTimer = HR2S(1) + now;
						m_bigTimer = SEC(10) + now;
					}
				} 
			}
		}

		if (zone->m_nextSmallTimer <= now) {
			zone->OnSmallTimer();
			zone->m_nextSmallTimer = MIN2S(1) + now;
		}
	}
也就是遍歷全部的CRoutingZone對象,並適時地調用一些CRoutingZone對象的須要定時執行的一些方法。其中會調用到 CRoutingZone::OnSmallTimer() 函數,週期大約爲1分鐘。咱們能夠具體來看一下這個函數的實現(amule-2.3.1/src/kademlia/routing/RoutingZone.cpp):

void CRoutingZone::OnSmallTimer()
{
	if (!IsLeaf()) {
		return;
	}
	
	CContact *c = NULL;
	time_t now = time(NULL);
	ContactList entries;

	// Remove dead entries
	m_bin->GetEntries(&entries);
	for (ContactList::iterator it = entries.begin(); it != entries.end(); ++it) {
		c = *it;
		if (c->GetType() == 4) {
			if ((c->GetExpireTime() > 0) && (c->GetExpireTime() <= now)) {
				if (!c->InUse()) {
					m_bin->RemoveContact(c);
					delete c;
				}
				continue;
			}
		}
		if(c->GetExpireTime() == 0) {
			c->SetExpireTime(now);
		}
	}

	c = m_bin->GetOldest();
	if (c != NULL) {
		if (c->GetExpireTime() >= now || c->GetType() == 4) {
			m_bin->PushToBottom(c);
			c = NULL;
		}
	}

	if (c != NULL) {
		c->CheckingType();
		if (c->GetVersion() >= 6) {
			DebugSend(Kad2HelloReq, c->GetIPAddress(), c->GetUDPPort());
			CUInt128 clientID = c->GetClientID();
			CKademlia::GetUDPListener()->SendMyDetails(KADEMLIA2_HELLO_REQ, c->GetIPAddress(), c->GetUDPPort(), c->GetVersion(), c->GetUDPKey(), &clientID, false);
			if (c->GetVersion() >= 8) {
				// FIXME:
				// This is a bit of a work around for statistic values. Normally we only count values from incoming HELLO_REQs for
				// the firewalled statistics in order to get numbers from nodes which have us on their routing table,
				// however if we send a HELLO due to the timer, the remote node won't send a HELLO_REQ itself anymore (but
				// a HELLO_RES which we don't count), so count those statistics here. This isn't really accurate, but it should
				// do fair enough. Maybe improve it later for example by putting a flag into the contact and make the answer count
				CKademlia::GetPrefs()->StatsIncUDPFirewalledNodes(false);
				CKademlia::GetPrefs()->StatsIncTCPFirewalledNodes(false);
			}
		} else if (c->GetVersion() >= 2) {
			DebugSend(Kad2HelloReq, c->GetIPAddress(), c->GetUDPPort());
			CKademlia::GetUDPListener()->SendMyDetails(KADEMLIA2_HELLO_REQ, c->GetIPAddress(), c->GetUDPPort(), c->GetVersion(), 0, NULL, false);
			wxASSERT(c->GetUDPKey() == CKadUDPKey(0));
		} else {
			wxFAIL;
		}
	}
}

1. 這個函數會首先確保當前CRoutingZone是一個葉子節點。(只有在葉子節點中才會保存其餘節點,也就是聯繫人的信息,這與aMule管理聯繫人的數據結構設計有關。) fetch

2. 隨後會遍歷全部的聯繫人,移除那些當前時間已通過了有效時間,又沒在使用的聯繫人,而對於有效時間爲0的聯繫人,則將有效時間設置爲當前時間。

3. 找出最老,同時當前時間又沒有超出它的有效時間的一個節點。

4. 調用CKademlia::GetUDPListener()->SendMyDetails()函數,向找到的節點發送一個KADEMLIA2_HELLO_REQ請求,其中會攜帶有本節點的詳細信息。KADEMLIA2_HELLO_REQ請求也就是aMule Kademlia網絡的鏈接請求。

這裏能夠在看一下CKademliaUDPListener::SendMyDetails()函數,來了解一下具體都會發送本節點的哪些信息(amule-2.3.1/src/kademlia/net/KademliaUDPListener.cpp)

// Used by Kad1.0 and Kad2.0
void CKademliaUDPListener::SendMyDetails(uint8_t opcode, uint32_t ip, uint16_t port, uint8_t kadVersion, const CKadUDPKey& targetKey, const CUInt128* cryptTargetID, bool requestAckPacket)
{
	CMemFile packetdata;
	packetdata.WriteUInt128(CKademlia::GetPrefs()->GetKadID());
	
	if (kadVersion > 1) {
		packetdata.WriteUInt16(thePrefs::GetPort());
		packetdata.WriteUInt8(KADEMLIA_VERSION);
		// Tag Count.
		uint8_t tagCount = 0;
		if (!CKademlia::GetPrefs()->GetUseExternKadPort()) {
			tagCount++;
		}
		if (kadVersion >= 8 && (requestAckPacket || CKademlia::GetPrefs()->GetFirewalled() || CUDPFirewallTester::IsFirewalledUDP(true))) {
			tagCount++;
		}
		packetdata.WriteUInt8(tagCount);
		if (!CKademlia::GetPrefs()->GetUseExternKadPort()) {
			packetdata.WriteTag(CTagVarInt(TAG_SOURCEUPORT, CKademlia::GetPrefs()->GetInternKadPort()));
		}
		if (kadVersion >= 8 && (requestAckPacket || CKademlia::GetPrefs()->GetFirewalled() || CUDPFirewallTester::IsFirewalledUDP(true))) {
			// if we're firewalled we send this tag, so the other client doesn't add us to his routing table (if UDP firewalled) and for statistics reasons (TCP firewalled)
			// 5 - reserved (!)
			// 1 - requesting HELLO_RES_ACK
			// 1 - TCP firewalled
			// 1 - UDP firewalled
			packetdata.WriteTag(CTagVarInt(TAG_KADMISCOPTIONS, (uint8_t)(
				(requestAckPacket ? 1 : 0) << 2 |
				(CKademlia::GetPrefs()->GetFirewalled() ? 1 : 0) << 1 |
				(CUDPFirewallTester::IsFirewalledUDP(true) ? 1 : 0)
			)));
		}
		if (kadVersion >= 6) {
			if (cryptTargetID == NULL || *cryptTargetID == 0) {
				AddDebugLogLineN(logClientKadUDP, CFormat(wxT("Sending hello response to crypt enabled Kad Node which provided an empty NodeID: %s (%u)")) % KadIPToString(ip) % kadVersion);
				SendPacket(packetdata, opcode, ip, port, targetKey, NULL);
			} else {
				SendPacket(packetdata, opcode, ip, port, targetKey, cryptTargetID);
			}
		} else {
			SendPacket(packetdata, opcode, ip, port, 0, NULL);
			wxASSERT(targetKey.IsEmpty());
		}
	} else {
		wxFAIL;
	}
}

能夠看到,主要的信息有,端口號,KAD的版本,以及和TAG有關的一些信息等。CKademliaUDPListener::SendPacket()的執行,如咱們前面在 Linux下電騾aMule Kademlia網絡構建分析2 中看到的那樣,此處再也不贅述。

KADEMLIA2_HELLO_REQ消息的處理

鏈接請求是發出去了,那收到請求的節點又會如何處理這樣的請求呢?

CKademliaUDPListener::ProcessPacket()這個函數裏,能夠看到這樣的一個case(amule-2.3.1/src/kademlia/net/KademliaUDPListener.cpp,更詳細的事件傳遞過程,能夠來看 Linux下電騾aMule Kademlia網絡構建分析2):

case KADEMLIA2_HELLO_REQ:
			DebugRecv(Kad2HelloReq, ip, port);
			Process2HelloRequest(packetData, lenPacket, ip, port, senderKey, validReceiverKey);
			break;

也就是說,消息會被委託給CKademliaUDPListener::Process2HelloRequest()函數處理,該函數定義以下所示:

// Used only for Kad2.0
bool CKademliaUDPListener::AddContact2(const uint8_t *data, uint32_t lenData, uint32_t ip, uint16_t& port, uint8_t *outVersion, const CKadUDPKey& udpKey, bool& ipVerified, bool update, bool fromHelloReq, bool* outRequestsACK, CUInt128* outContactID)
{
    if (outRequestsACK != 0) {
        *outRequestsACK = false;
    }

    CMemFile bio(data, lenData);
    CUInt128 id = bio.ReadUInt128();
    if (outContactID != NULL) {
        *outContactID = id;
    }
    uint16_t tport = bio.ReadUInt16();
    uint8_t version = bio.ReadUInt8();
    if (version == 0) {
        throw wxString(CFormat(wxT("***NOTE: Received invalid Kademlia2 version (%u) in %s")) % version % wxString::FromAscii(__FUNCTION__));
    }
    if (outVersion != NULL) {
        *outVersion = version;
    }
    bool udpFirewalled = false;
    bool tcpFirewalled = false;
    uint8_t tags = bio.ReadUInt8();
    while (tags) {
        CTag *tag = bio.ReadTag();
        if (!tag->GetName().Cmp(TAG_SOURCEUPORT)) {
            if (tag->IsInt() && (uint16_t)tag->GetInt() > 0) {
                port = tag->GetInt();
            }
        } else if (!tag->GetName().Cmp(TAG_KADMISCOPTIONS)) {
            if (tag->IsInt() && tag->GetInt() > 0) {
                udpFirewalled = (tag->GetInt() & 0x01) > 0;
                tcpFirewalled = (tag->GetInt() & 0x02) > 0;
                if ((tag->GetInt() & 0x04) > 0) {
                    if (outRequestsACK != NULL) {
                        if (version >= 8) {
                            *outRequestsACK = true;
                        }
                    } else {
                        wxFAIL;
                    }
                }
            }
        }
        delete tag;
        --tags;
    }

    // check if we are waiting for informations (nodeid) about this client and if so inform the requester
    for (FetchNodeIDList::iterator it = m_fetchNodeIDRequests.begin(); it != m_fetchNodeIDRequests.end(); ++it) {
        if (it->ip == ip && it->tcpPort == tport) {
            //AddDebugLogLineN(logKadMain, wxT("Result Addcontact: ") + id.ToHexString());
            uint8_t uchID[16];
            id.ToByteArray(uchID);
            it->requester->KadSearchNodeIDByIPResult(KCSR_SUCCEEDED, uchID);
            m_fetchNodeIDRequests.erase(it);
            break;
        }
    }

    if (fromHelloReq && version >= 8) {
        // this is just for statistic calculations. We try to determine the ratio of (UDP) firewalled users,
        // by counting how many of all nodes which have us in their routing table (our own routing table is supposed
        // to have no UDP firewalled nodes at all) and support the firewalled tag are firewalled themself.
        // Obviously this only works if we are not firewalled ourself
        CKademlia::GetPrefs()->StatsIncUDPFirewalledNodes(udpFirewalled);
        CKademlia::GetPrefs()->StatsIncTCPFirewalledNodes(tcpFirewalled);
    }

    if (!udpFirewalled) {    // do not add (or update) UDP firewalled sources to our routing table
        return CKademlia::GetRoutingZone()->Add(id, ip, port, tport, version, udpKey, ipVerified, update, true);
    } else {
        AddDebugLogLineN(logKadRouting, wxT("Not adding firewalled client to routing table (") + KadIPToString(ip) + wxT(")"));
        return false;
    }
}

。。。。。。

// KADEMLIA2_HELLO_REQ
// Used in Kad2.0 only
void CKademliaUDPListener::Process2HelloRequest(const uint8_t *packetData, uint32_t lenPacket, uint32_t ip, uint16_t port, const CKadUDPKey& senderKey, bool validReceiverKey)
{
	DEBUG_ONLY( uint16_t dbgOldUDPPort = port; )
	uint8_t contactVersion = 0;
	CUInt128 contactID;
	bool addedOrUpdated = AddContact2(packetData, lenPacket, ip, port, &contactVersion, senderKey, validReceiverKey, true, true, NULL, &contactID); // might change (udp)port, validReceiverKey
	wxASSERT(contactVersion >= 2);
#ifdef __DEBUG__
	if (dbgOldUDPPort != port) {
		AddDebugLogLineN(logClientKadUDP, CFormat(wxT("KadContact %s uses his internal (%u) instead external (%u) UDP Port")) % KadIPToString(ip) % port % dbgOldUDPPort);
	}
#endif
	AddLogLineNS(wxT("") + CFormat(_("KadContact %s uses his UDP Port (%u) to send KADEMLIA2_HELLO_RES.")) % KadIPToString(ip) % port);
	DebugSend(Kad2HelloRes, ip, port);
	// if this contact was added or updated (so with other words not filtered or invalid) to our routing table and did not already send a valid
	// receiver key or is already verified in the routing table, we request an additional ACK package to complete a three-way-handshake and
	// verify the remote IP
	SendMyDetails(KADEMLIA2_HELLO_RES, ip, port, contactVersion, senderKey, &contactID, addedOrUpdated && !validReceiverKey);

	if (addedOrUpdated && !validReceiverKey && contactVersion == 7 && !HasActiveLegacyChallenge(ip)) {
		// Kad Version 7 doesn't support HELLO_RES_ACK but sender/receiver keys, so send a ping to validate
		DebugSend(Kad2Ping, ip, port);
		SendNullPacket(KADEMLIA2_PING, ip, port, senderKey, NULL);
#ifdef __DEBUG__
		CContact* contact = CKademlia::GetRoutingZone()->GetContact(contactID);
		if (contact != NULL) {
			if (contact->GetType() < 2) {
				AddDebugLogLineN(logKadRouting, wxT("Sending (ping) challenge to a long known contact (should be verified already) - ") + KadIPToString(ip));
			}
		} else {
			wxFAIL;
		}
#endif
	} else if (CKademlia::GetPrefs()->FindExternKadPort(false) && contactVersion > 5) {	// do we need to find out our extern port?
		DebugSend(Kad2Ping, ip, port);
		SendNullPacket(KADEMLIA2_PING, ip, port, senderKey, NULL);
	}

	if (addedOrUpdated && !validReceiverKey && contactVersion < 7 && !HasActiveLegacyChallenge(ip)) {
		// we need to verify this contact but it doesn't support HELLO_RES_ACK nor keys, do a little workaround
		SendLegacyChallenge(ip, port, contactID);
	}

	// Check if firewalled
	if (CKademlia::GetPrefs()->GetRecheckIP()) {
		FirewalledCheck(ip, port, senderKey, contactVersion);
	}
}

Process2HelloRequest ()函數主要作了兩件事情,

1. 調用CKademliaUDPListener::AddContact2()函數,添加聯繫人。

2. 調用CKademliaUDPListener::SendMyDetails()函數發送本節點的信息,只不過此次是包在一個KADEMLIA2_HELLO_RES消息裏的,其它的就與前面發送KADEMLIA2_HELLO_REQ消息的過程同樣了。

KADEMLIA2_HELLO_RES消息的處理

鏈接的目標節點發送了響應消息KADEMLIA2_HELLO_RES,那就再來看一下鏈接的發起端對於這個消息的處理。

CKademliaUDPListener::ProcessPacket()這個函數裏,能夠看到這樣的一個case(amule-2.3.1/src/kademlia/net/KademliaUDPListener.cpp,更詳細的事件傳遞過程,能夠來看 Linux下電騾aMule Kademlia網絡構建分析2):

         case KADEMLIA2_HELLO_RES:
            DebugRecv(Kad2HelloRes, ip, port);
            Process2HelloResponse(packetData, lenPacket, ip, port, senderKey, validReceiverKey);
            break;

也就是說,消息會被委託給CKademliaUDPListener::Process2HelloResponse()函數處理,該函數定義以下所示:

// KADEMLIA2_HELLO_RES
// Used in Kad2.0 only
void CKademliaUDPListener::Process2HelloResponse(const uint8_t *packetData, uint32_t lenPacket, uint32_t ip, uint16_t port, const CKadUDPKey& senderKey, bool validReceiverKey)
{
	CHECK_TRACKED_PACKET(KADEMLIA2_HELLO_REQ);

	// Add or Update contact.
	uint8_t contactVersion;
	CUInt128 contactID;
	bool sendACK = false;
	bool addedOrUpdated = AddContact2(packetData, lenPacket, ip, port, &contactVersion, senderKey, validReceiverKey, true, false, &sendACK, &contactID);

	if (sendACK) {
		// the client requested us to send an ACK packet, which proves that we're not a spoofed fake contact
		// fulfill his wish
		if (senderKey.IsEmpty()) {
			// but we don't have a valid sender key - there is no point to reply in this case
			// most likely a bug in the remote client
			AddDebugLogLineN(logClientKadUDP, wxT("Remote client demands ACK, but didn't send any sender key! (sender: ") + KadIPToString(ip) + wxT(")"));
		} else {
			CMemFile packet(17);
			packet.WriteUInt128(CKademlia::GetPrefs()->GetKadID());
			packet.WriteUInt8(0);	// no tags at this time
			DebugSend(Kad2HelloResAck, ip, port);
			SendPacket(packet, KADEMLIA2_HELLO_RES_ACK, ip, port, senderKey, NULL);
		}
	} else if (addedOrUpdated && !validReceiverKey && contactVersion < 7) {
		// even though this is supposably an answer to a request from us, there are still possibilities to spoof
		// it, as long as the attacker knows that we would send a HELLO_REQ (which in this case is quite often),
		// so for old Kad Version which doesn't support keys, we need
		SendLegacyChallenge(ip, port, contactID);
	}

	// do we need to find out our extern port?
	if (CKademlia::GetPrefs()->FindExternKadPort(false) && contactVersion > 5) {
		DebugSend(Kad2Ping, ip, port);
		SendNullPacket(KADEMLIA2_PING, ip, port, senderKey, NULL);
	}

	// Check if firewalled
	if (CKademlia::GetPrefs()->GetRecheckIP()) {
		FirewalledCheck(ip, port, senderKey, contactVersion);
	}
}

這個函數作的最主要的事情就是將節點信息添加到本節點的聯繫人列表裏了。而後根據狀況,會再發送相應消息回去。

大致如此。

Done。

相關文章
相關標籤/搜索