Python基礎:之進程

1、進程

1.什麼是進程

程序並不能單獨運行,只有將程序裝載到內存中,系統爲它分配資源才能運行,而這種執行的程序就稱之爲進程。python

正在進行的一個過程或者說一個任務。而負責執行任務則是cpulinux

2.進程與程序的區別

程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。nginx

須要強調的是:同一個程序執行兩次,那也是兩個進程,好比打開暴風影音,雖然都是同一個軟件,可是一個能夠播放蒼井空,一個能夠播放飯島愛。web

3.併發與並行

 不管是並行仍是併發,在用戶看來都是'同時'運行的,無論是進程仍是線程,都只是一個任務而已,真是幹活的是cpu,cpu來作這些任務,而一個cpu同一時刻只能執行一個任務shell

  並行:同時運行,只有具有多個cpu才能實現並行數據庫

      併發:是僞並行,即看起來是同時運行。單個cpu+多道技術就能夠實現併發,(並行也屬於併發)編程

      

 

      全部現代計算機常常會在同一時間作不少件事,一個用戶的PC(不管是單cpu仍是多cpu),均可以同時運行多個任務(一個任務能夠理解爲一個進程)。json

    啓動一個進程來殺毒(360軟件)windows

    啓動一個進程來看電影(暴風影音)安全

    啓動一個進程來聊天(騰訊QQ)

  全部的這些進程都需被管理,因而一個支持多進程的多道程序系統是相當重要的

      多道技術:內存中同時存入多道(多個)程序,cpu從一個進程快速切換到另一個,使每一個進程各自運行幾十或幾百毫秒,這樣,雖然在某一個瞬間,一個cpu只能執行一個任務,但在1秒內,cpu卻能夠運行多個進程,這就給人產生了並行的錯覺,即僞併發,以此來區分多處理器操做系統的真正硬件並行(多個cpu共享同一個物理內存)

4.同步異步

同步就是指一個進程在執行某個請求的時候,若該請求須要一段時間才能返回信息,那麼這個進程將會一直等待下去,直到收到返回信息才繼續執行下去;

    異步是指進程不須要一直等下去,而是繼續執行下面的操做,無論其餘進程的狀態。當有消息返回時系統會通知進程進行處理,這樣能夠提升執行的效率。

    舉個例子,打電話時就是同步通訊,發短息時就是異步通訊。

5.進程的建立

 但凡是硬件,都須要有操做系統去管理,只要有操做系統,就有進程的概念,就須要有建立進程的方式,一些操做系統只爲一個應用程序設計,好比微波爐中的控制器,一旦啓動微波爐,全部的進程都已經存在。

  而對於通用系統(跑不少應用程序),須要有系統運行過程當中建立或撤銷進程的能力,主要分爲4中形式建立新的進程

  1. 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,後臺運行的進程與用戶無關,運行在後臺而且只在須要時才喚醒的進程,稱爲守護進程,如電子郵件、web頁面、新聞、打印)

  2. 一個進程在運行過程當中開啓了子進程(如nginx開啓多進程,os.fork,subprocess.Popen等)

  3. 用戶的交互式請求,而建立一個新進程(如用戶雙擊暴風影音)

  4. 一個批處理做業的初始化(只在大型機的批處理系統中應用)

  

  不管哪種,新進程的建立都是由一個已經存在的進程執行了一個用於建立進程的系統調用而建立的:

  1. 在UNIX中該系統調用是:fork,fork會建立一個與父進程如出一轍的副本,兩者有相同的存儲映像、一樣的環境字符串和一樣的打開文件(在shell解釋器進程中,執行一個命令就會建立一個子進程)

  2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的建立,也負責把正確的程序裝入新進程。

 

  關於建立的子進程,UNIX和windows

  1.相同的是:進程建立後,父進程和子進程有各自不一樣的地址空間(多道技術要求物理層面實現進程之間內存的隔離),任何一個進程的在其地址空間中的修改都不會影響到另一個進程。

  2.不一樣的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是能夠有隻讀的共享內存區的。可是對於windows系統來講,從一開始父進程與子進程的地址空間就是不一樣的。

