多任務fork、multiprocessing、進程池、進程間通訊-Queue

併發:一個處理器同時處理多個任務。python

並行:多個處理器或者是多核的處理器同時處理多個不一樣的任務.windows

 

fork建立子進程併發

import os
import time

#fork出一個子進程,子進程也從這一行開始執行
ret = os.fork()
if ret == 0:
    while True:
        print("---1---")
        time.sleep(1)
else:
    while True:
        print("---2---")
        time.sleep(1)

  輸出app

---2---
---1---
---1---
---2---
---2---
---1---
---1---
---2---
...

  

查看子進程iddom

import os
import time

ret = os.fork()
if ret != 0:
    print("---父進程---%d---"%os.getpid())
else:  #ret等於0的是子進程
    print("---子進程---%d---父進程---%d---"%(os.getpid(), os.getppid()))

  輸出async

---父進程---5142---
---子進程---5143---父進程---5142---

  

全局變量在多個進程之間不共享函數

import os
import time

g_num = 100
ret = os.fork()

if ret == 0:
    print("---process-1---")
    g_num += 1
    print("---process-1 g_num=%d---"%g_num)
else:
    time.sleep(3)
    print("---process-2---")
    print("---process-2 g_num=%d---"%g_num)

  輸出spa

---process-1---
---process-1 g_num=101---
---process-2---
---process-2 g_num=100---

  

 多個fork操作系統

import os
import time

ret = os.fork()
if ret != 0:
    print("---1---")
else:
    print("---2---")

ret = os.fork()
if ret != 0:
    print("---11---")
else:
    print("---22---")

  輸出對象

---1---
---2---
---11---
---22---
---22---
---11---

  

fork炸彈

import os
while True:
    os.fork()
print("---1---")

注意:

fork不能在windows下運行

fork建立的主進程不會等到子進程運行結束後再推出

 

 

process

Process語法結構以下:
Process([group[,target[,name[,args[,kwargs]]]]])
target:表示這個進程實例所調用對象;
args:表示調用對象的位置參數元組;
kwargs:表示調用對象的關鍵字參數字典;
name:爲當前進程實例的別名;
group:大多數狀況下用不到;

 

Process類經常使用方法:
is_alive():判斷進程實例是否還在執⾏;
join([timeout]):是否等待進程實例執行結束,或等待多少秒;
start():啓動進程實例(建立子進程);
run():若是沒有給定target參數,對這個對象調用start()方法時,就將執 ⾏對象中的run()⽅法;
terminate():無論任務是否完成,當即終止;

 

Process類經常使用屬性:
name:當前進程實例別名,默認爲Process-N,N爲從1開始遞增的整數;
pid:當前進程實例的PID值;

 

process建立子進程

 

process能夠在windows上運行

from multiprocessing import Process

import time

def test():
    while True:
        print("---test---")
        time.sleep(1)


if __name__ == "__main__":
    p = Process(target=test)
    p.start()
    while True:
        print("---main---")
        time.sleep(1)

  輸出

---main---
---test---
---main---
---test---
...

  

建立的子進程和主進程結束

from multiprocessing import Process
import time

def test():
    for i in range(5):
        print("---test---")
        time.sleep(1)
if __name__ == "__main__":
    p = Process(target=test)
    p.start() #讓這個進程開始執行test函數裏的代碼
    print("---main---")

  輸出

---main---
---test---
---test---
---test---
---test---
---test---

  

 給target函數傳參數

from multiprocessing import Process
import os

def test(num):
    print("pid=%d,ppid=%d,num=%d"%(os.getpid(),os.getppid(),num))

if __name__ == "__main__":
    p = Process(target=test, args=(100,))
    p.start()
    print("---main-pid=%d---"%os.getpid())

  輸出

---main-pid=14252---
pid=18284,ppid=14252,num=100

  

join

from multiprocessing import Process
import time

def test():
    for i in range(5):
        print("---%d---"%i)
        time.sleep(1)

if __name__ == "__main__":
    p = Process(target=test)
    p.start()
    p.join()#阻塞,子進程運行結束後,才向下繼續執行
    print("---main---")

  輸出

---0---
---1---
---2---
---3---
---4---
---main---

  

 第二種process建立子進程的方法

from multiprocessing import Process
import time

class MyNewProcess(Process):
    def run(self):
        while True:
            print("---1---")
            time.sleep(1)

if __name__ == "__main__":
    p = MyNewProcess()
    p.start()

    while True:
        print("---main---")
        time.sleep(1)

  輸出

---main---
---1---
---main---
---1---
---main---
---1---
---main---
---1---
...

  

 進程池

from multiprocessing import Pool
import os
import time
def test(num):
    print("pid=%d,ppid=%d,num=%d"%(os.getpid(),os.getppid(),num))
    time.sleep(1)


if __name__ == "__main__":
    pool = Pool(3)

    for i in range(10):
        print("---%d---"%i)
        pool.apply_async(test,(i,))


    pool.close()
    pool.join() #join保證子進程運行結束後,進程池才退出

 

 

進程間通訊-Queue
Process之間有時須要通訊,操做系統提供了不少機制來實現進程間的通訊。

可使⽤multiprocessing模塊的Queue實現多進程之間的數據傳遞,Queue 自己是⼀個消息列隊程序。

