Python併發編程之進程

1、理論概念

  一、定義

    進程(Process 也能夠稱爲重量級進程)是程序的一次執行。在每一個進程中都有本身的地址空間、內存、數據棧以及記錄運行的輔助數據,它是系統進行資源分配和調度的一個獨立單位。python

  二、並行和併發

    並行:並行是指多個任務同一時間執行;linux

    併發:是指在資源有限的狀況下,兩個任務相互交替着使用資源;編程

  三、同步和異常

    同步是指多個任務在執行時有一個前後的順序,必須是一個任務執行完成另一個任務才能執行;json

    異步是指多個任務在執行時沒有前後順序,多個任務能夠同時執行;
  四、同步/異步/阻塞/非阻塞/windows

    同步阻塞:這個阻塞的造成效率是最低的;好比你在下載一個東西是,你一直盯着下載進度條,到達100%時下載完成;安全

        同步體如今:你等待下載完成通知;網絡

        阻塞體如今:等待下載的過程當中,不能作別的事情數據結構

    同步非阻塞:你在下載東西時,你把任務提交後就去幹別的事情了,只是每過一段時間就看一下是否是下載完成;多線程

        同步體如今:等待下載完成通知;併發

        非阻塞提如今:等待下載完成通知過程當中,去幹別的事情了,只是時不時會瞄一眼進度條;

    異步阻塞:你在下載東西時換了一個如今使用的客戶端好比迅雷,下載完成後會有一個提示音,可是這時候你仍然一直在等待那個完成後的提示音;

        異步體如今:下載完成時有提示音;

        阻塞體如今:等待下載完成提示音時,不作任何事情;

    異步非阻塞:你然然使用的是迅雷下載軟件,這時候你把下載任務提交後就去幹別的事情去了,當你聽到‘叮’之後就知道下載完成;

        異步體如今:下載完成叮一聲完成通知

        非阻塞體如今:等待下載完成「叮」一聲通知過程當中,去幹別的任務了,只須要接收「叮」聲通知便可;

2、進程的建立與結束

  multiprocessing模塊:multiprocess不是一個模塊而是python中一個操做、管理進程的包。 之因此叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的全部子模塊。因爲提供的子模塊很是多,爲了方便你們歸類記憶,我將這部分大體分爲四個部分:建立進程部分,進程同步部分,進程池部分,進程之間數據共享。

  Process模塊的各類方法介紹

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動) 強調: 1. 須要使用關鍵字的方式來指定參數 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號 參數介紹: group參數未使用,值始終爲None target表示調用對象,即子進程要執行的任務 args表示調用對象的位置參數元組,args=(1,2,'egon',) kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} name爲子進程的名稱 p.start():啓動進程,並調用該子進程中的p.run() p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法 p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖 p.is_alive():若是p仍然運行,返回True p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程

  在windows中使用process注意事項:

    在Windows操做系統中因爲沒有fork(linux操做系統中建立進程的機制),在建立子進程的時候會自動 import 啓動它的這個文件,而在 import 的時候又執行了整個文件。所以若是將process()直接寫在文件中就會無限遞歸建立子進程報錯。因此必須把建立子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候 ,就不會遞歸運行了。

  process模塊建立進程:

#!/usr/bin/env python # -*- coding:utf-8 -*-  #Author: caoyf
import time from multiprocessing import Process def func(name): print('hello %s'%name) print('我是子進程') if __name__ == '__main__': p = Process(target=func,args=('caoyf',))  #在實例化時候,args的參數必須是一個元祖形式(註冊一個子進程)
    p.start() #啓動一個子進程
    time.sleep(3) print('執行主進程內容了')
建立第一個進程

  多個進程同時運行,子進程的執行順序不是根據啓動的順序來決定的;

#!/usr/bin/env python # -*- coding:utf-8 -*- #Author: caoyf
import time from multiprocessing import Process def func(name): print('hello %s'%name) time.sleep(2) if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=func, args=('caoyf',)) p.start() p_lst.append(p) for p in p_lst: p.join()  # 是感知一個子進程的結束,將異步的程序改成同步
    print('父進程在運行')
多個進程同時運行

  另外一種開啓進程的方法,繼承process的形式

#!/usr/bin/env python # -*- coding:utf-8 -*- #Author: caoyf
import time import os from multiprocessing import Process class Func(Process): def __init__(self,name): super().__init__() self.name = name def run(self): print(os.getpid()) print('%s正在和小明聊天'%self.name) if __name__ == '__main__': p1 = Func('caoyf') p2 = Func('Zhao') p1.start() p2.start() p1.join() p2.join()
繼承的方式開啓進程

  守護進程:會隨着主進程的結束而結束,進程之間是相互獨立的,主進程的代碼運行結束,守護進程也會隨即結束

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author: caoyf
import  time
import os
from multiprocessing import  Process
def foo():
    print('start123')
    time.sleep(2)
    print('end123')