6.進程的終止

  • 正常退出(自願,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)
  • 出錯退出(自願,python a.py中a.py不存在)
  • 嚴重錯誤(非自願,執行非法指令,如引用不存在的內存,1/0等,能夠捕捉異常,try...except...)
  • 被其餘進程殺死(非自願,如kill -9)

 7.進程的層次結構

 不管UNIX仍是windows,進程只有一個父進程,不一樣的是:

  1. 在UNIX中全部的進程,都是以init進程爲根,組成樹形結構。父子進程共同組成一個進程組,這樣,當從鍵盤發出一個信號時,該信號被送給當前與鍵盤相關的進程組中的全部成員。

  2. 在windows中,沒有進程層次的概念,全部的進程都是地位相同的,惟一相似於進程層次的暗示,是在建立進程時,父進程獲得一個特別的令牌(稱爲句柄),該句柄能夠用來控制子進程,可是父進程有權把該句柄傳給其餘子進程,這樣就沒有層次了。

8.進程狀態

 tail -f access.log |grep '404'

  執行程序tail,開啓一個子進程,執行程序grep,開啓另一個子進程,兩個進程之間基於管道'|'通信,將tail的結果做爲grep的輸入。

  進程grep在等待輸入(即I/O)時的狀態稱爲阻塞,此時grep命令都沒法運行

  其實在兩種狀況下會致使一個進程在邏輯上不能運行,

  1. 進程掛起是自身緣由,遇到I/O阻塞,便要讓出CPU讓其餘進程去執行,這樣保證CPU一直在工做

  2. 與進程無關,是操做系統層面,可能會由於一個進程佔用時間過多,或者優先級等緣由,而調用其餘的進程去使用CPU。

  於是一個進程由三種狀態

9.進程併發實現

 進程併發的實如今於,硬件中斷一個正在運行的進程,把此時進程運行的全部狀態保存下來,爲此,操做系統維護一張表格,即進程表(process table),每一個進程佔用一個進程表項(這些表項也稱爲進程控制塊)

  該表存放了進程狀態的重要信息:程序計數器、堆棧指針、內存分配情況、全部打開文件的狀態、賬號和調度信息,以及其餘在進程由運行態轉爲就緒態或阻塞態時,必須保存的信息,從而保證該進程在再次啓動時,就像從未被中斷過同樣。

10.python多進程模塊

10.1 multiprocessing模塊介紹

 

python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了很是好用的多進程包multiprocessing。
multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。

10.2 Process建立進程的類

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動) 強調: 1. 須要使用關鍵字的方式來指定參數 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
參數介紹
1.group參數未使用,值始終爲None()
2.target表示調用對象,即子進程要執行的任務
3.args表示調用對象的位置參數元組,args=(1,2,'egon',)
4.kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
5.name爲子進程的名稱

方法介紹

1.p.start():啓動進程,並調用該子進程中的p.run() 

2.p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法  

3.p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖

4.p.is_alive():若是p仍然運行,返回True

5.p.join([timeout]):主進程等待p終止(強調:是主進程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程

屬性介紹

1.p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置

2.p.name:進程的名稱

3.p.pid:進程的pid

4.p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)

5.p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)

10.3 Python開啓進程的兩種方式

注意:在windows中Process()必須放到# if __name__ == '__main__':下

if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.

因爲Windows沒有fork,多處理模塊啓動一個新的Python進程並導入調用模塊。 
若是在導入時調用Process(),那麼這將啓動無限繼承的新進程(或直到機器耗盡資源)。 
這是隱藏對Process()內部調用的原,使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用。

方式一

def piao(name):
print("%s is piaoing"%name)
time.sleep(2)
print("%s is piao end"%name)
if __name__ == '__main__':
p1 = Process(target=piao,args=('egon',),name='p1')
p1.start()#啓動主進程調用子進程中的run方法
print('p1 name is %s'%p1.name)#此處代碼和start同時運行的並行
print('父類')##此處代碼和start同時運行的並行

方式二

繼承Process,重寫run方法,繼承Process類必定要重寫run方法,init初始化時super父類init方法在定義本身得

class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print("%s is piaoing" % self.name)
        # time.sleep(1)
        print("%s is piao end"%self.name)
if __name__ == '__main__':
    p1=Piao('han')
    p1.start()#自動化調用run方法
    print('父類1')
    p2=Piao('cai')
    p2.start()
    print("父類2")

 練習socket併發通信

方式一

SERVER
from multiprocessing import Process from socket import * server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,addr): while True: #通信循環 try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': while True: #連接循環 conn,addr=server.accept() p=Process(target=talk,args=(conn,addr)) p.start()

 

