開博第一篇:DHT 爬蟲的學習記錄

通過一段時間的研究和學習,大體瞭解了DHT網絡的一些信息,大部分仍是參會別人的相關代碼,一方面主要對DHT爬蟲原理感興趣,最主要的是爲了學習python,大部分是別人的東西原理仍是引用別人的吧html

DHT網絡爬蟲的實現 | 學步園   http://www.xuebuyuan.com/1287052.htmlnode

DHT協議原理以及一些重點分析:python

   要作DHT的爬蟲,首先得透徹理解DHT,這樣才能知道在什麼地方究竟該應用什麼算法去解決問題。關於DHT協議的細節以及重要的參考文章,請參考文末1mysql

   DHT協議做爲BT協議的一個輔助,是很是好玩的。它主要是爲了在BT正式下載時獲得種子或者BT資源。傳統的網絡,須要一臺中央服務器存放種子或者BT資源,不只浪費服務器資源,還容易出現單點的各類問題,而DHT網絡則是爲了去中心化,也就是說任意時刻,這個網絡總有節點是亮的,你能夠去詢問問這些亮的節點,從而將本身加入DHT網絡。linux

   要實現DHT協議的網絡爬蟲,主要分3步,第一步是獲得資源信息(infohash,160bit,20字節,能夠編碼爲40字節的十六進制字符串),第二步是確認這些infohash是有效的,第三步是經過有效的infohash下載到BT的種子文件,從而獲得對這個資源的完整描述。算法

   其中第一步是其餘節點用DHT協議中的get_peers方法向爬蟲發送請求獲得的,第二步是其餘節點用DHT協議中的announce_peer向爬蟲發送請求獲得的,第三步能夠有幾種方式獲得,好比能夠去一些保存種子的網站根據infohash直接下載到,或者經過announce_peer的節點來下載到,具體如何實現,能夠取決於你本身的爬蟲。sql

   DHT協議中的主要幾個操做:數據庫

 

   主要負責經過UDP與外部節點交互,封裝4種基本操做的請求以及相應。數組

   ping:檢查一個節點是否「存活」tomcat

   在一個爬蟲裏主要有兩個地方用到ping,第一是初始路由表時,第二是驗證節點是否存活時

   find_node:向一個節點發送查找節點的請求

   在一個爬蟲中主要也是兩個地方用到find_node,第一是初始路由表時,第二是驗證桶是否存活時

   get_peers:向一個節點發送查找資源的請求

   在爬蟲中有節點向本身請求時不只像個正常節點同樣作出迴應,還須要以此資源的info_hash爲機會盡量多的去認識更多的節點。如圖,get_peers實際上最後一步是announce_peer,可是由於爬蟲不能announce_peer,因此實際上get_peers退化成了find_node操做。

   announce_peer:向一個節點發送本身已經開始下載某個資源的通知

   爬蟲中不能用announce_peer,由於這就至關於通報虛假資源,對方很容易從上下文中判斷你是否通報了虛假資源從而把你禁掉

   DHT協議中有幾個重點的須要澄清的地方:

   1. node與infohash一樣使用160bit的表示方式,160bit意味着整個節點空間有2^160 = 730750818665451459101842416358141509827966271488,是48位10進制,也就是說有百億億億億億個節點空間,這麼大的節點空間,是足夠存放你的主機節點以及任意的資源信息的。

   2. 每一個節點有張路由表。每張路由表由一堆K桶組成,所謂K桶,就是桶中最多隻能放K個節點,默認是8個。而桶的保存則是相似一顆前綴樹的方式。至關於一張8桶的路由表中最多有160-4個K桶。

   3. 根據DHT協議的規定,每一個infohash都是有位置的,所以,兩個infohash之間就有距離一說,而兩個infohash的距離就能夠用異或來表示,即infohash1 xor infohash2,也就是說,高位同樣的話,他們的距離就近,反之則遠,這樣能夠快速的計算兩個節點的距離。計算這個距離有什麼用呢,在DHT網絡中,若是一個資源的infohash與一個節點的infohash越近則該節點越有可能擁有該資源的信息,爲何呢?能夠想象,由於人人都用一樣的距離算法去遞歸的詢問離資源接近的節點,而且只要該節點作出了迴應,那麼就會獲得一個announce信息,也就是說跟資源infohash接近的節點就有更大的機率拿到該資源的infohash

   4. 根據上述算法,DHT中的查詢是跳躍式查詢,能夠迅速的跨越的的節點桶而接近目標節點桶。之因此在遠處可以大幅度跳躍,而在近處只能小幅度跳躍,緣由是每一個節點的路由表中離自身越接近的節點保存得越多,以下圖

   5. 在一個DHT網絡中當爬蟲並不容易,不像普通爬蟲同樣,看到資源就能夠主動爬下來,相反,由於獲得資源的方式(get_peers, announce_peer)都是被動的,因此爬蟲的方式就有些變化了,爬蟲所要作的事就是像個正常節點同樣去響應其餘節點的查詢,而且獲得其餘節點的迴應,把其中的數據收集下來就算是完成工做了。而爬蟲惟一能作的,是儘量的去多認識其餘節點,這樣,纔能有更多其餘節點來向你詢問。

   6. 有人說,那麼我把DHT爬蟲的K桶中的容量K增大是否是就能增長獲得資源的機會,其實否則,以前也分析過了,DHT爬蟲最重要的信息來源全是被動的,由於你不能增大別人的K,因此距離遠的節點保存你自身的機率就越小,固然距離遠的節點去請求你的機率相對也比較小。

   一些主要的組件(實際實現更加複雜一些,有其餘的模塊,這裏僅列舉主要幾個):

   DHT crawler

   這個就是DHT爬蟲的主邏輯,爲了簡化多線程問題,跟server用了生產者消費者模型,負責消費,而且複用server的端口。

   主要任務就是負責初始化,包括路由表的初始化,以及初始的請求。另外負責處理全部進來的消息事件,因爲生產者消費者模型的使用,裏面的操做都基本上是單線程的,簡化了很多問題,並且相信也比上鎖要提高速度(固然了,加鎖這步按理是放到了queue這裏了,不過對於這種生產者源源不斷生產的類型,能夠用ring-buffer大幅提高性能)。

   DHT server

   這裏是DHT爬蟲的服務器端,DHT網絡中的節點不單是client,也是server,因此要有server擔當生產者的角色,最初也是每一個消費者對應一個生產者,但實際上發現能夠利用IO多路複用來達到消息事件的目的,這樣一來大大簡化了系統中線程的數量,若是client能夠的話,也應該用一樣的方式來組織,這樣系統的速度應該會快不少。(還沒有驗證)

   DHT route table

   主要負責路由表的操做。

   路由表有以下操做:

   init:剛建立路由表時的操做。分兩種狀況:

   1. 若是以前已經初始化過,而且將上次路由表的數據保存下來,則只須要讀入保存數據。

   2. 若是以前沒有初始化過,則首先應當初始化。

   首先,應當有一個接入點,也就是說,你要想加進這個網絡,必須認識這個網絡中某個節點i並將i加入路由表,接下來對i用find_node詢問本身的hash_info,這裏巧妙的地方就在於,理論上經過必定數量的詢問就會找到離本身距離很近的節點(也就是通過必定步驟就會收斂)。find_node目的在於儘量早的讓本身有數據,而且讓網絡上別的節點知道本身,若是別人不認識你,就不會發送消息過來,意味着你也不能獲取到想要的信息。

   search:比較重要的方法,主要使用它來定位當前infohash所在的桶的位置。會被其餘各類代理方法調用到。

   findNodes:找到路由表中與傳入的infohash最近的k個節點

   getPeer:找到待查資源是否有peer(便是否有人在下載,也就是是否有人announce過)

   announcePeer:通知該資源正在被下載

   DHT bucket:

   acitiveNode:邏輯比較多,分以下幾點。

 

        1. 查找所要添加的節點對應路由表的桶是否已經滿,若是未滿,添加節點

        2. 若是已經滿,檢查該桶中是否包含爬蟲節點本身,若是不包含,拋棄待添加節點

        3. 若是該桶中包含本節點,則平均分裂該桶

   其餘的諸如locateNode
