併發編程之——多進程

1、基本概念

1.1 進程

  其實進程就是正在進行的一個程序或者任務,而負責執行任務的是CPU,執行任務的地方是內存。跟程序相比,程序僅僅是一堆代碼而已,而程序運行時的過程纔是進程。另外同一個程序執行兩次就是兩個進程了。python

1.2 併發與並行

  不管是並行仍是併發,在用戶看來都是'同時'運行的,不論是進程仍是線程,都只是一個任務而已,真是幹活的是cpu,cpu來作這些任務,而一個cpu同一時刻只能執行一個任務。對於「併發」而言,是僞並行,即看起來是同時運行,單個cpu+多道技術就能夠實現併發;而「並行」纔是真正意義上的「同時運行」——僅有多核纔可以實現「並行」。編程

  須要強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。json

2、Multiprocessing模塊

  python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu\_count\(\)查看),在python中大部分狀況須要使用多進程。安全

  Python提供了multiprocessing。 multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,>提供了Process、Queue、Pipe、Lock等組件。數據結構

2.1 開啓子進程的兩種方式

2.1.1 直接在Multiprocessing模塊中導入Process類,利用這個類實例化進程對象
# -*- coding: utf-8  -*-
# -*- Author: WangHW -*-
#方式一
from multiprocessing import Process
import time
import os

def task(name):
    print('%s is running...'%name)
    print('子進程的id爲:',os.getpid())
    print('子進程的父進程的id爲:',os.getppid())
    time.sleep(3)
    print('%s is done'%name)


if __name__ == '__main__':
    #Process(target=task, kwargs={'name':'子進程1'})
    #獲得一個對象
    p = Process(target=task,args=('子進程1',))
    #start僅僅只是給操做系統發送了一個信號,發完信號之後父進程不會等子進程
    #是徹底獨立的兩個進程
    p.start()
    print('主進程')
    print('主進程的id爲:',os.getpid())
    print('主進程的父進程id爲:',os.getppid())
View Code
2.2.2 利用類的繼承,本身定義一個MyProcessing類,繼承自Process,可是須要注意的是:裏面必需要有一個名爲run的方法去執行主體:
# -*- coding: utf-8  -*-
# -*- Author: WangHW -*-

from multiprocessing import Process
import time

