多進程 (二) — 信號傳遞與進程控

內容目錄:

  1. multiprocessing.Queue()
  2. JoinableQueue
  3. 進程間的信號傳遞 Event
  4. 控制對資源的訪問 Lock
  5. 同步操做 Condition
  6. 控制對資源的併發訪問 Semaphore
  7. 管理共享狀態 Manager
  8. 共享命名空間 mgr.Namespace()
  9. 進程池 multiprocessing.Pool

1. multiprocessing.Queue()

和線程同樣,多進程的一個常見的使用模式是將一個任務劃分爲幾個worker,以便並行運行。有效地使用多進程一般須要它們之間的一些通訊,這樣工做就能夠被分割,結果能夠被聚合。一種簡單方法是使用隊列multiprocessing.Queue()來回傳遞消息。任何能夠用pickle序列化的對象均可以經過隊列。python

import multiprocessing


class MyFancyClass:

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print('Doing something fancy in {} for {}!'.format(
            proc_name, self.name))


def worker(q):
    obj = q.get()
    obj.do_something()



if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass('Fancy Dan'))

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()

結果:當q是空的時候,q.get()會等。併發

Doing something fancy in Process-1 for Fancy Da

2. JoinableQueue

JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:app

  • q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
  • q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止

使用None這個特殊值來判斷是否結束Workerui

import multiprocessing
import time


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print('{}: Exiting'.format(proc_name))
                self.task_queue.task_done()
                break
            # next_task是Task()的一個實例,打印next_task會輸出__str__
            print('{}: {}'.format(proc_name, next_task))
            # 執行next_task()會執行__call__
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)


