Python多進程-multiprocess

Python中主要經過 multiprocess 包來操做和管理進程。html

進程啓動方式

python 啓動進程方式1python

import time
from multiprocessing import Process

def fork(thread_name):
    time.sleep(2)
    print("subprocess: " + thread_name)

if __name__ == '__main__':
    p = Process(target=fork, args=('hello_1',))
    p.start()             # 啓動進程
    print("end...")

# 結果輸出
end...
subprocess: hello_1

 
Process類參數說明:安全

Process([ target [, name [, args [, kwargs]]]]])
target 表示子進程要執行的任務
args 表示調用對象的位置參數元組,args=(1,2,'hello',)
kwargs 表示調用對象的字典,kwargs={'name':'baby','age':18}
name 子進程的名稱

 
python 啓動進程方式2:服務器

import time
from multiprocessing import Process

class MyProcess(Process):
    def __init__(self, thread_name):
        super().__init__()
        self.thread_name = thread_name
    def run(self):
        time.sleep(2)
        print("subprocess: " + self.thread_name)

if __name__ == '__main__':
    p = MyProcess('hello_1')
    p.start()
    print("end...")

# 結果輸出
end...
subprocess: hello_1

Tip:兩種啓動進程的方式沒有優劣之分~網絡

join方法的應用

在主進程中經過 join 方法,可讓主進程等待子進程執行完畢後,再繼續往下執行併發

import time
from multiprocessing import Process

def fork(thread_name):
    time.sleep(2)
    print("subprocess: " + thread_name)

if __name__ == '__main__':
    p = Process(target=fork, args=('hello_1',))
    p.start()
    p.join()    # 等待子進程執行完畢
    print("end...")

# 結果輸出
subprocess: hello_1
end...

 
多個子進程同時運行app

import time
from multiprocessing import Process

def fork(thread_name):
    time.sleep(2)
    print("subprocess: " + thread_name)

if __name__ == '__main__':
    p_list = []
    for i in range(1, 4):
        p = Process(target=fork, args=('hello_' + str(i),))
        p.start()
        p_list.append(p)
    [p.join() for p in p_list]    # 等待子進程執行完畢
    print("end...")

# 結果輸出
subprocess: hello_1
subprocess: hello_2
subprocess: hello_3
end...

 
如上是經過第一種方式啓動子進程,使用繼承 Process 類的形式啓動子進程示例以下:dom

import time
from multiprocessing import Process

class MyProcess(Process):
    def __init__(self, thread_name):
        super().__init__()
        self.thread_name = thread_name

    def run(self):
        time.sleep(2)
        print("subprocess: " + self.thread_name)

if __name__ == '__main__':
    p_list = []
    for i in range(1, 4):
        p = MyProcess('hello_' + str(i))
        p.start()
        p_list.append(p)
    [p.join() for p in p_list]
    print("end...")

Process類的其餘相關方法和屬性

import time
from multiprocessing import Process

def fork(thread_name):
    time.sleep(2)
    print("subprocess: " + thread_name)

if __name__ == '__main__':
    p = Process(target=fork, args=('hello',))
    p.start()
    # 進程的名稱
    print(p.name)    # 輸出:Process-1
    # 布爾值,True 表示該進程爲守護進程,默認爲 False,這個值須要在 p.start() 以前設置
    print(p.daemon)  # 輸出:False
    # 進程的pid
    print(p.pid)      # 輸出:7980
    # 進程的身份驗證鍵,默認是由 os.urandom() 隨機生成的32字符的字符串。
    print(p.authkey)  # 輸出:b'\xf2M)\xc8\xf6\xae8\x0c\xbet\xbcAT\xad7%ig9zl\xe5|\xb5|\x7f\xa6\xab\x8a\x8a\x94:'
    # 查看進程是否還在運行,若還在運行,則返回 True
    print(p.is_alive())  # 輸出:True
    # 主進程等待子進程 p 執行結束,再繼續往下執行
    # p.join()
    # 強制終止子進程 p
    p.terminate()
    print('end...')

守護進程

import time
from multiprocessing import Process

def fork(thread_name):
    for i in range(5):
        time.sleep(1)
        print("subprocess: " + thread_name + "..." + str(i))

if __name__ == '__main__':
    p = Process(target=fork, args=('hello',))
    p.start()
    time.sleep(2)
    print('end...')

