Python多進程應用 進程,線程,GIL,Python多線程,生產者消費者模型都是什麼鬼

在我以前的一篇博文中詳細介紹了Python多線程的應用:html

 進程,線程,GIL,Python多線程,生產者消費者模型都是什麼鬼

可是因爲GIL的存在,使得python多線程沒有充分利用CPU的多核,爲了利用多核,我能夠採用多進程;python

1. 父進程與子進程

wiki上對於父進程與子進程的定義:windows

a)Parent process服務器

In Unix-like operating systems, every process except process 0 (the swapper) is created when another process executes the fork() system call. The process that invoked fork is the parent process and the newly created process is the child process. Every process (except process 0) has one parent process, but can have many child processes.網絡

In the Linux kernel, in which there is a very slim difference between processes and POSIX threads, there are two kinds of parent processes, namely real parent and parent. Parent is the process that receives the SIGCHLD signal on child's termination, whereas real parent is the thread that actually created this child process in a multithreaded environment. For a normal process, both these two values are same, but for a POSIX thread which acts as a process, these two values may be different.[1]多線程

b)Child processapp

A child process in computing is a process created by another process (the parent process). This technique pertains to multitasking operating systems, and is sometimes called a subprocess or traditionally a subtask.運維

There are two major procedures for creating a child process: the fork system call (preferred in Unix-like systems and the POSIX standard) and the spawn (preferred in the modern (NT) kernel of Microsoft Windows, as well as in some historical operating systems).async

即,Unix/Linux操做系統提供了一個fork()系統調用,用於建立子進程;fork()很是特殊。普通的函數調用,調用一次,返回一次,可是fork()調用一次,返回兩次,由於操做系統自動把當前進程(稱爲父進程)複製了一份(稱爲子進程),而後,分別在父進程和子進程內返回。對於返回值,子進程永遠返回0,而父進程返回子進程的ID。這樣作的理由是,一個父進程能夠fork出不少子進程,因此,父進程要記下每一個子進程的ID;函數

python的os模塊,就含有fork函數:

#!/bin/env python
#coding:utf-8

import os
import time

print('Process %s start...' % os.getpid())
pid = os.fork()
if pid == 0:
    print('i am child process %s and my parent is %s' % (os.getpid(), os.getppid()))
else:
    print('i %s just created a child process %s' % (os.getpid(), pid))

運行結果:

Process 3522 start...
i 3522 just created a child process 3523
i am child process 3523 and my parent is 3522

由於fork()調用一次,返回兩次,因此獲得上面的結果;這裏注意:因爲Windows沒有fork調用,上面的代碼在Windows上沒法運行;有了fork調用,一個進程在接到新任務時就能夠複製出一個子進程來處理新任務,常見的Apache服務器就是由父進程監聽端口,每當有新的http請求時,就fork出子進程來處理新的http請求。

2. multiprocessing

上面說到windows沒有fork調用,那麼如何在windows上實現多進程呢?

經過multiprocess模塊,因爲Python是跨平臺的,天然也應該提供一個跨平臺的多進程支持。multiprocessing模塊就是跨平臺版本的多進程模塊

python中兩個經常使用來處理進程的模塊分別是subprocess和multiprocessing,其中subprocess一般用於執行外部程序,好比一些第三方應用程序,而不是Python程序。若是須要實現調用外部程序的功能,python的psutil模塊是更好的選擇,它不只支持subprocess提供的功能,並且還能對當前主機或者啓動的外部程序進行監控,好比獲取網絡、cpu、內存等信息使用狀況,在作一些自動化運維工做時支持的更加全面。multiprocessing是python的多進程模塊,主要經過啓動python進程,調用target回調函數來處理任務。

注意:multiprocessing的方法與threading的方法相似,因此咱們這裏只給出示例代碼,而不作詳細介紹;

1)multiprocessing基本使用

與threading相似,也是有兩種方式

a)直接調用

 1 from multiprocessing import Process, freeze_support
 2 import os
 3 
 4 processes = []
 5 
 6 def run(item):
 7     print('-'*50)
 8     print('child process %s id: %s'%(item, os.getpid()))
 9     print('child process %s parent id: %s' % (item, os.getppid()))