replaceNode
updateNode
removeNode
,就不一一說明了

   DHT torrent parser  

   主要從bt種子文件中解析出如下幾個重要的信息:name,size,file list(sub file name, sub file size),比較簡單,用bencode方向解碼就好了

   Utils

   distance:計算兩個資源之間的距離。在kad中用a xor b表示

 

   爲了增長難度,選用了不太熟悉的語言python,結果步步爲營,可是也感慨python的簡潔強大。在實現中,也碰到不少有意思的問題。好比如何保存一張路由表中的全部桶,以前想出來幾個辦法,甚至爲了節省資源,打算用bit數組+dict直接保存,可是由於估計最終的幾個操做不是很方便直觀容易出錯而放棄,選用的結構就是前綴樹,操做起來果真是沒有障礙;

   在超時問題上,好比桶超時和節點超時,一直在思考一個高效可是比較優雅的作法,能夠用一個同步調用而後等待它的超時,可是顯然很低效,尤爲我沒有用更多線程的狀況,一旦阻塞了就等於該端口全部事件都被阻塞了。因此必須用異步操做,可是異步操做很難去控制它的精確事件,固然,我能夠在每一個事件來的時候檢查一遍是否超時,可是顯然也是浪費和低效。那麼,剩下的只有採用跟tomcat相似的方式了,增長一個線程來監控,固然,這個監控線程最好是全局的,能監控全部crawler中全部事務的超時。另外,超時若是控制不當,容易致使內存沒有回收以致於內存泄露,也值得注意。超時線程是否會與其餘線程互相影響也應當仔細檢查。

   最初超時的控制沒處理好,出現了ping storm,運行必定時間後大多數桶已經滿了,若是按照協議中的方式去跑的話會發現大量的事件都是在ping以確認這個節點是否ok以致於大量的cpu用於處理ping和ping響應。深刻理解後發現,檢查節點狀態是不須要的,由於節點狀態只是爲了提供給詢問的人一些好的節點,既然如此,能夠將每次過來的節點替換當前桶中最老的節點,如此一來,咱們將老是保存着最新的節點。

   搜索算法也是比較讓我困惑的地方,簡而言之,搜索的目的並非真正去找資源,而是去認識那些可以保存你的節點。爲何說是可以保存你,由於離你越遠,桶的數量越少,這樣一來,要想進他們的桶中去相對來講就比較困難,因此搜索的目標按理應該是附近的節點最好,可是不能排除遠方節點也可能保存你的狀況,這種狀況會發生在遠方節點初始化時或者遠方節點的桶中節點超時的時候,但總而言之,機率要小些。因此搜索算法也不該該不作判斷就胡亂搜索,可是也不該該將搜索的距離嚴格限制在附近,因此這是一個權衡問題,暫時沒有想到好的方式,以爲暫時讓距離遠的以必定機率發生,而距離近的必然發生

   還有一點,就是搜索速度問題,由於DHT網絡的這種結構,決定了一個節點所認識的其餘節點必然是有限的附近節點,因而每一個節點在必定時間段內能拿到的資源數必然是有限的,因此應當分配多個節點同時去抓取,而抓取資源的數量很大程度上就跟分配節點的多少有關了。

   最後一個值得優化的地方是findnodes方法,以前的方式是把一個桶中全部數據拿出來排序,而後取其中前K個返回回去,可是實際上咱們作了不少額外的工做,這是經典的topN問題,使用排序明顯是浪費時間的,由於這個操做很是頻繁,因此即使全部保存的節點加起來不多((160 - 4) * 8),也會必定程度上增長時間。而採用的算法是在一篇論文《可擴展的DHT網絡爬蟲設計和優化》中找到的,基本公式是IDi = IDj xor 2 ^(160 - i),這樣,已知IDi和i就能知道IDj,若已知IDi和IDj就能知道i,經過這種方式,能夠快速的查找該桶A附近的其餘桶(顯然是離桶A層次最近的桶中的節點距離A次近),比起所有遍歷再查找效率要高很多。

  

    dht協議http://www.bittorrent.org/beps/bep_0005.html 及其翻譯http://gobismoon.blog.163.com/blog/static/5244280220100893055533/

 