方式二 

 

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process
from socket import *
class socket_server(Process):
    def __init__(self):
        super(socket_server, self).__init__()

    def run(self):
        server = socket(AF_INET, SOCK_STREAM)
        server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        server.bind(('127.0.0.1', 8081))
        server.listen(5)
        conn, addr = server.accept()
        while True: #通信循環
            try:
                msg=conn.recv(1024)
                if not msg:break
                conn.send(msg.upper())
            except Exception:
                break
if __name__ == '__main__':
    p1 = socket_server()
    p1.start()

 

客戶端  

#!/usr/bin/python
# -*- coding:utf-8 -*-
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081))
while True:
    msg=input('>>').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

10.4進程方法屬性

#p.join(),是父進程在等p的結束,是父進程阻塞在原地,而p仍然在後臺運行
def piao(name): print('%s is piaoing' % name) time.sleep(random.randint(1,3)) print('%s is piao end' % name) if __name__ == '__main__': p1=Process(target=piao,args=('egon',)) p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('wupeiqi',)) p4=Process(target=piao,args=('yuanhao',)) p_l=[p1,p2,p3,p4] for p in p_l: p.start() for p in p_l: p.join()#父進程等待p運行結束後才執行住進程 print('主進程')
#有的同窗會有疑問:既然join是等待進程結束,那麼我像下面這樣寫,進程不就又變成串行的了嗎?
#固然不是了,必須明確:p.join()是讓誰等?
#很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p,

#詳細解析以下:
#進程只要start就會在開始運行了,因此p1-p4.start()時,系統中已經有四個併發的進程了
#而咱們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵
#join是讓主線程等,而p1-p4仍然是併發執行的,p1.join的時候,其他p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接經過檢測,無需等待
# 因此4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間
#進程對象的其餘方法一:terminate,is_alive
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s is piao end' %self.name)


p1=Piao('egon1')
p1.start()

p1.terminate()#關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活
print(p1.is_alive()) #結果爲True

print('開始')
print(p1.is_alive()) #結果爲False

  

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing  import Process
import time
import random
import os
class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)

if __name__ == '__main__':

    p=Piao('egon')
    p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程死,p跟着一塊兒死
    p.start()
    p.join(0.0001) #等待p中止,等0.0001秒就再也不等了
    print('開始')
    print(1)
    print(p.is_alive())

 

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
import random
class Piao(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法會執行self.name=Piao-1,
        #                    #因此加到這裏,會覆蓋咱們的self.name=name

        #爲咱們開啓的進程設置名字的作法
        super().__init__()

        self.name = name

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)
if __name__ == '__main__':

    p=Piao('egon')
    p.start()
    print('開始')
    print(p.pid) #查看pid

 

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
import random
def piao(name):
    print('%s is piaoing' % name)
    time.sleep(random.randint(1,3))
    print('%s is piao end' % name)
if __name__ == '__main__':
    p1=Process(target=piao,args=('egon',))
    p1.name = 'hanjialong'#修改進程名稱
    # p1.daemon=True
    p1.start()

    p1.terminate()
    print(p1.is_alive())
    time.sleep(1)
    print(p1.is_alive())

    print('主進程')

    print(p1.name)
    print(p1.pid)

循環開啓進程

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
import random
def foo(name):
    print(name)

if __name__ == '__main__':
    for i in range(1,100):
        p = Process(target=foo, args=('進程%s' % i,))
        p.start()
進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,
part1:共享同一打印終端,發現會有多行內容打印到一行的現象(多個進程共享並搶佔同一個打印終端,亂了)
#多進程共享一個打印終端(用python2測試看兩個進程同時往一個終端打印,出現打印到一行的錯誤)
from multiprocessing import Process
import time
class Logger(Process):
    def __init__(self):
        super(Logger,self).__init__()
    def run(self):
        print(self.name)


for i in range(1000000):
    l=Logger()
    l.start()

 線程共享同一個文件,實驗:

既然能夠用文件共享數據,那麼爲何通信不用文件做爲介質呢?

1.效率問題,使用文件速度太慢

2.須要本身加鎖處理

from multiprocessing import Process
import time
import random
#多進程共享一套文件系統

def work(filename,msg):
    with open(filename,'a',encoding='utf-8') as f:
        f.write(msg)