# 輸出結果:
subprocess: hello...0
subprocess: hello...1
end...
subprocess: hello...2
subprocess: hello...3
subprocess: hello...4

 
能夠看到主進程的代碼先運行完畢,運行完成後,它會等待子進程執行完成後再結束。如果將子進程設置爲守護進程,則子進程會隨着主進程的代碼執行完畢而結束。注意守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children。異步

import time
from multiprocessing import Process

def fork(thread_name):
    for i in range(5):
        time.sleep(1)
        print("subprocess: " + thread_name + "..." + str(i))

if __name__ == '__main__':
    p = Process(target=fork, args=('hello',))
    p.daemon = True   # 設置進程 p 爲守護進程
    p.start()
    time.sleep(2)
    print('end...')

# 輸出結果:
subprocess: hello...0
subprocess: hello...1
end...

 
值得注意的是:守護進程是在主進程代碼執行結束後就終止,即主進程的代碼執行完畢,守護進程就終止。來看以下示例:async

import time
from multiprocessing import Process

def fork_1(thread_name):
    for i in range(5):
        time.sleep(1)
        print("subprocess: " + thread_name + "..." + str(i), end="\n")

def fork_2(thread_name):
    for i in range(7):
        time.sleep(1)
        print("subprocess: " + thread_name + "..." + str(i), end="\n")

if __name__ == '__main__':
    p1 = Process(target=fork_1, args=('hello',))
    p2 = Process(target=fork_2, args=('hi',))
    p1.daemon = True   # 設置進程 p1 爲守護進程
    p1.start()
    p2.start()
    time.sleep(2)
    print('end...')

# 輸出結果:
subprocess: hello...0
subprocess: hi...0
subprocess: hello...1
subprocess: hi...1
end...
subprocess: hi...2
subprocess: hi...3
subprocess: hi...4
subprocess: hi...5
subprocess: hi...6

 
如上示例中,p1 爲守護進程,在主進程輸出 ‘end…’ 後,即主進程的代碼執行完畢後,守護進程 p1 就終止了。可是此時,主進程並無終止,它須要等待 p2 執行完畢以後再終止。

鎖 —— multiprocess.Lock

進程與進程之間數據是隔離的

from multiprocessing import Process

def fork(thread_name):
    global n
    print("subprocess: " + thread_name + "...n=" + str(n))
    n = 1
    print("subprocess: " + thread_name + "...n=" + str(n))

if __name__ == '__main__':
    n = 100
    p = Process(target=fork, args=('hello',))
    p.start()
    p.join()
    print("main...n=" + str(n))

# 輸出結果:
subprocess: hello...n=100
subprocess: hello...n=1
main...n=100

 
經過如上示例能夠看出,子進程 p 中的變量 n 和主進程中的變量 n 是兩個獨立的變量,存放在不一樣的內存空間,更改其中一個變量並不會影響另外一個變量的值。
 
要想在進程間共享數據,可經過 Manager 類實現。Manager 類中提供了不少能夠共享數據的數據類型,包括dict,list,Queue,Pipe 等。注意:Manager 中的數據是不安全的。當多個進程同時訪問共享數據的時候,就會產生數據安全問題。
 
多進程同時搶購餘票示例:

from multiprocessing import Process, Manager

def work(m_dict):
    if m_dict['count'] > 0:
        print("%s get ticket %d" % (str(os.getpid()), m_dict['count']))
        m_dict['count'] -= 1

if __name__ == '__main__':
    m = Manager()
    m_dict = m.dict({'count': 20})
    p_list = []
    for i in range(20):
        p = Process(target=work, args=(m_dict, ))
        p.start()
        p_list.append(p)
    for i in p_list:
        i.join()
    print("end..." + str(m_dict['count']))

# 輸出結果:
32940 get ticket 20
32941 get ticket 19
32942 get ticket 18
32939 get ticket 17
32943 get ticket 16
32944 get ticket 15
32946 get ticket 14
32945 get ticket 13
32947 get ticket 12
32948 get ticket 11
32953 get ticket 11
32958 get ticket 9
32957 get ticket 8
32955 get ticket 7
32956 get ticket 7
32954 get ticket 6
32950 get ticket 5
32949 get ticket 5
32951 get ticket 3
32952 get ticket 2
end...1

 
輸出結果中 「ticket 11」 被購買了2次,能夠看到當多個進程對同一份數據進行操做的時候,就會引起數據安全問題。
在如上示例中,增長進程數據還有可能出現以下這樣的報錯:

AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

 
這個報錯的觸發緣由並無深究,極有多是 manager 內部緣由,在 manager 管理進程的同時不能夠進入主進程進行某些交互。能夠經過在子進程中 sleep 一下 來避免這個問題(這並非根本的解決方式)

import time, os
from multiprocessing import Process, Manager

def work(m_dict):
    time.sleep(0.5)       # sleep 0.5 s,能夠繞過這個問題
    if m_dict['count'] > 0:
        print("%s get ticket %d" % (str(os.getpid()), m_dict['count']))
        m_dict['count'] -= 1
...

 
如上的數據安全問題,能夠在子進程中加鎖來解決,即在同一時刻,僅容許一個進程執行 lock.acquire() 和 lock.release() 之間的代碼

import os
from multiprocessing import Process, Manager, Lock

def work(m_dict, lock):
    lock.acquire()
    if m_dict['count'] > 0:
        print("%s get ticket %d" % (str(os.getpid()), m_dict['count']))
        m_dict['count'] -= 1
    lock.release()

if __name__ == '__main__':
    m = Manager()
    m_dict = m.dict({'count': 20})
    lock = Lock()
    p_list = []
    for i in range(20):
        p = Process(target=work, args=(m_dict, lock))
        p.start()
        p_list.append(p)
    for i in p_list:
        i.join()
    print("end..." + str(m_dict['count']))

# 輸出結果:
33240 get ticket 20
33242 get ticket 19
33241 get ticket 18
33243 get ticket 17
33244 get ticket 16
33245 get ticket 15
33247 get ticket 14
33246 get ticket 13
33249 get ticket 12
33248 get ticket 11
33250 get ticket 10
33251 get ticket 9
33252 get ticket 8
33257 get ticket 7
33258 get ticket 6
33253 get ticket 5
33254 get ticket 4
33255 get ticket 3
33259 get ticket 2
33256 get ticket 1
end...0

 
Manager() 是經過共享進程來實現多進程之間數據共享。Manager() 返回的對象控制了一個 server 進程,這個 server 進程容許其餘進程經過 proxies 來訪問。多進程之間數據共享,除了 Manager() 外,還有 Value 、 Array,Value 和 Array 是經過共享內存的方式實現數據共享,一樣爲了保證數據安全,常常和同步互斥鎖配合使用。
關於 Value 、 Array 的具體使用方式可參閱 https://www.cnblogs.com/gengyi/p/8661235.html
 
使用 Value 實現上述的搶票示例:

import os
from multiprocessing import Process, Value, Lock

def work(count, lock):
    lock.acquire()
    if count.value > 0:
        print("%s get ticket %d" % (str(os.getpid()), count.value))
        count.value -= 1
    lock.release()

if __name__ == '__main__':
    count = Value('l', 50)
    lock = Lock()
    p_list = []
    for i in range(50):
        p = Process(target=work, args=(count, lock))
        p.start()
        p_list.append(p)
    for i in p_list:
        i.join()
    print("end..." + str(count.value))

隊列 —— multiprocess.Queue

from multiprocessing import Queue

queue = Queue(3)    # 建立隊列:Queue([maxsize]),maxsize 表示隊列的最大長度
queue.put('a')
queue.put('b')
queue.put('c')
print(queue.full())    # 輸出 True,表示隊列已經滿了
# 若隊列已經滿了,繼續向隊列中插入數據,則程序會阻塞在這裏,直到隊列的另外一端有數據被取出,新的數據才能插入
# put 方法有兩個可選參數:block 和 timeout。
# block 默認爲 True,表示會阻塞 timeout 指定的時間,若是超時,會拋出 Queue.Full 異常。若是 block 爲 False,在 put 時 隊列已滿,則會當即拋出 Queue.Full 異常。
# timeout 默認爲 None,表示會一直阻塞。
# queue.put('d')
# queue.put_nowait()   # 等同於 queue.put(block = False)
print(queue.get())     # 'a'
print(queue.get())     # 'b'
print(queue.get())     # 'c'
print(queue.empty())   # 輸出 True,表示隊列已空
# 若隊列已空,繼續從該隊列中 get 數據,則程序會阻塞在這裏,直到隊列中新插入了數據。
# get 方法也有兩個參數:block 和 timeout,通 put 方法
# block 默認爲 True,表示會阻塞 timeout 指定的時間,若是 timeout 之間以內仍是沒有獲取到數據,會拋出 Queue.Empty 異常。block 爲 False 時,若隊列中有數據,則會當即返回數據,若是隊列爲空,則會當即拋出 Queue.Empty 異常.
# timeout 默認爲 None,表示會一直阻塞。
# queue.get(False)
# queue.get_nowait()    # 等同於 queue.get(block = False)
# print(queue.qsize())  # 獲取隊列的長度,某些系統上,此方法可能引起NotImplementedError異常。
# q.close()             # 關閉隊列

 
生產者和消費者示例