爬蟲源碼參考別人的,非原創,只爲學習

 

 

  1 #encoding: utf-8
  2 
  3 from hashlib import sha1
  4 from random import randint
  5 from struct import unpack, pack
  6 from socket import inet_aton, inet_ntoa
  7 from bisect import bisect_left
  8 from threading import Timer
  9 from time import sleep
 10 
 11 from bencode import bencode, bdecode
 12 
 13 BOOTSTRAP_NODES = [
 14     ("router.bittorrent.com", 6881),
 15     ("dht.transmissionbt.com", 6881),
 16     ("router.utorrent.com", 6881)
 17 ] 
 18 TID_LENGTH = 4
 19 KRPC_TIMEOUT = 10
 20 REBORN_TIME = 5 * 60
 21 K = 8
 22 
 23 def entropy(bytes):
 24     s = ""
 25     for i in range(bytes):
 26         s += chr(randint(0, 255))
 27     return s
 28 
 29     # """把爬蟲"假裝"成正常node, 一個正常的node有ip, port, node ID三個屬性, 由於是基於UDP協議,   
 30     # 因此向對方發送信息時, 即便沒"明確"說明本身的ip和port時, 對方天然會知道你的ip和port,   
 31     # 反之亦然. 那麼咱們自身node就只須要生成一個node ID就行, 協議裏說到node ID用sha1算法生成,   
 32     # sha1算法生成的值是長度是20 byte, 也就是20 * 8 = 160 bit, 正好如DHT協議裏說的那範圍: 0 至 2的160次方,   
 33     # 也就是總共能生成1461501637330902918203684832716283019655932542976個獨一無二的node.   
 34     # ok, 因爲sha1老是生成20 byte的值, 因此哪怕你寫SHA1(20)或SHA1(19)或SHA1("I am a 2B")均可以,   
 35     # 只要保證大大下降與別人重複概率就行. 注意, node ID非十六進制,   
 36     # 也就是說非FF5C85FE1FDB933503999F9EB2EF59E4B0F51ECA這個樣子, 即非hash.hexdigest(). """
 37 def random_id():
 38     hash = sha1()
 39     hash.update( entropy(20) )
 40     return hash.digest()
 41 
 42 def decode_nodes(nodes):
 43     n = []
 44     length = len(nodes)
 45     if (length % 26) != 0: 
 46         return n
 47     for i in range(0, length, 26):
 48         nid = nodes[i:i+20]
 49         ip = inet_ntoa(nodes[i+20:i+24])
 50         port = unpack("!H", nodes[i+24:i+26])[0]
 51         n.append( (nid, ip, port) )
 52     return n
 53 
 54 def encode_nodes(nodes):
 55     strings = []
 56     for node in nodes:
 57         s = "%s%s%s" % (node.nid, inet_aton(node.ip), pack("!H", node.port))
 58         strings.append(s)
 59 
 60     return "".join(strings)
 61 
 62 def intify(hstr):
 63     #"""這是一個小工具, 把一個node ID轉換爲數字. 後面會頻繁用到.""" 
 64     return long(hstr.encode('hex'), 16)    #先轉換成16進制, 再變成數字
 65 
 66 def timer(t, f):
 67     Timer(t, f).start()
 68 
 69 
 70 class BucketFull(Exception):
 71     pass
 72 
 73 
 74 class KRPC(object):
 75     def __init__(self):
 76         self.types = {
 77             "r": self.response_received,
 78             "q": self.query_received
 79         }
 80         self.actions = {
 81             "ping": self.ping_received,
 82             "find_node": self.find_node_received,
 83             "get_peers": self.get_peers_received,
 84             "announce_peer": self.announce_peer_received,
 85         }
 86 
 87         self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 88         self.socket.bind(("0.0.0.0", self.port))
 89 
 90     def response_received(self, msg, address):
 91         self.find_node_handler(msg)
 92 
 93     def query_received(self, msg, address):
 94         try:
 95             self.actions[msg["q"]](msg, address)
 96         except KeyError:
 97             pass
 98 
 99     def send_krpc(self, msg, address):
