multiprocessing- 基於進程的並行性

介紹
multiprocessing是一個使用相似於threading模塊的API支持生成進程的包。該multiprocessing軟件包提供本地和遠程併發,經過使用子進程而不是線程有效地支持 全局解釋器鎖。multiprocessing模塊充分利用給定機器上的多個處理器。它能夠在Unix和Windows上運行。python

該multiprocessing模塊還引入了threading模塊中沒有模擬的API 。一個主要的例子是該 Pool對象提供了一種方便的方法,能夠跨多個輸入值並行化函數的執行,跨過程分配輸入數據(數據並行)。如下示例演示了在模塊中定義此類函數的常見作法,以便子進程能夠成功導入該模塊。這個數據並行的基本例子使用Pool編程

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

>>>[1, 4, 9]
#encoding:utf-8
# __author__ = 'donghao'
# __time__ = 2019/4/1 11:27
from multiprocessing import Pool
import time
import os
# 進程池
# 大量進程建立,使用pool的方法

def worker(msg):
    start = time.time()
    print('%s開始執行,進程號%d'%(msg,os.getpid()))
    time.sleep(1)
    end = time.time()
    print('耗時%0.2f'%(end-start))


if __name__ == '__main__':
    po = Pool(3)
    for i in range(10):
        po.apply_async(worker, (i,))

    print('——tart____')
    po.close()  # 關閉進程池,關閉後再也不接受新的請求
    po.join()  # 等待全部的子進程執行完成,必須放到close以後

apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的
close() 關閉pool,使其不在接受新的任務。
terminate() 結束工做進程,不在處理未完成的任務。
join() 主進程阻塞,等待子進程的退出, join方法要在close或terminate以後使用。安全

Process
multiprocessing,經過建立Process 對象而後調用其start()方法來生成進程。 Process 遵循API的threading.Thread服務器

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

顯示所涉及的各個進程ID網絡

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('父進程:', os.getppid())
    print('進程:', os.getpid())

def f(name):
    info('函數 f')
    print('我是', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('魯班七號',))
    p.start()
    p.join()
    
>>>
main line
module name: __main__
父進程: 1668
進程: 1368
函數 f
module name: __mp_main__
父進程: 1368
進程: 4644
我是 魯班七號

multiprocessing 支持進程之間的兩種通訊
隊列併發

這個Queue是近乎克隆的queue.Queue。例如:app

from multiprocessing import Process, Queue

def f(q):
    q.put(['魯班七號', '妲己', '後裔'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints ['魯班七號', '妲己', '後裔']
    p.join()

隊列是線程和進程安全的。async

管道函數

from multiprocessing import Process, Pipe

def f(conn):
    conn.send(['魯班七號', '妲己', '後裔'])
    conn.close()

if __name__ == '__main__':
    parent_conn,child_conn = Pipe()
    p = Process(target=f,args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()
    parent_conn.close()

返回的兩個鏈接對象Pipe()表示管道的兩端。每一個鏈接對象都有send()recv()方法(以及其餘)。請注意,若是兩個進程(或線程)同時嘗試讀取或寫入管道的同一端,則管道中的數據可能會損壞。固然,同時使用管道的不一樣端的進程不存在損壞的風險spa

進程間的同步

multiprocessing包含全部同步原語的等價物threading。例如,可使用鎖來確保一次只有一個進程打印到標準輸出:

from multiprocessing import Process, Lock
def f(l, i):
    print('hello world', i)
if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

不使用來自不一樣進程的鎖輸出容易被混淆。

進程間共享狀態

在進行併發編程時,一般最好儘可能避免使用共享狀態。使用多個進程時尤爲如此。
可是,若是您確實須要使用某些共享數據,那麼 multiprocessing提供了幾種方法。

共享內存

可使用Value或 將數據存儲在共享存儲器映射中Array。例如,如下代碼

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

    
>>>
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

服務器進程
Manager()控制器返回的管理器對象控制一個服務器進程,該進程保存Python對象並容許其餘進程使用代理操做它們
經過返回的經理Manager()將支持類型

list,dict,Namespace,Lock, RLock,Semaphore,BoundedSemaphore, Condition,Event,Barrier, Queue,Value和Array

例如

from multiprocessing import Process, Manager

def f(d, l, kills):
    d['name'] = '程咬金'
    d['slogan'] = '真男人,必需要有強健的肌肉,身體和精神'
    d['裝備'] = None
    l.reverse()
    kills.append('後裔')

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))
        kills = manager.list(['達摩','魯班七號'])
        p = Process(target=f, args=(d, l, kills))
        p.start()
        p.join()

        print(d)
        print(l)
        print(kills)

>>>
{'name': '程咬金', 'slogan': '真男人,必需要有強健的肌肉,身體和精神', '裝備': None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
['達摩', '魯班七號', '後裔']

服務器進程管理器比使用共享內存對象更靈活,由於它們能夠支持任意對象類型。此外,單個管理器能夠經過網絡由不一樣計算機上的進程共享。可是,它們比使用共享內存慢。

daemon程序

# 不加daemon屬性
import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()))
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()))

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print("end!")
>>>
end!
work start:Mon Apr  1 16:08:40 2019
work end:Mon Apr  1 16:08:43 2019
#加上daemon屬性
import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()))
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()))

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    print("end!")
>>>
end!

注:因子進程設置了daemon屬性,主進程結束,它們就隨着結束了。

Event用來實現進程間同步通訊。

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 1))
    w1.start()
    w2.start()

    time.sleep(5)

    e.set()
    print("main: event is set")

>>>
wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True

文件拷貝器:

#encoding:utf-8
# __author__ = 'donghao'
# __time__ = 2019/4/1 14:14
from multiprocessing import pool,Manager,Queue
import os,time

def mycopy(old_file_name, new_file_name, filename, queue):
    f = open(old_file_name+'/' + filename,'rb')
    content = f.read()
    f.close()
    w = open(new_file_name+'/' + filename,'wb')
    w.write(content)
    w.close()
    queue.put(filename)

def main():
    old_file_name = input('請輸入文件名稱')
    path = os.listdir(old_file_name)
    length = len(path)
    po = pool.Pool(5)
    queue = Manager().Queue()
    try:
        new_file_name = old_file_name+'[副本]'
        os.mkdir(new_file_name)
    except:
        pass
    for filename in path:
        po.apply_async(mycopy,args=(old_file_name, new_file_name, filename, queue))
    po.close()
    copy_file_nums = 0
    while True:
        filename = queue.get()
        copy_file_nums += 1
        print('\r 拷貝進度: %0.2f %%'%(copy_file_nums*100/length),end='')
        if copy_file_nums >= length:
            break
    print('\n文件拷貝成功!')
    po.join()

if __name__ == '__main__':
    main()
相關文章
相關標籤/搜索