if __name__ == '__main__':

    for i in range(5):
        p=Process(target=work,args=('a.txt','進程%s\n' %str(i)))
        p.start()
        p.join()
        work('a.txt',"主進程%s\n"%str(i))

11 進程間通訊(IPC)

1.隊列方式

 進程彼此之間互相隔離,要實現進程間通訊,即IPC,multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

建立隊列的類(底層就是以管道和鎖定的方式實現)

Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。

參數介紹

maxsize是隊列中容許最大項數,省略則無大小限制。 

方法介紹 

 

1.q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。

2.q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.

3.q.get_nowait():同q.get(False) q.put_nowait():同q.put(False)

4.q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目

5.q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。

6.q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣

 

瞭解

1 q.cancel_join_thread():不會在進程退出時自動鏈接後臺線程。能夠防止join_thread()方法阻塞

2 q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。

3 q.join_thread():鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用
4.q.cancel_join_thread方法能夠禁止這種行爲

應用 

 

 
 
multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,可是隊列接口
#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Queue
#隊列,先進先出
q=Queue(3)#設置隊列長度爲3

q.put({'a':1})
q.put('b')
q.put('c')
print(q.full())#此時隊列已滿因此返回True
# q.put('d',False) #等同於q.put_nowait('d')
print(q.get())
q.put('d',timeout=1)#當超過1秒尚未空間存儲就會報錯
print(q.qsize())
print(q.get())
print(q.get())
print(q.get())
# print(q.get(timeout=2))#在2秒內尚未可取的就會報錯
# print(q.get())
# print(q.get())
# print(q.empty())
print(q.get(block=False))
# print(q.get_nowait())

10.6 生產者消費者模型 

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        time.sleep(random.randint(1,3))
        res=q.get()
        print("\033[45m消費者拿到了:%s\033[0m"%res)
def producer(seq,q):
    for item in seq:
        time.sleep(random.randint(1,3))
        print("\033[46m生產者生產了:%s\033[0m"%item)
        q.put(item)
if __name__ == '__main__':
    q=Queue()
    seq=('包子%s' %i for i in range(10))
    producer(seq,q)
    print('主線程')
    c =Process(target=consumer,args=(q,))
    c.start()

主線程等待消費者結束(生產者發送結束信號給消費者)

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        # time.sleep(random.randint(1,3))
        res=q.get()
        print("\033[45m消費者拿到了:%s\033[0m"%res)
def producer(seq,q):
    for item in seq:
        # time.sleep(random.randint(1,3))
        print("\033[46m生產者生產了:%s\033[0m"%item)
        q.put(item)
if __name__ == '__main__':
    q=Queue()
    c =Process(target=consumer,args=(q,))
    c.start()
    seq=('包子%s' %i for i in range(10))
    producer(seq,q)
    c.join()
    print('主線程')

JoinableQueue類

JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

    參數介紹:

    maxsize是隊列中容許最大項數,省略則無大小限制。    

  方法介紹:

    JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
    q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
    q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
from multiprocessing import Process,JoinableQueue
import time
import random

def consumer(q,name):
    while True:
        # time.sleep(random.randint(1,3))
        res=q.get()
        q.task_done()
        print('\033[41m消費者%s拿到了%s\033[0m' %(name,res))

def producer(seq,q,name):
    for item in seq:
        # time.sleep(random.randint(1,3))
        q.put(item)
        print('\033[42m生產者%s生產了%s\033[0m' %(name,item))
    q.join()
    print('============>>')

if __name__ == '__main__':
    q=JoinableQueue()
    c=Process(target=consumer,args=(q,'egon'),)
    c.daemon=True #設置守護進程,主進程結束c就結束
    c.start()

    seq=['包子%s' %i for i in range(10)]
    p=Process(target=producer,args=(seq,q,'廚師1'))
    p.start()

    # master--->producer----->q--->consumer(10次task_done)
    p.join() #主進程等待p結束,p等待c把數據都取完,c一旦取完數據,p.join就是再也不阻塞,進
    # 而主進程結束,主進程結束會回收守護進程c,並且c此時也沒有存在的必要了
    print('主進程')

 一個生產者多個消費者

from multiprocessing import Process,JoinableQueue
import time,random
def consumer(name,q):
    while True:
        time.sleep(random.randint(1,2))
        res=q.get()
        print('\033[45m%s拿到了 %s\033[0m' %(name,res))
        q.task_done()