100         try:
101             self.socket.sendto(bencode(msg), address)
102         except:
103             pass
104 
105 
106 class Client(KRPC):
107     def __init__(self, table):
108         self.table = table
109 
110         timer(KRPC_TIMEOUT, self.timeout)
111         timer(REBORN_TIME, self.reborn)
112         KRPC.__init__(self)
113 
114     def find_node(self, address, nid=None):
115         nid = self.get_neighbor(nid) if nid else self.table.nid
116         tid = entropy(TID_LENGTH)
117         
118         msg = {
119             "t": tid,
120             "y": "q",
121             "q": "find_node",
122             "a": {"id": nid, "target": random_id()}
123         }
124         self.send_krpc(msg, address)
125 
126     def find_node_handler(self, msg):
127         try:
128             nodes = decode_nodes(msg["r"]["nodes"])
129             for node in nodes:
130                 (nid, ip, port) = node
131                 if len(nid) != 20: continue
132                 if nid == self.table.nid: continue
133                 self.find_node( (ip, port), nid )
134         except KeyError:
135             pass
136 
137     def joinDHT(self):
138         for address in BOOTSTRAP_NODES: 
139             self.find_node(address)
140 
141     def timeout(self):
142         if len( self.table.buckets ) < 2:
143             self.joinDHT()
144         timer(KRPC_TIMEOUT, self.timeout)
145 
146     def reborn(self):
147         self.table.nid = random_id()
148         self.table.buckets = [ KBucket(0, 2**160) ]
149         timer(REBORN_TIME, self.reborn)
150 
151     def start(self):
152         self.joinDHT()
153 
154         while True:
155             try:
156                 (data, address) = self.socket.recvfrom(65536)
157                 msg = bdecode(data)
158                 self.types[msg["y"]](msg, address)
159             except Exception:
160                 pass
161 
162     def get_neighbor(self, target):
163         return target[:10]+random_id()[10:]
164 
165 
166 class Server(Client):
167     def __init__(self, master, table, port):
168         self.table = table
169         self.master = master
170         self.port = port
171         Client.__init__(self, table)
172 
173     def ping_received(self, msg, address):
174         try:
175             nid = msg["a"]["id"]
176             msg = {
177                 "t": msg["t"],
178                 "y": "r",
179                 "r": {"id": self.get_neighbor(nid)}
180             }
181             self.send_krpc(msg, address)
182             self.find_node(address, nid)
183         except KeyError:
184             pass
185 
186     def find_node_received(self, msg, address):
187         try:
188             target = msg["a"]["target"]
189             neighbors = self.table.get_neighbors(target)
190             
191             nid = msg["a"]["id"]
192             msg = {
193                 "t": msg["t"],
194                 "y": "r",
195                 "r": {
196                     "id": self.get_neighbor(target), 
197                     "nodes": encode_nodes(neighbors)
198                 }
199             }
200             self.table.append(KNode(nid, *address))
201             self.send_krpc(msg, address)
202             self.find_node(address, nid)
203         except KeyError:
204             pass
205 
206     def get_peers_received(self, msg, address):
207         try:
208             infohash = msg["a"]["info_hash"]
209 
210             neighbors = self.table.get_neighbors(infohash)
211 
212             nid = msg["a"]["id"]
213             msg = {
214                 "t": msg["t"],
215                 "y": "r",
216                 "r": {
217                     "id": self.get_neighbor(infohash), 
218                     "nodes": encode_nodes(neighbors)
219                 }
220             }
221             self.table.append(KNode(nid, *address))
222             self.send_krpc(msg, address)
223             self.master.log(infohash)
224             self.find_node(address, nid)
225         except KeyError:
226             pass
227 
228     def announce_peer_received(self, msg, address):
229         try:
230             infohash = msg["a"]["info_hash"]
231             nid = msg["a"]["id"]
232 
233             msg = { 
234                 "t": msg["t"],
235                 "y": "r",
236                 "r": {"id": self.get_neighbor(infohash)}
237             }
238 
239             self.table.append(KNode(nid, *address))
240             self.send_krpc(msg, address)
241             self.master.log(infohash)
242             self.find_node(address, nid)
243         except KeyError:
244             pass
245 # 該類只實例化一次. 
246 class KTable(object):
247     # 這裏的nid就是經過node_id()函數生成的自身node ID. 協議裏說道, 每一個路由表至少有一個bucket,   
248  #    還規定第一個bucket的min=0, max=2^160次方, 因此這裏就給予了一個buckets屬性來存儲bucket, 這個是列表.
249     def __init__(self, nid):
250         self.nid = nid
251         self.buckets = [ KBucket(0, 2**160) ]
252 
253     def append(self, node):
254         index = self.bucket_index(node.nid)
255         try:
256             bucket = self.buckets[index]
257             bucket.append(node)
258         except IndexError:
259             return
260         except BucketFull:
261             if not bucket.in_range(self.nid): 
262                 return
263             self.split_bucket(index)
264             self.append(node)
265 
266 
267         # 返回與目標node ID或infohash的最近K個node.  
268  
269         # 定位出與目標node ID或infohash所在的bucket, 若是該bucuck有K個節點, 返回.   
270         # 若是不夠到K個節點的話, 把該bucket前面的bucket和該bucket後面的bucket加起來, 只返回前K個節點.  
271         # 仍是不到K個話, 再重複這個動做. 要注意不要超出最小和最大索引範圍.  
272         # 總之, 無論你用什麼算法, 想盡辦法找出最近的K個節點.  
273     def get_neighbors(self, target):
274         nodes = []
275         if len(self.buckets) == 0: return nodes
276         if len(target) != 20 : return nodes
277 
278         index = self.bucket_index(target)
279         try:
280             nodes = self.buckets[index].nodes
281             min = index - 1
282             max = index + 1
283 
284             while len(nodes) < K and ((min >= 0) or (max < len(self.buckets))):
285                 if min >= 0:
286                     nodes.extend(self.buckets[min].nodes)
287 
288                 if max < len(self.buckets):
289                     nodes.extend(self.buckets[max].nodes)
290 
291                 min -= 1
292                 max += 1
293 
294             num = intify(target)
295             nodes.sort(lambda a, b, num=num: cmp(num^intify(a.nid), num^intify(b.nid)))
296             return nodes[:K] #K是個常量, K=8 
297         except IndexError:
298             return nodes
299 
300     def bucket_index(self, target):
301         return bisect_left(self.buckets, intify(target))
302 
303 
304         # 拆表  
305  
306         # index是待拆分的bucket(old bucket)的所在索引值.   
307         # 假設這個old bucket的min:0, max:16. 拆分該old bucket的話, 分界點是8, 而後把old bucket的max改成8, min仍是0.   
308         # 建立一個新的bucket, new bucket的min=8, max=16.  
309         # 而後根據的old bucket中的各個node的nid, 看看是屬於哪一個bucket的範圍裏, 就裝到對應的bucket裏.   
310         # 各回各家,各找各媽.  
311         # new bucket的所在索引值就在old bucket後面, 即index+1, 把新的bucket插入到路由表裏. 
312     def split_bucket(self, index):
313         old = self.buckets[index]
314         point = old.max - (old.max - old.min)/2
315         new = KBucket(point, old.max)
316         old.max = point
317         self.buckets.insert(index + 1, new)
318         for node in old.nodes[:]:
319             if new.in_range(node.nid):
320                 new.append(node)
321                 old.remove(node)
322 
323     def __iter__(self):
324         for bucket in self.buckets:
325             yield bucket
326 
327 
328 class KBucket(object):
329     __slots__ = ("min", "max", "nodes")
330 
331     # min和max就是該bucket負責的範圍, 好比該bucket的min:0, max:16的話,   
332     # 那麼存儲的node的intify(nid)值均爲: 0到15, 那16就不負責, 這16將會是該bucket後面的bucket的min值.   
333     # nodes屬性就是個列表, 存儲node. last_accessed表明最後訪問時間, 由於協議裏說到,   
334     # 當該bucket負責的node有請求, 迴應操做; 刪除node; 添加node; 更新node; 等這些操做時,   
335     # 那麼就要更新該bucket, 因此設置個last_accessed屬性, 該屬性標誌着這個bucket的"新鮮程度". 用linux話來講, touch一下.  
336     # 這個用來便於後面說的定時刷新路由表.  
337 
338     def __init__(self, min, max):
339         self.min = min
340         self.max = max
341         self.nodes = []
342 
343 
344     # 添加node, 參數node是KNode實例.  
345 
346     # 若是新插入的node的nid屬性長度不等於20, 終止.  
347     # 若是滿了, 拋出bucket已滿的錯誤, 終止. 通知上層代碼進行拆表.  
348     # 若是未滿, 先看看新插入的node是否已存在, 若是存在, 就替換掉, 不存在, 就添加,  
349     # 添加/替換時, 更新該bucket的"新鮮程度".  
350     def append(self, node):
351         if node in self:
352             self.remove(node)
353             self.nodes.append(node)
354         else:
355             if len(self) < K:
356                 self.nodes.append(node)
357             else:
358                 raise BucketFull
359 
360     def remove(self, node):
361         self.nodes.remove(node)
362 
363     def in_range(self, target):
364         return self.min <= intify(target) < self.max
365 
366     def __len__(self):
367         return len(self.nodes)
368 
369     def __contains__(self, node):
370         return node in self.nodes
371 
372     def __iter__(self):
373         for node in self.nodes:
374             yield node
375 
376     def __lt__(self, target):
377         return self.max <= target
378 
379 
380 class KNode(object):
381      # """  
382   #       nid就是node ID的簡寫, 就不取id這麼模糊的變量名了. __init__方法至關於別的OOP語言中的構造方法,   
383   #       在python嚴格來講不是構造方法, 它是初始化, 不過, 功能差很少就行.  
384   #       """ 
385     __slots__ = ("nid", "ip", "port")
386     
387     def __init__(self, nid, ip, port):
388         self.nid = nid
389         self.ip = ip
390         self.port = port
391 
392     def __eq__(self, other):
393         return self.nid == other.nid
394 
395 
396 
397 #using example
398 class Master(object):
399     def __init__(self, f):
400         self.f = f
401 
402     def log(self, infohash):
403         self.f.write(infohash.encode("hex")+"\n")
404         self.f.flush()
405 try:
406     f = open("infohash.log", "a")
407     m = Master(f)
408     s = Server(Master(f), KTable(random_id()), 8001)
409     s.start()     
410 except KeyboardInterrupt:
411     s.socket.close()
412     f.close()

 

