python學習三十八-九天(python程序中進程的操做)

主要內容

python的multiprocess模塊和用法

 

在python程序中的進程操做

  以前咱們已經瞭解了不少進程相關的理論知識,瞭解進程是什麼應該再也不困難了,剛剛咱們已經瞭解了,運行中的程序就是一個進程。全部的進程都是經過它的父進程來建立的。所以,運行起來的python程序也是一個進程,那麼咱們也能夠在程序中再建立進程。多個進程能夠實現併發效果,也就是說,當咱們的程序中存在多個進程的時候,在某些時候,就會讓程序的執行速度變快。以咱們以前所學的知識,並不能實現建立進程這個功能,因此咱們就須要藉助python中強大的模塊。html

multiprocess模塊

      仔細說來,multiprocess不是一個模塊而是python中一個操做、管理進程的包。 之因此叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的全部子模塊。因爲提供的子模塊很是多,爲了方便你們歸類記憶,我將這部分大體分爲四個部分:建立進程部分,進程同步部分,進程池部分,進程之間數據共享。python

multiprocess.process模塊

process模塊介紹

process模塊是一個建立進程的模塊,藉助這個模塊,就能夠完成進程的建立。linux

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)

強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號

參數介紹:
group參數未使用,值始終爲None
target表示調用對象,即子進程要執行的任務
args表示調用對象的位置參數元組,args=(1,2,'egon',)
kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
name爲子進程的名稱
p.start():啓動進程,並調用該子進程中的p.run() 
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法  
p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程  

方法介紹
p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)

屬性介紹
在Windows操做系統中因爲沒有fork(linux操做系統中建立進程的機制),在建立子進程的時候會自動 import 啓動它的這個文件,而在 import 的時候又執行了整個文件。所以若是將process()直接寫在文件中就會無限遞歸建立子進程報錯。因此必須把建立子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候  ,就不會遞歸運行了。

使用process模塊建立進程

在一個python進程中開啓子進程,start方法和併發效果。編程

import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    print('我是子進程')

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    time.sleep(1)
    print('執行主進程的內容了')
    

在python中啓動的第一個子進程

2.子進程中進行傳參操做json

import os
import time
from multiprocessing import Process

def process1(n,name,num = 20):
    print('process1 : ',os.getpid())
    print('n : ',n,name,num)
    time.sleep(10)

if __name__ == '__main__':
    print(os.getpid())
    p = Process(target=process1,args=[1,'alex',30])
    p.start()
 
 

# 主進程默認會等待子進程執行完畢以後才結束
# 主進程和子進程之間的代碼是異步的
# 爲何主進程要等待子進程結束 回收一些子進程的資源
# 開啓一個進程是有時間開銷的 :操做系統響應開啓進程指令,給這個進程分配必要的資源安全

import os
import time
from multiprocessing import Process

def func():
    print(os.getpid(),os.getppid())
    time.sleep(1)

if __name__ == '__main__':
    print(os.getpid(),os.getppid())  # process id,parent process id
    Process(target=func).start()    # func
    print('*'*20)
    time.sleep(0.5)
    print('*'*40)

3. join的用法服務器

import os
from multiprocessing import Process

def func(exp):
    print(os.getpid(),os.getppid())
    result = eval(exp)
    with open('file','w') as f:
        f.write(str(result))

if __name__ == '__main__':
    print(os.getpid(),os.getppid())  # process id,parent process id
    # 3*5+5/6
    p = Process(target=func,args=['3*5'])    # func
    p.start()
    ret = 5/6
    p.join()  # join方法可以檢測到p進程是否已經執行完了,阻塞知道p執行結束
    with open('file') as f:
        result = f.read()
    ret = ret + int(result)
    print(ret)

4.開啓多個子進程網絡

import os
import time
from multiprocessing import Process

def process(n):
    print(os.getpid(),os.getppid())
    time.sleep(1)
    print(n)
if __name__ == '__main__':
    p_lst = []
    for i in range(10):
        p = Process(target=process,args=[i,])
        p.start()
        p_lst.append(p)
    for p in p_lst:p.join()   # 檢測p是否結束 若是沒有結束就阻塞直到結束 若是已經結束了就不阻塞
    print('求和')
開啓多個子進程

5.開啓子進程的第二種方式併發

除了上面這些開啓進程的方法,還有一種以繼承Process類的形式開啓進程的方式app

import os
from multiprocessing import Process
class Myprocess(Process):
    def __init__(self,*args):
        super().__init__()
        self.args = args
    def run(self):
        print(os.getpid(),self.name,self.pid)
        for name in self.args:
            print('%s和女主播聊天'%name)

if __name__ == '__main__':
    print(os.getpid())
    p = Myprocess('yuan','wusir')
    p.start()   # 在執行start的時候,會幫咱們主動執行run方法中的內容

6.進程之間的數據隔離問題

from multiprocessing import Process

def work():
    global n
    n=0
    print('子進程內: ',n)


if __name__ == '__main__':
    n = 100
    p=Process(target=work)
    p.start()
    print('主進程內: ',n)

進程之間的數據隔離問題

守護進程

會隨着主進程的結束而結束。

主進程建立守護進程

  其一:守護進程會在主進程代碼執行結束後就終止

  其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

import os
import time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self,person):
        super().__init__()
        self.person = person
    def run(self):
        print(os.getpid(),self.name)
        print('%s正在和女主播聊天' %self.person)


p=Myprocess('哪吒')
p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行
p.start()
time.sleep(10) # 在sleep時查看進程id對應的進程ps -ef|grep id
print('')

守護進程的啓動
from multiprocessing import Process

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


