進程

知識內容:html

1.進程概念python

2.建立進程linux

3.守護進程數據庫

4.進程同步編程

5.進程通訊json

6.進程間數據共享數組

7.進程池安全

 

參考:https://www.cnblogs.com/Eva-J/articles/8253549.html服務器

 

 

 

1.進程概念網絡

關於進程的概念詳細看此:http://www.javashuo.com/article/p-qgkfdsnq-q.html

multiprocess:

multiprocess不是一個模塊而是python中一個操做、管理進程的包。 之因此叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的全部子模塊。

multiprocess分爲如下幾部分:

  • 建立進程部分:multiprocess.Process
  • 進程同步部分:multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event
  • 進程間通訊:multiprocess.Queue、multiprocess.Pipe
  • 進程之間數據共享:multiprocess.Manager
  • 進程池部分:multiprocess.Pool

 

 

2.建立進程

(1)Process

 1 Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)
 2 
 3 強調:
 4 1. 須要使用關鍵字的方式來指定參數
 5 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
 6 
 7 參數介紹:
 8 group參數未使用,值始終爲None
 9 target表示調用對象,即子進程要執行的任務
10 args表示調用對象的位置參數元組,args=(1,2,'xxx',)
11 kwargs表示調用對象的字典,kwargs={'name':'xxx','age':18}
12 name爲子進程的名稱

方法介紹(下面的p是Process對象,也就是進程對象):

1 p.start():啓動進程,並調用該子進程中的p.run() 
2 p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法  
3 p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
4 p.is_alive():若是p仍然運行,返回True
5 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程  

屬性介紹:

1 p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
2 p.name:進程的名稱
3 p.pid:進程的pid
4 p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)

 

(2)建立進程實例

 1 import os
 2 from multiprocessing import Process
 3 
 4 def func(args1, args2):
 5     print("子進程開始----------------------------")
 6     print("子進程: ", os.getpid())
 7     print("子進程的父進程: ", os.getppid())
 8     print("args: ", args1, args2)
 9     print(666)
10 
11 if __name__ == '__main__':      # main只在Windows上才須要
12     # 註冊:
13     # p是一個進程對象
14     p = Process(target=func, args=("參數1", "參數2"))
15     # 啓動進程:
16     # 注意不是當即執行!
17     p.start()
18     print("父進程開始----------------------------")
19     print("父進程: ", os.getpid())
20     print("父進程的父進程: ", os.getppid())
21     print('*' * 10)
 1 # join方法
 2 import time
 3 from multiprocessing import Process
 4 
 5 def func(arg1,arg2):
 6     print('*'*arg1)
 7     time.sleep(5)
 8     print('*'*arg2)
 9 
10 if __name__ == '__main__':
11     p = Process(target=func,args=(10,20))
12     p.start()
13     print('hahahaha')
14     p.join()     # 感知一個子進程的結束,將異步的程序改成同步
15     print('====== : 運行完了')
 1 # 同時建立多個進程
 2 from multiprocessing import Process
 3 
 4 def func(arg1, arg2):
 5     print('*' * arg1)
 6     print('*' * arg2)
 7 
 8 if __name__ == '__main__':
 9     p_lst = []
10     for i in range(10):
11         p = Process(target=func, args=(10 * i, 20 * i))
12         p_lst.append(p)
13         p.start()
14     for p in p_lst:
15         p.join()   # 以前的全部進程必須在這裏都執行完才能執行下面的代碼
16     print("運行完成了")

還能夠這樣建立子進程:

 1 from multiprocessing import Process
 2 
 3 class MyProcess(Process):
 4     def __init__(self, arg1, arg2):
 5         super().__init__()
 6         self.arg1 = arg1
 7         self.arg2 = arg2
 8 
 9     def run(self):
10         print(self.pid)
11         print(self.name)
12         print(self.arg1)
13         print(self.arg2)
14 
15 if __name__ == '__main__':
16     p1 = MyProcess(1, 2)
17     p1.start()
18     p2 = MyProcess(3, 4)
19     p2.start()
20 
21 # 自定義類 繼承Process類
22 # 必須實現一個run方法(就是上面的func函數),run方法中是在子進程中執行的代碼

 

(3)多進程寫文件

 1 import os
 2 from multiprocessing import Process
 3 
 4 def func(filename, content):
 5     with open(filename, 'w') as f:
 6         f.write(content*10*'*')
 7 
 8 if __name__ == '__main__':
 9     p_lst = []
