Python分佈式進程

分佈式進程:

分佈式進程是指的是將Process進程分佈到多臺機器上,充分利用多臺機器的性能完成複雜的任務。在Thread和Process中,應當優選Process,由於Process更穩定,並且,Process能夠分佈到多臺機器上,而Thread最多隻能分佈到同一臺機器的多個CPU上。python

Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分佈到多臺機器上。一個服務進程能夠做爲調度者,將任務分佈到其餘多個進程中,依靠網絡通訊。因爲managers模塊封裝很好,沒必要了解網絡通訊的細節,就能夠很容易地編寫分佈式多進程程序。linux

舉個例子:作爬蟲程序時,經常會遇到這樣的場景,咱們想抓取圖片的連接地址,將連接地址存放到Queue中,另外的進程負責從Queue中讀取連接地址進行下載和存儲到本地。如今把這個過程作成分佈式,一臺機器上的進程負責抓取連接,其它機器上的進程負責下載存儲,那麼遇到的主要問題是將Queue暴露到網絡中,讓其它機器進程均可以訪問,分佈式進程就是將這一個過程進行了封裝,咱們能夠將這個過程稱爲本隊列的網絡化。windows

建立分佈式進程須要一個服務進程與任務進程:服務器

服務進程建立:

  • 創建隊列Queue,用來進行進程間的通訊。服務進程建立任務隊列task_queue,用來做爲 傳遞任務給任務進程的通道;服務進程建立結果隊列result_queue,做爲任務進程完成任務後回覆服務進程的通道。在分佈式多進程環境下,必須經過由Queuemanager得到Queue接口來添加任務.
  • 把第一步中創建的隊列在網絡上註冊,暴露給其它進程(主機),註冊後得到網絡隊列,至關於本隊隊列的映像.
  • 創建一個險象(Queuemanager(BaseManager))實例manager,綁定端口和驗證口令。
  • 啓動第三步中創建的實例,即啓動管理manager,監管信息通道
  • 經過管理實例的方法得到經過網絡訪問的Queue對象,即再把網絡隊列實體化成可使用的本地隊列.
  • 建立任務到"本地"隊列中,自動上傳任務到網絡隊列中,分配給任務進程進行處理。

注意:我這裏是基於window操做系統的,linux系統會有所不一樣網絡

# coding:utf-8
# taskManager.py for win

import Queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support

# 任務個數
task_num = 10

# 定義收發隊列
task_queue = Queue.Queue(task_num)
result_queue = Queue.Queue(task_num)

def get_task():
    return task_queue

def get_result():
    return result_queue

# 建立相似的QueueManager

class QueueManager(BaseManager):
    pass

def win_run():
    # windows下綁定調用接口不能使用lambda,因此只能先定義函數再綁定
    QueueManager.register('get_task_queue', callable=get_task)
    QueueManager.register('get_result_queue', callable=get_result)
    # 綁定端口並設置驗證口令,windows下須要填寫IP地址,Linux下不填,默認爲本地
    manager = QueueManager(address=('127.0.0.1', 4000), authkey='qty')

    # 啓動
    manager.start()

    # 經過網絡獲取任務隊列和結果隊列
    task = manager.get_task_queue()
    result = manager.get_result_queue()

    try:

        # 添加任務
        for i in range(10):
            print 'put task %s...' % i
            task.put(i)
        print 'try get result...'

        for i in range(10):
            print 'result is %s' % result.get(timeout=10)

    except:
        print 'manage error'
    finally:
        # 必定要關閉,不然會報管理未關閉的錯誤
        manager.shutdown()
        print 'master exit!'

if __name__ == '__main__':
    # windows下多進程可能會出現問題,添加這句能夠緩解
    freeze_support()
    win_run()

任務進程

  • 使用QueueManager 註冊用於獲取Queue的方法名稱,任務進程只能經過名稱來在網絡上獲取Queue
  • 鏈接服務器中,端口和驗證口令注意保持與服務進程中徹底一致
  • 從網絡上獲取Queue,進行本地化
  • 從Task隊列獲取任務,並把結果result隊列
# coding:utf-8
import time
from multiprocessing.managers import BaseManager

# 建立相似的QueueManager:
class QueueManager(BaseManager):
    pass

# 第一步:使用QueueManager註冊用於獲取Queue的方法名稱
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 第二步:鏈接服務器
server_addr = '127.0.0.1'
print "Connect to server %s" % server_addr

# 端口和驗證口令注意保持與服務進程徹底一致
m = QueueManager(address=(server_addr, 4000), authkey='qty')

# 從網絡鏈接
m.connect()

# 第三步:獲取Queue的對象
task = m.get_task_queue()
result = m.get_result_queue()

# 第四步:從task隊列獲取任務,並把結果寫入result隊列:

while not task.empty():
    index = task.get(True, timeout=10)
    print 'run task download %s' % str(index)
    result.put('%s---->success ' % str(index))

# 處理結束
print 'worker exit.'

###執行結果分佈式

先運行:服務進程獲得結果ide

put task 0...
put task 1...
put task 2...
put task 3...
put task 4...
put task 5...
put task 6...
put task 7...
put task 8...
put task 9...
try get result...

再當即運行:任務進程獲得結果,防止進程走完後得不到結果,這裏必定要當即執行函數

Connect to server 127.0.0.1
run task download 0
run task download 1
run task download 2
run task download 3
run task download 4
run task download 5
run task download 6
run task download 7
run task download 8
run task download 9
worker exit.

最後再回頭看服務進程窗口的結果性能

put task 0...
put task 1...
put task 2...
put task 3...
put task 4...
put task 5...
put task 6...
put task 7...
put task 8...
put task 9...
try get result...
result is 0---->success 
result is 1---->success 
result is 2---->success 
result is 3---->success 
result is 4---->success 
result is 5---->success 
result is 6---->success 
result is 7---->success 
result is 8---->success 
result is 9---->success 
master exit!

這就是一個簡單但真正的分佈式計算,把代碼稍加改造,啓動多個worker,就把任務分佈到幾臺甚至幾十臺機器上,實現大規模的分佈式爬蟲操作系統

相關文章
相關標籤/搜索