多進程 (一) — 像線程同樣管理進程

因爲和線程類似,這裏的前幾個示例都是從線程示例中修改的。python

內容目錄

  1. multiprocessing基礎
  2. 可導入的目標函數
  3. 肯定當前進程
  4. 守護進程 Daemon
  5. 等待進程 join()
  6. 終止進程 terminate()
  7. 進程退出狀態
  8. 調試 log_to_stderr
  9. 子類化進程

1.multiprocessing基礎

生成進程的最簡單的方法是用目標函數實例化一個進程對象,並調用start()來讓它開始工做。bootstrap

import multiprocessing


def worker():
    """worker function"""
    print('Worker')


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

運行結果:併發

Worker
Worker
Worker
Worker
Worker

傳遞參數:app

import multiprocessing


def worker(num):
    """thread worker function"""
    print('Worker:', num)


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

運行結果:函數

Worker: 1
Worker: 0
Worker: 3
Worker: 2
Worker: 4

2.可導入的目標函數

# main.py
import multiprocessing
import multiprocessing_import_worker

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(
            target=multiprocessing_import_worker.worker,
        )
        jobs.append(p)
        p.start()

其中worker()函數在multiprocessing_import_worker.py中定義:操作系統

# multiprocessing_import_worker.py
def worker():
    """worker function"""
    print('Worker')
    return

運行結果:線程

Worker
Worker
Worker
Worker
Worker

3.肯定當前進程

import multiprocessing
import time


def worker():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(2)
    print(name, 'Exiting')


def my_service():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(3)
    print(name, 'Exiting')


if __name__ == '__main__':
    service = multiprocessing.Process(
        name='my_service',
        target=my_service,
    )
    worker_1 = multiprocessing.Process(
        name='worker 1',
        target=worker,
    )
    worker_2 = multiprocessing.Process(  # default name
        target=worker,
    )

    worker_1.start()
    worker_2.start()
    service.start()

運行結果:能夠看到默認的進程名字對應於Process-3, 和線程很相似。調試

worker 1 Starting
Process-3 Starting
my_service Starting
worker 1 Exiting
Process-3 Exiting
my_service Exiting

4.守護進程 Daemon

默認狀況下主進程會在子進程所有執行完畢後才退出,若是子進程設置爲守護進程,便再也不阻塞主進程退出。
爲了將進程標記爲守護進程只需將daemon屬性設置爲True。默認狀況下,進程不是守護進程。日誌

import multiprocessing
import time
import sys


def daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    time.sleep(2)
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


def non_daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

運行結果:能夠看到d進程還沒執行完主進程就退出了。code

Starting: daemon 24852
Starting: non-daemon 23248
Exiting : non-daemon 23248

在主程序退出以前,守護進程會自動終止,這將避免留下孤兒進程。

5.等待進程 join()

要等到進程完成工做再退出,請使用join()方法。

import multiprocessing
import time
import sys


def daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    time.sleep(2)
    print('Exiting :', name)


def non_daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    print('Exiting :', name)


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

    d.join()
    n.join()

運行結果:

Starting: non-daemon
Exiting : non-daemon
Starting: daemon
Exiting : daemon

默認狀況下,join()無限期阻塞。也能夠使用超時參數。

6.終止進程 terminate()

在一個進程調用terminate()會殺死子進程

import multiprocessing
import time


def slow_worker():
    print('Starting worker')
    time.sleep(0.1)
    print('Finished worker')


if __name__ == '__main__':
    p = multiprocessing.Process(target=slow_worker)
    print('BEFORE:', p, p.is_alive())

    p.start()
    print('DURING:', p, p.is_alive())

    p.terminate()
    print('TERMINATED:', p, p.is_alive())

    p.join()
    print('JOINED:', p, p.is_alive())

運行結果:在終止它以後,使用join()是很重要的,以便讓進程管理代碼有時間來更新對象的狀態以反映終止。

BEFORE: <Process(Process-1, initial)> False
DURING: <Process(Process-1, started)> True
TERMINATED: <Process(Process-1, started)> True
JOINED: <Process(Process-1, stopped[SIGTERM])> False

7.進程退出狀態

當進程退出時產生的狀態碼能夠經過exitcode屬性訪問。容許的範圍列在下面的表格中。

Exit Code Meaning
== 0 row 1 col 2
> 0 the process had an error, and exited with that code
< 0 the process had an error, and exited with that code
import multiprocessing
import sys
import time


def exit_error():
    sys.exit(1)


def exit_ok():
    return


def return_value():
    return 1


def raises():
    raise RuntimeError('There was an error!')


def terminated():
    time.sleep(3)


if __name__ == '__main__':
    jobs = []
    funcs = [
        exit_error,
        exit_ok,
        return_value,
        raises,
        terminated,
    ]
    for f in funcs:
        print('Starting process for', f.__name__)
        j = multiprocessing.Process(target=f, name=f.__name__)
        jobs.append(j)
        j.start()

    jobs[-1].terminate()

    for j in jobs:
        j.join()
        print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))

運行結果:

Starting process for exit_error
Starting process for exit_ok
Starting process for return_value
Starting process for raises
Starting process for terminated
Process raises:
     exit_error.exitcode = 1
        exit_ok.exitcode = 0
Traceback (most recent call last):
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "E:\MyPython\image.py", line 19, in raises
    raise RuntimeError('There was an error!')
RuntimeError: There was an error!
   return_value.exitcode = 0
         raises.exitcode = 1
     terminated.exitcode = -15

exit(0):無錯誤退出
exit(1):有錯誤退出
退出代碼是告訴解釋器的(或操做系統)

8.調試 log_to_stderr

在調試併發性問題時,能夠使用multiprocessing所提供的對象的內部構件。有一個方便的模塊級函數爲logtostderr()。它使用logging設置一個記錄器對象,並添加一個處理程序,以便將日誌消息發送到標準錯誤通道。

import multiprocessing
import logging
import sys


def worker():
    print('Doing some work')
    sys.stdout.flush()


if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.DEBUG)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

運行結果:認狀況下,logging級別被設置爲NOTSET不會產生任何消息。

Doing some work
[INFO/Process-1] child process calling self.run()
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

若想直接操做日誌請使用get_logger()獲取

import multiprocessing
import logging
import sys


def worker():
    print('Doing some work')
    sys.stdout.flush()


if __name__ == '__main__':
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

運行結果:

[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down

9.子類化進程

經過multiprocessing.Process能夠建立進程,也能夠經過自定義子類建立進程。

import multiprocessing


class Worker(multiprocessing.Process):

    def run(self):
        print('In {}'.format(self.name))
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()

運行結果:

In Worker-2
In Worker-4
In Worker-3
In Worker-1
In Worker-5
相關文章
相關標籤/搜索