通過一段時間的研究和學習,大體瞭解了DHT網絡的一些信息,大部分仍是參會別人的相關代碼,一方面主要對DHT爬蟲原理感興趣,最主要的是爲了學習python,大部分是別人的東西原理仍是引用別人的吧html
DHT網絡爬蟲的實現 | 學步園 http://www.xuebuyuan.com/1287052.htmlnode
DHT協議原理以及一些重點分析:python
要作DHT的爬蟲,首先得透徹理解DHT,這樣才能知道在什麼地方究竟該應用什麼算法去解決問題。關於DHT協議的細節以及重要的參考文章,請參考文末1mysql
DHT協議做爲BT協議的一個輔助,是很是好玩的。它主要是爲了在BT正式下載時獲得種子或者BT資源。傳統的網絡,須要一臺中央服務器存放種子或者BT資源,不只浪費服務器資源,還容易出現單點的各類問題,而DHT網絡則是爲了去中心化,也就是說任意時刻,這個網絡總有節點是亮的,你能夠去詢問問這些亮的節點,從而將本身加入DHT網絡。linux
要實現DHT協議的網絡爬蟲,主要分3步,第一步是獲得資源信息(infohash,160bit,20字節,能夠編碼爲40字節的十六進制字符串),第二步是確認這些infohash是有效的,第三步是經過有效的infohash下載到BT的種子文件,從而獲得對這個資源的完整描述。算法
其中第一步是其餘節點用DHT協議中的get_peers方法向爬蟲發送請求獲得的,第二步是其餘節點用DHT協議中的announce_peer向爬蟲發送請求獲得的,第三步能夠有幾種方式獲得,好比能夠去一些保存種子的網站根據infohash直接下載到,或者經過announce_peer的節點來下載到,具體如何實現,能夠取決於你本身的爬蟲。sql
DHT協議中的主要幾個操做:數據庫
主要負責經過UDP與外部節點交互,封裝4種基本操做的請求以及相應。數組
ping:檢查一個節點是否「存活」tomcat
在一個爬蟲裏主要有兩個地方用到ping,第一是初始路由表時,第二是驗證節點是否存活時
find_node:向一個節點發送查找節點的請求
在一個爬蟲中主要也是兩個地方用到find_node,第一是初始路由表時,第二是驗證桶是否存活時
get_peers:向一個節點發送查找資源的請求
在爬蟲中有節點向本身請求時不只像個正常節點同樣作出迴應,還須要以此資源的info_hash爲機會盡量多的去認識更多的節點。如圖,get_peers實際上最後一步是announce_peer,可是由於爬蟲不能announce_peer,因此實際上get_peers退化成了find_node操做。
announce_peer:向一個節點發送本身已經開始下載某個資源的通知
爬蟲中不能用announce_peer,由於這就至關於通報虛假資源,對方很容易從上下文中判斷你是否通報了虛假資源從而把你禁掉
DHT協議中有幾個重點的須要澄清的地方:
1. node與infohash一樣使用160bit的表示方式,160bit意味着整個節點空間有2^160 = 730750818665451459101842416358141509827966271488,是48位10進制,也就是說有百億億億億億個節點空間,這麼大的節點空間,是足夠存放你的主機節點以及任意的資源信息的。
2. 每一個節點有張路由表。每張路由表由一堆K桶組成,所謂K桶,就是桶中最多隻能放K個節點,默認是8個。而桶的保存則是相似一顆前綴樹的方式。至關於一張8桶的路由表中最多有160-4個K桶。
3. 根據DHT協議的規定,每一個infohash都是有位置的,所以,兩個infohash之間就有距離一說,而兩個infohash的距離就能夠用異或來表示,即infohash1 xor infohash2,也就是說,高位同樣的話,他們的距離就近,反之則遠,這樣能夠快速的計算兩個節點的距離。計算這個距離有什麼用呢,在DHT網絡中,若是一個資源的infohash與一個節點的infohash越近則該節點越有可能擁有該資源的信息,爲何呢?能夠想象,由於人人都用一樣的距離算法去遞歸的詢問離資源接近的節點,而且只要該節點作出了迴應,那麼就會獲得一個announce信息,也就是說跟資源infohash接近的節點就有更大的機率拿到該資源的infohash
4. 根據上述算法,DHT中的查詢是跳躍式查詢,能夠迅速的跨越的的節點桶而接近目標節點桶。之因此在遠處可以大幅度跳躍,而在近處只能小幅度跳躍,緣由是每一個節點的路由表中離自身越接近的節點保存得越多,以下圖
5. 在一個DHT網絡中當爬蟲並不容易,不像普通爬蟲同樣,看到資源就能夠主動爬下來,相反,由於獲得資源的方式(get_peers, announce_peer)都是被動的,因此爬蟲的方式就有些變化了,爬蟲所要作的事就是像個正常節點同樣去響應其餘節點的查詢,而且獲得其餘節點的迴應,把其中的數據收集下來就算是完成工做了。而爬蟲惟一能作的,是儘量的去多認識其餘節點,這樣,纔能有更多其餘節點來向你詢問。
6. 有人說,那麼我把DHT爬蟲的K桶中的容量K增大是否是就能增長獲得資源的機會,其實否則,以前也分析過了,DHT爬蟲最重要的信息來源全是被動的,由於你不能增大別人的K,因此距離遠的節點保存你自身的機率就越小,固然距離遠的節點去請求你的機率相對也比較小。
一些主要的組件(實際實現更加複雜一些,有其餘的模塊,這裏僅列舉主要幾個):
DHT crawler:
這個就是DHT爬蟲的主邏輯,爲了簡化多線程問題,跟server用了生產者消費者模型,負責消費,而且複用server的端口。
主要任務就是負責初始化,包括路由表的初始化,以及初始的請求。另外負責處理全部進來的消息事件,因爲生產者消費者模型的使用,裏面的操做都基本上是單線程的,簡化了很多問題,並且相信也比上鎖要提高速度(固然了,加鎖這步按理是放到了queue這裏了,不過對於這種生產者源源不斷生產的類型,能夠用ring-buffer大幅提高性能)。
DHT server:
這裏是DHT爬蟲的服務器端,DHT網絡中的節點不單是client,也是server,因此要有server擔當生產者的角色,最初也是每一個消費者對應一個生產者,但實際上發現能夠利用IO多路複用來達到消息事件的目的,這樣一來大大簡化了系統中線程的數量,若是client能夠的話,也應該用一樣的方式來組織,這樣系統的速度應該會快不少。(還沒有驗證)
DHT route table:
主要負責路由表的操做。
路由表有以下操做:
init:剛建立路由表時的操做。分兩種狀況:
1. 若是以前已經初始化過,而且將上次路由表的數據保存下來,則只須要讀入保存數據。
2. 若是以前沒有初始化過,則首先應當初始化。
首先,應當有一個接入點,也就是說,你要想加進這個網絡,必須認識這個網絡中某個節點i並將i加入路由表,接下來對i用find_node詢問本身的hash_info,這裏巧妙的地方就在於,理論上經過必定數量的詢問就會找到離本身距離很近的節點(也就是通過必定步驟就會收斂)。find_node目的在於儘量早的讓本身有數據,而且讓網絡上別的節點知道本身,若是別人不認識你,就不會發送消息過來,意味着你也不能獲取到想要的信息。
search:比較重要的方法,主要使用它來定位當前infohash所在的桶的位置。會被其餘各類代理方法調用到。
findNodes:找到路由表中與傳入的infohash最近的k個節點
getPeer:找到待查資源是否有peer(便是否有人在下載,也就是是否有人announce過)
announcePeer:通知該資源正在被下載
DHT bucket:
acitiveNode:邏輯比較多,分以下幾點。
1. 查找所要添加的節點對應路由表的桶是否已經滿,若是未滿,添加節點
2. 若是已經滿,檢查該桶中是否包含爬蟲節點本身,若是不包含,拋棄待添加節點
3. 若是該桶中包含本節點,則平均分裂該桶
其餘的諸如locateNode,
replaceNode, updateNode,
removeNode,就不一一說明了
DHT torrent parser:
主要從bt種子文件中解析出如下幾個重要的信息:name,size,file list(sub file name, sub file size),比較簡單,用bencode方向解碼就好了
Utils:
distance:計算兩個資源之間的距離。在kad中用a xor b表示
爲了增長難度,選用了不太熟悉的語言python,結果步步爲營,可是也感慨python的簡潔強大。在實現中,也碰到不少有意思的問題。好比如何保存一張路由表中的全部桶,以前想出來幾個辦法,甚至爲了節省資源,打算用bit數組+dict直接保存,可是由於估計最終的幾個操做不是很方便直觀容易出錯而放棄,選用的結構就是前綴樹,操做起來果真是沒有障礙;
在超時問題上,好比桶超時和節點超時,一直在思考一個高效可是比較優雅的作法,能夠用一個同步調用而後等待它的超時,可是顯然很低效,尤爲我沒有用更多線程的狀況,一旦阻塞了就等於該端口全部事件都被阻塞了。因此必須用異步操做,可是異步操做很難去控制它的精確事件,固然,我能夠在每一個事件來的時候檢查一遍是否超時,可是顯然也是浪費和低效。那麼,剩下的只有採用跟tomcat相似的方式了,增長一個線程來監控,固然,這個監控線程最好是全局的,能監控全部crawler中全部事務的超時。另外,超時若是控制不當,容易致使內存沒有回收以致於內存泄露,也值得注意。超時線程是否會與其餘線程互相影響也應當仔細檢查。
最初超時的控制沒處理好,出現了ping storm,運行必定時間後大多數桶已經滿了,若是按照協議中的方式去跑的話會發現大量的事件都是在ping以確認這個節點是否ok以致於大量的cpu用於處理ping和ping響應。深刻理解後發現,檢查節點狀態是不須要的,由於節點狀態只是爲了提供給詢問的人一些好的節點,既然如此,能夠將每次過來的節點替換當前桶中最老的節點,如此一來,咱們將老是保存着最新的節點。
搜索算法也是比較讓我困惑的地方,簡而言之,搜索的目的並非真正去找資源,而是去認識那些可以保存你的節點。爲何說是可以保存你,由於離你越遠,桶的數量越少,這樣一來,要想進他們的桶中去相對來講就比較困難,因此搜索的目標按理應該是附近的節點最好,可是不能排除遠方節點也可能保存你的狀況,這種狀況會發生在遠方節點初始化時或者遠方節點的桶中節點超時的時候,但總而言之,機率要小些。因此搜索算法也不該該不作判斷就胡亂搜索,可是也不該該將搜索的距離嚴格限制在附近,因此這是一個權衡問題,暫時沒有想到好的方式,以爲暫時讓距離遠的以必定機率發生,而距離近的必然發生
還有一點,就是搜索速度問題,由於DHT網絡的這種結構,決定了一個節點所認識的其餘節點必然是有限的附近節點,因而每一個節點在必定時間段內能拿到的資源數必然是有限的,因此應當分配多個節點同時去抓取,而抓取資源的數量很大程度上就跟分配節點的多少有關了。
最後一個值得優化的地方是findnodes方法,以前的方式是把一個桶中全部數據拿出來排序,而後取其中前K個返回回去,可是實際上咱們作了不少額外的工做,這是經典的topN問題,使用排序明顯是浪費時間的,由於這個操做很是頻繁,因此即使全部保存的節點加起來不多((160 - 4) * 8),也會必定程度上增長時間。而採用的算法是在一篇論文《可擴展的DHT網絡爬蟲設計和優化》中找到的,基本公式是IDi = IDj xor 2 ^(160 - i),這樣,已知IDi和i就能知道IDj,若已知IDi和IDj就能知道i,經過這種方式,能夠快速的查找該桶A附近的其餘桶(顯然是離桶A層次最近的桶中的節點距離A次近),比起所有遍歷再查找效率要高很多。
dht協議http://www.bittorrent.org/beps/bep_0005.html 及其翻譯http://gobismoon.blog.163.com/blog/static/5244280220100893055533/
爬蟲源碼參考別人的,非原創,只爲學習
1 #encoding: utf-8 2 3 from hashlib import sha1 4 from random import randint 5 from struct import unpack, pack 6 from socket import inet_aton, inet_ntoa 7 from bisect import bisect_left 8 from threading import Timer 9 from time import sleep 10 11 from bencode import bencode, bdecode 12 13 BOOTSTRAP_NODES = [ 14 ("router.bittorrent.com", 6881), 15 ("dht.transmissionbt.com", 6881), 16 ("router.utorrent.com", 6881) 17 ] 18 TID_LENGTH = 4 19 KRPC_TIMEOUT = 10 20 REBORN_TIME = 5 * 60 21 K = 8 22 23 def entropy(bytes): 24 s = "" 25 for i in range(bytes): 26 s += chr(randint(0, 255)) 27 return s 28 29 # """把爬蟲"假裝"成正常node, 一個正常的node有ip, port, node ID三個屬性, 由於是基於UDP協議, 30 # 因此向對方發送信息時, 即便沒"明確"說明本身的ip和port時, 對方天然會知道你的ip和port, 31 # 反之亦然. 那麼咱們自身node就只須要生成一個node ID就行, 協議裏說到node ID用sha1算法生成, 32 # sha1算法生成的值是長度是20 byte, 也就是20 * 8 = 160 bit, 正好如DHT協議裏說的那範圍: 0 至 2的160次方, 33 # 也就是總共能生成1461501637330902918203684832716283019655932542976個獨一無二的node. 34 # ok, 因爲sha1老是生成20 byte的值, 因此哪怕你寫SHA1(20)或SHA1(19)或SHA1("I am a 2B")均可以, 35 # 只要保證大大下降與別人重複概率就行. 注意, node ID非十六進制, 36 # 也就是說非FF5C85FE1FDB933503999F9EB2EF59E4B0F51ECA這個樣子, 即非hash.hexdigest(). """ 37 def random_id(): 38 hash = sha1() 39 hash.update( entropy(20) ) 40 return hash.digest() 41 42 def decode_nodes(nodes): 43 n = [] 44 length = len(nodes) 45 if (length % 26) != 0: 46 return n 47 for i in range(0, length, 26): 48 nid = nodes[i:i+20] 49 ip = inet_ntoa(nodes[i+20:i+24]) 50 port = unpack("!H", nodes[i+24:i+26])[0] 51 n.append( (nid, ip, port) ) 52 return n 53 54 def encode_nodes(nodes): 55 strings = [] 56 for node in nodes: 57 s = "%s%s%s" % (node.nid, inet_aton(node.ip), pack("!H", node.port)) 58 strings.append(s) 59 60 return "".join(strings) 61 62 def intify(hstr): 63 #"""這是一個小工具, 把一個node ID轉換爲數字. 後面會頻繁用到.""" 64 return long(hstr.encode('hex'), 16) #先轉換成16進制, 再變成數字 65 66 def timer(t, f): 67 Timer(t, f).start() 68 69 70 class BucketFull(Exception): 71 pass 72 73 74 class KRPC(object): 75 def __init__(self): 76 self.types = { 77 "r": self.response_received, 78 "q": self.query_received 79 } 80 self.actions = { 81 "ping": self.ping_received, 82 "find_node": self.find_node_received, 83 "get_peers": self.get_peers_received, 84 "announce_peer": self.announce_peer_received, 85 } 86 87 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 88 self.socket.bind(("0.0.0.0", self.port)) 89 90 def response_received(self, msg, address): 91 self.find_node_handler(msg) 92 93 def query_received(self, msg, address): 94 try: 95 self.actions[msg["q"]](msg, address) 96 except KeyError: 97 pass 98 99 def send_krpc(self, msg, address): 100 try: 101 self.socket.sendto(bencode(msg), address) 102 except: 103 pass 104 105 106 class Client(KRPC): 107 def __init__(self, table): 108 self.table = table 109 110 timer(KRPC_TIMEOUT, self.timeout) 111 timer(REBORN_TIME, self.reborn) 112 KRPC.__init__(self) 113 114 def find_node(self, address, nid=None): 115 nid = self.get_neighbor(nid) if nid else self.table.nid 116 tid = entropy(TID_LENGTH) 117 118 msg = { 119 "t": tid, 120 "y": "q", 121 "q": "find_node", 122 "a": {"id": nid, "target": random_id()} 123 } 124 self.send_krpc(msg, address) 125 126 def find_node_handler(self, msg): 127 try: 128 nodes = decode_nodes(msg["r"]["nodes"]) 129 for node in nodes: 130 (nid, ip, port) = node 131 if len(nid) != 20: continue 132 if nid == self.table.nid: continue 133 self.find_node( (ip, port), nid ) 134 except KeyError: 135 pass 136 137 def joinDHT(self): 138 for address in BOOTSTRAP_NODES: 139 self.find_node(address) 140 141 def timeout(self): 142 if len( self.table.buckets ) < 2: 143 self.joinDHT() 144 timer(KRPC_TIMEOUT, self.timeout) 145 146 def reborn(self): 147 self.table.nid = random_id() 148 self.table.buckets = [ KBucket(0, 2**160) ] 149 timer(REBORN_TIME, self.reborn) 150 151 def start(self): 152 self.joinDHT() 153 154 while True: 155 try: 156 (data, address) = self.socket.recvfrom(65536) 157 msg = bdecode(data) 158 self.types[msg["y"]](msg, address) 159 except Exception: 160 pass 161 162 def get_neighbor(self, target): 163 return target[:10]+random_id()[10:] 164 165 166 class Server(Client): 167 def __init__(self, master, table, port): 168 self.table = table 169 self.master = master 170 self.port = port 171 Client.__init__(self, table) 172 173 def ping_received(self, msg, address): 174 try: 175 nid = msg["a"]["id"] 176 msg = { 177 "t": msg["t"], 178 "y": "r", 179 "r": {"id": self.get_neighbor(nid)} 180 } 181 self.send_krpc(msg, address) 182 self.find_node(address, nid) 183 except KeyError: 184 pass 185 186 def find_node_received(self, msg, address): 187 try: 188 target = msg["a"]["target"] 189 neighbors = self.table.get_neighbors(target) 190 191 nid = msg["a"]["id"] 192 msg = { 193 "t": msg["t"], 194 "y": "r", 195 "r": { 196 "id": self.get_neighbor(target), 197 "nodes": encode_nodes(neighbors) 198 } 199 } 200 self.table.append(KNode(nid, *address)) 201 self.send_krpc(msg, address) 202 self.find_node(address, nid) 203 except KeyError: 204 pass 205 206 def get_peers_received(self, msg, address): 207 try: 208 infohash = msg["a"]["info_hash"] 209 210 neighbors = self.table.get_neighbors(infohash) 211 212 nid = msg["a"]["id"] 213 msg = { 214 "t": msg["t"], 215 "y": "r", 216 "r": { 217 "id": self.get_neighbor(infohash), 218 "nodes": encode_nodes(neighbors) 219 } 220 } 221 self.table.append(KNode(nid, *address)) 222 self.send_krpc(msg, address) 223 self.master.log(infohash) 224 self.find_node(address, nid) 225 except KeyError: 226 pass 227 228 def announce_peer_received(self, msg, address): 229 try: 230 infohash = msg["a"]["info_hash"] 231 nid = msg["a"]["id"] 232 233 msg = { 234 "t": msg["t"], 235 "y": "r", 236 "r": {"id": self.get_neighbor(infohash)} 237 } 238 239 self.table.append(KNode(nid, *address)) 240 self.send_krpc(msg, address) 241 self.master.log(infohash) 242 self.find_node(address, nid) 243 except KeyError: 244 pass 245 # 該類只實例化一次. 246 class KTable(object): 247 # 這裏的nid就是經過node_id()函數生成的自身node ID. 協議裏說道, 每一個路由表至少有一個bucket, 248 # 還規定第一個bucket的min=0, max=2^160次方, 因此這裏就給予了一個buckets屬性來存儲bucket, 這個是列表. 249 def __init__(self, nid): 250 self.nid = nid 251 self.buckets = [ KBucket(0, 2**160) ] 252 253 def append(self, node): 254 index = self.bucket_index(node.nid) 255 try: 256 bucket = self.buckets[index] 257 bucket.append(node) 258 except IndexError: 259 return 260 except BucketFull: 261 if not bucket.in_range(self.nid): 262 return 263 self.split_bucket(index) 264 self.append(node) 265 266 267 # 返回與目標node ID或infohash的最近K個node. 268 269 # 定位出與目標node ID或infohash所在的bucket, 若是該bucuck有K個節點, 返回. 270 # 若是不夠到K個節點的話, 把該bucket前面的bucket和該bucket後面的bucket加起來, 只返回前K個節點. 271 # 仍是不到K個話, 再重複這個動做. 要注意不要超出最小和最大索引範圍. 272 # 總之, 無論你用什麼算法, 想盡辦法找出最近的K個節點. 273 def get_neighbors(self, target): 274 nodes = [] 275 if len(self.buckets) == 0: return nodes 276 if len(target) != 20 : return nodes 277 278 index = self.bucket_index(target) 279 try: 280 nodes = self.buckets[index].nodes 281 min = index - 1 282 max = index + 1 283 284 while len(nodes) < K and ((min >= 0) or (max < len(self.buckets))): 285 if min >= 0: 286 nodes.extend(self.buckets[min].nodes) 287 288 if max < len(self.buckets): 289 nodes.extend(self.buckets[max].nodes) 290 291 min -= 1 292 max += 1 293 294 num = intify(target) 295 nodes.sort(lambda a, b, num=num: cmp(num^intify(a.nid), num^intify(b.nid))) 296 return nodes[:K] #K是個常量, K=8 297 except IndexError: 298 return nodes 299 300 def bucket_index(self, target): 301 return bisect_left(self.buckets, intify(target)) 302 303 304 # 拆表 305 306 # index是待拆分的bucket(old bucket)的所在索引值. 307 # 假設這個old bucket的min:0, max:16. 拆分該old bucket的話, 分界點是8, 而後把old bucket的max改成8, min仍是0. 308 # 建立一個新的bucket, new bucket的min=8, max=16. 309 # 而後根據的old bucket中的各個node的nid, 看看是屬於哪一個bucket的範圍裏, 就裝到對應的bucket裏. 310 # 各回各家,各找各媽. 311 # new bucket的所在索引值就在old bucket後面, 即index+1, 把新的bucket插入到路由表裏. 312 def split_bucket(self, index): 313 old = self.buckets[index] 314 point = old.max - (old.max - old.min)/2 315 new = KBucket(point, old.max) 316 old.max = point 317 self.buckets.insert(index + 1, new) 318 for node in old.nodes[:]: 319 if new.in_range(node.nid): 320 new.append(node) 321 old.remove(node) 322 323 def __iter__(self): 324 for bucket in self.buckets: 325 yield bucket 326 327 328 class KBucket(object): 329 __slots__ = ("min", "max", "nodes") 330 331 # min和max就是該bucket負責的範圍, 好比該bucket的min:0, max:16的話, 332 # 那麼存儲的node的intify(nid)值均爲: 0到15, 那16就不負責, 這16將會是該bucket後面的bucket的min值. 333 # nodes屬性就是個列表, 存儲node. last_accessed表明最後訪問時間, 由於協議裏說到, 334 # 當該bucket負責的node有請求, 迴應操做; 刪除node; 添加node; 更新node; 等這些操做時, 335 # 那麼就要更新該bucket, 因此設置個last_accessed屬性, 該屬性標誌着這個bucket的"新鮮程度". 用linux話來講, touch一下. 336 # 這個用來便於後面說的定時刷新路由表. 337 338 def __init__(self, min, max): 339 self.min = min 340 self.max = max 341 self.nodes = [] 342 343 344 # 添加node, 參數node是KNode實例. 345 346 # 若是新插入的node的nid屬性長度不等於20, 終止. 347 # 若是滿了, 拋出bucket已滿的錯誤, 終止. 通知上層代碼進行拆表. 348 # 若是未滿, 先看看新插入的node是否已存在, 若是存在, 就替換掉, 不存在, 就添加, 349 # 添加/替換時, 更新該bucket的"新鮮程度". 350 def append(self, node): 351 if node in self: 352 self.remove(node) 353 self.nodes.append(node) 354 else: 355 if len(self) < K: 356 self.nodes.append(node) 357 else: 358 raise BucketFull 359 360 def remove(self, node): 361 self.nodes.remove(node) 362 363 def in_range(self, target): 364 return self.min <= intify(target) < self.max 365 366 def __len__(self): 367 return len(self.nodes) 368 369 def __contains__(self, node): 370 return node in self.nodes 371 372 def __iter__(self): 373 for node in self.nodes: 374 yield node 375 376 def __lt__(self, target): 377 return self.max <= target 378 379 380 class KNode(object): 381 # """ 382 # nid就是node ID的簡寫, 就不取id這麼模糊的變量名了. __init__方法至關於別的OOP語言中的構造方法, 383 # 在python嚴格來講不是構造方法, 它是初始化, 不過, 功能差很少就行. 384 # """ 385 __slots__ = ("nid", "ip", "port") 386 387 def __init__(self, nid, ip, port): 388 self.nid = nid 389 self.ip = ip 390 self.port = port 391 392 def __eq__(self, other): 393 return self.nid == other.nid 394 395 396 397 #using example 398 class Master(object): 399 def __init__(self, f): 400 self.f = f 401 402 def log(self, infohash): 403 self.f.write(infohash.encode("hex")+"\n") 404 self.f.flush() 405 try: 406 f = open("infohash.log", "a") 407 m = Master(f) 408 s = Server(Master(f), KTable(random_id()), 8001) 409 s.start() 410 except KeyboardInterrupt: 411 s.socket.close() 412 f.close()
種子從迅雷下,初期爲學習從http://torrage.com/sync/下的infohash,去重用了別人寫的Bloom Filter算法,數據庫用Mysql,建表語句以下,其中uinthash是根據infohash的頭四個字節和最後四個字節組成的一個int整數,先這樣設計,看後期查詢的時候用獲得不,總以爲用infohash來查很慢
1 CREATE TABLE `torrentinfo` ( 2 `id` int(11) NOT NULL AUTO_INCREMENT, 3 `infohash` char(40) NOT NULL DEFAULT '', 4 `filename` varchar(128) DEFAULT NULL, 5 `filelength` bigint(11) DEFAULT NULL, 6 `recvtime` datetime DEFAULT NULL, 7 `filecontent` text, 8 `uinthash` int(11) unsigned NOT NULL DEFAULT '0', 9 PRIMARY KEY (`id`), 10 KEY `uinthash_index` (`uinthash`) 11 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Thunder.py
1 # _*_ coding: utf-8 _*_ 2 import socket 3 import os,glob 4 import time as time_p 5 import requests 6 from bencode import bdecode, BTL 7 from torrent import * 8 import threading, signal 9 import MySQLdb 10 from BloomFilter import * 11 12 class Thunder(object): 13 def __init__(self): 14 self.connstr={'host':'127.0.0.1','user':'root','passwd':'123456','port':3306,'charset':"UTF8"} 15 def download(self, infohash): 16 try: 17 tc = self._download(infohash) 18 if(tc==-1): 19 return 20 tc = bdecode(tc) 21 info = torrentInfo(tc) 22 # print info['name'] 23 # print info['length'] 24 # print info['files'] 25 uint=int(infohash[:4]+infohash[-4:],16) 26 time_now=time_p.strftime('%Y-%m-%d %H:%M:%S',time_p.localtime(time_p.time())) 27 sql="insert into torrentinfo(infohash,filename,filelength,recvtime,filecontent,uinthash) values('%s','%s','%d','%s','%s','%d')"%(infohash,MySQLdb.escape_string(info['name']),info['length'],time_now,MySQLdb.escape_string(info['files']),uint) 28 self.executeSQL(sql) 29 except Exception,e: 30 print e 31 pass 32 33 def openConnection(self): 34 try: 35 self.conn=MySQLdb.connect(**self.connstr) 36 self.cur=self.conn.cursor() 37 self.conn.select_db('dht') 38 except MySQLdb.Error,e: 39 print 'mysql error %d:%s'%(e.args[0],e.args[1]) 40 41 42 def executeSQL(self,sql): 43 try: 44 self.cur.execute(sql) 45 self.conn.commit() 46 except MySQLdb.Error,e: 47 print 'mysql error %d:%s'%(e.args[0],e.args[1]) 48 def closeConnection(self): 49 try: 50 self.cur.close() 51 self.conn.close() 52 except MySQLdb.Error,e: 53 print 'mysql error %d:%s'%(e.args[0],e.args[1]) 54 55 def _download(self, infohash): 56 infohash = infohash.upper() 57 start = infohash[0:2] 58 end = infohash[-2:] 59 url = "http://bt.box.n0808.com/%s/%s/%s.torrent" % (start, end, infohash) 60 headers = { 61 "Referer": "http://bt.box.n0808.com" 62 } 63 try: 64 r = requests.get(url, headers=headers, timeout=10) 65 if r.status_code == 200: 66 # f=open("d:\\"+infohash+'.torrent','wb') 67 # f.write(r.content) 68 # f.close() 69 return r.content 70 except (socket.timeout, requests.exceptions.Timeout), e: 71 pass 72 return -1 73 74 class torrentBean(object): 75 """docstring for torrentBean""" 76 __slots__=('infohash','filename','recvtime','filecontent','uinthash') 77 78 def __init__(self, infohash,filename,recvtime,filecontent,uinthash): 79 super(torrentBean, self).__init__() 80 self.infohash = infohash 81 self.filename = filename 82 self.recvtime = recvtime 83 self.filecontent = filecontent 84 self.uinthash = uinthash 85 86 87 bf = BloomFilter(0.001, 1000000) 88 a=Thunder() 89 a.openConnection() 90 # info_hash="a02d2735e6e1daa6f7d58f21bd7340a7b7c4b7a5" 91 # info_hash='cf3a6a4f07da0b90beddae838462ca0012bef285' 92 # a.download('cf3a6a4f07da0b90beddae838462ca0012bef285') 93 94 95 files=glob.glob('./*.txt') 96 for fl in files: 97 print os.path.basename(fl) 98 f=open(fl,'r') 99 for line in f: 100 infohash=line.strip('\n') 101 if not bf.is_element_exist(infohash): 102 bf.insert_element(infohash) 103 a.download(infohash) 104 a.closeConnection()
torrent種子文件通過bencode解析,獲取key爲info對應value值,種子大體的格式以下,有亂碼,不影響觀看
{ 'files': [{ 'path': ['PGD660.avi'], 'length': 1367405512, 'filehash': 'J\xef\xfe\xb3K\xd4g\x8d\x07m\x03\xbb\xb3\xadt\xa1\xa0\xf0\xec\xab', 'ed2k': '/\xfb\xe55#n\xbd1\xb6\x1c\x0f\xf3\xe4\x9dP\xfb', 'path.utf-8': ['PGD660.avi'] }, { 'path': ['PGD660B.jpg'], 'length': 135899, 'filehash': '*$O\x17w\xe9E\x95>O\x1f\xfb\x0e\x9b\x16\x15B\\Q\x9d', 'ed2k': 'T/L*\xbb\x8e.\xe2d\xddu\nR\x07\xca\x19', 'path.utf-8': ['PGD660B.jpg'] }, { 'path': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba@\xe6\x9c\x80\xe6\x96\xb0\xe5\x9c\xb0\xe5\x9d\x80.mht'], 'length': 472, 'filehash': '&\xa92\xb7\xdd8\xeel3\xcc-S\x07\xb5e\xd35\xc0\xb7r', 'ed2k': '\x13\xd2 a\x0cA\xb4\xf2X\x12\xea\xd4\xe8\xac`\x92', 'path.utf-8': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba@\xe6\x9c\x80\xe6\x96\xb0\xe5\x9c\xb0\xe5\x9d\x80.mht'] }, { 'path': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba\xe5\xae\xa3\xe4\xbc\xa0.txt'], 'length': 363, 'filehash': '\x96nA*\xe2\xb6Y+[\xe3\xaf\xd4\x14A\x94\xf5@\xcd\xc1\x91', 'ed2k': '8V\xa6X\xd9\x82l\xdbNO8\xe8D\xe9E\xed', 'path.utf-8': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba\xe5\xae\xa3\xe4\xbc\xa0.txt'] }, { 'path': ['\xe2\x98\x85\xe5\xb0\x91\xe5\xa6\x87 \xe8\xae\xba\xe5\x9d\x9b \xe9\x99\x90\xe9\x87\x8f\xe5\xbc\x80\xe6\x94\xbe\xe4\xb8\xad\xe3\x80\x82\xe3\x80\x82.mht'], 'length': 475, 'filehash': '\xec\xde\xeb-6\x86\x1avB\xdd\xd8q\x8b\x8f\xc06\xf0XX\x0e', 'ed2k': '\xa7\x8dU\xfd\xfc=\x12\x15>yE\x8f&A\xc2u', 'path.utf-8': ['\xe2\x98\x85\xe5\xb0\x91\xe5\xa6\x87 \xe8\xae\xba\xe5\x9d\x9b \xe9\x99\x90\xe9\x87\x8f\xe5\xbc\x80\xe6\x94\xbe\xe4\xb8\xad\xe3\x80\x82\xe3\x80\x82.mht'] }, { 'path': ['\xe6\x9f\x8f\xe6\x8b\x89\xe5\x9c\x96\xe7\xa7\x98\xe5\xaf\x86\xe8\x8a\xb1\xe5\x9c\x92.mht'], 'length': 478, 'filehash': "\xe4\xb5'Td\x0b=P\xc0\x9aG\xa2\xd7\xfapg\xc6.\x8e\xa7", 'ed2k': '\xdd\x8d\xbb\x0b\x04\xcb\x03O\xb1\x18"\x03\xb1\x1d\xba\x08', 'path.utf-8': ['\xe6\x9f\x8f\xe6\x8b\x89\xe5\x9c\x96\xe7\xa7\x98\xe5\xaf\x86\xe8\x8a\xb1\xe5\x9c\x92.mht'] }, { 'path': ['\xe7\xbe\x8e\xe5\xa5\xb3\xe4\xb8\x8a\xe9\x96\x80\xe6\x8f\xb4\xe4\xba\xa4\xe6\x9c\x8d\xe5\x8b\x99.mht'], 'length': 478, 'filehash': "\xe4\xb5'Td\x0b=P\xc0\x9aG\xa2\xd7\xfapg\xc6.\x8e\xa7", 'ed2k': '\xdd\x8d\xbb\x0b\x04\xcb\x03O\xb1\x18"\x03\xb1\x1d\xba\x08', 'path.utf-8': ['\xe7\xbe\x8e\xe5\xa5\xb3\xe4\xb8\x8a\xe9\x96\x80\xe6\x8f\xb4\xe4\xba\xa4\xe6\x9c\x8d\xe5\x8b\x99.mht'] }], 'publisher': 'yoy123', 'piece length': 524288, 'name': 'PGD660 \xe6\x83\xb3\xe8\xa9\xa6\xe8\x91\x97\xe5\x85\xa8\xe5\x8a\x9b\xe6\x93\x8d\xe6\x93\x8d\xe7\x9c\x8b\xe9\x80\x99\xe5\x80\x8b\xe6\xb7\xab\xe8\x95\xa9\xe7\xbe\x8e\xe5\xa5\xb3\xe5\x97\x8e \xe5\xb0\x8f\xe5\xb7\x9d\xe3\x81\x82\xe3\x81\x95\xe7\xbe\x8e', 'publisher.utf-8': 'yoy123', }
解析代碼torrent.py
1 # _*_ coding: utf-8 _*_ 2 from time import time 3 4 def torrentInfo(torrentContent): 5 metadata = torrentContent["info"] 6 print metadata 7 info = { 8 "name": getName(metadata), 9 "length": calcLength(metadata), 10 "timestamp": getCreateDate(torrentContent), 11 "files": extraFiles(metadata) 12 } 13 return info 14 15 def calcLength(metadata): 16 length = 0 17 try: 18 length = metadata["length"] 19 except KeyError: 20 try: 21 for file in metadata["files"]: 22 length += file["length"] 23 except KeyError: 24 pass 25 return length 26 27 def extraFiles(metadata): 28 files = [] 29 try: 30 for file in metadata["files"]: 31 path = file["path.utf-8"] 32 size=file['length'] 33 if len(path) > 1: 34 main = path[0] 35 for f in path[1:2]: 36 files.append("%s/%s %d bytes" % (main, f,size)) 37 else: 38 files.append("%s %d bytes" % (path[0],size) ) 39 if files: 40 return '\r\n'.join(files) 41 else: 42 return getName(metadata) 43 except KeyError: 44 return getName(metadata) 45 46 def getName(metadata): 47 try: 48 name = metadata["name.utf-8"] 49 if name.strip()=="": 50 raise KeyError 51 except KeyError: 52 try: 53 name = metadata["name"] 54 if name.strip()=="": 55 raise KeyError 56 except KeyError: 57 name = getMaxFile(metadata) 58 59 return name 60 def getMaxFile(metadata): 61 try: 62 maxFile = metadata["files"][0] 63 for file in metadata["files"]: 64 if file["length"] > maxFile["length"]: 65 maxFile = file 66 name = maxFile["path"][0] 67 return name 68 except KeyError: 69 return "" 70 71 def getCreateDate(torrentContent): 72 try: 73 timestamp = torrentContent["creation date"] 74 except KeyError: 75 timestamp = int( time() ) 76 return timestamp
最後還有別人寫的BloomFilter代碼
1 #encoding: utf-8 2 ''' 3 Created on 2012-11-7 4 5 @author: palydawn 6 ''' 7 import cmath 8 from BitVector import BitVector 9 10 class BloomFilter(object): 11 def __init__(self, error_rate, elementNum): 12 #計算所須要的bit數 13 self.bit_num = -1 * elementNum * cmath.log(error_rate) / (cmath.log(2.0) * cmath.log(2.0)) 14 15 #四字節對齊 16 self.bit_num = self.align_4byte(self.bit_num.real) 17 18 #分配內存 19 self.bit_array = BitVector(size=self.bit_num) 20 21 #計算hash函數個數 22 self.hash_num = cmath.log(2) * self.bit_num / elementNum 23 24 self.hash_num = self.hash_num.real 25 26 #向上取整 27 self.hash_num = int(self.hash_num) + 1 28 29 #產生hash函數種子 30 self.hash_seeds = self.generate_hashseeds(self.hash_num) 31 32 def insert_element(self, element): 33 for seed in self.hash_seeds: 34 hash_val = self.hash_element(element, seed) 35 #取絕對值 36 hash_val = abs(hash_val) 37 #取模,防越界 38 hash_val = hash_val % self.bit_num 39 #設置相應的比特位 40 self.bit_array[hash_val] = 1 41 42 #檢查元素是否存在,存在返回true,不然返回false 43 def is_element_exist(self, element): 44 for seed in self.hash_seeds: 45 hash_val = self.hash_element(element, seed) 46 #取絕對值 47 hash_val = abs(hash_val) 48 #取模,防越界 49 hash_val = hash_val % self.bit_num 50 51 #查看值 52 if self.bit_array[hash_val] == 0: 53 return False 54 return True 55 56 #內存對齊 57 def align_4byte(self, bit_num): 58 num = int(bit_num / 32) 59 num = 32 * (num + 1) 60 return num 61 62 #產生hash函數種子,hash_num個素數 63 def generate_hashseeds(self, hash_num): 64 count = 0 65 #連續兩個種子的最小差值 66 gap = 50 67 #初始化hash種子爲0 68 hash_seeds = [] 69 for index in xrange(hash_num): 70 hash_seeds.append(0) 71 for index in xrange(10, 10000): 72 max_num = int(cmath.sqrt(1.0 * index).real) 73 flag = 1 74 for num in xrange(2, max_num): 75 if index % num == 0: 76 flag = 0 77 break 78 79 if flag == 1: 80 #連續兩個hash種子的差值要大才行 81 if count > 0 and (index - hash_seeds[count - 1]) < gap: 82 continue 83 hash_seeds[count] = index 84 count = count + 1 85 86 if count == hash_num: 87 break 88 return hash_seeds 89 90 def hash_element(self, element, seed): 91 hash_val = 1 92 for ch in str(element): 93 chval = ord(ch) 94 hash_val = hash_val * seed + chval 95 return hash_val 96 97 98 def SaveBitToFile(self,f): 99 self.bit_array.write_bits_to_fileobject(f) 100 pass
表內容見下圖