def producer(seq,q):
    for item in seq:
        time.sleep(random.randrange(1,2))
        q.put(item)
        print('\033[46m生產者作好了 %s\033[0m' %item)
    q.join()

if __name__ == '__main__':
    q=JoinableQueue()
    seq=('包子%s' %i for i in range(10))

    p1=Process(target=consumer,args=('消費者1',q,))
    p2=Process(target=consumer,args=('消費者2',q,))
    p3=Process(target=consumer,args=('消費者3',q,))
    p1.daemon=True
    p2.daemon=True
    p3.daemon=True
    p1.start()
    p2.start()
    p3.start()

    producer(seq,q)

    print('主線程')

也能夠開啓一個新的子進程當生產者,不用主線程當生產者

from multiprocessing import Process,JoinableQueue
import time,random
def consumer(name,q):
    while True:
        # time.sleep(random.randint(1,2))
        res=q.get()
        print('\033[45m%s拿到了 %s\033[0m' %(name,res))
        q.task_done()


def producer(seq,q):
    for item in seq:
        # time.sleep(random.randrange(1,2))
        q.put(item)
        print('\033[46m生產者作好了 %s\033[0m' %item)
    q.join()

if __name__ == '__main__':
    q=JoinableQueue()
    seq=['包子%s' %i for i in range(10)] #在windows下沒法傳入生成器,咱們能夠用列表解析測試

    p1=Process(target=consumer,args=('消費者1',q,))
    p2=Process(target=consumer,args=('消費者2',q,))
    p3=Process(target=consumer,args=('消費者3',q,))
    p1.daemon=True
    p2.daemon=True
    p3.daemon=True
    p1.start()
    p2.start()
    p3.start()

    # producer(seq,q) #也能夠是下面三行的形式,開啓一個新的子進程當生產者,不用主線程當生產者
    p4=Process(target=producer,args=(seq,q))
    p4.start()
    p4.join()
    print('主線程')

2.管道方式

管道也能夠說是隊列的另一種形式,下面咱們就開始介紹基於管道實現金城之間的消息傳遞

建立管道的類:

Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道

參數介紹:

dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。

 方法介紹:

 conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。

 conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
1.conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
2.conn1.fileno():返回鏈接使用的整數文件描述符
3.conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。 4.conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
5.conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收 6.conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。

基於管道實現進程間通訊(與隊列的方式是相似的,隊列就是管道加鎖實現的):

from multiprocessing import Process,Pipe

import time,os
def consumer(p,name):
    left,right=p
    left.close()
    while True:
        try:
            baozi=right.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            right.close()
            break
def producer(seq,p):
    left,right=p
    right.close()
    for i in seq:
        left.send(i)
        # time.sleep(1)
    else:
        left.close()