#用類的繼承方式實現
class MyProcessing(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name

    #注意名字必須叫run
    def run(self):
        print('%s is running......'%self.name)
        time.sleep(3)
        print('%s is done...'%self.name)


if __name__ == '__main__':
    p = MyProcessing('進程1')
    p.start()

    print('主進程')
View Code

2.2 Process類實例化出對象的join方法 

  在主進程運行過程當中若是想併發地執行其餘的任務,咱們能夠開啓子進程,此時主進程的任務與子進程的任務分兩種狀況多線程

  狀況一:在主進程的任務與子進程的任務彼此獨立的狀況下,主進程的任務先執行完畢後,主進程還須要等待子進程執行完畢,而後統一回收資源。併發

  狀況二:若是主進程的任務在執行到某一個階段時,須要等待子進程執行完畢後才能繼續執行,就須要有一種機制可以讓主進程檢測子進程是否運行完畢,在子進程執行完畢後才繼續執行,不然一直在原地阻塞,這就是join方法的做用ide

# -*- coding: utf-8  -*-
# -*- Author: WangHW -*-
from multiprocessing import Process
import time
import os

def task(name,n):
    print('%s is running...'%name)
    time.sleep(n)
    print('%s is done...'% name)


if __name__ == '__main__':
    start_time = time.time()
    # for i in range(5,8):
    #     p = Process(target=task,args=('p%s'%(i+1),i))
    #     p.start()
        #p.join()
    p1 = Process(target=task,args=('p1',5))
    p2 = Process(target=task,args=('p2',2))
    p3 = Process(target=task,args=('p3',3))
    #start僅僅是向操做系統發出信號,具體誰先執行不必定,由操做系統決定
    p1.start()
    p2.start()
    p3.start()
    #保證有序,看着像「串行」,但實際上仍是並行:最後一行的運行時間能夠驗證
    p1.join()
    p2.join()
    p3.join()

    print('主進程開啓,id爲:',os.getpid())
    #打印出來的結果可知,程序仍然是併發執行的,不是串行執行的
    print('運行時間:',time.time()-start_time)
View Code

  關於join方法,須要注意的一點是:雖然咱們看着像「串行」,但實際上仍是並行:由上面程序最後一行的運行時間能夠驗證:函數

3、互斥鎖

3.1 簡介

  雖然進程之間數據不共享,可是能夠共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂。優化

  如何控制,就是加鎖處理。而互斥鎖的意思就是互相排斥,若是把多個進程比喻爲多我的,互斥鎖的工做原理就是多我的都要去爭搶同一個資源:衛生間,一我的搶到衛生間後上一把鎖,其餘人都要等着,等到這個完成任務後釋放鎖,其餘人才有可能有一個搶到......因此互斥鎖的原理,就是把併發改爲穿行,下降了效率,但保證了數據安全不錯亂

  這裏有一個利用互斥鎖模擬搶票的程序(whw.json文件的內容爲:{"count": 2}):

# -*- coding: utf-8  -*-
# -*- Author: WangHW -*-
from multiprocessing import Process,Lock
import json
import time

#查票
def search(name):
    time.sleep(1)
    with open('whw.json','r') as f:
        ticket_dict =  json.load(f)
        print('<%s>查看到餘票爲:<%s>'%(name,ticket_dict['count']))

#買票
def get(name):
    time.sleep(1)
    f = open('whw.json','r')
    ticket_dict =  json.load(f)
    print('<%s>查看到餘票還剩餘:<%s>'%(name,ticket_dict['count']))
    if ticket_dict['count'] > 0:
        ticket_dict['count'] -= 1
        print('<%s>購票成功!' % name)
        time.sleep(1)
    else:
        print('餘票不足~購票失敗')
    f.close()
    #保存
    f_new = open('whw.json','w')
    json.dump(ticket_dict,f_new)
    f_new.close()


def task(name,mutex):
    search(name)
    #在購票前加鎖
    mutex.acquire()
    get(name)
    #釋放鎖
    mutex.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(5):
        p = Process(target=task,args=('路人%s'%(i+1),mutex))
        p.start()
View Code

  結果展現:

3.2關於互斥鎖與join的區別:

  用一句話來簡單歸納:「互斥鎖」是將代碼的「局部變成串行」,而若是用join的話會整個功能代碼變爲串行,因此對於本例而言互斥鎖要靈活一些。

4、隊列

4.1 簡介

  對於多進程有一個問題須要咱們考慮:是否有一種方案可以同時兼顧一下兩點:一是效率高(多個進程共享一塊內存數據),另一點是可以幫咱們處理好鎖的問題。

  答案就是~~利用隊列!

  首先,隊列是將數據存到內存中處理,這就知足了「效率高」這個要求,另外,隊列是基於「管道+鎖」設計的,因此另一點也知足了。事實上,隊列纔是進程間通訊(IPC)的最佳選擇

  另外須要你們注意的是:隊列是一種先進先出的數據結構

  建立隊列用如下方式:

# -*- coding: utf-8  -*-
# -*- Author: WangHW -*-
from multiprocessing import Queue

#隊列中不該該放大文件,發的只是精簡的消息
#能夠不指定大小,但最終受限於內存的大小
q = Queue(3)
q.put('hello')
q.put({'a':1})
q.put(3333333)
#判斷一下隊列滿沒有
print(q.full())
#取出來~先進先出
print(q.get())
print(q.get())
print(q.get())
View Code

4.2 隊列的應用——生產者消費者模型

  「生產者消費者模型」是併發編程的很是重要的一個模型,也是隊列的一個很是重要的應用之一:

  

  上圖是一個簡單的生產者與消費者模型:生產者將生產的DATA先放到隊列裏,消費者從隊列中獲取生產者生產的數據,這樣使得程序的耦合性大大下降,並且也平衡了生產者與消費者之間的速度差:

  具體代碼以下:

# -*- coding: utf-8  -*-
# -*- Author: WangHW -*-
from multiprocessing import Process,Queue
import time

def producer(q):
    for i in range(5):
        res = '包子%s'%i
        time.sleep(0.5)
        print('生產者生產了%s'%res)
        q.put(res)

def consumer(q):
    while 1:
        res = q.get()
        if res is None:
            break
        time.sleep(1)
        print('消費者吃了%s'%res)


if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=producer,args=(q,))
    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    #有兩個消費者,須要最後put兩次None
    q.put(None)
    q.put(None)
    #print('主進程'.center(20,'*'))
View Code

  實現效果以下:

  固然上面的代碼能夠利用「守護進程」優化一下(做爲了解),將消費者進程設置爲守護進程,隨着主程序進程一塊兒消除:

# -*- coding: utf-8  -*-
# -*- Author: WangHW -*-
from multiprocessing import Process,JoinableQueue
import time

def producer(q):
    for i in range(5):
        res = '包子%s'%i
        time.sleep(0.5)
        print('生產者生產了%s'%res)
        q.put(res)
    q.join()

def consumer(q):
    while 1:
        res = q.get()
        if res is None:
            break
        time.sleep(1)
        print('消費者吃了%s'%res)
        q.task_done()

if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=producer,args=(q,))
    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))
    #將消費者進程設置爲守護進程,隨着主程序一塊兒消除
    c1.daemon = True
    c2.daemon = True
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
View Code

5、其餘補充

5.1 須要注意的一點

  進程之間的內存空間是相互隔離的,看以下程序:

from multiprocessing import Process

n = 100

def work():
    global n
    n = 0
    print('子進程內的n爲:',n)


if __name__ == '__main__':
    p = Process(target=work)
    p.start()
    print('主進程的n爲:',n)

  運行結果爲:

相關文章
相關標籤/搜索