運維學python之爬蟲中級篇(三)分佈式進程

相對線程來講,進程更穩定一些,線程只是在同一臺機器上利用多CPU,沒法實現不一樣機器的共享,而多進程則能夠實現分佈到不一樣機器去運行,應用到爬蟲上,就例如:咱們要爬取網站圖片,若是使用多進程的話,則一個進程負責獲取圖片url,將url放入隊列中,另外的進程負責讀取和下載,如今把這個進程作成分佈式,一臺機器負責抓取,另外一臺負責下載。windows

1 介紹

Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分佈到多臺機器上。managers提供了一種建立數據的方法,能夠在不一樣的進程之間共享數據,包括在不一樣機器上運行的進程之間經過網絡共享。manage對象控制管理共享對象的服務器進程。其餘進程能夠經過使用代理訪問共享對象。服務器

2 服務進程

咱們先來看服務進程,服務進程主要工做:網絡

  • 服務進程負責啓動Queue
  • 把Queue註冊到網絡上
  • 往Queue裏面寫入任務
    咱們用代碼來看實際效果:
# -*- coding: utf-8 -*-

import queue
from multiprocessing.managers import BaseManager

# 建立task_queue和result_queue對了用來存聽任務和結果
task_queue = queue.Queue()
result_queue = queue.Queue()

class QueueManager(BaseManager):
    """
    繼承BaseManager
    """
    pass

# 把建立的兩個隊列註冊到網絡上,利用register方法,callable參數關聯對象
# 注意windows下綁定調用接口不能使用lambda
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 綁定5000端口, 設置密鑰
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 啓動queue,監聽通道
manager.start()
# 得到經過網絡訪問的對象,注意分佈式進程必須經過manager.get_task_queue()得到的Queue接口添加 
task = manager.get_task_queue()
result = manager.get_result_queue()
# 添加任務
for url in ['url_' + str(i) for i in range(10)]:
    print('put task %s...' % url)
    task.put(url)

# 獲取返回結果
for i in range(10):
    print('result is %s' % result.get(timeout=10))
# 關閉管理
manager.shutdown()
print('master exit')

3 任務進程

任務進程主要的工做以下:分佈式

  • 註冊獲取網絡上queue
  • 鏈接服務器
  • 從task隊列獲取任務,並寫入結果
    示例代碼以下:
# -*- coding: utf-8 -*-

import time
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
    """
    繼承類BaseManager
    """
    pass

# 使用register註冊,獲取網絡queue名稱
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 配置服務器ip並鏈接服務器
server_addr = '127.0.0.1'
print('connect to server %s...' % server_addr)
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 從網絡鏈接
m.connect()
# 獲取queue對象
task = m.get_task_queue()
result = m.get_result_queue()
# 從task獲取任務並將結果寫入result
while(not task.empty()):
    image_url = task.get(True, timeout=5)
    print('run task download %s...' % image_url)
    time.sleep(1)
    result.put('%s ------>success' % image_url)
print('worker exit')

先啓動服務進程,結果以下:ide

put task url_0...
put task url_1...
put task url_2...
put task url_3...
put task url_4...
put task url_5...
put task url_6...
put task url_7...
put task url_8...
put task url_9...

再啓動任務進程,結果以下:網站

connect to server 127.0.0.1...
run task download url_0...
run task download url_1...
run task download url_2...
run task download url_3...
run task download url_4...
run task download url_5...
run task download url_6...
run task download url_7...
run task download url_8...
run task download url_9...
worker exit

再回過頭來看服務進程端輸出url

put task url_0...
put task url_1...
put task url_2...
put task url_3...
put task url_4...
put task url_5...
put task url_6...
put task url_7...
put task url_8...
put task url_9...
result is url_0 ------>success
result is url_1 ------>success
result is url_2 ------>success
result is url_3 ------>success
result is url_4 ------>success
result is url_5 ------>success
result is url_6 ------>success
result is url_7 ------>success
result is url_8 ------>success
result is url_9 ------>success
master exit

這個簡單的分佈式有什麼用呢?從例子也能夠看出,若是咱們設置多個任務進程,就能夠把任務分配到多臺機器上,服務端來添加url到隊列,任務端來下載並返回結果。而Queue之因此能經過網絡訪問,就是經過QueueManager實現的。線程

相關文章
相關標籤/搜索