10     for i in range(10):
11         p = Process(target=func, args=('info%s' % i, i))
12         p_lst.append(p)
13         p.start()
14     for p in p_lst:
15         p.join()   # 以前的全部進程必須在這裏都執行完才能執行下面的代碼
16     now_path = os.getcwd()
17     # print(now_path)
18     # 展現寫入文件以後文件夾中的全部的文件名
19     print([i for i in os.walk(now_path)])

 

注意:

在Windows操做系統中因爲沒有fork(linux操做系統中建立進程的機制),在建立子進程的時候會自動 import 啓動它的這個文件,而在 import 的時候又執行了整個文件。

所以若是將process()直接寫在文件中就會無限遞歸建立子進程報錯。因此必須把建立子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候 ,就不會遞歸運行了。

 

 

3.守護進程

(1)什麼是守護進程

守護進程會隨着主進程的結束而結束

主進程建立守護進程:

  • 守護進程會在主進程代碼執行結束後就終止
  • 守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

 

(2)守護進程實例

 1 // 守護進程的啓動
 2 // 主進程代碼執行完後守護進程當即結束
 3 import time
 4 from multiprocessing import Process
 5 
 6 
 7 def func():
 8     while True:
 9         time.sleep(0.5)
10         print("我還活着")
11 
12 
13 def func2():
14     print("in func2 start")
15     time.sleep(8)
16     print("in func2 finish")
17 
18 
19 if __name__ == '__main__':
20     p = Process(target=func)
21     p.daemon = True                     # 設置子進程爲守護進程
22     p.start()
23     Process(target=func2).start()       # 開啓普通進程
24     i = 0
25     while i < 5:
26         print("我是socket server")
27         time.sleep(1)
28         i += 1
29 
30 # 守護進程 會隨着 主進程的代碼執行完畢 而結束

 

 

4.進程同步

(1)進程鎖

多進程實現了程序的異步,讓多個任務能夠同時在幾個進程中併發處理,他們之間的運行沒有順序,一旦開啓也不受咱們控制。儘管併發編程讓咱們能更加充分的利用IO資源,可是也給咱們帶來了新的問題:

當多個進程使用同一份數據資源的時候,就會引起數據安全或順序混亂問題

進程鎖實例 - 買票:

ticket:

1 {"ticket": 1}

lock.py:

 1 # 鎖 -> 購買火車票
 2 import json
 3 import time
 4 from multiprocessing import Process
 5 from multiprocessing import Lock
 6 
 7 def buy_ticket(i, lock):
 8     lock.acquire()  # 拿鑰匙進門
 9     with open('ticket') as f:
10         dic = json.load(f)
11         time.sleep(0.1)
12     if dic['ticket'] > 0:
13         dic['ticket'] -= 1
14         print('\033[32m%s買到票了\033[0m' % i)
15     else:
16         print('\033[31m%s沒買到票\033[0m' % i)
17     time.sleep(0.1)
18     with open('ticket', 'w') as f:
19         json.dump(dic, f)
20     lock.release()  # 還鑰匙
21 
22 if __name__ == '__main__':
23     lock = Lock()
24     for i in range(10):
25         p = Process(target=buy_ticket, args=(i, lock))      # 能在某一時刻操做文件的只能有一個進程
26         p.start()

結果:

 

可是注意:

 1 # 加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
 2 雖然能夠用文件共享數據實現進程間通訊,但問題是:
 3     效率低(共享數據基於文件,而文件是硬盤上的數據)
 4     須要本身加鎖處理
 5 
 6 # 所以咱們最好找尋一種解決方案可以兼顧:
 7     效率高(多個進程共享一塊內存的數據)
 8     幫咱們處理好鎖問題
 9 # 這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
10 # 隊列和管道都是將數據存放於內存中
11 # 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,咱們應該儘可能避免使用共享數據12 # 儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

 

(2)信號量

互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。

假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。

 

實現:

信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是Dijkstra信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。

信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念

 

信號量實例:

 1 import time
 2 import random
 3 from multiprocessing import Process
 4 from multiprocessing import Semaphore
 5 
 6 def ktv(i, sem):
 7     sem.acquire()       # 獲取鑰匙 -> 相似P操做 -> 申請一個資源 等待信號量
 8     print('%s走進ktv' % i)
 9     time.sleep(random.randint(1, 5))