10 
11 def main():
12     #打印主進程進程號
13     print('main process id: ', os.getpid())
14     #建立多個子進程
15     for item in range(2):
16         p = Process(target=run, args=(item, ))
17         processes.append(p)
18         print('child process %s name: %s' % (item, p.name))
19         print('child process %s id: %s' % (item, p.pid))
20 
21     for item in processes:
22         item.start()
23 
24     for item in processes:
25         item.join()
26 
27 if __name__ == '__main__':
28     main()
29     freeze_support()

b)面向對象方式調用

 1 from multiprocessing import Process, freeze_support
 2 import os
 3 
 4 processes = []
 5 
 6 class MyProcess(Process):
 7     def __init__(self, func, item):
 8         super(MyProcess, self).__init__()
 9         self.__func = func
10         self.__item = item
11 
12     def run(self):
13         self.__func(self.__item)
14 
15 def proc(item):
16     print('-'*50)
17     print('child process %s id: %s'%(item, os.getpid()))
18     print('child process %s parent id: %s' % (item, os.getppid()))
19 
20 def main():
21     #打印主進程進程號
22     print('main process id: ', os.getpid())
23     #建立多個子進程
24     for item in range(2):
25         p = MyProcess(proc, item)
26         processes.append(p)
27         print('child process %s name: %s' % (item, p.name))
28         print('child process %s id: %s' % (item, p.pid))
29 
30     for item in processes:
31         item.start()
32 
33     for item in processes:
34         item.join()
35 
36 if __name__ == '__main__':
37     main()
38     freeze_support()

注:2.7中,if __name__ == '__main__'的代碼塊中必須加上freeze_support(),python3好像不須要了

結果:

main process id:  10972
child process 0 name: MyProcess-1
child process 0 id: None
child process 1 name: MyProcess-2
child process 1 id: None
--------------------------------------------------
child process 0 id: 10636
child process 0 parent id: 10972
--------------------------------------------------
child process 1 id: 8076
child process 1 parent id: 10972

2)daemon屬性設置

 1 from multiprocessing import Process
 2 import time
 3 
 4 processes = []
 5 
 6 def run(item):
 7     time.sleep(1)
 8     print('item: ', item)
 9 
10 def main():
11     #建立多個子進程
12     for item in range(2):
13         p = Process(target=run, args=(item, ))
14         processes.append(p)
15         p.daemon = True
16 
17     for item in processes:
18         item.start()
19 
20     print('all done')
21 
22 if __name__ == '__main__':
23     main()

結果:

all done

注意daemon和threading的方式不一樣,這裏是直接設置屬性,而不是調用方法;另外要在start前設置daemon;

3)進程同步

既然進程之間不共享數據,爲何還有進程同步問題呢?若是多個進程打開同一個文件,在同一個屏幕輸出呢?這些仍是須要進程同步的,經過Lock

4)Semaphore

同threading.Semaphore()用法相同,只是建立的Semaphore須要做爲參數傳入子進程,由於進程間不共享資源

5)Event

同threading.Event()用法相同,只是建立的Event須要做爲參數傳入子進程

6)進程間通信

由於進程之間不共享資源,咱們先看一個例子證實一下:

 1 from multiprocessing import Process
 2 
 3 processes = []
 4 data_list = []
 5 
 6 def run(lst, item):
 7     lst.append(item)
 8     print('%s : %s' % (item, lst))
 9 
10 def main():
11     for item in range(4):
12         p = Process(target=run, args=(data_list, item))
13         processes.append(p)
14 
15     for item in processes:
16         item.start()
17 
18     for item in processes:
19         item.join()
20 
21     print('final lst: ', data_list)
22 
23 if __name__ == '__main__':
24     main()

結果:

1 : [1]
2 : [2]
0 : [0]
3 : [3]
final lst:  []

因此必須經過第三方實現進程間通信,下面介紹3種方法

a)Queue

用法與queue.Queue在多線程中的應用相同,只是建立的queue要做爲參數傳入子進程

 1 from multiprocessing import Process, Queue
 2 import time
 3 
 4 q = Queue(10)
 5 
 6 def put(q):
 7     for i in range(3):
 8         q.put(i)
 9     print('queue size after put: %s' % q.qsize())
