python開發的 dht網絡爬蟲

使用 libtorrent 的python綁定庫實現一個dht網絡爬蟲,抓取dht網絡中的磁力連接。html


dht 網絡簡介

p2p網絡

在P2P網絡中,經過種子文件下載資源時,要知道資源在P2P網絡中哪些計算機中,這些傳輸資源的計算機稱做peer。在傳統的P2P網絡中,使用tracker服務器跟蹤資源的peer。要下載資源,首先須要取得這些peer。node


dht網絡

tracker服務器面臨一些版權和法律問題。因而出現了DHT,它把tracker上的資源peer信息分散到了整個網絡中。dht網絡是由分佈 式節點構成,節點(node)是實現了DHT協議的p2p客戶端。P2P客戶端程序既是peer也是node。DHT網絡有多種算法,經常使用的有 Kademlia。python


dht網絡下載

P2P客戶端使用種子文件下載資源時,若是沒有tracker服務器,它就向DHT網絡查詢資源的peer列表, 而後從peer下載資源。linux


Magnet是磁力連接

資源的標識在DHT網絡中稱爲infohash,是一個經過sha1算法獲得的20字節長的字符串。infohash是使用種子文件的文件描述信息 計算獲得。磁力連接是把infohash編碼成16進制字符串獲得。P2P客戶端使用磁力連接,下載資源的種子文件,而後根據種子文件下載資源。git


Kademlia 算法

Kademlia是DHT網絡的一種實現, 具體的算法參見:DHT協議github


KRPC 協議

KRPC 是節點之間的交互協議,使用UDP來傳送。算法

包括4種請求:ping,find_node,get_peer,announce_peer。其中get_peer和announce_peer是節點間查詢資源的主要消息。json


dht 爬蟲原理

主要的思路就是假裝爲p2p客戶端,加入dht網絡,收集dht網絡中的get_peer和announce_peer消息,這些消息是其餘node發送給假裝的p2p客戶端的udp消息。服務器


本文dht爬蟲的實現

爬蟲運行環境

  1. linux 系統網絡

  2. python 2.7

  3. libtorrent 庫的python綁定

  4. twisted 網絡庫

  5. 防火牆開啓固定的udp和tcp端口


libtorrent 庫的介紹

libtorrent庫是p2p下載的客戶端庫,有豐富的接口,能夠用來開發下載p2p網絡上的資源。它有python的綁定庫,本爬蟲就是使用它的python庫開發的。

在libtorrent中有幾個概念須要解釋下。 session 至關於p2p客戶端,session開啓一個tcp和一個udp端口,用來與其餘p2p客戶端交換數據。能夠在一個進程內定義多個session,也就是多個p2p客戶端,來加快收集速度。

alert是libtorrent中用來收集各類消息的隊列,每一個session都有一個本身的alert消息隊列。KRPC協議的get_peer和announce_peer消息也是從這個隊列中獲取,就是用這兩個消息收集磁力連接的。


主要實現代碼

爬蟲實現的主要代碼比較簡單

# 事件通知處理函數
    def _handle_alerts(self, session, alerts):
        while len(alerts):
            alert = alerts.pop()
            # 獲取dht_announce_alert和dht_get_peer_alert消息
            # 從這兩消息收集磁力連接
            if isinstance(alert, lt.add_torrent_alert):
                alert.handle.set_upload_limit(self._torrent_upload_limit)
                alert.handle.set_download_limit(self._torrent_download_limit)
            elif isinstance(alert, lt.dht_announce_alert):
                info_hash = alert.info_hash.to_string().encode('hex')
                if info_hash in self._meta_list:
                    self._meta_list[info_hash] += 1
                else:
                    self._meta_list[info_hash] = 1
                    self._current_meta_count += 1
            elif isinstance(alert, lt.dht_get_peers_alert):
                info_hash = alert.info_hash.to_string().encode('hex')
                if info_hash in self._meta_list:
                    self._meta_list[info_hash] += 1
                else:
                    self._infohash_queue_from_getpeers.append(info_hash)
                    self._meta_list[info_hash] = 1
                    self._current_meta_count += 1

    def start_work(self):
        '''主工做循環,檢查消息,顯示狀態'''
        # 清理屏幕
        begin_time = time.time()
        show_interval = self._delay_interval
        while True:
            for session in self._sessions:
                session.post_torrent_updates()
                # 從隊列中獲取信息
                self._handle_alerts(session, session.pop_alerts())
            time.sleep(self._sleep_time)
            if show_interval > 0:
                show_interval -= 1
                continue
            show_interval = self._delay_interval

            # 統計信息顯示
            show_content = ['torrents:']
            interval = time.time() - begin_time
            show_content.append('  pid: %s' % os.getpid())
            show_content.append('  time: %s' %
                                time.strftime('%Y-%m-%d %H:%M:%S'))
            show_content.append('  run time: %s' % self._get_runtime(interval))
            show_content.append('  start port: %d' % self._start_port)
            show_content.append('  collect session num: %d' %
                                len(self._sessions))
            show_content.append('  info hash nums from get peers: %d' %
                                len(self._infohash_queue_from_getpeers))
            show_content.append('  torrent collection rate: %f /minute' %
                                (self._current_meta_count * 60 / interval))
            show_content.append('  current torrent count: %d' %
                                self._current_meta_count)
            show_content.append('  total torrent count: %d' %
                                len(self._meta_list))
            show_content.append('\n')

            # 存儲運行狀態到文件
            try:
                with open(self._stat_file, 'wb') as f:
                    f.write('\n'.join(show_content))
                with open(self._result_file, 'wb') as f:
                    json.dump(self._meta_list, f)
            except Exception as err:
                pass

            # 測試是否到達退出時間
            if interval >= self._exit_time:
                # stop
                break

            # 天天結束備份結果文件
            self._backup_result()

        # 銷燬p2p客戶端
        for session in self._sessions:
            torrents = session.get_torrents()
            for torrent in torrents:
                session.remove_torrent(torrent)


運行效率

在個人一臺512M內存,單cpu機器上。爬蟲剛開始運行稍慢,運行幾分鐘後收集速度穩定在 180個每分鐘,1小時採集10000左右。

運行狀態

run times: 12

torrents:
  pid: 11480  time: 2014-08-18 22:45:01
  run time: day: 0, hour: 0, minute: 12, second: 25
  start port: 32900
  collect session num: 20
  info hash nums from get peers: 2222
  torrent collection rate: 179.098480 /minute
  current torrent count: 2224
  total torrent count: 58037


爬蟲完整代碼

完整的代碼參見:https://github.com/blueskyz/DHTCrawler

還包括一個基於twisted的監控進程,用來查看爬蟲狀態,在爬蟲進程退出後從新啓動。


原文連接:python開發的 dht網絡爬蟲

相關文章
相關標籤/搜索