from multiprocessing import Process, Queue
import time

def producer(name, production, queue):
    for i in range(2):
        time.sleep(0.5)
        queue.put(production + '_' + str(i))
        print('%s produce %s' % (name, production + '_' + str(i)), end="\n")

def consumer(name, queue):
    while True:
        data = queue.get()
        if data is None: break      # None 爲結束信號
        time.sleep(0.3)
        print('%s consume %s' % (name, data), end="\n")

if __name__ == '__main__':
    queue = Queue()
    p_list = []
    for index, f in enumerate(['apple', 'pear', 'peach']):
        p = Process(target=producer, args=('producer_' + str(index), f, queue))
        p_list.append(p)
        p.start()
    Process(target=consumer, args=('consumer_1', queue)).start()
    Process(target=consumer, args=('consumer_2', queue)).start()
    [p.join() for p in p_list]
    # 有2個消費者,則發送2次 None
    queue.put(None)
    queue.put(None)

# 輸出結果:
producer_1 produce pear_0
producer_2 produce peach_0
producer_0 produce apple_0
consumer_2 consume peach_0
consumer_1 consume pear_0
producer_1 produce pear_1
producer_2 produce peach_1
producer_0 produce apple_1
consumer_2 consume apple_0
consumer_1 consume peach_1
consumer_2 consume pear_1
consumer_1 consume apple_1

 
經過向隊列中插入 None,來告訴消費者生產已經結束。這是一種比較低端的實現方式。
JoinableQueue 類是 Queue 類的擴展,JoinableQueue 類中的 task_done() 方法爲消費者調用方法,表示從隊列中獲取的項目(queue.get() 獲取的數據)已經被處理;JoinableQueue 類中的 join() 方法爲生產者調用的方法,生產者在調用 join() 方法後會被阻塞,直到隊列中的每一個項目都被調用 queue.task_done() 方法爲止。
 
以下示例是經過 task_done() 方法 和 join() 方法來實現相似於上述的發送結束信號機制。

from multiprocessing import Process, JoinableQueue
import time

def producer(name, production, queue):
    for i in range(2):
        time.sleep(0.5)
        queue.put(production + '_' + str(i))
        print('%s produce %s' % (name, production + '_' + str(i)), end="\n")
    queue.join()

def consumer(name, queue):
    while True:
        data = queue.get()
        time.sleep(0.3)
        print('%s consume %s' % (name, data), end="\n")
        queue.task_done()

if __name__ == '__main__':
    queue = JoinableQueue()
    p_list = []
    for index, f in enumerate(['apple', 'pear', 'peach']):
        p = Process(target=producer, args=('producer_' + str(index), f, queue))
        p_list.append(p)
        p.start()
    c1 = Process(target=consumer, args=('consumer_1', queue))
    c2 = Process(target=consumer, args=('consumer_2', queue))
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()
    [p.join() for p in p_list]
    print('end...')

 
輸出結果與上一個示例一致。這裏將 2個 consumer 設置爲守護進程,在等待 producer 完成後,也隨主進程的結束而結束。

管道

管道的使用:

from multiprocessing import Process, Pipe

def func(pro, con):
    pro.close()
    while True:
        try:
            print(con.recv())
        except EOFError:
            con.close()
            break

if __name__ == '__main__':
    pro, con = Pipe()        # pro, con 分別表示管道的兩端
    Process(target=func, args=(pro, con)).start()
    con.close()             # 這裏也能夠不關閉
    for i in range(5):
        pro.send(i)
    pro.close()

# 輸出結果:
0
1
2
3
4

 
傳給進程的 conn(管道鏈接)是不會相互影響的,在一個進程中關閉了管道,並不會影響這個管道在另外一個進程中的使用。如果在一個進程中,管道的一端沒有被用到,那麼就應該將這一端關閉。例如在生產者中,應該關閉管道的 con 端(右端),在消費者中應該關閉管道的 pro 端(左端)。
 
