python多進程multiprocessing

  Multiprocessing是一個相似於threading模塊的生成多進程的包,這個包提供了本地和遠程的進程併發。使用multiprocessing可以有效的解決python由於在GIL(全局解釋鎖)下在CPU密集型任務中的瓶頸問題,容許使用多核處理器來運行python腳本程序。官方介紹https://docs.python.org/2/library/multiprocessing.html。
html

  multiprocessing生成進程大體是一下的流程:python

  1.經過 multiprocessing.Process生成進程對象linux

  2.調用start()方法啓動進程安全

  3.調用join()方法,阻塞主進程知道 子進程執行完畢 數據結構

下面實例代碼展現如何經過multiprocessing建立子進程執行任務併發

 

# -*- coding: utf-8 -*-

import multiprocessing
import os


def func(m):
    print("called function in process : %s , process id is %s" % (str(m), str(os.getpid())))


if __name__ == '__main__':
    ProcessJob = []
    for i in range(5): # 依次開啓5個進程
        p = multiprocessing.Process(target=func, args=(i,))
        ProcessJob.append(p)
        p.start() # 啓動進程
        p.join() # 阻塞進程直至 當前進程中的任務完成

結果:    app

  

進程的命名dom

對與每一個進程在建立的時候,能夠經過name參數對進程名稱進行命名 例如異步

# -*- coding: utf-8 -*-

import multiprocessing
import os


def func(m):
    name = multiprocessing.current_process().name
    print("current process name is " + name)
    print("called function in process : %s , process id is %s" % (str(m), str(os.getpid())))


if __name__ == '__main__':
    ProcessJob = []
    for i in range(5): # 依次開啓5個進程
        p = multiprocessing.Process(target=func,name='PROCESS_NAME_' + str(i), args=(i,))
        ProcessJob.append(p)
        p.start() # 啓動進程
        p.join() # 阻塞進程直至 當前進程中的任務完成

 

 

在子類中使用進程 async

  在子類使用進程,可使類繼承自multiprocessing.Process達到效果

# -*- coding: utf-8 -*-

import multiprocessing
import os


class MyProcess(multiprocessing.Process):
    def run(self): # 重寫 run函數
        print("called rub method in process: %s, process id is : %s" % (multiprocessing.current_process().name, os.getpid()))


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = MyProcess() # 建立對象
        jobs.append(p)
        p.start() # 啓動進程對象
        p.join() # 阻塞進程對象

結果:

called rub method in process: MyProcess-1, process id is : 1524
called rub method in process: MyProcess-2, process id is : 1525
called rub method in process: MyProcess-3, process id is : 1526
called rub method in process: MyProcess-4, process id is : 1527
called rub method in process: MyProcess-5, process id is : 1528

 

 

進程間如何交換數據對象

多進程間進行交換數據, multiprocessing提供了兩個數據通道,一個是隊列,一個是通道。

1. 經過隊列進行數據交換 

建立一個進程共享的隊列,這個共享隊列是線程安全與進程安全的 在操做該共享隊列的時候不須要經過鎖來保持訪問的安全性

經過multiprocessing.Queue()來建立共享隊列。

2.經過管道進行數據交換

管道建立以後會返回一對鏈接對象, 每一個對象都有send/receive 方法 , 實現了進程間的通訊

以下例子:

# -*- coding: utf-8 -*-

import multiprocessing
import os


def create_items(pipe):
    output_pipe , _ = pipe
    for item in range(10):
        output_pipe.send(item)
    output_pipe.close()


def multiply_items(pipe_1, pipe_2):
    close, input_pipe = pipe_1
    close.close()
    output_pipe, _ = pipe_2
    try:
        while True:
            item = input_pipe.recv()
            output_pipe.send(item*item)
    except EOFError as err:
        output_pipe.close()

if __name__ == '__main__':
    pipe_1 = multiprocessing.Pipe(True) # 建立 通道
    process_pipe_1 = multiprocessing.Process(target= create_items, args=(pipe_1, )) #建立進程1 給通道1裏載入數據

    process_pipe_1.start()

    pipe_2 = multiprocessing.Pipe(True) # 建立通道2

    process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2, )) #將通道1裏的數據接收以後 放入 通道2

    process_pipe_2.start()
  pipe_1[0].close()
  pipe_2[0].close()
  try: 
    while True:
      print(pipe_2[1].recv())
    except EOFError as err:
      print("end")

 

進程的同步

當多個進程協同來完成一個任務的時候,多個進程須要共享數據, 按照必定的順序來訪問數據完成任務,須要用到同步原語來控制獲取數據的順序,保證任務的正常進行。