10     print('%s走出ktv' % i)
11     sem.release()       # 釋放鑰匙 -> 相似V操做 -> 傳送一個資源 釋放信號量
12 
13 if __name__ == '__main__':
14     sem = Semaphore(4)              # 最多容納4個
15     for i in range(20):
16         p = Process(target=ktv, args=(i, sem))
17         p.start()

 

總結:信號量是用鎖的原理實現的,內置了一個計數器,在同一時間 只能有指定數量的進程執行某一段被控制住的代碼

 

(3)事件

python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear

事件處理的機制:全局定義一個「Flag」,而後經過如下三個方法進行事件處理:

  • clear:將「Flag」設置爲False
  • wait:若是「Flag」值爲 False那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True那麼event.wait 方法時便再也不阻塞。
  • set:將「Flag」設置爲True
 1 from multiprocessing import Event
 2 
 3 # 經過一個信號 來控制 多個進程 同時 執行或者阻塞   ->  事件
 4 # 一個信號可使全部的進程都進入阻塞狀態   也能夠控制全部的進程解除阻塞
 5 e = Event()  # 建立了一個事件
 6 print(e.is_set())   # 查看一個事件的狀態, 事件建立後默認被設置成阻塞
 7 e.set()      # 將這個事件的狀態改成True
 8 print(e.is_set())
 9 e.wait()     # 是依據e.is_set()的值來決定是否阻塞的
10 print(123456)
11 e.clear()    # 將這個事件的狀態改成False
12 print(e.is_set())
13 e.wait()     # 等待 事件的信號被變成True
14 print('*'*10)
15 
16 # 執行結果:
17 # False
18 # True
19 # 123456
20 # False

 

事件處理實例:

 1 # 紅綠燈事件
 2 import time
 3 import random
 4 from multiprocessing import Event, Process
 5 
 6 def cars(e, i):
 7     if not e.is_set():
 8         print('car%i在等待' % i)
 9         e.wait()  # 阻塞 直到獲得一個 事件狀態變成 True 的信號
10     print('\033[0;32;40mcar%i經過\033[0m' % i)
11 
12 def light(e):
13     while True:
14         if e.is_set():
15             e.clear()
16             print('\033[31m紅燈亮了\033[0m')
17         else:
18             e.set()
19             print('\033[32m綠燈亮了\033[0m')
20         time.sleep(2)
21 
22 if __name__ == '__main__':
23     e = Event()
24     traffic = Process(target=light, args=(e,))
25     traffic.start()
26     for i in range(20):
27         car = Process(target=cars, args=(e, i))
28         car.start()
29         time.sleep(random.random())

 

 

5.進程間通訊 - 隊列和管道

進程間通訊 - IPC(Inter-Process Communication)

(1)隊列

建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞

 1 # 隊列方法:
 2 Queue([maxsize]) 
 3 建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現
 4 
 5 Queue的實例q具備如下方法:
 6 q.get( [ block [ ,timeout ] ] ) 
 7 返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止
 8 block: 控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)
 9 timeout: 可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常
10 
11 q.get_nowait( )   同q.get(False)方法
12 
13 q.put(item [, block [,timeout ] ] ) 
14 將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止
15 block: 控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)
16 timeout: 指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常
17 
18 q.qsize() 
19 返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。
20 
21 q.empty() 
22 若是調用此方法時 q爲空,返回True
23 若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目
24 
25 q.full() 
26 若是q已滿,返回爲True
27 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)
28 
29 
30 # 其餘方法:
31 q.close() 
32 關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。
33 關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
34 
35 q.cancel_join_thread() 
36 不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。
37 
38 q.join_thread() 
39 鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。

 

實例 - 父進程和子進程之間的通訊:

 1 # 父進程和子進程之間的通訊
 2 from multiprocessing import Process, Queue
 3 
 4 
 5 def produce(que):
 6     que.put('hello')
 7 
 8 
 9 def consumer(que):
10     print(que.get())
11 
12 
13 if __name__ == '__main__':
14     q = Queue()
15     p = Process(target=produce, args=(q, ))
16     p.start()
17     c = Process(target=consumer, args=(q, ))
18     c.start()

 