p1=Process(target=foo)
p2=Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
time.sleep(0.1)
print("main-------")#打印該行則主進程代碼結束,則守護進程p1應該被終止.#可能會有p1任務執行的打印信息123,由於主進程打印main----時,p1也執行了,可是隨即被終止.

主進程代碼執行結束守護進程當即結束
from multiprocessing import Process
import time
import random

class Myprocess(Process):
    def __init__(self,person):
        self.name=person
        super().__init__()

    def run(self):
        print('%s正在和網紅臉聊天' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s還在和網紅臉聊天' %self.name)


p1=Myprocess('哪吒')
p1.start()

p1.terminate()#關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活
print(p1.is_alive()) #結果爲True

print('開始')
print(p1.is_alive()) #結果爲False

進程對象的其餘方法:terminate,is_alive
class Myprocess(Process):
    def __init__(self,person):
        self.name=person   # name屬性是Process中的屬性,標示進程的名字
        super().__init__() # 執行父類的初始化方法會覆蓋name屬性
        #self.name = person # 在這裏設置就能夠修改進程名字了
        #self.person = person #若是不想覆蓋進程名,就修改屬性名稱就能夠了
    def run(self):
        print('%s正在和網紅臉聊天' %self.name)
        # print('%s正在和網紅臉聊天' %self.person)
        time.sleep(random.randrange(1,5))
        print('%s正在和網紅臉聊天' %self.name)
        # print('%s正在和網紅臉聊天' %self.person)


p1=Myprocess('哪吒')
p1.start()
print(p1.pid)    #能夠查看子進程的進程id

進程對象的其餘屬性:pid和name

 

進程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)

鎖 —— multiprocess.Lock

      經過剛剛的學習,咱們想方設法實現了程序的異步,讓多個任務能夠同時在幾個進程中併發處理,他們之間的運行沒有順序,一旦開啓也不受咱們控制。儘管併發編程讓咱們能更加充分的利用IO資源,可是也給咱們帶來了新的問題。

  當多個進程使用同一份數據資源的時候,就會引起數據安全或順序混亂問題

import os
import time
import random
from multiprocessing import Process

def work(n):
    print('%s: %s is running' %(n,os.getpid()))
    time.sleep(random.random())
    print('%s:%s is done' %(n,os.getpid()))

if __name__ == '__main__':
    for i in range(3):
        p=Process(target=work,args=(i,))
        p.start()

多進程搶佔輸出資源
# 由併發變成了串行,犧牲了運行效率,但避免了競爭
import os
import time
import random
from multiprocessing import Process,Lock

def work(lock,n):
    lock.acquire()
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s: %s is done' % (n, os.getpid()))
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,i))
        p.start()

使用鎖維護執行順序

上面這種狀況雖然使用加鎖的形式實現了順序的執行,可是程序又從新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。

  接下來,咱們以模擬搶票爲例,來看看數據安全的重要性

#文件db的內容爲:{"count":1}
#注意必定要用雙引號,否則json沒法識別
#併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(0.1) #模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2) #模擬寫數據的網絡延遲
        json.dump(dic,open('db','w'))
        print('\033[43m購票成功\033[0m')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(100): #模擬併發100個客戶端搶票
        p=Process(target=task)
        p.start()

多進程同時搶購餘票
#文件db的內容爲:{"count":5}
#注意必定要用雙引號,否則json沒法識別
#併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(random.random()) #模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(random.random()) #模擬寫數據的網絡延遲
        json.dump(dic,open('db','w'))
        print('\033[32m購票成功\033[0m')
    else:
        print('\033[31m購票失敗\033[0m')

def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(100): #模擬併發100個客戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()

使用鎖來保證數據安全
#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.須要本身加鎖處理

#所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,
咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

總結:

# 同步控制
# 只要用到了鎖 鎖以內的代碼就變成同步的了
# 鎖 :控制一段代碼 同一時間 只能被一個進程執行

信號量 —— multiprocess.Semaphore(瞭解)

互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。
假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念
from multiprocessing import Process,Semaphore
import time,random

def go_ktv(sem,user):
    sem.acquire()
    print('%s 佔到一間ktv小屋' %user)
    time.sleep(random.randint(0,3)) #模擬每一個人在ktv中待的時間不一樣
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(4)
    p_l=[]
    for i in range(13):
        p=Process(target=go_ktv,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')

例子

事件 —— multiprocess.Event(瞭解)

python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。

    事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

clear:將「Flag」設置爲False
set:將「Flag」設置爲True

事件介紹

# 事件
# wait的方法 根據一個狀態來決定本身是否要阻塞
# 狀態相關的方法
# set 將狀態改成T
# clear 將狀態改成F
# is_set 判斷當前的狀態是否爲T

import time
import random
from multiprocessing import Process,Event

def car(i,e):  # 感知狀態的變化
    if not e.is_set():    # 當前這個事件的狀態若是是False
        print('car%s正在等待'%i)  # 這輛車正在等待經過路口
    e.wait()    # 阻塞 直到有一個e.set行爲  # 等紅燈
    print('car%s經過路口'%i)

def traffic_light(e):  # 修改事件的狀態
    print('\033[1;31m紅燈亮\033[0m')
    # 事件在創立之初的狀態是False,至關於我程序中的紅燈
    time.sleep(2)  # 紅燈亮2s
    while True:
        if not e.is_set():  # False
            print('\033[1;32m綠燈亮\033[0m')
            e.set()
        elif e.is_set():
            print('\033[1;31m紅燈亮\033[0m')
            e.clear()
        time.sleep(2)

if __name__ == '__main__':
    e = Event()
    Process(target=traffic_light,args=[e,]).start()
    for i in range(50):
        time.sleep(random.randrange(0,5,2))
        Process(target=car,args=[i,e]).start()
相關文章
相關標籤/搜索