10 
11 def get(q):
12     print('queue size before get: %s' % q.qsize())
13     while not q.empty():
14         print('queue get: ', q.get())
15 
16 def main():
17     p_put = Process(target=put, args=(q,))
18     p_get = Process(target=get, args=(q,))
19     p_put.start()
20     time.sleep(1)
21     p_get.start()
22     p_get.join()
23     print('all done')
24 
25 if __name__ == '__main__':
26     main()

結果:

queue size after put: 3
queue size before get: 3
queue get:  0
queue get:  1
queue get:  2
all done

b)Pipe

Pipe方法返回(conn1, conn2)表明一個管道的兩個端。Pipe方法有duplex參數,若是duplex參數爲True(默認值),那麼這個管道是全雙工模式,也就是說conn1和conn2都可收發。duplex爲False,conn1只負責接受消息,conn2只負責發送消息。
send和recv方法分別是發送和接受消息的方法。例如,在全雙工模式下,能夠調用conn1.send發送消息,conn1.recv接收消息。若是沒有消息可接收,recv方法會一直阻塞。若是管道已經被關閉,那麼recv方法會拋出EOFError。
 1 import multiprocessing
 2 import time
 3 
 4 pipe = multiprocessing.Pipe()
 5 
 6 def send(pipe):
 7     for i in range(5):
 8         print("send: %s" % (i,))
 9         pipe.send(i)
10         time.sleep(0.2)
11 
12 def recv_1(pipe):
13     while True:
14         print("rev_1:", pipe.recv())
15         time.sleep(1)
16 
17 def recv_2(pipe):
18     while True:
19         print("rev_2:", pipe.recv())
20         time.sleep(1)
21 
22 def main():
23     p_send = multiprocessing.Process(target=send, args=(pipe[0],))
24     p_recv_1 = multiprocessing.Process(target=recv_1, args=(pipe[1],))
25     p_recv_2 = multiprocessing.Process(target=recv_2, args=(pipe[1],))
26 
27     p_send.start()
28     p_recv_1.start()
29     p_recv_2.start()
30 
31     p_send.join()
32     p_recv_1.join()
33     p_recv_2.join()
34 
35 if __name__ == "__main__":
36     main()

結果:

send: 0
rev_1: 0
send: 1
rev_2: 1
send: 2
send: 3
send: 4
rev_1: 2
rev_2: 3
rev_1: 4

c)Manager

至關至關給力,上面的Queue,Pipe僅僅能夠傳遞數據,而不能作到數據共享(不一樣進程修改同一份數據),可是Manger能夠作到數據共享

看一下官方文檔:

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventQueueValue and Array.

from multiprocessing import Process, Manager

def run(d, l):
    d['name'] = 'winter'
    l.reverse()


def main():
    p = Process(target=run, args=(d, l, ))
    p.start()
    p.join()

    print('final dict: ', d)
    print('final list: ', l)

if __name__ == "__main__":
    mgmt = Manager()
    d = mgmt.dict()
    l = mgmt.list(range(10))
    main()

注意:mgmt = Manger()必須放在if __name__ == "__main__"的代碼塊中,否則報freeze_support()的錯誤

並且,注意這裏:

Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.

還能夠在不一樣主機之間共享數據;

7)進程池Pool

若是要啓動大量的子進程,能夠用進程池pool批量建立子進程:Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,纔會建立新的進程來執行。

有兩種方法:阻塞方法Pool.apply()和非阻塞方法Pool.apply_async()

a)阻塞方法Pool.apply()

import multiprocessing
import time

def func(name):
    print("start: %s" % name)
    time.sleep(2)
    return 'end: %s' % name

if __name__ == "__main__":
    name_list = ['winter', 'elly', 'james', 'yule']
    res_list = []
    # 建立一個進程總數爲3的進程池
    pool = multiprocessing.Pool(3)
    for member in name_list:
        # 建立子進程,並執行,不須要start
        res = pool.apply(func, (member,))
        print(res)
    pool.close()
    # 調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool
    pool.join()
    print("all done...")

結果:

start: winter
end: winter
start: elly
end: elly
start: james
end: james
start: yule
end: yule
all done...