實例 - 批量生產數據放入隊列再批量獲取結果:

 1 # 批量生產數據放入隊列再批量獲取結果
 2 import os
 3 import time
 4 import multiprocessing
 5 
 6 # 向queue中輸入數據的函數
 7 def inputQ(queue):
 8     info = str(os.getpid()) + '(put):' + str(time.asctime())
 9     queue.put(info)
10 
11 # 向queue中輸出數據的函數
12 def outputQ(queue):
13     info = queue.get()
14     print('%s%s\033[32m%s\033[0m' % (str(os.getpid()), '(get):', info))
15 
16 if __name__ == '__main__':
17     multiprocessing.freeze_support()
18     record1 = []        # store input processes
19     record2 = []        # store output processes
20     queue = multiprocessing.Queue(3)
21 
22     # 輸入進程
23     for i in range(10):
24         process = multiprocessing.Process(target=inputQ, args=(queue,))
25         process.start()
26         record1.append(process)
27 
28     # 輸出進程
29     for i in range(10):
30         process = multiprocessing.Process(target=outputQ, args=(queue,))
31         process.start()
32         record2.append(process)
33 
34     for p in record1:
35         p.join()
36 
37     for p in record2:
38         p.join()

 

(2)生產者消費者模型

生產者消費者模型見此:http://www.javashuo.com/article/p-vjlqkpfo-c.html

 

(3)管道

 1 #建立管道的類:
 2 Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
 3 
 4 #參數介紹:
 5 dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送
 6 
 7 
 8 #主要方法:
 9 conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
10 conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError
11 
12 
13 #其餘方法:
14 conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
15 conn1.fileno():返回鏈接使用的整數文件描述符
16 conn1.poll([timeout]):若是鏈接上的數據可用,返回True
17   timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達
18 
19 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息
20   maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常
21 
22 conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區
23   buffer是支持緩衝區接口的任意對象
24   offset是緩衝區中的字節偏移量
25   size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收 
26 
27 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)
28   offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常
 1 # 管道簡單使用
 2 from multiprocessing import Pipe, Process
 3 
 4 def func(conn):
 5     conn.send('吃了麼')
 6 
 7 if __name__ == '__main__':
 8     conn1, conn2 = Pipe()
 9     Process(target=func, args=(conn1, )).start()
10     print(conn2.recv())
 1 # EOFError
 2 from multiprocessing import Pipe, Process
 3 
 4 def func(conn1, conn2):
 5     conn2.close()
 6     while True:
 7         try:
 8             msg = conn1.recv()
 9             print(msg)
10         except EOFError:
11             conn1.close()
12             break
13 
14 if __name__ == '__main__':
15     conn1, conn2 = Pipe()
16     Process(target=func, args=(conn1, conn2)).start()
17     conn1.close()
18     for i in range(20):
19         conn2.send('吃了麼')
20     conn2.close()

 

 

6.進程間數據共享 - Manager

(1)進程間通訊

展望將來,基於消息傳遞的併發編程是大勢所趨

即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合,經過消息隊列交換數據。

這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中。

進程間應儘可能避免通訊,即使須要通訊也應該選擇進程安全的工具(好比隊列)來避免加鎖帶來的問題。固然可使用數據庫來解決如今進程之間的數據共享問題。

 

(2)Manager模塊

進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的;雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此

 1 # Manager實例:
 2 from multiprocessing import Manager, Process, Lock
 3 
 4 def main(dic, lock):        # 加鎖確保進程之間的數據共享萬無一失
 5     lock.acquire()
 6     dic['count'] -= 1
 7     lock.release()
 8 
 9 if __name__ == '__main__':
10     m = Manager()
11     l = Lock()
12     dic = m.dict({'count': 100})
13     p_lst = []
14     for i in range(50):
15         p = Process(target=main, args=(dic, l))
16         p.start()
17         p_lst.append(p)
18     for i in p_lst:
19         i.join()
20     print('主進程', dic)

固然上面的寫法也能夠更高級:

 1 # Manager實例
 2 from multiprocessing import Manager, Process, Lock
 3 def work(d, lock):
 4     with lock:  # 不加鎖而操做共享的數據,確定會出現數據錯亂
 5         d['count'] -= 1
 6 
 7 if __name__ == '__main__':
 8     lock=Lock()
 9     with Manager() as m:
10         dic=m.dict({'count':100})
11         p_l=[]
12         for i in range(100):
13             p=Process(target=work,args=(dic,lock))
14             p_l.append(p)
15             p.start()
16         for p in p_l:
17             p.join()
18         print(dic)

 

 