種子從迅雷下,初期爲學習從http://torrage.com/sync/下的infohash,去重用了別人寫的Bloom Filter算法,數據庫用Mysql,建表語句以下,其中uinthash是根據infohash的頭四個字節和最後四個字節組成的一個int整數,先這樣設計,看後期查詢的時候用獲得不,總以爲用infohash來查很慢

 1 CREATE TABLE `torrentinfo` (
 2   `id` int(11) NOT NULL AUTO_INCREMENT,
 3   `infohash` char(40) NOT NULL DEFAULT '',
 4   `filename` varchar(128) DEFAULT NULL,
 5   `filelength` bigint(11) DEFAULT NULL,
 6   `recvtime` datetime DEFAULT NULL,
 7   `filecontent` text,
 8   `uinthash` int(11) unsigned NOT NULL DEFAULT '0',
 9   PRIMARY KEY (`id`),
10   KEY `uinthash_index` (`uinthash`)
11 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

 

Thunder.py

  1 # _*_ coding: utf-8 _*_
  2 import socket
  3 import os,glob
  4 import time as time_p
  5 import requests
  6 from bencode import bdecode, BTL
  7 from torrent import *
  8 import threading, signal
  9 import MySQLdb
 10 from BloomFilter import *
 11 
 12 class Thunder(object):
 13     def __init__(self):
 14         self.connstr={'host':'127.0.0.1','user':'root','passwd':'123456','port':3306,'charset':"UTF8"}
 15     def download(self, infohash):
 16         try:
 17             tc = self._download(infohash)
 18             if(tc==-1):
 19                 return
 20             tc = bdecode(tc)
 21             info = torrentInfo(tc)
 22             # print info['name']
 23             # print info['length']
 24             # print info['files']
 25             uint=int(infohash[:4]+infohash[-4:],16)
 26             time_now=time_p.strftime('%Y-%m-%d %H:%M:%S',time_p.localtime(time_p.time()))
 27             sql="insert into torrentinfo(infohash,filename,filelength,recvtime,filecontent,uinthash) values('%s','%s','%d','%s','%s','%d')"%(infohash,MySQLdb.escape_string(info['name']),info['length'],time_now,MySQLdb.escape_string(info['files']),uint)
 28             self.executeSQL(sql)
 29         except Exception,e:
 30             print  e
 31             pass
 32 
 33     def openConnection(self):
 34         try:
 35             self.conn=MySQLdb.connect(**self.connstr)
 36             self.cur=self.conn.cursor()
 37             self.conn.select_db('dht')
 38         except MySQLdb.Error,e:
 39             print 'mysql error %d:%s'%(e.args[0],e.args[1])
 40 
 41 
 42     def executeSQL(self,sql):
 43         try:
 44             self.cur.execute(sql)
 45             self.conn.commit()
 46         except MySQLdb.Error,e:
 47             print 'mysql error %d:%s'%(e.args[0],e.args[1])
 48     def closeConnection(self):
 49         try:
 50             self.cur.close()
 51             self.conn.close()
 52         except MySQLdb.Error,e:
 53             print 'mysql error %d:%s'%(e.args[0],e.args[1])    
 54 
 55     def _download(self, infohash):
 56         infohash = infohash.upper()
 57         start = infohash[0:2]
 58         end = infohash[-2:]
 59         url = "http://bt.box.n0808.com/%s/%s/%s.torrent" % (start, end, infohash)
 60         headers = {
 61             "Referer": "http://bt.box.n0808.com"
 62         }
 63         try:
 64             r = requests.get(url, headers=headers, timeout=10)
 65             if r.status_code == 200:
 66                 # f=open("d:\\"+infohash+'.torrent','wb')
 67                 # f.write(r.content)
 68                 # f.close()
 69                 return r.content
 70         except (socket.timeout, requests.exceptions.Timeout), e:
 71             pass
 72         return -1
 73 
 74 class torrentBean(object):
 75     """docstring for torrentBean"""
 76     __slots__=('infohash','filename','recvtime','filecontent','uinthash')
 77 
 78     def __init__(self, infohash,filename,recvtime,filecontent,uinthash):
 79         super(torrentBean, self).__init__()
 80         self.infohash = infohash
 81         self.filename = filename
 82         self.recvtime = recvtime
 83         self.filecontent = filecontent
 84         self.uinthash = uinthash
 85 
 86 
 87 bf = BloomFilter(0.001, 1000000)
 88 a=Thunder()
 89 a.openConnection()
 90 # info_hash="a02d2735e6e1daa6f7d58f21bd7340a7b7c4b7a5"
 91 # info_hash='cf3a6a4f07da0b90beddae838462ca0012bef285'
 92 # a.download('cf3a6a4f07da0b90beddae838462ca0012bef285')
 93 
 94 
 95 files=glob.glob('./*.txt')
 96 for fl in files:
 97     print os.path.basename(fl)
 98     f=open(fl,'r')
 99     for line in f:
100         infohash=line.strip('\n')
101         if not bf.is_element_exist(infohash):
102             bf.insert_element(infohash)
103             a.download(infohash)
104 a.closeConnection()

 

  torrent種子文件通過bencode解析,獲取key爲info對應value值,種子大體的格式以下,有亂碼,不影響觀看

{
    'files': [{
        'path': ['PGD660.avi'],
        'length': 1367405512,
        'filehash': 'J\xef\xfe\xb3K\xd4g\x8d\x07m\x03\xbb\xb3\xadt\xa1\xa0\xf0\xec\xab',
        'ed2k': '/\xfb\xe55#n\xbd1\xb6\x1c\x0f\xf3\xe4\x9dP\xfb',
        'path.utf-8': ['PGD660.avi']
    }, {
        'path': ['PGD660B.jpg'],
        'length': 135899,
        'filehash': '*$O\x17w\xe9E\x95>O\x1f\xfb\x0e\x9b\x16\x15B\\Q\x9d',
        'ed2k': 'T/L*\xbb\x8e.\xe2d\xddu\nR\x07\xca\x19',
        'path.utf-8': ['PGD660B.jpg']
    }, {
        'path': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba@\xe6\x9c\x80\xe6\x96\xb0\xe5\x9c\xb0\xe5\x9d\x80.mht'],
        'length': 472,
        'filehash': '&\xa92\xb7\xdd8\xeel3\xcc-S\x07\xb5e\xd35\xc0\xb7r',
        'ed2k': '\x13\xd2 a\x0cA\xb4\xf2X\x12\xea\xd4\xe8\xac`\x92',
        'path.utf-8': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba@\xe6\x9c\x80\xe6\x96\xb0\xe5\x9c\xb0\xe5\x9d\x80.mht']
    }, {
        'path': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba\xe5\xae\xa3\xe4\xbc\xa0.txt'],
        'length': 363,
        'filehash': '\x96nA*\xe2\xb6Y+[\xe3\xaf\xd4\x14A\x94\xf5@\xcd\xc1\x91',
        'ed2k': '8V\xa6X\xd9\x82l\xdbNO8\xe8D\xe9E\xed',
        'path.utf-8': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba\xe5\xae\xa3\xe4\xbc\xa0.txt']
    }, {
        'path': ['\xe2\x98\x85\xe5\xb0\x91\xe5\xa6\x87 \xe8\xae\xba\xe5\x9d\x9b \xe9\x99\x90\xe9\x87\x8f\xe5\xbc\x80\xe6\x94\xbe\xe4\xb8\xad\xe3\x80\x82\xe3\x80\x82.mht'],
        'length': 475,
        'filehash': '\xec\xde\xeb-6\x86\x1avB\xdd\xd8q\x8b\x8f\xc06\xf0XX\x0e',
        'ed2k': '\xa7\x8dU\xfd\xfc=\x12\x15>yE\x8f&A\xc2u',
        'path.utf-8': ['\xe2\x98\x85\xe5\xb0\x91\xe5\xa6\x87 \xe8\xae\xba\xe5\x9d\x9b \xe9\x99\x90\xe9\x87\x8f\xe5\xbc\x80\xe6\x94\xbe\xe4\xb8\xad\xe3\x80\x82\xe3\x80\x82.mht']
    }, {
        'path': ['\xe6\x9f\x8f\xe6\x8b\x89\xe5\x9c\x96\xe7\xa7\x98\xe5\xaf\x86\xe8\x8a\xb1\xe5\x9c\x92.mht'],
        'length': 478,
        'filehash': "\xe4\xb5'Td\x0b=P\xc0\x9aG\xa2\xd7\xfapg\xc6.\x8e\xa7",
        'ed2k': '\xdd\x8d\xbb\x0b\x04\xcb\x03O\xb1\x18"\x03\xb1\x1d\xba\x08',
        'path.utf-8': ['\xe6\x9f\x8f\xe6\x8b\x89\xe5\x9c\x96\xe7\xa7\x98\xe5\xaf\x86\xe8\x8a\xb1\xe5\x9c\x92.mht']
    }, {
        'path': ['\xe7\xbe\x8e\xe5\xa5\xb3\xe4\xb8\x8a\xe9\x96\x80\xe6\x8f\xb4\xe4\xba\xa4\xe6\x9c\x8d\xe5\x8b\x99.mht'],
        'length': 478,
        'filehash': "\xe4\xb5'Td\x0b=P\xc0\x9aG\xa2\xd7\xfapg\xc6.\x8e\xa7",
        'ed2k': '\xdd\x8d\xbb\x0b\x04\xcb\x03O\xb1\x18"\x03\xb1\x1d\xba\x08',
        'path.utf-8': ['\xe7\xbe\x8e\xe5\xa5\xb3\xe4\xb8\x8a\xe9\x96\x80\xe6\x8f\xb4\xe4\xba\xa4\xe6\x9c\x8d\xe5\x8b\x99.mht']
    }],
    'publisher': 'yoy123',
    'piece length': 524288,
    'name': 'PGD660 \xe6\x83\xb3\xe8\xa9\xa6\xe8\x91\x97\xe5\x85\xa8\xe5\x8a\x9b\xe6\x93\x8d\xe6\x93\x8d\xe7\x9c\x8b\xe9\x80\x99\xe5\x80\x8b\xe6\xb7\xab\xe8\x95\xa9\xe7\xbe\x8e\xe5\xa5\xb3\xe5\x97\x8e \xe5\xb0\x8f\xe5\xb7\x9d\xe3\x81\x82\xe3\x81\x95\xe7\xbe\x8e',
    'publisher.utf-8': 'yoy123',
}

解析代碼torrent.py

 1 # _*_ coding: utf-8 _*_
 2 from time import time
 3 
 4 def torrentInfo(torrentContent):
 5     metadata = torrentContent["info"]
 6     print metadata
 7     info = {
 8         "name": getName(metadata),
 9         "length": calcLength(metadata),
10         "timestamp": getCreateDate(torrentContent),
11         "files": extraFiles(metadata)
12     }
13     return info
14 
15 def calcLength(metadata):
16     length = 0
17     try:
18         length = metadata["length"]
19     except KeyError:
20         try:
21             for file in metadata["files"]:
22                 length += file["length"]
23         except KeyError:
24             pass
25     return length
26 
27 def extraFiles(metadata):
28     files = []
29     try:
30         for file in metadata["files"]:
31             path = file["path.utf-8"]
32             size=file['length']
33             if len(path) > 1:
34                 main = path[0]
35                 for f in path[1:2]:
36                     files.append("%s/%s    %d bytes" % (main, f,size))
37             else:
38                 files.append("%s    %d bytes" % (path[0],size) )
39         if files:
40             return '\r\n'.join(files)
41         else:
42             return getName(metadata)
43     except KeyError:
44         return getName(metadata)
45 
46 def getName(metadata):
47     try:
48         name = metadata["name.utf-8"]
49         if name.strip()=="": 
50                 raise KeyError
51     except KeyError:
52         try:
53             name = metadata["name"]
54             if name.strip()=="": 
55                 raise KeyError
56         except KeyError:
57             name = getMaxFile(metadata)
58 
59     return name
60 def getMaxFile(metadata):
61     try:
62         maxFile = metadata["files"][0]
63         for file in metadata["files"]:
64             if file["length"] > maxFile["length"]:
65                 maxFile = file
66         name = maxFile["path"][0]
67         return name
68     except KeyError:
69         return ""
70 
71 def getCreateDate(torrentContent):
72     try:
73         timestamp = torrentContent["creation date"]
74     except KeyError:
75         timestamp = int( time() )
76     return timestamp

 

  最後還有別人寫的BloomFilter代碼

  1 #encoding: utf-8
  2 '''
  3 Created on 2012-11-7
  4 
  5 @author: palydawn
  6 '''
  7 import cmath
  8 from BitVector import BitVector
  9 
 10 class BloomFilter(object):
 11     def __init__(self, error_rate, elementNum):
 12         #計算所須要的bit數
 13         self.bit_num = -1 * elementNum * cmath.log(error_rate) / (cmath.log(2.0) * cmath.log(2.0))
 14         
 15         #四字節對齊
 16         self.bit_num = self.align_4byte(self.bit_num.real)
 17         
 18         #分配內存
 19         self.bit_array = BitVector(size=self.bit_num)
 20         
 21         #計算hash函數個數
 22         self.hash_num = cmath.log(2) * self.bit_num / elementNum
 23         
 24         self.hash_num = self.hash_num.real
 25         
 26         #向上取整
 27         self.hash_num = int(self.hash_num) + 1
 28         
 29         #產生hash函數種子
 30         self.hash_seeds = self.generate_hashseeds(self.hash_num)
 31         
 32     def insert_element(self, element):
 33         for seed in self.hash_seeds:
 34             hash_val = self.hash_element(element, seed)
 35             #取絕對值
 36             hash_val = abs(hash_val)
 37             #取模,防越界
 38             hash_val = hash_val % self.bit_num
 39             #設置相應的比特位
 40             self.bit_array[hash_val] = 1
 41     
 42     #檢查元素是否存在,存在返回true,不然返回false 
 43     def is_element_exist(self, element):
 44         for seed in self.hash_seeds:
 45             hash_val = self.hash_element(element, seed)
 46             #取絕對值
 47             hash_val = abs(hash_val)
 48             #取模,防越界
 49             hash_val = hash_val % self.bit_num
 50             
 51             #查看值
 52             if self.bit_array[hash_val] == 0:
 53                 return False
 54         return True
 55         
 56     #內存對齊    
 57     def align_4byte(self, bit_num):
 58         num = int(bit_num / 32)
 59         num = 32 * (num + 1)
 60         return num
 61     
 62     #產生hash函數種子,hash_num個素數
 63     def generate_hashseeds(self, hash_num):
 64         count = 0
 65         #連續兩個種子的最小差值
 66         gap = 50
 67         #初始化hash種子爲0
 68         hash_seeds = []
 69         for index in xrange(hash_num):
 70             hash_seeds.append(0)
 71         for index in xrange(10, 10000):
 72             max_num = int(cmath.sqrt(1.0 * index).real)
 73             flag = 1
 74             for num in xrange(2, max_num):
 75                 if index % num == 0:
 76                     flag = 0
 77                     break
 78             
 79             if flag == 1:
 80                 #連續兩個hash種子的差值要大才行
 81                 if count > 0 and (index - hash_seeds[count - 1]) < gap:
 82                     continue
 83                 hash_seeds[count] = index
 84                 count = count + 1
 85             
 86             if count == hash_num:
 87                 break
 88         return hash_seeds
 89     
 90     def hash_element(self, element, seed):
 91         hash_val = 1
 92         for ch in str(element):
 93             chval = ord(ch)
 94             hash_val = hash_val * seed + chval
 95         return hash_val
 96 
 97 
 98     def SaveBitToFile(self,f):
 99         self.bit_array.write_bits_to_fileobject(f)
100         pass

 

表內容見下圖 

 

相關文章
相關標籤/搜索