當管道全部的入口都已經關閉(上述示例中,主進程和子進程中管道的入口都爲 pro),消費者繼續接收數據(調用 recv() 方法),當管道中已經沒有數據時,就會拋出 EOFError。
 
若是管道有入口沒有關閉,且該入口沒有在向管道發送數據,那麼消費者就會阻塞在 recv() 方法上。
如上示例是經過 拋出 EOFError 錯誤來結束管道,還有另外一種方式,就是經過管道中的數據(例如向管道中傳遞None)來結束管道

from multiprocessing import Process, Pipe

def func(con):
    while True:
        data = con.recv()
        if data is None: break
        print(data)

if __name__ == '__main__':
    pro, con = Pipe()        # con, pro 分別表示管道的兩端
    Process(target=func, args=(con,)).start()
    for i in range(5):
        pro.send(i)
    pro.send(None)

 
多個消費者消費管道中的數據示例(加鎖):

from multiprocessing import Process, Pipe, Lock
import time

def producer(pro, con, name, production):
    con.close()
    for i in range(4):
        time.sleep(0.5)
        pro.send(production + str(i))
        print('%s produce %s' % (name, production + '_' + str(i)), end="\n")
    pro.close()

def consumer(pro, con, name, lock):
    pro.close()
    while True:
        lock.acquire()
        try:
            data = con.recv()
            time.sleep(0.3)
            print('%s consume %s' % (name, data), end="\n")
        except EOFError:
            con.close()
            break
        finally:
            lock.release()

if __name__ == '__main__':
    pro, con = Pipe()
    lock = Lock()
    Process(target=producer, args=(pro, con, 'producer', 'apple')).start()
    Process(target=consumer, args=(pro, con, 'c_1', lock)).start()
    Process(target=consumer, args=(pro, con, 'c_2', lock)).start()
    pro.close()
    con.close()

 
pipe(管道)是進程數據不安全的,隊列進程之間是數據安全的,由於隊列的實現就是基於管道和鎖實現的。因此管道極少被用到,生產環境中 pipe 通常也不多被用到,使用較多的通常會是隊列服務器,例如 rabbitmq,kafka…...

信號量

信號量也是一種鎖,信號量與互斥鎖區別在於,互斥鎖的 acquire() 方法和 release() 方法之間,僅容許一個線程(或進程)執行,而信號量可容許多個線程(或進程)執行。信號量的一種應用就是控制併發執行的線程(或進程)數。

from multiprocessing import Process, Semaphore
import time

def func(semaphore, name):
    if semaphore.acquire():
        print(name)
        time.sleep(2)
        semaphore.release()

if __name__ == '__main__':
    semaphore = Semaphore(3)
    for i in range(9):
        Process(target=func, args=(semaphore, 'process_' + str(i), )).start()

事件

Python中的事件(Event)主要用於主線程(進程)控制其餘線程(進程)的執行,其主要方法包括 set、wait、clear,is_set。
若事件(Event)的標記取值爲 False,則線程(進程)會阻塞在 event.wait() 方法,event.wait() 還能夠設置一個參數 timeout,在等待 timeout 指定的時間後中止阻塞,繼續運行。
 
方法說明:

event.set():將 event 的標記設置爲 True,全部 阻塞在 event.wait() 的線程(進程)都會繼續執行
event.clear():將 event 的標記設置爲 False。
event.is_set():判斷 event 的標誌是否爲 True。

 
以下示例,在主進程中控制子進程在什麼時候繼續向下執行。例如在主進程的 time.sleep(3) 處能夠執行一些檢測工做,確保子進程的運行,若檢測沒有問題則繼續子進程的運行。

from multiprocessing import Process, Event
import time

def worker(name, event):
    print('Process_%s is ready' % name)
    event.wait()
    print('Process_%s is running' % name)

if __name__ == '__main__':
    event = Event()
    for i in range(0, 2):
        Process(target=worker, args=(i, event)).start()
    time.sleep(3)
    event.set()

# 結果輸出:
Process_0 is ready
Process_1 is ready
Process_0 is running
Process_1 is running

 
如上示例,若主進程一直沒有容許子進程繼續執行(例如檢測工做沒有經過),則子進程會一直阻塞在 event.wait() 這兒,咱們但願在子進程阻塞過程當中會有持續的提示信息,這個能夠經過設置 event.wait 方法的 timeout 參數實現。