發現,阻塞方式下,進程是一個一個執行的,仍是串行,因此apply用的少;

注意兩點:

1. 進程池執行子進程不須要start;

2. 調用join()以前必須先調用close(),調用close()以後就不能繼續添加新的Process了;

b)非阻塞方法Pool.apply_async()

import multiprocessing
import time

def func(name):
    print("start: %s" % name)
    time.sleep(2)
    return 'end: %s' % name

def func_exp(msg):
    print('callback: %s' % msg)

if __name__ == "__main__":
    name_list = ['winter', 'elly', 'james', 'yule']
    res_list = []
    # 建立一個進程總數爲3的進程池
    pool = multiprocessing.Pool()
    for member in name_list:
        # 建立子進程,並執行,不須要start
        res = pool.apply_async(func, (member,), callback=func_exp)
        #注意這裏是append了res,不是res.get(),否則又要阻塞了
        res_list.append(res)
    for res_mem in res_list:
        print(res_mem.get())
    pool.close()
    # 調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool
    pool.join()
    print("all done...")

結果:

start: winter
start: elly
start: james
start: yule
callback: end: winter
end: winter
callback: end: elly
end: elly
callback: end: james
end: james
callback: end: yule
end: yule
all done...

結果分析:

1. 能夠看到非阻塞狀況下,充分利用了多核,實現了並行;

2. apply_async方法含有callback參數,能夠用於回調

3.爲何apply方法是阻塞的呢?到底阻塞在了哪裏呢?同時apply_async方法作了什麼改進呢?

查看apply方法源碼:

def apply(self, func, args=(), kwds={}):
    '''
    Equivalent of `func(*args, **kwds)`.
    '''
    assert self._state == RUN
    return self.apply_async(func, args, kwds).get()

apply方法最終執行了self.apply_async(func, args, kwds).get(),一樣調用了apply_async()方法,只是對結果執行了get()方法;阻塞就是阻塞在了這裏;

那我修改一下apply_async()的代碼是否是可讓apply_async()能夠變成阻塞的呢?試一下

 1 import multiprocessing
 2 import time
 3 
 4 def func(name):
 5     print("start: %s" % name)
 6     time.sleep(2)
 7     return 'end: %s' % name
 8 
 9 def func_exp(msg):
10     print('callback: %s' % msg)
11 
12 if __name__ == "__main__":
13     name_list = ['winter', 'elly', 'james', 'yule']
14     # 建立一個進程總數爲3的進程池
15     pool = multiprocessing.Pool()
16     for member in name_list:
17         # 建立子進程,並執行,不須要start
18         res = pool.apply_async(func, (member,), callback=func_exp)
19         print(res.get())
20     pool.close()
21     # 調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool
22     pool.join()
23     print("all done...")

注意紅色部分是我修改的編碼,結果果真變成了阻塞狀態:

start: winter
callback: end: winter
end: winter
start: elly
callback: end: elly
end: elly
start: james
callback: end: james
end: james
start: yule
callback: end: yule
end: yule
all done...

c)進程池該設置多少個進程數?

既然多進程能夠利用多核,那麼是否是建立越多的進程越好呢?不是的,由於進程的切換成本高,因此數量太多的進程來回切換反而會下降效率!

進程數是一個經驗值,和系統的硬件資源有很大關係;最優的進程數須要經過不斷調整得出;

Pool建立時,進程池的進程數默認大小爲CPU的邏輯CPU數目(內核線程數);

經驗上來講:

進程數與CPU核數比例1:1比較好,對於支持多線程的模型,線程數通常推薦的至少是1:1.5,這樣能夠留一部分線程來作IO。Python的多進程通常要麼是作純計算,要麼是協程模型(沒有IO等待時間,或者等待時間不多),要麼在進程內再使用多線程(很是不推薦,須要瞭解fork機制),這樣每一個核一個進程通常足夠了,進程切換的開銷略大,數量太多的話來回切換反而會下降效率。不過有種狀況例外,若是磁盤IO比較多,通常即便是協程,磁盤IO也是同步的,這時候多增長一些進程數也許有幫助。
相關文章
相關標籤/搜索