def func():
    print('start456')
    time.sleep(5)
    print('end456')
if __name__ == '__main__':
    p1 = Process(target=foo)
    p2 = Process(target=func)
    p1.daemon = True
    p1.start()
    p2.start()
    time.sleep(0.1)
    print('main------------')#打印該行則主進程代碼結束,則守護進程p1應該被終止.#可能會有p1任務執行的打印信息123,由於主進程打印main---
    # -時,p1也執行了,可是隨即被終止.
守護進程

 

3、進程同步(multiprocessing.Lock\Spemaphore\Event)

  鎖(Lock):

    資源是有限的,多個進程若是對同一個對象進行操做,則有可能形成資源的爭用,甚至致使死鎖,在併發進程中就能夠利用鎖進行操做來避免訪問的衝突;

    加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,可是速度就變慢了,但犧牲了速度卻保證了數據安全。

    雖然能夠用文件共享數據實現進程間通訊,但問題是:

      1.效率低(共享數據基於文件,而文件是硬盤上的數據)

      2.須要本身加鎖處理

 

    咱們能夠模擬一個火車搶票的過程,當過個客戶同時對一個程序發起訪問時,假設此時有5張票,有10我的搶

from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(random.random()) #模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(random.random()) #模擬寫數據的網絡延遲
        json.dump(dic,open('db','w'))
        print('\033[32m購票成功\033[0m')
    else:
        print('\033[31m購票失敗\033[0m')

def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(100): #模擬併發100個客戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()
搶火車票

  信號量:

    信號量Semaphore是同時容許必定數量的線程更改數據 。

#!/usr/bin/env python
# -*- coding:utf-8 -*- 
#Author: caoyf
import  time
import  random
from multiprocessing import Semaphore
from multiprocessing import Process
def f(i,a):
    a.acquire()
    print('%s走進了房間'%i)
    time.sleep(random.randint(1,5))
    print('%s走出了房間'%i)
    a.release()
if __name__ == '__main__':
    a = Semaphore(5)
    for i in range(10):
        p = Process(target=f,args=(i,a))
        p.start()
信號量

  事件:

用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。

    事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
            clear:將「Flag」設置爲False
            set:將「Flag」設置爲True
#!/usr/bin/env python
# -*- coding:utf-8 -*- 
#Author: caoyf
from multiprocessing import Event,Process
import random
import time
def cars(a,i):
    if not a.is_set():
        print('car%s在等待'%i)
        a.wait()
    print('\033[31mcar%s經過\033[0m' % i)
def f(a):
    while True:
        if a.is_set():
            a.clear()
            print('\033[31m紅燈亮了\033[0m')

        else:
            a.set()
            print('\033[32m綠燈亮了\033[0m')
        time.sleep(2)
if __name__ == '__main__':
    a = Event()
    p = Process(target=f,args=(a,))
    p.start()
    for i in range(20):
        car = Process(target=cars,args=(a,i))
        car.start()
        time.sleep(random.random())
事件/紅綠燈實例

4、進程間通訊---隊列和管道

  隊列Queue:適用於多線程編程的先進先出數據結構,能夠用來安全的傳遞多線程信息。

  經過隊列實現了 主進程與子進程的通訊 子進程與子進程之間的通訊

q=Queue(10)     #實例化一個對象,容許隊列對多10個元素
q.put() #放入隊列
q.get() #從隊列中取出

  假設如今有一個隊伍,隊伍裏最多隻能站5我的,可是有15我的想要進去

#!/usr/bin/env python
# -*- coding:utf-8 -*- 
#Author: caoyf
from multiprocessing import Process
from multiprocessing import Queue
def getin(q):     #進入隊伍的子進程
    for i in range(15):
        q.put(i)
        # print(q)
def getout(q):    #離開隊伍的子進程
    for i in range(6):
        print(q.get())
if __name__=='__main__':
    q=Queue(5)      #隊伍內最多能夠容納的人數
    p=Process(target=getin,args=(q,))     #進入隊伍的進程
    p.start()
    p2=Process(target=getout,args=(q,))   #離開隊伍的進程
    p2.start()
隊列實例

  管道(Pipes)