1.Lock:使用acquire和release來管理共享資源的訪問

2. 事件:實現了進程間的簡單通訊, 一個進程發出事件 ,其餘一個進程或多個進程響應事件。經過Event對象來實現

3.信號量: 用越共享的公共資源。例如一個資源支持同時訪問的的數量。

4.屏障: 使全部的涉及屏障的進程到達某一條件後,在執行,屏障以前和以後的代碼不可以並行執行

詳細可參見 https://docs.python.org/2/library/multiprocessing.html#multiprocessing.managers.SyncManager

 

multiprocessing中的managers

這個managers是multiprocessing提供的一種管理器,用來協調多進程之間的共享信息,能夠用來實現分佈式進程之間的數據共享,協做完成任務。

例如,咱們在 一臺機器上的任務隊列中發佈任務, 遠程進程獲取 任務,執行完任務以後再將輸入回放到結果隊列, 不單單是隊列 ,還能夠是字典,list等其餘的數據結構 

下面是一個 隊列的例子:

server端:

# -*- coding: utf-8 -*-

import multiprocessing
from multiprocessing.managers import BaseManager
import random, time
import queue

# 繼承自BaseManager 用於註冊 下面的兩個隊列
class QueueManager(BaseManager):
    pass


task_queue = queue.Queue() # 申明兩個隊列 一個任務隊列 一個結果隊列
result_queue = queue.Queue()


def return_task_queue():
    global task_queue
    return task_queue


def return_result_queue():
    global result_queue
    return result_queue


if __name__ == '__main__':
    QueueManager.register('get_task_from_queue', callable=return_task_queue)#  註冊兩個隊列
    QueueManager.register('put_result_to_queue', callable=return_result_queue)
    # 綁定端口以及驗證碼(window平臺下須要寫127.0.0.1, linux下能夠爲空 或者0.0.0.0, authkey 不能直接填字符串 會提示沒有正確的編碼 能夠用b'abc' 的方式 或者'abc'.encode('utf-8'))
    mng = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
    # 啓動
    mng.start()

    task = mng.get_task_from_queue()
    result = mng.put_result_to_queue()

    # 在任務隊列中放入任務
    for i in range(10):
        n = random.randint(0, 1000)
        print("had put task into task_queue %s" % str(n))
        task.put(n)

    # 等待 任務完成以後放入 結果隊列 取出打印
    while True:
        if result.empty():
            print("i am wait for result")
            time.sleep(1)
        else:
            rep = result.get()
            print("had get result from result_queue %s" % str(rep))


    # mng.shutdown()

client端:

# -*- coding: utf-8 -*-

import time, sys, queue
import math
from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
    pass


task_queue = queue.Queue()
result_queue = queue.Queue()


def return_task_queue():
    global task_queue
    return task_queue


def return_result_queue():
    global result_queue
    return result_queue

if __name__ == "__main__":
    server_addr = '127.0.0.1'

    QueueManager.register('get_task_from_queue', callable=return_task_queue)
    QueueManager.register('put_result_to_queue', callable=return_result_queue)

    mng = QueueManager(address=(server_addr, 5000), authkey=b'abc')
    mng.connect()

    task_get = mng.get_task_from_queue()
    result_put = mng.put_result_to_queue()

    while True:
        if task_get.empty():
            print("wait for task")
            time.sleep(1)
        else:
            m = task_get.get(timeout = 1)
            rep = math.sqrt(m)
            time.sleep(1)
            result_put.put(str(rep))

 

multiprocessing中的進程池

進程的頻繁 建立和銷燬時很是耗費資源的 , multiprocessing.Pool提供給用戶一個常駐的進程池,當有任務來臨時,有空閒的進程則執行任務,沒有空閒的任務的時候,等待進程池中有空閒進程後,分配空閒進程給該任務執行

# -*- coding: utf-8 -*-

import multiprocessing
import time
import random


def func_square(x):
    print("process is exc %s", multiprocessing.current_process().name)
    time.sleep(2)
    return x*x


if __name__ == "__main__":
    res = []
    pool = multiprocessing.Pool(4)

    for i in range(10):
        res.append(pool.apply_async(func=func_square, args=(random.randint(0, 100), ))) # 異步執行 

    pool.close()
    pool.join()

    print("result is:")
    for r in res:
        print(r.get())

代碼中的 Pool.apply_async是並行執行 Pool.apply是阻塞的同步執行, 相似的還有Pool.map, Pool.map_async.

相關文章
相關標籤/搜索