7.進程池 

(1)進程池的概念

在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?

  • 首先建立進程須要消耗時間,銷燬進程也須要消耗時間
  • 另外即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以不能無限制的根據任務開啓或者結束進程

因而就有了進程池的概念:

  • 定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務
  • 若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行
  • 也就是說池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果

 

(2)multiprocess.Pool模塊

 1 Pool([numprocess  [,initializer [, initargs]]]):建立進程池
 2   numprocess: 要建立的進程數,若是省略,將默認使用cpu_count()的值
 3   initializer: 是每一個工做進程啓動時要執行的可調用對象,默認爲None
 4   initargs: 是要傳給initializer的參數組
 5   
 6 
 7 基本方法:
 8   p. map(func, iterable, chunksize=None):  第一個參數是函數,第二個參數是一個迭代器,將迭代器中的數字做爲參數依次傳入函數中執行,返回值爲全部結果的[]
 9   p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果
10   '''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''
11 
12   p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果
13   '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
14    
15   p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 
16   P.join():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
17 
18 
19 其餘方法:
20   方法apply_async()和map_async()的返回值是AsyncResul的實例obj。
21   實例obj具備如下方法:
22     obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
23     obj.ready():若是調用完成,返回True
24     obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
25     obj.wait([timeout]):等待結果變爲可用。
26     obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數

 

(3)代碼實例

進程池和多進程效率對比:

 1 import time
 2 from multiprocessing import Pool, Process
 3 
 4 def func(n):
 5     for v in range(10):
 6         n+1
 7 
 8 if __name__ == '__main__':
 9     start = time.time()
10     pool = Pool(5)
11     pool.map(func, range(100))
12     t1 = time.time() - start
13 
14     start = time.time()
15     p_list = []
16     for i in range(100):
17         p = Process(target=func, args=(i, ))
18         p_list.append(p)
19         p.start()
20     for p in p_list:
21         p.join()
22     t2 = time.time() - start
23     print(t1, t2)

結果以下:

 

進程池的同步調用和異步調用:

 1 import os
 2 import time
 3 from multiprocessing import Pool
 4 
 5 def func(n):
 6     print("start func %s" % n, os.getpid())
 7     time.sleep(1)
 8     print("end func %s" % n, os.getpid())
 9     return n*2
10 
11 # 測試進程池的apply方法 -> 同步調用
12 def test_apply(p):
13     res_list = []
14     for i in range(10):
15         res = p.apply(func, args=(i, ))
16         res_list.append(res)
17     print(res_list)
18 
19 # 測試進程池的apply_async方法 -> 異步調用
20 def test_apply_async(p):
21     res_list = []
22     for i in range(10):
23         res = p.apply_async(func, args=(i, ))
24         res_list.append(res)
25         # 異步運行,根據進程池中有的進程數,每次最多5個子進程在異步執行  返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務
26         # 須要注意的是,進程池中的三個進程不會同時開啓或者同時結束  而是執行完一個就釋放一個進程,這個進程就去接收新的任務 
27         # 異步apply_async用法:若是使用異步提交的任務,主進程須要使用join,等待進程池內任務都處理完,而後能夠用get收集結果
28         # 不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了
29     p.close()
30     p.join()
31     for res in res_list:
32         print(res, res.get())
33         # 使用get來獲取apply_async的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get
34 
35 if __name__ == '__main__':
36     pool = Pool(5)
37     test_apply(pool)
38     # test_apply_async(pool)

 

進程池的回調函數:

1 # 關於回調函數
2 # 須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
3 
4 # 咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果
 1 # 下面的回調函數 -> 先執行func1 而後把func1的結果做爲參數傳給func2
 2 import os
 3 from multiprocessing import Pool
 4 
 5 def func1(n):                           # func1在子進程中執行
 6     print('in func1', os.getpid())
 7     return n * n
 8 
 9 def func2(nn):                          # func2在主進程中執行
10     print('in func2', os.getpid())
11     print(nn)
12 
13 if __name__ == '__main__':
14     print('主進程 :', os.getpid())
15     p = Pool(5)
16     for i in range(10):
17         p.apply_async(func1, args=(10,), callback=func2)
18     p.close()
19     p.join()
相關文章
相關標籤/搜索