Python的進程

進程

說明:本文是基於Py2.X環境,3.X在我電腦上出了些問題。二者差異並不大。安全

Python實現多進程的方式主要有兩種:一種方法是使用os模塊中的fork方法; 另外一種是使用multiprocessing模塊。這兩種方法的區別在於前者僅適用於Unix/Linux操做操做。對win是不支持的,然後者則是跨平臺的實現方式。app

使用os模塊中的fork方式實現多進程。

Unix/Linux操做系統提供了一個fork()系統調用,它很是特殊。普通的函數調用,調用一次,返回一次,可是fork()調用一次,返回兩次,由於操做系統自動把當前進程(稱爲父進程)複製了一份(稱爲子進程),而後,分別在父進程和子進程內返回。dom

子進程永遠返回0,而父進程返回子進程的ID。這樣作的理由是,一個父進程能夠fork出不少子進程,因此,父進程要記下每一個子進程的ID,而子進程只須要調用getppid()就能夠拿到父進程的ID。async

Python的os模塊封裝了常見的系統調用,其中就包括fork,能夠在Python程序中輕鬆建立子進程:ide

import os

print 'Process (%s) start...' % os.getpid()

pid = os.fork()

if pid == 0:

    print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())

else:

    print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)

獲得:

Process (2450) start...

I (2450) just created a child process (2451).

I am child process (2451) and my parent is 2450.

使用Multiprocessing查模塊建立多進程。

multiprocessing模塊提供了一個Process類來描述一個進程對象,建立子進程時,只須要傳入一個執行函數和函數的參數便可完成一個Process實例的建立,用start()方法啓動進程,用join()方法實現進程間的同步。join()方法能夠等待子進程結束後再繼續往下運行,一般用於進程間的同步。函數

# -*- coding:utf-8 -*-

from multiprocessing import Process

import os

# 子進程要執行的代碼

def run_proc(name):

    print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__ == '__main__':

    print 'Parent process %s.' % os.getpid()

    p = Process(target=run_proc, args=('test',))

    print 'Process will start.'

    p.start()

    p.join()

    print 'Process end.'

獲得:

Parent process 2533.

Process will start.

Run child process test (2534)...

Process end.

####multiprocessing模塊提供了一個pool類來表明進程池對象url

Pool能夠提供指定數量的進程供用戶調用,默認大小是cpu的核數,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求,但若是池的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束纔會建立新的進程來處理它。操作系統

# -*- coding:utf-8 -*-

from multiprocessing import Pool

import os, time, random

def long_time_task(name):

    print 'Run task %s (%s)...' % (name, os.getpid())

    start = time.time()

    time.sleep(random.random() * 3)

    end = time.time()

    print 'Task %s runs %0.2f seconds.' % (name, (end - start))

if __name__ == '__main__':

    print 'Parent process %s.' % os.getpid()

    p = Pool()

    for i in range(5):

        p.apply_async(long_time_task, args=(i,))

    print 'Waiting for all subprocesses done...'

    p.close()

    p.join()

    print 'All subprocesses done.'

獲得: 

Parent process 2541.

Waiting for all subprocesses done...

Run task 0 (2543)...

Run task 1 (2544)...

Run task 2 (2545)...

Run task 3 (2546)...

Task 0 runs 0.02 seconds.

Run task 4 (2543)...

Task 2 runs 0.60 seconds.

Task 4 runs 1.18 seconds.

Task 3 runs 1.26 seconds.

Task 1 runs 1.66 seconds.

All subprocesses done.

對Pool對象調用join()方法會等待全部子進程執行完畢,調用join()以前必須先調用close(),調用close()以後就不能繼續添加新的Process了。code

進程間的通訊

Process之間確定是須要通訊的,操做系統提供了不少機制來實現進程間的通訊。Python的multiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等多種方式來交換數據。二者的區別在於Pipe經常使用於兩個進程間的通信而Queue用於多個進程間實現通信。對象