if __name__ == '__main__':
    left,right=Pipe()

    c1=Process(target=consumer,args=((left,right),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(left,right))

    right.close()
    left.close()

    c1.join()
    print('主進程')

注意:生產者和消費者都沒有使用管道的某個端點,就應該將其關閉,如在生產者中關閉管道的右端,在消費者中關閉管道的左端。若是忘記執行這些步驟,程序可能再消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生產EOFError異常。所以在生產者中關閉管道不會有任何效果,付費消費者中也關閉了相同的管道端點。

管道能夠用於雙向通訊,利用一般在客戶端/服務器中使用的請求/響應模型或遠程過程調用,就可使用管道編寫與進程交互的程序,以下

#注意:send()和recv()方法使用pickle模塊對對象進行序列化。
from multiprocessing import Process,Pipe import time,os def adder(p,name): server,client=p client.close() while True: try: x,y=server.recv() except EOFError: server.close() break res=x+y server.send(res) print('server done') if __name__ == '__main__': server,client=Pipe() c1=Process(target=adder,args=((server,client),'c1')) c1.start() server.close() client.send((10,20)) print(client.recv()) client.close() c1.join() print('主進程')

12 進程間通訊方式三:共享數據

12 .1進程共享數據

from multiprocessing import Manager,Process
import os
def work(d,l):
    l.append(os.getpid())
    d[os.getpid()]=os.getpid()

if __name__ == '__main__':
    m=Manager()
    l=m.list(['init',])
    d=m.dict({'name':'egon'})


    # p1=Process(target=work,args=(d,l))
    # p2=Process(target=work,args=(d,l))
    # p3=Process(target=work,args=(d,l))
    # p4=Process(target=work,args=(d,l))
    # p5=Process(target=work,args=(d,l))
    #
    # p_l=[p1,p2,p3,p4,p5]
    # for p in p_l:
    #     p.start()
    #
    # for p in p_l:
    #     p.join()

    p_l=[]
    for i in range(5):
        p=Process(target=work,args=(d,l))
        p_l.append(p)
        p.start()

    for p in p_l:
        p.join()
    print(d)
    print(l)

12.2  進程同步鎖 

加鎖的目的是爲了保證多個進程修改同一塊數據時,同一時間只能有一個修改,即串行的修改,沒錯,速度是慢了,犧牲了速度而保證了數據安全。

在4.4小節咱們學習了隊列,隊列是管道+鎖實現的,於是咱們無需考慮複雜的鎖問題。

可是在4.3小節中咱們介紹到,進程之間數據隔離,可是共享一套文件系統,於是能夠經過文件來實現進程直接的通訊,但問題是必須本身加鎖處理

因此,就讓咱們幫文件當作數據庫,模擬搶票(Lock互斥鎖)

#文件db的內容爲:{"count":1}
#注意必定要用雙引號,否則json沒法識別
from multiprocessing import Process,Lock
import json
import time
import random
import os

def work(filename,lock): #買票
    # lock.acquire()
    with lock:
        with open(filename,encoding='utf-8') as f:
            dic=json.loads(f.read())
            # print('剩餘票數: %s' % dic['count'])
        if dic['count'] > 0:
            dic['count']-=1
            time.sleep(random.randint(1,3)) #模擬網絡延遲
            with open(filename,'w',encoding='utf-8') as f:
                f.write(json.dumps(dic))
            print('%s 購票成功' %os.getpid())
        else:
            print('%s 購票失敗' %os.getpid())
    # lock.release()

if __name__ == '__main__':
    lock=Lock()
    p_l=[]
    for i in range(100):
        p=Process(target=work,args=('db',lock))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()

    print('主線程')

13 進程池 

13.1 Pool類

在使用Python進行系統管理時,特別是同時操做多個文件目錄或者遠程控制多臺主機,並行操做能夠節約大量的時間。若是操做的對象數目不大時,還能夠直接使用Process類動態的生成多個進程,十幾個還好,可是若是上百個甚至更多,那手動去限制進程數量就顯得特別的繁瑣,此時進程池就派上用場了。 
Pool類能夠提供指定數量的進程供用戶調用,當有新的請求提交到Pool中時,若是池尚未滿,就會建立一個新的進程來執行請求。若是池滿,請求就會告知先等待,直到池中有進程結束,纔會建立新的進程來執行這些請求。
下面介紹一下multiprocessing 模塊下的Pool類下的幾個方法

#!/usr/bin/python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Lock
import json
import time
import random
def work(dbfile,name,lock):
    # lock.acquire()
    with lock:
        with open(dbfile,encoding='utf-8') as f:
            dic=json.loads(f.read())

        if dic['count'] > 0:
            dic['count']-=1
            time.sleep(random.randint(1,3)) #模擬網絡延遲
            with open(dbfile,'w',encoding='utf-8') as f:
                f.write(json.dumps(dic))
            print('\033[43m%s 搶票成功\033[0m' %name)
        else:
            print('\033[45m%s 搶票失敗\033[0m' %name)
    # lock.release()


if __name__ == '__main__':
    lock=Lock()
    p_l=[]
    for i in range(100):
        p=Process(target=work,args=('a.txt','用戶%s' %i,lock))
        p_l.append(p)
        p.start()


    for p in p_l:
        p.join()
    print('主進程')

使用進程池進程socket通信 

客戶端

#客戶端
#!/usr/bin/Python # -*- coding:utf-8 -*- from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8081)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))

服務端

#!/usr/bin/python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Pool
from socket import *
import os
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8081))
server.listen(5)

def talk(conn,addr):
print(os.getpid())
while True: #通信循環
try:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__':
pool=Pool()
res_l=[]
while True: #連接循環
conn,addr=server.accept()
# print(addr)
# pool.apply(talk,args=(conn,addr))
res=pool.apply_async(talk,args=(conn,addr))
res_l.append(res)
# print(res_l)

13.2 回調函數

相關文章
相關標籤/搜索