from multiprocessing import Process, Event
import time

def worker(name, event):
    while not event.is_set():
        print('Process_%s is ready' % name)
        event.wait(1)
    print('Process_%s is running' % name)

if __name__ == '__main__':
    event = Event()
    for i in range(0, 2):
        Process(target=worker, args=(i, event)).start()
    time.sleep(3)
    event.set()

# 結果輸出:
Process_0 is ready
Process_1 is ready
Process_0 is ready
Process_1 is ready
Process_0 is ready
Process_1 is ready
Process_0 is ready
Process_1 is ready
Process_1 is running
Process_0 is running

進程池

進程的建立和銷燬都須要消耗系統資源,且每一臺服務器的 cpu 核心數有限,建立過多的進程反而會下降執行效率。這裏就可使用進程池,進程池一啓動就會建立固定數量的進程,有執行須要了,就從進程池中獲取一個進程處理對應的任務,處理完成後,進程不會被銷燬,而是放回進程池中。若是同時須要執行的任務過多,沒有獲取到進程的任務須要等待,等有空閒的進程了才能運行。
 
進程池節省了操做系統在建立和銷燬進程上所花去的開銷,也限制了同一時間可以運行的進程總數,在必定程度上提高了多進程的執行效率。
 
以下示例是使用進程池啓動進程和直接啓動進程的效率差距:

from multiprocessing import Process, Pool
import time

def m_add(a):
    return a ** a

if __name__ == '__main__':
    # print(os.cpu_count())    # 調試環境的 cpu 核數爲 8
    # 建立進程池
    pool = Pool(8)
    start_t1 = time.time()
    # 使用進程池啓動進程
    res = pool.map(m_add, range(500))
    print(time.time() - start_t1)
    p_list = []
    start_t2 = time.time()
    # 直接啓動進程
    for i in range(500):
        p = Process(target=m_add, args=(i, ))
        p_list.append(p)
        p.start()
    for p in p_list: p.join()
    print(time.time() - start_t2)

# 輸出結果:
0.003328084945678711
0.6395020484924316

 
建立進程池:

Pool([numprocess  [,initializer [, initargs]]]):
    numprocess:進程池中的固定繼承數,默認爲 cpu 核心數(os.cpu_count())
    initializer:每次啓動進程須要執行的可調用對象
    initargs:傳遞給 initializer 的參數

 
Pool 的經常使用方法:

map(func, iterable):異步提交任務。iterable 爲一個可迭代對象,這個可迭代對象的長度是多少,就啓動多少個子進程,且可迭代對象的每個元素會做爲參數傳遞給 func。注意,使用 map 方法開啓子進程,只能傳遞一個參數,若子進程須要多個參數,則這個參數可使用 元組;將全部子進程的返回結果以列表的形式返回。
apply(func [, args [, kwargs]]):同步提交任務,返回子進程的執行結果。若是須要併發地執行 func,必須從不一樣線程中調用同一個進程池的 apply() 方法;
apply_async(func [, args [, kwargs]]):異步提交任務,返回 AsyncResult 類的實例,從 AsyncResult 實例中獲取執行結果。與 map 方法的區別是,apply_async 方法能夠爲所欲爲地傳遞參數;
close():結束進程池接受任務;
jion():感知進程池中的任務執行結束。即全部提交進來的任務都已經執行完畢,且沒有新的任務提交進來。

Tip:進程池能夠有返回值,這是進程池特有的,可是直接起進程,是作不到有返回值的。
 
apply 方法應用:

import time, os
from multiprocessing import Pool

def worker(i):
    print('worker_%s running, pid: %s' % (i, os.getpid()))
    time.sleep(1)
    return i * i

if __name__ == '__main__':
    pool = Pool(3)
    res_list = []
    for i in range(7):
        res = pool.apply(worker, args=(i, ))    # 返回的 res 便是子進程的返回結果
        res_list.append(res)
    print(res_list)
    print('...end')