#!/usr/bin/env python
# -*- coding:utf-8 -*- 
#Author: caoyf
from multiprocessing import Process,Pipe,Manager,Lock
import time
import  random
# 管道 進程之間建立的一條管道,默認是全雙工模式,兩頭均可以進和出,
# 注意 必須在產生Process對象以前產生管道
# 若是在Pipe括號裏面填寫False後就變成了單雙工,
# 左邊的只能收,右邊的只能發,recv(接收),send(發送)
#若是沒有消息能夠接收,recv會一直阻塞,若是鏈接的另一段關閉後,
#recv會拋出EOFError錯誤
# close 關閉鏈接
#下面的實例是在Pipe的括號裏填寫和不填寫False的區別
# from multiprocessing import Process,Pipe
# def func(pro):
#     pro.send('hello')
#     pro.close()
#
# if __name__=='__main__':
#     con,pro =  Pipe(False)
#     p = Process(target=func,args=(pro,))
#     p.start()
#     print(con.recv())
#     p.join()
# 模擬recv阻塞狀況
# def func(con,pro):
#     con.close()
#     while 1:
#         try:
#             print(pro.recv())
#         except EOFError:
#             pro.close()
#             break
#
#
# if __name__=='__main__':
#     con,pro =  Pipe()
#     p = Process(target=func,args=(con,pro,))
#     p.start()
#     pro.close()
#     con.send('aaaaa')
#     con.close()
#     p.join()

# 利用管道實現生產者和消費者
# def sc(con,pro,name,food):
#     con.close()
#     for i in range(5):
#         time.sleep(random.random())
#         f = '%s生產了%s%s'%(name,food,i)
#         print(f)
#         pro.send(f)
# def xf(con,pro,name):
#     pro.close()
#     while 1:
#         try:
#             baozi = con.recv()
#             print('%s消費了%s'%(name,baozi))
#         except EOFError:
#             break
# if __name__=='__main__':
#     con,pro = Pipe()
#     p1 = Process(target=sc,args=(con,pro,'caoyf','包子'))
#     c1 = Process(target=xf,args=(con,pro,'zhoaf'))
#     p1.start()
#     c1.start()
#     con.close()
#     pro.close()
#     p1.join()
管道

  數據共享:

    隊列和管道只是實現了數據的傳遞,尚未實現數據的共享,如實現數據共享,就要用到Managers(注:進程間通訊應該儘可能避免使用共享數據的方式

 
 

from multiprocessing import Process,Manager
import os

 
 

def f(dict1,list1):
dict1[os.getpid()] = os.getpid() # 往字典裏放當前PID
list1.append(os.getpid()) # 往列表裏放當前PID
print(list1)

 
 

if __name__ == "__main__":
with Manager() as manager:
d = manager.dict() #生成一個字典,可在多個進程間共享和傳遞
l = manager.list(range(5)) #生成一個列表,可在多個進程間共享和傳遞
p_list = []
for i in range(10):
p = Process(target=f,args=(d,l))
p.start()
p_list.append(p) # 存進程列表
for res in p_list:
res.join()
print('\n%s' %d) #若要保證數據安全,須要加鎖lock=Lock()

 

  進程池

    對於須要使用幾個甚至十幾個進程時,咱們使用Process仍是比較方便的,可是若是要成百上千個進程,用Process顯然太笨了,multiprocessing提供了Pool類,即如今要講的進程池,可以將衆多進程放在一塊兒,設置一個運行進程上限,每次只運行設置的進程數,等有進程結束,再添加新的進程

  • Pool(processes =num):設置運行進程數,當一個進程運行完,會添加新的進程進去
  • apply_async:異步,串行
  • apply:同步,並行
  • close():關閉pool,不能再添加新的任務
複製代碼
import os
import time
import random
from multiprocessing import Pool
from multiprocessing import Process
def func(i):
    i += 1

if __name__ == '__main__':
    p = Pool(5)          # 建立了5個進程
    start = time.time()
    p.map(func,range(1000))  
    p.close()                        # 是不容許再向進程池中添加任務
    p.join()                        #阻塞等待 執行進程池中的全部任務直到執行結束
    print(time.time() - start)
    start = time.time()
    l = []
    for i in range(1000):
        p = Process(target=func,args=(i,))  # 建立了一百個進程
        p.start()
        l.append(p)
    [i.join() for i in l]
    print(time.time() - start)

回調函數:
 
 
複製代碼
import os
import time
from multiprocessing import Pool
# 參數 概念 回調函數
def func(i):    # 多進程中的io多,分出去一部分
    print('子進程%s:%s'%(i,os.getpid()))
    return i*'*'

def call(arg):   # 回調函數是在主進程中完成的,不能傳參數,只能接受多進程中函數的返回值
    print('回調 :',os.getpid())
    print(arg)

if __name__ == '__main__':
    print('主進程',os.getpid())
    p = Pool(5)
    for i in range(10):
        p.apply_async(func,args=(i,),callback=call)  #callback 回調函數 :主進程執行 參數是子進程執行的函數的返回值
    p.close()
    p.join()
相關文章
相關標籤/搜索