初始化Queue()對象時(例如:q=Queue()),若括號中沒有指定最⼤可接收 的消息數量,或數量爲負值,那麼就表明可接受的消息數量沒有上限(直到 內存的盡頭);
Queue.qsize():返回當前隊列包含的消息數量;
Queue.empty():若是隊列爲空,返回True,反之False ;
Queue.full():若是隊列滿了,返回True,反之False;
Queue.get([block[, timeout]]):獲取隊列中的⼀條消息,而後將其從列隊 中移除,block默認值爲True;

  • 1)若是block使⽤默認值,且沒有設置timeout(單位秒),消息列隊若是爲 空,此時程序將被阻塞(停在讀取狀態),直到從消息列隊讀到消息爲⽌, 若是設置了timeout,則會等待timeout秒,若還沒讀取到任何消息,則拋 出"Queue.Empty"異常;
  • 2)若是block值爲False,消息列隊若是爲空,則會⽴刻拋 出"Queue.Empty"異常;

Queue.get_nowait():至關Queue.get(False);

Queue.put(item,[block[, timeout]]):將item消息寫⼊隊列,block默認值 爲True;

  • 1)若是block使⽤默認值,且沒有設置timeout(單位秒),消息列隊若是已 經沒有空間可寫⼊,此時程序將被阻塞(停在寫⼊狀態),直到從消息列隊 騰出空間爲⽌,若是設置了timeout,則會等待timeout秒,若還沒空間,則拋 出"Queue.Full"異常;
  • 2)若是block值爲False,消息列隊若是沒有空間可寫⼊,則會⽴刻拋 出"Queue.Full"異常;

Queue.put_nowait(item):至關Queue.put(item, False);

實例一

from	multiprocessing	import	
Queue q=Queue(3)	#初始化⼀個Queue對象,最多可接收三條put消息 
q.put("消息1")	
q.put("消息2") 
print(q.full())	#False 
q.put("消息3") 
print(q.full())	#True

#由於消息列隊已滿下⾯的try都會拋出異常,第⼀個try會等待2秒後再拋出異常,第⼆個Try會⽴ 
try:				
    q.put("消息4",True,2) 
except:				
    print("消息列隊已滿,現有消息數量:%s"%q.qsize())

try:
    q.put_nowait("消息4") 
except:
    print("消息列隊已滿,現有消息數量:%s"%q.qsize())

#推薦的⽅式,先判斷消息列隊是否已滿,再寫⼊ 
if not	q.full():
    q.put_nowait("消息4")
#讀取消息時,先判斷消息列隊是否爲空,再讀取 
if not q.empty():
	for	i in range(q.qsize()):
		print(q.get_nowait())

  輸出

False 
True 
消息列隊已滿,現有消息數量:3 
消息列隊已滿,現有消息數量:3 
消息1 
消息2 
消息3

  

實例二

from multiprocessing import Process, Queue
import os, time, random


# 寫數據進程執行的代碼:
def write(q):
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())


# 讀數據進程執行的代碼:
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print('Get %s from queue.' % value)
            time.sleep(random.random())
        else:
            break


if __name__ == '__main__':
    # 父進程建立Queue,並傳給各個子進程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啓動子進程pw,寫入:
    pw.start()
    # 等待pw結束:
    pw.join()
    # 啓動子進程pr,讀取:
    pr.start()
    pr.join()
    # pr進程裏是死循環,沒法等待其結束,只能強行終止:
    print("")
    print("全部數據都寫入而且讀完")

  輸出

Put A to queue...
Put B to queue...
Put C to queue...
Get A from queue.
Get B from queue.
Get C from queue.

全部數據都寫入而且讀完

  

進程池中的Queue
若是要使⽤Pool建立進程,就須要使⽤multiprocessing.Manager()中的 Queue(),⽽不是multiprocessing.Queue(),不然會獲得⼀條以下的錯誤信息:

# 修改import中的Queue爲Manager
from multiprocessing import Manager, Pool
import os


def reader(q):
    print("reader啓動(%s),父進程爲(%s) " % (os.getpid(), os.getppid()))
    for i in range(q.qsize()):
        print("reader從Queue獲取到消息:%s " % q.get(True))


def writer(q):
    print("writer啓動(%s),父進程爲(%s) " % (os.getpid(), os.getppid()))
    for i in "dongGe":
        q.put(i)


if __name__ == "__main__":
    print("(%s)	start " % os.getpid())
    q = Manager().Queue()  # 使用Manager中的Queue來初始化
    po = Pool()
    # 使用阻塞模式建立進程,這樣就不須要在reader中使用死循環了,可讓writer徹底執行完
    po.apply(writer, (q,))
    po.apply(reader, (q,))
    po.close()
    po.join()
    print("(%s) End" % os.getpid())

  輸出

(15004)	start 
writer啓動(10232),父進程爲(15004) 
reader啓動(1612),父進程爲(15004) 
reader從Queue獲取到消息:d 
reader從Queue獲取到消息:o 
reader從Queue獲取到消息:n 
reader從Queue獲取到消息:g 
reader從Queue獲取到消息:G 
reader從Queue獲取到消息:e 
(15004) End
相關文章
相關標籤/搜索