class Task:

    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)  # pretend to take time to do the work
        return '{self.a} * {self.b} = {product}'.format(
            self=self, product=self.a * self.b)

    def __str__(self):
        return '{self.a} * {self.b}'.format(self=self)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print('Creating {} consumers'.format(num_consumers))
    consumers = [
        Consumer(tasks, results)
        for i in range(num_consumers)
    ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print('Result:', result)
        num_jobs -= 1

執行結果:spa

Creating 8 consumers
Consumer-4: 0 * 0
Consumer-1: 1 * 1
Consumer-2: 2 * 2
Consumer-4: 3 * 3
Consumer-1: 4 * 4
Consumer-2: 5 * 5
Consumer-1: 6 * 6
Consumer-6: 7 * 7
Consumer-4: 8 * 8
Consumer-2: 9 * 9
Consumer-1: Exiting
Consumer-4: Exiting
Consumer-6: Exiting
Consumer-2: Exiting
Consumer-5: Exiting
Consumer-8: Exiting
Consumer-3: Exiting
Consumer-7: Exiting
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 2 * 2 = 4
Result: 4 * 4 = 16
Result: 3 * 3 = 9
Result: 5 * 5 = 25
Result: 6 * 6 = 36
Result: 8 * 8 = 64
Result: 7 * 7 = 49
Result: 9 * 9 = 81

3. 進程間的信號傳遞 Event

Event類提供了一種簡單的方法來在進程之間傳遞狀態信息。
當wait()超時時,它返回時不會出現錯誤。調用者負責使用is_set()檢查事件的狀態線程

import multiprocessing
import time


def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print('wait_for_event: starting')
    e.wait()
    print('wait_for_event: e.is_set()->', e.is_set())


def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print('wait_for_event_timeout: starting')
    e.wait(t)
    print('wait_for_event_timeout: e.is_set()->', e.is_set())


if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(
        name='block',
        target=wait_for_event,
        args=(e,),
    )
    w1.start()

    w2 = multiprocessing.Process(
        name='nonblock',
        target=wait_for_event_timeout,
        args=(e, 2),
    )
    w2.start()
    print('main: waiting before calling Event.set()')
    time.sleep(3)
    e.set()
    print('main: event is set')

執行結果:code

main: waiting before calling Event.set()
wait_for_event: starting
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set()-> False
main: event is set
wait_for_event: e.is_set()-> True

4. 控制對資源的訪問 Lock

在須要在多個進程之間共享單個資源的狀況下,可使用鎖來避免衝突的訪問。orm

import multiprocessing
import sys


def worker_with(lock):
    with lock:
        sys.stdout.write('Lock acquired via with\n')


def worker_no_with(lock):
    lock.acquire()
    try:
        sys.stdout.write('Lock acquired directly\n')
    finally:
        lock.release()

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    w = multiprocessing.Process(
        target=worker_with,
        args=(lock,),
    )
    nw = multiprocessing.Process(
        target=worker_no_with,
        args=(lock,),
    )

    w.start()
    nw.start()

    w.join()
    nw.join()

運行結果:對象

Lock acquired via with
Lock acquired directly

5. 同步操做 Condition

cond.wait()等着,cond.notify_all()通知能夠往下運行了隊列

import multiprocessing
import time


def stage_1(cond):
    """perform first stage of work,
    then notify stage_2 to continue
    """
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        print('{} done and ready for stage 2'.format(name))
        cond.notify_all()


def stage_2(cond):
    """wait for the condition telling us stage_1 is done"""
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        cond.wait()
        print('{} running'.format(name))


if __name__ == '__main__':
    condition = multiprocessing.Condition()
    s1 = multiprocessing.Process(name='s1',
                                 target=stage_1,
                                 args=(condition,))
    s2_clients = [
        multiprocessing.Process(
            name='stage_2[{}]'.format(i),
            target=stage_2,
            args=(condition,),
        )
        for i in range(1, 3)
    ]

    for c in s2_clients:
        c.start()
        time.sleep(1)
    s1.start()

    s1.join()
    for c in s2_clients:
        c.join()

運行結果:在這個例子中,兩個進程並行地運行第二階段的工做,可是隻有在第一個階段完成以後。

Starting stage_2[1]
Starting stage_2[2]
Starting s1
s1 done and ready for stage 2
stage_2[1] running
stage_2[2] running

6. 控制對資源的併發訪問 Semaphore

有時,容許多個worker一次訪問一個資源是頗有用的,但要限制了數量。

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release\n");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

運行結果:

Process-2acquire
Process-3acquire
Process-2release

Process-4acquire
Process-3release

Process-1acquire
Process-1release

Process-5acquire
Process-4release

Process-5release

7. 管理共享狀態 Manager

經過Manager共享信息,全部進程都能看獲得。

import multiprocessing
import pprint


def worker(d, key, value):
    d[key] = value


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    d = mgr.dict()
    jobs = [
        multiprocessing.Process(
            target=worker,
            args=(d, i, i * 2),
        )
        for i in range(10)
    ]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print('Results:', d

運行結果:經過Manager建立列表,它是共享的,而且在全部進程中均可以看到更新。字典也支持。

Results: {0: 0, 2: 4, 3: 6, 1: 2, 4: 8, 6: 12, 5: 10, 7: 14, 8: 16, 9: 18}

8. 共享命名空間 Manager

除了字典和列表以外,管理者還能夠建立一個共享的名稱空間。

import multiprocessing


def producer(ns, event):
    ns.value = 'This is the value'
    event.set()


def consumer(ns, event):
    try:
        print('Before event: {}'.format(ns.value))
    except Exception as err:
        print('Before event, error:', str(err))
    event.wait()
    print('After event:', ns.value)


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    event = multiprocessing.Event()
    p = multiprocessing.Process(
        target=producer,
        args=(namespace, event),
    )
    c = multiprocessing.Process(
        target=consumer,
        args=(namespace, event),
    )

    c.start()
    p.start()

    c.join()
    p.join()

運行結果:能夠看到在另外一個進程中能夠對mgr.Namespace()進行復制,其餘進程能夠訪問。

Before event, error: 'Namespace' object has no attribute 'value'
After event: This is the value

重要的是要知道mgr.Namespace()中可變值的內容的更新不會自動傳播。

import multiprocessing


def producer(ns, event):
    # DOES NOT UPDATE GLOBAL VALUE!
    ns.my_list.append('This is the value')
    event.set()


def consumer(ns, event):
    print('Before event:', ns.my_list)
    event.wait()
    print('After event :', ns.my_list)


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    namespace.my_list = []

    event = multiprocessing.Event()
    p = multiprocessing.Process(
        target=producer,
        args=(namespace, event),
    )
    c = multiprocessing.Process(
        target=consumer,
        args=(namespace, event),
    )

    c.start()
    p.start()

    c.join()
    p.join()

運行結果:

Before event: []
After event : []

9. 進程池 multiprocessing.Pool

池類可用於管理固定數量的worker,用於簡單的工做,在這些狀況下,能夠將工做分解並獨立地分配給worker。

import multiprocessing


def do_calculation(data):
    return data * 2


def start_process():
    print('Starting', multiprocessing.current_process().name)


if __name__ == '__main__':
    inputs = list(range(10))
    print('Input   :', inputs)

    builtin_outputs = map(do_calculation, inputs)
    print('Built-in:', builtin_outputs)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks

    print('Pool    :', pool_outputs)

運行結果:進程的返回值被收集並做爲一個列表返回。

Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x000000000256A080>
Starting SpawnPoolWorker-2
Starting SpawnPoolWorker-3
Starting SpawnPoolWorker-4
Starting SpawnPoolWorker-1
Starting SpawnPoolWorker-6
Starting SpawnPoolWorker-5
Starting SpawnPoolWorker-7
Starting SpawnPoolWorker-8
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

在默認狀況下,池會建立固定數量的worker進程,並將做業傳遞給他們,直到沒有更多的工做。設置maxtasksperchild參數告訴池在完成了幾個任務後從新啓動worker進程,防止長時間運行的worker消耗更多的系統資源。

import multiprocessing


def do_calculation(data):
    return data * 2


def start_process():
    print('Starting', multiprocessing.current_process().name)


if __name__ == '__main__':
    inputs = list(range(10))
    print('Input   :', inputs)

    builtin_outputs = map(do_calculation, inputs)
    print('Built-in:', builtin_outputs)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
        maxtasksperchild=2,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks

    print('Pool    :', pool_outputs)

運行結果:當工人完成分配的任務時,即便沒有更多的工做,他們也會從新開始工做。在這個輸出中,有9個worker被建立,儘管只有10個任務,有的worker一次能夠完成其中的兩個任務。

Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x00000000025CA080>
Starting SpawnPoolWorker-4
Starting SpawnPoolWorker-2
Starting SpawnPoolWorker-1
Starting SpawnPoolWorker-5
Starting SpawnPoolWorker-3
Starting SpawnPoolWorker-8
Starting SpawnPoolWorker-6
Starting SpawnPoolWorker-7
Starting SpawnPoolWorker-9
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
相關文章
相關標籤/搜索