aMule在啓動的時候,會起一些定時器,以便於按期的執行一些任務。其中比較重要的就是core_timer,相關code以下(amule-2.3.1/src/amule-gui.cpp): node
// Create the Core timer core_timer = new CTimer(this,ID_CORE_TIMER_EVENT); if (!core_timer) { AddLogLineCS(_("Fatal Error: Failed to create Core Timer")); OnExit(); } // Start the Core and Gui timers // Note: wxTimer can be off by more than 10% !!! // In addition to the systematic error introduced by wxTimer, we are losing // timer cycles due to high CPU load. I've observed about 0.5% random loss of cycles under // low load, and more than 6% lost cycles with heavy download traffic and/or other tasks // in the system, such as a video player or a VMware virtual machine. // The upload queue process loop has now been rewritten to compensate for timer errors. // When adding functionality, assume that the timer is only approximately correct; // for measurements, always use the system clock [::GetTickCount()]. core_timer->Start(CORE_TIMER_PERIOD);wxWidgets的定時器,按期的產生一些事件,具體的事件在Timer建立時傳入,而定時器的週期則在Start()時傳入。在amule-2.3.1/src/amule.h中能夠看到CORE_TIMER_PERIOD的定義爲100,也就是說定時器的週期是100ms。
在amule-2.3.1/src/amule-gui.cpp的EventTable中,能夠看到事件將由CamuleGuiApp::OnCoreTimer()處理: 網絡
// Core timer EVT_MULE_TIMER(ID_CORE_TIMER_EVENT, CamuleGuiApp::OnCoreTimer)
在CamuleApp::OnCoreTimer()函數(amule-2.3.1/src/amule.cpp)中,會執行Kademlia::CKademlia::Process(): 數據結構
if (msCur-msPrev1 > 1000) { // approximately every second msPrev1 = msCur; clientcredits->Process(); clientlist->Process(); // Publish files to server if needed. sharedfiles->Process(); if( Kademlia::CKademlia::IsRunning() ) { Kademlia::CKademlia::Process(); if(Kademlia::CKademlia::GetPrefs()->HasLostConnection()) { StopKad(); clientudp->Close(); clientudp->Open(); if (thePrefs::Reconnect()) { StartKad(); } } }在Kademlia::CKademlia::Process()(文件amule-2.3.1/src/kademlia/kademlia/Kademlia.cpp)中,主要來關注以下的幾行:
if (m_nextSelfLookup <= now) { CSearchManager::FindNode(instance->m_prefs->GetKadID(), true); m_nextSelfLookup = HR2S(4) + now; }回想 Kademlia 網絡在啓動的時候,會執行的CKademlia::Start(),其中有這麼幾行:
// Force a FindNodeComplete within the first 3 minutes. m_nextSelfLookup = time(NULL) + MIN2S(3);綜合來看這兩段code,也就是說,在啓動以後3分鐘,將首次執行 CSearchManager::FindNode(instance->m_prefs->GetKadID(), true) ,然後,則將每隔4個小時執行這個方法一次。
也就意味着,在Kademlia模塊啓動以後3分鐘,將首次搜尋KadID與本節點最接近的節點,而後與它們創建鏈接,並將它們做爲鄰居節點。以後則將每隔4個小時執行一次相同的過程。 app
那咱們來看CSearchManager::FindNode()的執行(amule-2.3.1/src/kademlia/kademlia/SearchManager.cpp): dom
void CSearchManager::FindNode(const CUInt128& id, bool complete) { // Do a node lookup. CSearch *s = new CSearch; if (complete) { s->SetSearchTypes(CSearch::NODECOMPLETE); } else { s->SetSearchTypes(CSearch::NODE); } s->SetTargetID(id); StartSearch(s); } 。。。。。。 bool CSearchManager::StartSearch(CSearch* search) { // A search object was created, now try to start the search. if (AlreadySearchingFor(search->GetTarget())) { // There was already a search in progress with this target. delete search; return false; } // Add to the search map m_searches[search->GetTarget()] = search; // Start the search. search->Go(); return true; }
如咱們前面在Linux下電騾aMule Kademlia網絡構建分析2中看到的,這個請求發出去,能獲得的響應也只是一些節點的信息。 tcp
那Kademlia網絡中一個節點是如何鏈接到網絡中的另外一個節點的呢?先回想一下 Linux下電騾aMule Kademlia網絡構建分析I 一文,CRoutingZone的初始化函數CRoutingZone::Init()會調用到CRoutingZone::StartTimer(),其中又調用了CKademlia::AddEvent(),以下所示(amule-2.3.1/src/kademlia/routing/RoutingZone.cpp): ide
void CRoutingZone::StartTimer() { // Start filling the tree, closest bins first. m_nextBigTimer = time(NULL) + SEC(10); CKademlia::AddEvent(this); }再來看CKademlia::AddEvent(),在amule-2.3.1/src/kademlia/kademlia/Kademlia.h中:
static void AddEvent(CRoutingZone *zone) throw() { m_events[zone] = zone; }
也就是把當前CRoutingZone對象的指針,保存在CKademlia的一個map中,key和value都是該指針。 函數
在按期會被執行的CKademlia::Process()函數中,咱們還能看到以下的這樣一段code: oop
for (EventMap::const_iterator it = m_events.begin(); it != m_events.end(); ++it) { CRoutingZone *zone = it->first; if (updateUserFile) { // The EstimateCount function is not made for really small networks, if we are in LAN mode, it is actually // better to assume that all users of the network are in our routing table and use the real count function if (IsRunningInLANMode()) { tempUsers = zone->GetNumContacts(); } else { tempUsers = zone->EstimateCount(); } if (maxUsers < tempUsers) { maxUsers = tempUsers; } } if (m_bigTimer <= now) { if (zone->m_nextBigTimer <= now) { if(zone->OnBigTimer()) { zone->m_nextBigTimer = HR2S(1) + now; m_bigTimer = SEC(10) + now; } } else { if (lastContact && (now - lastContact > KADEMLIADISCONNECTDELAY - MIN2S(5))) { if(zone->OnBigTimer()) { zone->m_nextBigTimer = HR2S(1) + now; m_bigTimer = SEC(10) + now; } } } } if (zone->m_nextSmallTimer <= now) { zone->OnSmallTimer(); zone->m_nextSmallTimer = MIN2S(1) + now; } }也就是遍歷全部的CRoutingZone對象,並適時地調用一些CRoutingZone對象的須要定時執行的一些方法。其中會調用到 CRoutingZone::OnSmallTimer() 函數,週期大約爲1分鐘。咱們能夠具體來看一下這個函數的實現(amule-2.3.1/src/kademlia/routing/RoutingZone.cpp):
void CRoutingZone::OnSmallTimer() { if (!IsLeaf()) { return; } CContact *c = NULL; time_t now = time(NULL); ContactList entries; // Remove dead entries m_bin->GetEntries(&entries); for (ContactList::iterator it = entries.begin(); it != entries.end(); ++it) { c = *it; if (c->GetType() == 4) { if ((c->GetExpireTime() > 0) && (c->GetExpireTime() <= now)) { if (!c->InUse()) { m_bin->RemoveContact(c); delete c; } continue; } } if(c->GetExpireTime() == 0) { c->SetExpireTime(now); } } c = m_bin->GetOldest(); if (c != NULL) { if (c->GetExpireTime() >= now || c->GetType() == 4) { m_bin->PushToBottom(c); c = NULL; } } if (c != NULL) { c->CheckingType(); if (c->GetVersion() >= 6) { DebugSend(Kad2HelloReq, c->GetIPAddress(), c->GetUDPPort()); CUInt128 clientID = c->GetClientID(); CKademlia::GetUDPListener()->SendMyDetails(KADEMLIA2_HELLO_REQ, c->GetIPAddress(), c->GetUDPPort(), c->GetVersion(), c->GetUDPKey(), &clientID, false); if (c->GetVersion() >= 8) { // FIXME: // This is a bit of a work around for statistic values. Normally we only count values from incoming HELLO_REQs for // the firewalled statistics in order to get numbers from nodes which have us on their routing table, // however if we send a HELLO due to the timer, the remote node won't send a HELLO_REQ itself anymore (but // a HELLO_RES which we don't count), so count those statistics here. This isn't really accurate, but it should // do fair enough. Maybe improve it later for example by putting a flag into the contact and make the answer count CKademlia::GetPrefs()->StatsIncUDPFirewalledNodes(false); CKademlia::GetPrefs()->StatsIncTCPFirewalledNodes(false); } } else if (c->GetVersion() >= 2) { DebugSend(Kad2HelloReq, c->GetIPAddress(), c->GetUDPPort()); CKademlia::GetUDPListener()->SendMyDetails(KADEMLIA2_HELLO_REQ, c->GetIPAddress(), c->GetUDPPort(), c->GetVersion(), 0, NULL, false); wxASSERT(c->GetUDPKey() == CKadUDPKey(0)); } else { wxFAIL; } } }
1. 這個函數會首先確保當前CRoutingZone是一個葉子節點。(只有在葉子節點中才會保存其餘節點,也就是聯繫人的信息,這與aMule管理聯繫人的數據結構設計有關。) fetch
2. 隨後會遍歷全部的聯繫人,移除那些當前時間已通過了有效時間,又沒在使用的聯繫人,而對於有效時間爲0的聯繫人,則將有效時間設置爲當前時間。
3. 找出最老,同時當前時間又沒有超出它的有效時間的一個節點。
4. 調用CKademlia::GetUDPListener()->SendMyDetails()函數,向找到的節點發送一個KADEMLIA2_HELLO_REQ請求,其中會攜帶有本節點的詳細信息。KADEMLIA2_HELLO_REQ請求也就是aMule Kademlia網絡的鏈接請求。
// Used by Kad1.0 and Kad2.0 void CKademliaUDPListener::SendMyDetails(uint8_t opcode, uint32_t ip, uint16_t port, uint8_t kadVersion, const CKadUDPKey& targetKey, const CUInt128* cryptTargetID, bool requestAckPacket) { CMemFile packetdata; packetdata.WriteUInt128(CKademlia::GetPrefs()->GetKadID()); if (kadVersion > 1) { packetdata.WriteUInt16(thePrefs::GetPort()); packetdata.WriteUInt8(KADEMLIA_VERSION); // Tag Count. uint8_t tagCount = 0; if (!CKademlia::GetPrefs()->GetUseExternKadPort()) { tagCount++; } if (kadVersion >= 8 && (requestAckPacket || CKademlia::GetPrefs()->GetFirewalled() || CUDPFirewallTester::IsFirewalledUDP(true))) { tagCount++; } packetdata.WriteUInt8(tagCount); if (!CKademlia::GetPrefs()->GetUseExternKadPort()) { packetdata.WriteTag(CTagVarInt(TAG_SOURCEUPORT, CKademlia::GetPrefs()->GetInternKadPort())); } if (kadVersion >= 8 && (requestAckPacket || CKademlia::GetPrefs()->GetFirewalled() || CUDPFirewallTester::IsFirewalledUDP(true))) { // if we're firewalled we send this tag, so the other client doesn't add us to his routing table (if UDP firewalled) and for statistics reasons (TCP firewalled) // 5 - reserved (!) // 1 - requesting HELLO_RES_ACK // 1 - TCP firewalled // 1 - UDP firewalled packetdata.WriteTag(CTagVarInt(TAG_KADMISCOPTIONS, (uint8_t)( (requestAckPacket ? 1 : 0) << 2 | (CKademlia::GetPrefs()->GetFirewalled() ? 1 : 0) << 1 | (CUDPFirewallTester::IsFirewalledUDP(true) ? 1 : 0) ))); } if (kadVersion >= 6) { if (cryptTargetID == NULL || *cryptTargetID == 0) { AddDebugLogLineN(logClientKadUDP, CFormat(wxT("Sending hello response to crypt enabled Kad Node which provided an empty NodeID: %s (%u)")) % KadIPToString(ip) % kadVersion); SendPacket(packetdata, opcode, ip, port, targetKey, NULL); } else { SendPacket(packetdata, opcode, ip, port, targetKey, cryptTargetID); } } else { SendPacket(packetdata, opcode, ip, port, 0, NULL); wxASSERT(targetKey.IsEmpty()); } } else { wxFAIL; } }
能夠看到,主要的信息有,端口號,KAD的版本,以及和TAG有關的一些信息等。CKademliaUDPListener::SendPacket()的執行,如咱們前面在 Linux下電騾aMule Kademlia網絡構建分析2 中看到的那樣,此處再也不贅述。
在CKademliaUDPListener::ProcessPacket()這個函數裏,能夠看到這樣的一個case(amule-2.3.1/src/kademlia/net/KademliaUDPListener.cpp,更詳細的事件傳遞過程,能夠來看 Linux下電騾aMule Kademlia網絡構建分析2):
case KADEMLIA2_HELLO_REQ: DebugRecv(Kad2HelloReq, ip, port); Process2HelloRequest(packetData, lenPacket, ip, port, senderKey, validReceiverKey); break;
// Used only for Kad2.0 bool CKademliaUDPListener::AddContact2(const uint8_t *data, uint32_t lenData, uint32_t ip, uint16_t& port, uint8_t *outVersion, const CKadUDPKey& udpKey, bool& ipVerified, bool update, bool fromHelloReq, bool* outRequestsACK, CUInt128* outContactID) { if (outRequestsACK != 0) { *outRequestsACK = false; } CMemFile bio(data, lenData); CUInt128 id = bio.ReadUInt128(); if (outContactID != NULL) { *outContactID = id; } uint16_t tport = bio.ReadUInt16(); uint8_t version = bio.ReadUInt8(); if (version == 0) { throw wxString(CFormat(wxT("***NOTE: Received invalid Kademlia2 version (%u) in %s")) % version % wxString::FromAscii(__FUNCTION__)); } if (outVersion != NULL) { *outVersion = version; } bool udpFirewalled = false; bool tcpFirewalled = false; uint8_t tags = bio.ReadUInt8(); while (tags) { CTag *tag = bio.ReadTag(); if (!tag->GetName().Cmp(TAG_SOURCEUPORT)) { if (tag->IsInt() && (uint16_t)tag->GetInt() > 0) { port = tag->GetInt(); } } else if (!tag->GetName().Cmp(TAG_KADMISCOPTIONS)) { if (tag->IsInt() && tag->GetInt() > 0) { udpFirewalled = (tag->GetInt() & 0x01) > 0; tcpFirewalled = (tag->GetInt() & 0x02) > 0; if ((tag->GetInt() & 0x04) > 0) { if (outRequestsACK != NULL) { if (version >= 8) { *outRequestsACK = true; } } else { wxFAIL; } } } } delete tag; --tags; } // check if we are waiting for informations (nodeid) about this client and if so inform the requester for (FetchNodeIDList::iterator it = m_fetchNodeIDRequests.begin(); it != m_fetchNodeIDRequests.end(); ++it) { if (it->ip == ip && it->tcpPort == tport) { //AddDebugLogLineN(logKadMain, wxT("Result Addcontact: ") + id.ToHexString()); uint8_t uchID[16]; id.ToByteArray(uchID); it->requester->KadSearchNodeIDByIPResult(KCSR_SUCCEEDED, uchID); m_fetchNodeIDRequests.erase(it); break; } } if (fromHelloReq && version >= 8) { // this is just for statistic calculations. We try to determine the ratio of (UDP) firewalled users, // by counting how many of all nodes which have us in their routing table (our own routing table is supposed // to have no UDP firewalled nodes at all) and support the firewalled tag are firewalled themself. // Obviously this only works if we are not firewalled ourself CKademlia::GetPrefs()->StatsIncUDPFirewalledNodes(udpFirewalled); CKademlia::GetPrefs()->StatsIncTCPFirewalledNodes(tcpFirewalled); } if (!udpFirewalled) { // do not add (or update) UDP firewalled sources to our routing table return CKademlia::GetRoutingZone()->Add(id, ip, port, tport, version, udpKey, ipVerified, update, true); } else { AddDebugLogLineN(logKadRouting, wxT("Not adding firewalled client to routing table (") + KadIPToString(ip) + wxT(")")); return false; } } 。。。。。。 // KADEMLIA2_HELLO_REQ // Used in Kad2.0 only void CKademliaUDPListener::Process2HelloRequest(const uint8_t *packetData, uint32_t lenPacket, uint32_t ip, uint16_t port, const CKadUDPKey& senderKey, bool validReceiverKey) { DEBUG_ONLY( uint16_t dbgOldUDPPort = port; ) uint8_t contactVersion = 0; CUInt128 contactID; bool addedOrUpdated = AddContact2(packetData, lenPacket, ip, port, &contactVersion, senderKey, validReceiverKey, true, true, NULL, &contactID); // might change (udp)port, validReceiverKey wxASSERT(contactVersion >= 2); #ifdef __DEBUG__ if (dbgOldUDPPort != port) { AddDebugLogLineN(logClientKadUDP, CFormat(wxT("KadContact %s uses his internal (%u) instead external (%u) UDP Port")) % KadIPToString(ip) % port % dbgOldUDPPort); } #endif AddLogLineNS(wxT("") + CFormat(_("KadContact %s uses his UDP Port (%u) to send KADEMLIA2_HELLO_RES.")) % KadIPToString(ip) % port); DebugSend(Kad2HelloRes, ip, port); // if this contact was added or updated (so with other words not filtered or invalid) to our routing table and did not already send a valid // receiver key or is already verified in the routing table, we request an additional ACK package to complete a three-way-handshake and // verify the remote IP SendMyDetails(KADEMLIA2_HELLO_RES, ip, port, contactVersion, senderKey, &contactID, addedOrUpdated && !validReceiverKey); if (addedOrUpdated && !validReceiverKey && contactVersion == 7 && !HasActiveLegacyChallenge(ip)) { // Kad Version 7 doesn't support HELLO_RES_ACK but sender/receiver keys, so send a ping to validate DebugSend(Kad2Ping, ip, port); SendNullPacket(KADEMLIA2_PING, ip, port, senderKey, NULL); #ifdef __DEBUG__ CContact* contact = CKademlia::GetRoutingZone()->GetContact(contactID); if (contact != NULL) { if (contact->GetType() < 2) { AddDebugLogLineN(logKadRouting, wxT("Sending (ping) challenge to a long known contact (should be verified already) - ") + KadIPToString(ip)); } } else { wxFAIL; } #endif } else if (CKademlia::GetPrefs()->FindExternKadPort(false) && contactVersion > 5) { // do we need to find out our extern port? DebugSend(Kad2Ping, ip, port); SendNullPacket(KADEMLIA2_PING, ip, port, senderKey, NULL); } if (addedOrUpdated && !validReceiverKey && contactVersion < 7 && !HasActiveLegacyChallenge(ip)) { // we need to verify this contact but it doesn't support HELLO_RES_ACK nor keys, do a little workaround SendLegacyChallenge(ip, port, contactID); } // Check if firewalled if (CKademlia::GetPrefs()->GetRecheckIP()) { FirewalledCheck(ip, port, senderKey, contactVersion); } }
Process2HelloRequest ()函數主要作了兩件事情,
1. 調用CKademliaUDPListener::AddContact2()函數,添加聯繫人。
2. 調用CKademliaUDPListener::SendMyDetails()函數發送本節點的信息,只不過此次是包在一個KADEMLIA2_HELLO_RES消息裏的,其它的就與前面發送KADEMLIA2_HELLO_REQ消息的過程同樣了。
在CKademliaUDPListener::ProcessPacket()這個函數裏,能夠看到這樣的一個case(amule-2.3.1/src/kademlia/net/KademliaUDPListener.cpp,更詳細的事件傳遞過程,能夠來看 Linux下電騾aMule Kademlia網絡構建分析2):
case KADEMLIA2_HELLO_RES: DebugRecv(Kad2HelloRes, ip, port); Process2HelloResponse(packetData, lenPacket, ip, port, senderKey, validReceiverKey); break;
// KADEMLIA2_HELLO_RES // Used in Kad2.0 only void CKademliaUDPListener::Process2HelloResponse(const uint8_t *packetData, uint32_t lenPacket, uint32_t ip, uint16_t port, const CKadUDPKey& senderKey, bool validReceiverKey) { CHECK_TRACKED_PACKET(KADEMLIA2_HELLO_REQ); // Add or Update contact. uint8_t contactVersion; CUInt128 contactID; bool sendACK = false; bool addedOrUpdated = AddContact2(packetData, lenPacket, ip, port, &contactVersion, senderKey, validReceiverKey, true, false, &sendACK, &contactID); if (sendACK) { // the client requested us to send an ACK packet, which proves that we're not a spoofed fake contact // fulfill his wish if (senderKey.IsEmpty()) { // but we don't have a valid sender key - there is no point to reply in this case // most likely a bug in the remote client AddDebugLogLineN(logClientKadUDP, wxT("Remote client demands ACK, but didn't send any sender key! (sender: ") + KadIPToString(ip) + wxT(")")); } else { CMemFile packet(17); packet.WriteUInt128(CKademlia::GetPrefs()->GetKadID()); packet.WriteUInt8(0); // no tags at this time DebugSend(Kad2HelloResAck, ip, port); SendPacket(packet, KADEMLIA2_HELLO_RES_ACK, ip, port, senderKey, NULL); } } else if (addedOrUpdated && !validReceiverKey && contactVersion < 7) { // even though this is supposably an answer to a request from us, there are still possibilities to spoof // it, as long as the attacker knows that we would send a HELLO_REQ (which in this case is quite often), // so for old Kad Version which doesn't support keys, we need SendLegacyChallenge(ip, port, contactID); } // do we need to find out our extern port? if (CKademlia::GetPrefs()->FindExternKadPort(false) && contactVersion > 5) { DebugSend(Kad2Ping, ip, port); SendNullPacket(KADEMLIA2_PING, ip, port, senderKey, NULL); } // Check if firewalled if (CKademlia::GetPrefs()->GetRecheckIP()) { FirewalledCheck(ip, port, senderKey, contactVersion); } }