# 輸出結果:
worker_0 running, pid: 20584
worker_1 running, pid: 20585
worker_2 running, pid: 20586
worker_3 running, pid: 20584
worker_4 running, pid: 20585
worker_5 running, pid: 20586
worker_6 running, pid: 20584
[0, 1, 4, 9, 16, 25, 36]
...end

 
在同一個線程中使用 pool.apply 方法提交任務,是提交一個,執行一個,執行完成後才能繼續提交下一個任務。如上輸出結果也是逐個輸出。
 
apply_async 方法應用:

import time, os
from multiprocessing import Pool

def worker(i):
    print('worker_%s running, pid: %s' % (i, os.getpid()))
    time.sleep(1)
    return i * i

if __name__ == '__main__':
    pool = Pool(3)
    res_list = []
    for i in range(7):
        res = pool.apply_async(worker, args=(i, ))   # res 爲 AsyncResult 類的實例
        res_list.append(res)
    pool.close()
    pool.join()
    for i in res_list:
        print(i.get())
    print('...end')

# 輸出結果:
worker_0 running, pid: 20598
worker_1 running, pid: 20599
worker_2 running, pid: 20600
worker_3 running, pid: 20598
worker_4 running, pid: 20599
worker_5 running, pid: 20600
worker_6 running, pid: 20599
0
1
4
9
16
25
36
...end

 
經過 AsyncResult 對象的 get 方法獲取返回值,get 方法會阻塞,即阻塞到子進程執行完畢,而後獲取其返回值。
通常使用 apply_async 方法 異步提交任務,須要在主進程中感知任務結束(join方法),而且在 join 方法前面結束進程池接受任務(close方法)
 
map 方法應用:

import time, os
from multiprocessing import Pool

def worker(i):
    print('worker_%s running, pid: %s' % (i, os.getpid()))
    time.sleep(1)
    return i * i

if __name__ == '__main__':
    pool = Pool(3)
    res_list = pool.map(worker, range(7))
    for i in res_list:
        print(i)
    print('...end')

# 輸出結果:
worker_0 running, pid: 20713
worker_1 running, pid: 20714
worker_2 running, pid: 20715
worker_3 running, pid: 20714
worker_4 running, pid: 20713
worker_5 running, pid: 20715
worker_6 running, pid: 20715
0
1
4
9
16
25
36
...end

 
map 方法自帶 join 方法和 close 方法,map 方法啓動子進程後,就不容許再提交任務,且 map 方法會阻塞,直到子進程所有執行完畢,且將全部子進程的返回結果以列表的形式返回。
如果不想阻塞在 map 方法,則可使用 map_async,只是用了 map_async 方法,須要本身進行 close 和 join。

import time, os
from multiprocessing import Pool

def worker(i):
    print('worker_%s running, pid: %s' % (i, os.getpid()))
    time.sleep(1)
    return i * i

if __name__ == '__main__':
    pool = Pool(3)
    res_list = pool.map_async(worker, range(7))
    pool.close()
    pool.join()
    for i in res_list.get():
        print(i)
    print('...end')

 
返回結果與上述一致。

回調函數

進程池中一個進程處理完任務以後,這進程能夠調用一個函數去處理該進程返回的結果,這個函數就是回調函數。回調函數的主要做用是告訴主進程,這裏已經執行完畢,主進程能夠針對返回結果繼續後續的處理。相對於主進程輪詢等待子進程的返回結果,利用回調函數能夠提升程序的執行效率。
 
注意回調函數是由主進程執行的,能夠將一些比較耗IO的操做放到進程池中執行,由主進程統一處理它們的返回結果。
 
回調函數簡單示例:

from multiprocessing import Pool

def func(info):
    print('...' + str(info))

def worker(i):
    return i * i

if __name__ == '__main__':
    pool = Pool(3)
    res_list = []
    for i in range(7):
        res = pool.apply_async(worker, args=(i, ), callback=func)
        res_list.append(res)
    pool.close()
    pool.join()
    print('~end')

# 輸出結果:
...0
...4
...9
...1
...16
...36
...25
~end

 
以下示例中,能夠將具體的業務放在 worker 方法中,例如從網絡上爬取數據,而後統一由回調函數 func 寫到一個文件中。

from multiprocessing import Pool

def func(info):
    with open('abc.txt', 'a+') as f:
        f.writelines(str(info) + '\n')
def worker(i):
    return i * i

if __name__ == '__main__':
    pool = Pool()
    for i in range(10):
        pool.apply_async(worker, (i,), callback=func)
    pool.close()
    pool.join()

 .................^_^

相關文章
相關標籤/搜索