Queue通信

Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳輸,有兩個方法:put和get進行Queue操做。

  • put方法用以插入數據隊列中它能夠有兩個可選參數:blocked和timeout,若是blocked爲True(默認值)而且timeout是正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘空間,若是超時,會拋出Queue.Full異常,若是blocked爲False,但該Queue已滿,則會當即拋出Queue.Full異常。

  • Get方法用以從隊列讀取而且刪除一個元素。它能夠有兩個可選參數:blocked和timeout,若是blocked爲True(默認值)而且timeout是正值,那麼在等待時間內沒有取到任何元素會拋出Queue.Empty異常,若是blocked爲False,分兩種狀況:若是Queue有一個值木口月禾,則當即返回該值,不然若是隊列爲空,則當即拋出Queuq.Empty異常。
# -*- coding:utf-8 -*-

from multiprocessing import Process, Queue

import os, time, random

# 寫數據進程執行的代碼:

def write(q):

    for value in ['A', 'B', 'C']:

        print 'Put %s to queue...' % value

        q.put(value)

        time.sleep(random.random())

# 讀數據進程執行的代碼:

def read(q):

    while True:

        value = q.get(True)

        print 'Get %s from queue.' % value

if __name__ == '__main__':

    # 父進程建立Queue,並傳給各個子進程:

    q = Queue()

    pw = Process(target=write, args=(q,))

    pr = Process(target=read, args=(q,))

    # 啓動子進程pw,寫入:

    pw.start()

    # 啓動子進程pr,讀取:

    pr.start()

    # 等待pw結束:

    pw.join()

    # pr進程裏是死循環,沒法等待其結束,只能強行終止:

    pr.terminate()

獲得:

Put A to queue...

Get A from queue.

Put B to queue...

Get B from queue.

Put C to queue...

Get C from queue.
Pipes通信

Pipe經常使用來在兩個進程間進行通訊,兩個進程分別位於管道的兩端。

Pipe方法返回(conn1,conn2)表明一個管道的兩個端,Pipe方法有duplex參數,若是duplex參數爲True(默認值),那麼這個管道是全雙工模式,也就是說conn1和conn2都可收發,若duplex爲False,conn1只負責接收消息,conn2只負責發送消息。send和recv方法分別是發送和接收消息的方法。例如,在全雙工模式下,能夠調用conn1.send發送消息,conn1.recv接收消息。若是沒有消息可接收,recv方法會一直阻塞。若是管道已經被關閉,那麼recv方法會拋出EOFError.

import multiprocessing

import random

import time, os

def proc_send(pipe, urls):

    for url in urls:

        print "process(%s) send:%s" % (os.getpid(), url)

        pipe.send(url)

        time.sleep(random.random())

def proc_recv(pipe):

    while True:

        print "Process(%s) rev:%s" % (os.getpid(), pipe.recv())

        time.sleep(random.random())

if __name__ == "__main__":

    pipe = multiprocessing.Pipe()

    p1 = multiprocessing.Process(target=proc_send,args=(pipe[0],['url_'+str(i) for i in range(10)]))

    p2 = multiprocessing.Process(target=proc_recv,args=(pipe[1],))

    p1.start()

    p2.start()

    p1.join()

    p2.join()

獲得:

process(1134) send:url_0

Process(1135) rev:url_0

process(1134) send:url_1

Process(1135) rev:url_1

process(1134) send:url_2

Process(1135) rev:url_2

process(1134) send:url_3

Process(1135) rev:url_3

process(1134) send:url_4

Process(1135) rev:url_4

process(1134) send:url_5

Process(1135) rev:url_5

process(1134) send:url_6

Process(1135) rev:url_6

process(1134) send:url_7

Process(1135) rev:url_7

process(1134) send:url_8

Process(1135) rev:url_8

process(1134) send:url_9

Process(1135) rev:url_9
相關文章
相關標籤/搜索