使用 libtorrent 的python綁定庫實現一個dht網絡爬蟲,抓取dht網絡中的磁力連接。html
在P2P網絡中,經過種子文件下載資源時,要知道資源在P2P網絡中哪些計算機中,這些傳輸資源的計算機稱做peer。在傳統的P2P網絡中,使用tracker服務器跟蹤資源的peer。要下載資源,首先須要取得這些peer。node
tracker服務器面臨一些版權和法律問題。因而出現了DHT,它把tracker上的資源peer信息分散到了整個網絡中。dht網絡是由分佈 式節點構成,節點(node)是實現了DHT協議的p2p客戶端。P2P客戶端程序既是peer也是node。DHT網絡有多種算法,經常使用的有 Kademlia。python
P2P客戶端使用種子文件下載資源時,若是沒有tracker服務器,它就向DHT網絡查詢資源的peer列表, 而後從peer下載資源。linux
資源的標識在DHT網絡中稱爲infohash,是一個經過sha1算法獲得的20字節長的字符串。infohash是使用種子文件的文件描述信息 計算獲得。磁力連接是把infohash編碼成16進制字符串獲得。P2P客戶端使用磁力連接,下載資源的種子文件,而後根據種子文件下載資源。git
Kademlia是DHT網絡的一種實現, 具體的算法參見:DHT協議github
KRPC 是節點之間的交互協議,使用UDP來傳送。算法
包括4種請求:ping,find_node,get_peer,announce_peer。其中get_peer和announce_peer是節點間查詢資源的主要消息。json
主要的思路就是假裝爲p2p客戶端,加入dht網絡,收集dht網絡中的get_peer和announce_peer消息,這些消息是其餘node發送給假裝的p2p客戶端的udp消息。服務器
linux 系統網絡
python 2.7
libtorrent 庫的python綁定
twisted 網絡庫
防火牆開啓固定的udp和tcp端口
libtorrent庫是p2p下載的客戶端庫,有豐富的接口,能夠用來開發下載p2p網絡上的資源。它有python的綁定庫,本爬蟲就是使用它的python庫開發的。
在libtorrent中有幾個概念須要解釋下。 session 至關於p2p客戶端,session開啓一個tcp和一個udp端口,用來與其餘p2p客戶端交換數據。能夠在一個進程內定義多個session,也就是多個p2p客戶端,來加快收集速度。
alert是libtorrent中用來收集各類消息的隊列,每一個session都有一個本身的alert消息隊列。KRPC協議的get_peer和announce_peer消息也是從這個隊列中獲取,就是用這兩個消息收集磁力連接的。
爬蟲實現的主要代碼比較簡單
# 事件通知處理函數 def _handle_alerts(self, session, alerts): while len(alerts): alert = alerts.pop() # 獲取dht_announce_alert和dht_get_peer_alert消息 # 從這兩消息收集磁力連接 if isinstance(alert, lt.add_torrent_alert): alert.handle.set_upload_limit(self._torrent_upload_limit) alert.handle.set_download_limit(self._torrent_download_limit) elif isinstance(alert, lt.dht_announce_alert): info_hash = alert.info_hash.to_string().encode('hex') if info_hash in self._meta_list: self._meta_list[info_hash] += 1 else: self._meta_list[info_hash] = 1 self._current_meta_count += 1 elif isinstance(alert, lt.dht_get_peers_alert): info_hash = alert.info_hash.to_string().encode('hex') if info_hash in self._meta_list: self._meta_list[info_hash] += 1 else: self._infohash_queue_from_getpeers.append(info_hash) self._meta_list[info_hash] = 1 self._current_meta_count += 1 def start_work(self): '''主工做循環,檢查消息,顯示狀態''' # 清理屏幕 begin_time = time.time() show_interval = self._delay_interval while True: for session in self._sessions: session.post_torrent_updates() # 從隊列中獲取信息 self._handle_alerts(session, session.pop_alerts()) time.sleep(self._sleep_time) if show_interval > 0: show_interval -= 1 continue show_interval = self._delay_interval # 統計信息顯示 show_content = ['torrents:'] interval = time.time() - begin_time show_content.append(' pid: %s' % os.getpid()) show_content.append(' time: %s' % time.strftime('%Y-%m-%d %H:%M:%S')) show_content.append(' run time: %s' % self._get_runtime(interval)) show_content.append(' start port: %d' % self._start_port) show_content.append(' collect session num: %d' % len(self._sessions)) show_content.append(' info hash nums from get peers: %d' % len(self._infohash_queue_from_getpeers)) show_content.append(' torrent collection rate: %f /minute' % (self._current_meta_count * 60 / interval)) show_content.append(' current torrent count: %d' % self._current_meta_count) show_content.append(' total torrent count: %d' % len(self._meta_list)) show_content.append('\n') # 存儲運行狀態到文件 try: with open(self._stat_file, 'wb') as f: f.write('\n'.join(show_content)) with open(self._result_file, 'wb') as f: json.dump(self._meta_list, f) except Exception as err: pass # 測試是否到達退出時間 if interval >= self._exit_time: # stop break # 天天結束備份結果文件 self._backup_result() # 銷燬p2p客戶端 for session in self._sessions: torrents = session.get_torrents() for torrent in torrents: session.remove_torrent(torrent)
在個人一臺512M內存,單cpu機器上。爬蟲剛開始運行稍慢,運行幾分鐘後收集速度穩定在 180個每分鐘,1小時採集10000左右。
運行狀態
run times: 12 torrents: pid: 11480 time: 2014-08-18 22:45:01 run time: day: 0, hour: 0, minute: 12, second: 25 start port: 32900 collect session num: 20 info hash nums from get peers: 2222 torrent collection rate: 179.098480 /minute current torrent count: 2224 total torrent count: 58037
完整的代碼參見:https://github.com/blueskyz/DHTCrawler
還包括一個基於twisted的監控進程,用來查看爬蟲狀態,在爬蟲進程退出後從新啓動。
原文連接:python開發的 dht網絡爬蟲