01python 中的進程(python併發編程)

進程

什麼是進程?

        進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。
        狹義定義:進程是正在運行的程序的實例(an instance of a computer program that is being executed)。
        廣義定義:進程是一個具備必定獨立功能的程序關於某個數據集合的一次運行活動。它是操做系統動態執行的基本單元,在傳統的操做系統中,進程既是基本的分配單元,也是基本的執行單元。

進程調度

要想多個進程交替運行,操做系統必須對這些進程進行調度,這個調度也不是隨即進行的,而是須要遵循必定的法則,由此就有了進程的調度算法。
瞭解幾個相關的算法
  • 先來先服務算法
  • 短期優先調度算法
  • 時間片輪轉法
  • 多級反饋隊列
http://www.cnblogs.com/Eva-J/articles/8253549.html#_label11        ——詳見女神的博客

進程的並行與併發

並行 : 並行是指二者同時執行,好比賽跑,兩我的都在不停的往前跑;(資源夠用,好比三個線程,四核的CPU )
併發 : 併發是指資源有限的狀況下,二者交替輪流使用資源,好比一段路(單核CPU資源)同時只能過一我的,A走一段後,讓給B,B用完繼續給A ,交替使用,目的是提升效率。
區別:
並行是從微觀上,也就是在一個精確的時間片刻,有不一樣的程序在執行,這就要求必須有多個處理器。
併發是從宏觀上,在一個時間段上能夠看出是同時執行的,好比一個服務器同時處理多個session。

 

同步,異步、阻塞與非阻塞

這張圖很關鍵html

 

  在瞭解其餘概念以前,咱們首先要了解進程的幾個狀態。在程序運行的過程當中,因爲被操做系統的調度算法控制,程序會進入幾個狀態:就緒,運行和阻塞。
  (1)就緒(Ready)狀態    當進程已分配到除CPU之外的全部必要的資源,只要得到處理機即可當即執行,這時的進程狀態稱爲就緒狀態。
  (2)執行/運行(Running)狀態    當進程已得到處理機,其程序正在處理機上執行,此時的進程狀態稱爲執行狀態。
  (3)阻塞(Blocked)狀態    正在執行的進程,因爲等待某個事件發生而沒法執行時,便放棄處理機而處於阻塞狀態。引發進程阻塞的事件可有多種,例如,等待I/O完成、申請緩衝區不能知足、等待信件(信號)等。

同步與異步

      所謂同步:就是一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。要麼成功都成功,失敗都失敗,兩個任務的狀態能夠保持一致。
        例如購物車與結算,必須現將商品添加至購物車才能結算!
  所謂異步:是不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了。至於被依賴的任務最終是否真正完成,依賴它的任務沒法肯定,因此它是不可靠的任務序列
好比我去銀行辦理業務,可能會有兩種方式:
第一種 :選擇排隊等候;
第二種 :選擇取一個小紙條上面有個人號碼,等到排到我這一號時由櫃檯的人通知我輪到我去辦理業務了;

第一種:前者(排隊等候)就是同步等待消息通知,也就是我要一直在等待銀行辦理業務狀況;

第二種:後者(等待別人通知)就是異步等待消息通知。在異步消息處理中,等待消息通知者(在這個例子中就是等待辦理業務的人)每每註冊一個回調機制,在所等待的事件被觸發時由觸發機制(在這裏是櫃檯的人)經過某種機制(在這裏是寫在小紙條上的號碼,喊號)找到等待該事件的人。

 

阻塞與非阻塞

阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來講的

進程的建立與結束

進程的建立

multiprocess.process模塊介紹
 1 Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)
 2 
 3 強調:
 4 1. 須要使用關鍵字的方式來指定參數
 5 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
 6 
 7 參數介紹:
 8 1 group參數未使用,值始終爲None
 9 2 target表示調用對象,即子進程要執行的任務
10 3 args表示調用對象的位置參數元組,args=(1,2,'egon',)
11 4 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
12 5 name爲子進程的名稱

實例代碼:python

 1 # 在python進程中開啓一個子進程
 2 import time
 3 from multiprocessing import Process
 4 
 5 
 6 def f(name):
 7     print("hello", name)
 8     print("我是子進程")
 9 
10 
11 if __name__ == '__main__':
12     p = Process(target=f, args=("pontoon", ))
13     p.start()
14     time.sleep(1)
15     print("執行主進程的內容了")
16 
17 >>>hello pontoon
18 我是子進程
19 執行主進程的內容了
 1 import os
 2 import time
 3 from multiprocessing import Process
 4 
 5 
 6 def func():
 7     print(54321)
 8     time.sleep(1)
 9     print("子進程:", os.getpid())
10     print("子進程的父進程:", os.getppid())
11     print(12345)
12 
13 
14 if __name__ == '__main__':
15     p = Process(target=func)
16     p.start()
17     print("*" * 20)
18     print("父進程:", os.getpid())  # 查看當前進程的進程號
19     print("父進程的父進程", os.getppid())
20     
21 
22 >>>********************
23 父進程: 2972
24 父進程的父進程 14060
25 54321
26 子進程: 13992
27 子進程的父進程: 2972
28 12345

多進程中的join()方法

進程中join()方法的用處,感知一個子進程的結束,將異步的程序變成同步的程序,在join()方法調用以前,子進程與主進程都是異步的,可是當調用了join()方法以後,那麼下面的代碼就變成了同步!web

 1 import os
 2 import time
 3 from multiprocessing import Process
 4 
 5 
 6 def func():
 7     print(54321)
 8     time.sleep(1)
 9     print("子進程:", os.getpid())
10     print("子進程的父進程:", os.getppid())
11     print(12345)
12 
13 
14 if __name__ == '__main__':
15     p = Process(target=func)
16     p.start()
17     p.join()        # 將異步的程序變成同步的
18     print("*" * 20)
19     print("進程結束了")  # 查看當前進程的進程號
20     
21 
22 # 代碼彷佛這樣執行纔是正常的,這就是join()方法的做用    
23 >>>54321
24 子進程: 10472
25 子進程的父進程: 8692
26 12345
27 ********************
28 進程結束了    

join()方法須要注意的地方
 

 



同時開啓多個子進程
 1 def func(a, b):
 2     print(a)
 3     time.sleep(3)
 4     print(b)
 5 
 6 
 7 if __name__ == '__main__':
 8     p = Process(target=func, args=(10, 20))
 9     p.start()
10     p = Process(target=func, args=(11, 21))
11     p.start()
12     p = Process(target=func, args=(12, 22))
13     p.start()
14     p = Process(target=func, args=(13, 23))
15     p.start()
16 
17     print("*" * 20)
18     # join()之上的程序是異步的,join()以後的方法變成了同步
19     p.join()
20     print("進程結束了")  # 查看當前進程的進程號
21     
22 
23 >>>********************
24 10
25 11
26 12
27 13
28 20
29 21
30 22
31 23
32 進程結束了
使用for循環開啓多個子進程

for 循環配合p.join()方法值得注意的地方算法

 1 import os
 2 import time
 3 from multiprocessing import Process
 4 
 5 
 6 def func(a, b):
 7     print("*" * a)
 8     time.sleep(3)
 9     print("*" * b)
10 
11 
12 if __name__ == '__main__':
13     
14     for i in range(10):
15         p = Process(target=func, args=(5, 10))
16         p.start()
17         
18     print("*" * 20)
19     # join()之上的程序是異步的,join()以後的方法變成了同步
20     p.join()    
21     print("進程結束了")  # 查看當前進程的進程號
22     
23 >>>*****
24 *****
25 *****
26 *****
27 *****
28 *****
29 *****
30 ********************
31 *****
32 *****
33 *****
34 **********
35 **********
36 **********
37 **********
38 **********
39 **********
40 **********
41 **********
42 進程結束了            # 出問題了
43 **********
44 **********
 修改之
 1 import time
 2 from multiprocessing import Process
 3 
 4 
 5 def func(a, b):
 6     print("*" * a)
 7     time.sleep(6)
 8     print("*" * b)
 9 
10 
11 if __name__ == '__main__':
12     p_list = []
13     for i in range(5):
14         p = Process(target=func, args=(5, 10))
15         p_list.append(p)
16         p.start()
17     [p.join() for p in p_list]      
18     print("進程結束了")  # 查看當前進程的進程號
19 
20 >>>*****
21 *****
22 *****
23 *****
24 *****
25 **********
26 **********
27 **********
28 **********
29 **********
30 進程結束了    
 [p.join() for p in p_list] 的引用場景
 1 # 需求想500個文件裏面寫數據,用異步的方式實現
 2 import os
 3 from multiprocessing import Process
 4 
 5 
 6 def func(file_name, contents):
 7     with open(file_name, 'w') as f:
 8         f.write(contents * '+')
 9 
10 
11 if __name__ == '__main__':
12     p_list = []
13     for i in range(5):
14         p = Process(target=func, args=("info{0}".format(i), i))
15         p_list.append(p)
16         p.start()
17     [p.join() for p in p_list]
18     print([i for i in os.walk(r'G:\進線程')])

 

開啓多進程的第二種方式      —— 繼承

最簡易的
 1 import os
 2 from multiprocessing import Process
 3 
 4 
 5 # 開啓多進程的第二種方式
 6 class MyProcess(Process):
 7     def run(self):
 8         print(os.getpid())
 9 
10 
11 if __name__ == '__main__':
12     print("主:", os.getpid())
13     p1 = MyProcess()
14     p1.start()
15     p2 = MyProcess()
16     p2.start()
17 
18 >>>主: 7824
19 12324
20 14660  
 1 # Process 類中的屬性一覽
 2 
 3 class Process(object):
 4     def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
 5         self.name = ''             # 進程名
 6         self.daemon = False        # 
 7         self.authkey = None        #
 8         self.exitcode = None
 9         self.ident = 0
10         self.pid = 0               # 進程號
11         self.sentinel = None
12         self.is_alive = ''         # 判斷子進程是否活着
13         self.terminate = ''        # 結束一個子進程
升級,傳參數
 1 import os
 2 from multiprocessing import Process
 3 
 4 
 5 # 開啓多進程的第二種方式
 6 class MyProcess(Process):
 7     def __init__(self, args1, args2):
 8         super(MyProcess, self).__init__()
 9         self.arg1 = args1
10         self.arg2 = args2
11 
12     def run(self):
13         print(self.pid)
14         print(self.name)
15         print(self.arg1)
16         print(self.arg2)
17 
18 
19 if __name__ == '__main__':
20     print("主:", os.getpid())
21     p1 = MyProcess(10, 20)
22     p1.start()
23     p2 = MyProcess(11, 22)
24     p2.start()
25     
26 >>>主: 6916
27 9288
28 MyProcess-1
29 10
30 20
31 7412
32 MyProcess-2
33 11
34 22    

多進程之間數據是隔離的

  • 進程與進程之間數據是隔離的json

  • 父進程與子進程之間的數據也是隔離的windows

 1 from multiprocessing import Process
 2 
 3 def work():
 4     global n
 5     n=0
 6     print('子進程內: ',n)
 7 
 8 
 9 if __name__ == '__main__':
10     n = 100
11     p=Process(target=work)
12     p.start()
13     p.join()        # 執行玩子進程以後在執行父進程
14     print('主進程內: ',n)
15     
16 >>>子進程內:  0
17 主進程內:  100
18 
19 # 在子進程中設置了全局變量可是在主進程中的n的值並無發生改變,得出結論:主進程與子進程之間的數據也是隔離的
使用socket實現聊天併發
# server端

from socket import *
from multiprocessing import Process

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__': #windows下start進程必定要寫到這下面
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()

 

# client端

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
 

守護進程

什麼是守護進程?

        守護進程是一種新的進程,本身開的一個子進程轉換成的。當在一個主進程中開啓子進程的時候,想要子進程隨着主進程的結束而結束,那麼就要將子進程變成一個守護進程。安全

關鍵字子進程轉換成的,那麼該如何轉換啦?
        先看一個現象,代碼以下
import time
from multiprocessing import Process


# 守護進程
def func():
    while 1:
        time.sleep(0.5)
        print('2222')


if __name__ == '__main__':
    p = Process(target=func)
    # p.deamon = True
    p.start()
    i = 0
    while i < 4:
        print('11111111')
        time.sleep(1)
        i += 1
        
# 查看輸出的結果
>>>11111111
2222
11111111
2222
2222
11111111
2222
2222
11111111
2222
2222
2222
2222
2222
...

 

 

上面的2會一直執行下去。這顯然不是咱們但願看見的效果,我但願當主進程執行結束以後,子進程也會跟着結束。守護進程就是作這件事的。如何來設置守護進程 。很簡單加上一段代碼!服務器

 1 import time
 2 from multiprocessing import Process
 3 
 4 
 5 # 守護進程
 6 def func():
 7     while 1:
 8         time.sleep(0.5)
 9         print('2222')
10 
11 
12 if __name__ == '__main__':
13     p = Process(target=func)
14     p.daemon = True     # 在start以前加
15     p.start()
16     i = 0
17     while i < 4:
18         print('11111111')
19         time.sleep(1)
20         i += 1
21 
22 >>>11111111
23 2222
24 11111111
25 2222
26 2222
27 11111111
28 2222
29 2222
30 11111111
31 2222
32 2222

改進:添加守護進程網絡

 1 import time
 2 from multiprocessing import Process
 3 
 4 
 5 def func():
 6     while 1:
 7         time.sleep(0.5)
 8         print('2222')
 9 
10 
11 if __name__ == '__main__':
12     p = Process(target=func)
13     p.daemon = True     # 在start以前加
14     p.start()
15     i = 0
16     while i < 4:
17         print('11111111')
18         time.sleep(1)
19         i += 1
20 
21 >>>11111111
22 2222
23 11111111
24 2222
25 2222
26 11111111
27 2222
28 2222
29 11111111
30 2222
31 2222        

有上面的現象引出結論:session

  • 主進程會隨着子進程代碼的結束而結束(子進程若是是個死循環,那麼程序會一致執行下去)
  • 守護進程會隨着    主進程的代碼的執行完畢,而結束。
  • p.terminate()       結束一個子進程,並不會當即生效。

進程鎖

什麼叫作進程鎖?

進程鎖適用於多進程的程序,就是將異步的程序變成同步的

        模擬一個現象來引出鎖的概念。

        春運到了你們都在搶火車票。100我的搶一張火車票(併發的過程)。但最後只能賣出去一張(心塞~)。

買票的過程用代碼來表示就是這樣的

#文件db的內容爲:{"count":1}
#注意必定要用雙引號,否則json沒法識別
#併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(0.1) #模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2) #模擬寫數據的網絡延遲
        json.dump(dic,open('db','w'))
        print('\033[43m購票成功\033[0m')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(100): #模擬併發100個客戶端搶票
        p=Process(target=task)
        p.start()

引出概念爲子進程加鎖

# 由併發變成了串行,犧牲了運行效率,但避免了競爭
import random
from multiprocessing import Process,Lock

def func(n, l):
    l.acquire()
    time.sleep(random.random())
    print("爲子線程加鎖了%s" % n)
    l.release()


if __name__ == '__main__':
    l = Lock()
    for i in range(3):
        p = Process(target=func, args=(i, l))
        p.start()

>>>爲子線程加鎖了0
爲子線程加鎖了1
爲子線程加鎖了2        
        
# --------------------------------------------------------------

def func(n):
    time.sleep(random.random())
    print("爲子線程加鎖了%s" % n)


if __name__ == '__main__':
    for i in range(3):
        p = Process(target=func, args=(i))
        p.start()

>>>爲子線程加鎖了1
爲子線程加鎖了2         
爲子線程加鎖了0



# 對比兩段代碼的執行結果——加鎖使得子進程變成了‘有序’的狀態了

加鎖實現春運買票

 1 #文件db的內容爲:{"count":5}
 2 #注意必定要用雙引號,否則json沒法識別
 3 #併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
 4 from multiprocessing import Process,Lock
 5 import time,json,random
 6 def search():
 7     dic=json.load(open('db'))
 8     print('\033[43m剩餘票數%s\033[0m' %dic['count'])
 9 
10 def get():
11     dic=json.load(open('db'))
12     time.sleep(random.random()) #模擬讀數據的網絡延遲
13     if dic['count'] >0:
14         dic['count']-=1
15         time.sleep(random.random()) #模擬寫數據的網絡延遲
16         json.dump(dic,open('db','w'))
17         print('\033[32m購票成功\033[0m')
18     else:
19         print('\033[31m購票失敗\033[0m')
20 
21 def task(lock):
22     search()
23     lock.acquire()
24     get()
25     lock.release()
26 
27 if __name__ == '__main__':
28     lock = Lock()
29     for i in range(100): #模擬併發100個客戶端搶票
30         p=Process(target=task,args=(lock,))
31         p.start()

鎖的總結

1 #加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
2 雖然能夠用文件共享數據實現進程間通訊,但問題是:
3 1.效率低(共享數據基於文件,而文件是硬盤上的數據)
4 2.須要本身加鎖處理
5 
6 #所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
7 隊列和管道都是將數據存放於內存中
8 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,
9 咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

多進程中的組件

信號量(規定了鎖的個數)

互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。

 1 import time
 2 import random
 3 from multiprocessing import Process
 4 from multiprocessing import Semaphore
 5 
 6 
 7 # 信號量
 8 def ktv(i, sem):
 9     sem.acquire()           # 獲取鑰匙
10     print('%s走進KTV' % i)
11     time.sleep(random.randint(1, 5))
12     print('%s走出了KTV' % i)
13     sem.release()
14 
15 
16 if __name__ == '__main__':
17     sem = Semaphore(4)      # 設置4個鎖
18     for i in range(20):
19         p = Process(target=ktv, args=(i, sem))
20         p.start()
21         
22 >>>0走進KTV
23 1走進KTV
24 2走進KTV
25 3走進KTV
26 1走出了KTV
27 4走進KTV
28 2走出了KTV
29 6走進KTV
30 0走出了KTV
31 ...

感受有點意思

事件

事件:經過一個信號     控制多個程序的執行或者阻塞    
            一個事件被建立以後默認是阻塞狀態
 1 from multiprocessing import Event
 2 
 3 
 4 # 一個信號可使全部的進程都進入阻塞狀態,也能夠控制全部的進程接觸阻塞。
 5 # 一個事件被建立以後默認是阻塞狀態
 6 e = Event()                         # 建立了一個事件
 7 print(e.is_set())                   # 查看是否阻塞,默認是阻塞狀態的
 8 e.set()                             # 設置阻塞
 9 print(e.is_set())
10 e.wait()                            # 根據e.is_set()的值決定是否阻塞
11 print("wahaha")
12 e.clear()                           # 清除阻塞
13 print(e.is_set())
14 e.wait()
15 print('shuangwaiwai')               # 阻塞了就不會在打印了
案例:紅綠燈事件
 1 from multiprocessing import Event
 2 
 3 
 4 def cars(e, i):
 5     if not e.is_set():
 6         print('car%i在等待' % i)
 7         e.wait()
 8     print("car%i經過" % i)
 9 
10 
11 # 紅綠燈事件
12 def light(e):
13     while True:
14         if e.is_set():
15             e.clear()
16             print("紅燈亮了")
17         else:
18             e.set()
19             print('綠燈亮了')
20         time.sleep(2)
21 
22 
23 if __name__ == '__main__':
24     e = Event()
25     traffic = Process(target=light, args=(e, ))
26     traffic.start()
27     for i in range(20):
28         car = Process(target=cars, args=(e, i))
29         car.start()
30         time.sleep(random.random())

 

進程之間的通訊——隊列和管道

隊列

 1 from multiprocessing import Queue, Process
 2 
 3 
 4 def produce(q):
 5     q.put("hello")    # 向對列中放入數據
 6 
 7 
 8 def consume(q):
 9     print(q.get())    # 向隊列中取數據
10 
11 
12 if __name__ == '__main__':
13     q = Queue()
14     p = Process(target=produce, args=(q,))
15     p.start()
16     c = Process(target=consume, args=(q, ))
17     c.start()

生產者消費者模型

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

 1 from multiprocessing import Process, Queue
 2 import random
 3 import time
 4 
 5 
 6 def consumer(q, name):
 7     # 定義消費者
 8     while True:
 9         food = q.get()
10         if food is None:
11             print("%s獲取到了一個空" % name)
12             break
13         print("%s消費了%s" % (name, food))
14         time.sleep(random.randint(1, 3))
15 
16 
17 def produce(name, food, q):
18     # 定義生產者
19     for i in range(10):
20         time.sleep(random.randint(1, 3))
21         f = "{0}生產了{1}{2}".format(name, food, i)
22         print(f)
23         q.put(f)
24 
25 
26 if __name__ == '__main__':
27     q = Queue(10)
28     p1 = Process(target=produce, args=("lee", "包子", q))
29     p2 = Process(target=produce, args=("dan", "泔水", q))
30     c1 = Process(target=consumer, args=(q, 'pon'))
31     c2 = Process(target=consumer, args=(q, 'toon'))
32     p1.start()
33     p2.start()
34     c1.start()
35     c2.start()
36     p1.join()
37     p2.join()
38     q.put(None)
39     q.put(None)
使用JoinableQueue改進代碼
 1 # 升級版
 2 from multiprocessing import Process, JoinableQueue
 3 import random
 4 import time
 5 
 6 
 7 def consumer(q, name):
 8     while True:
 9         food = q.get()
10         print("%s消費了%s" % (name, food))
11         time.sleep(random.randint(1, 3))
12         q.task_done()    # 使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。
13 
14 
15 def produce(name, food, q):
16     # 定義生產者
17     for i in range(10):
18         time.sleep(random.randint(1, 3))
19         f = "{0}生產了{1}{2}".format(name, food, i)
20         print(f)
21         q.put(f)
22     q.join()        # 當隊列中全部的包子都被消費玩了,程序執行完
23 
24 
25 if __name__ == '__main__':
26     q = JoinableQueue(20)
27     p1 = Process(target=produce, args=("lee", "包子", q))
28     p2 = Process(target=produce, args=("dan", "泔水", q))
29     c1 = Process(target=consumer, args=(q, 'pon'))
30     c2 = Process(target=consumer, args=(q, 'toon'))
31     p1.start()
32     p2.start()
33     c1.daemon = True    # 建立一個守護進程  循環的案例中,隨着主進程的結束,子進程也會跟着結束   因此他的做用是判斷 主進程是否結束
34     c2.daemon = True    # 建立一個守護進程  循環的案例中,隨着主進程的結束,子進程也會跟着結束
35     c1.start()
36     c2.start()
37     p1.join()           # join執行完,那麼主進程就執行完
38     p2.join()           # join執行完,那麼主進程就執行完

 

管道

管道是什麼?

Linux進程間通訊方式的一種,管道有兩端,讀端和寫端。建立管道,而後從父進程fork出子進程,
父進程和子進程擁有共同的讀寫文件描述符,能夠實現子進程寫文件,父進程讀文件的操做。

 

 1 # 管道
 2 from multiprocessing import Pipe, Process
 3 
 4 
 5 def func(conn1, conn2):
 6     conn2.close()
 7     while True:
 8         try:
 9             msg = conn1.recv()
10             print(msg)
11         except EOFError:
12             conn1.close()
13             break
14 
15 
16 if __name__ == '__main__':
17     conn1, conn2 = Pipe()
18     Process(target=func, args=(conn1, conn2)).start()
19     conn1.close()
20     for i in range(20):
21         conn2.send("吃了麼")
22     conn2.close()

進程之間的數據共享Manager模塊

 1 from multiprocessing import Manager, Process
 2 
 3 
 4 def main(dic):
 5     dic['count'] -= 1
 6     print(dic)
 7 
 8 
 9 if __name__ == '__main__':
10     m = Manager()
11     dic = m.dict({'count': 100})
12     p_lst = []
13     p = Process(target=main, args=(dic, ))
14     p.start()
15     p.join()
16     print('主線程:', dic)
17 
18 
19 >>>{'count': 99}
20 主線程: {'count': 99}

 上面的作到了數據共享,but:

 1 from multiprocessing import Manager, Process
 2 
 3 
 4 def main(dic):
 5     dic['count'] -= 1
 6 
 7 
 8 if __name__ == '__main__':
 9     m = Manager()
10     dic = m.dict({'count': 100})
11     p_list = []
12     for i in range(30):
13         p = Process(target=main, args=(dic,))
14         p.start()
15         p_list.append(p)
16     for i in p_list: p.join()
17     print('主程序', dic)
18 
19 >>>主程序 {'count': 70}
玄學開始了
 1 from multiprocessing import Manager, Process, Lock
 2 
 3 
 4 def main(dic, lock):
 5     lock.acquire()
 6     dic['count'] -= 1
 7     lock.release()
 8 
 9 
10 if __name__ == '__main__':
11     m = Manager()
12     lock = Lock()
13     dic = m.dict({'count': 100})
14     p_list = []
15     for i in range(30):
16         p = Process(target=main, args=(dic, lock))
17         p.start()
18         p_list.append(p)
19     for i in p_list: p.join()
20     print('主程序', dic)
21 
22 >>>主程序 {'count': 70}
加鎖
好 這個時候來一波總結
 
 
 
 
 

進程池

什麼是進程池?爲何要用進程池?

        在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。那麼咱們要怎麼作呢?

        在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。

 1 # 進程池
 2 import time
 3 from multiprocessing import Pool, Process
 4 
 5 
 6 def func(n):
 7     for i in range(5):
 8         print(n+1)
 9 
10 
11 if __name__ == '__main__':
12     start = time.time()
13     pool = Pool(5)
14     pool.map(func, range(100))
15     t1 = time.time() - start
16 
17     start = time.time()
18     p_List = []
19     for i in range(100):
20         p = Process(target=func, args=(i, ))
21         p_List.append(p)
22         p.start()
23     for p in p_List:
24         p.join()
25     t2 = time.time() - start
26     print(t1, t2)
27     
28     
29 >>>0.22115707397460938 6.861854076385498        # 差距是至關的明顯
 1 import os, time
 2 from multiprocessing import Pool
 3 
 4 
 5 def func(n):
 6     print('start func%s' % n, os.getpid())
 7     time.sleep(3)  # 模擬代碼執行
 8     print('end func%s' % n, os.getpid())
 9 
10 
11 if __name__ == '__main__':
12     p = Pool(5)
13     for i in range(10):
14         p.apply_async(func, args=(i, ))
15     p.close()  # 結束進程池接收任務
16     p.join()  # 感知進程池中的任務執行結束
17 
18 >>>start func0 17124
19 start func1 14436
20 start func2 8872
21 start func3 7396
22 start func4 1268
23 end func0 17124
24 start func5 17124
25 end func1 14436
26 start func6 14436
27 end func2 8872
28 start func7 8872
29 end func3 7396
30 start func8 7396
31 end func4 1268
32 start func9 1268
33 end func5 17124
34 end func6 14436
35 end func7 8872
36 end func8 7396
37 end func9 1268
開啓進程池
實現一個基於進程池的TCP
 1 # 基於進程池的tcp
 2 # client 端
 3 import socket
 4 
 5 
 6 sk = socket.socket()
 7 sk.connect(('127.0.0.1', 8080))
 8 
 9 ret = sk.recv(1024).decode('utf-8')
10 print(ret)
11 msg = input('>>>').encode('utf-8')
12 sk.send(msg)
13 sk.close()
 1 # server 端
 2 import socket
 3 
 4 
 5 sk = socket.socket()
 6 sk.bind(('127.0.0.1', 8080))
 7 sk.listen()
 8 while True:
 9     conn, addr = sk.accept()
10     conn.send(b'hello')
11     print(conn.recv(1024).decode('utf-8'))
12     conn.close()
13 sk.close()
改進
 1 # server 端
 2 import socket
 3 from multiprocessing import Pool
 4 
 5 
 6 def func(conn):
 7     conn.send(b'hello')
 8     print(conn.recv(1024).decode('utf-8'))
 9     conn.close()
10 
11 
12 if __name__ == '__main__':
13     p = Pool(5)
14     sk = socket.socket()
15     sk.bind(('127.0.0.1', 8080))
16     sk.listen()
17     while True:
18         conn, addr = sk.accept()
19         p.apply_async(func, args=(conn, ))
20     sk.close()
進程池的返回值
 1 # 進程池的返回值
 2 from multiprocessing import Pool
 3 def func(i):
 4     return i*i
 5 
 6 
 7 if __name__ == '__main__':
 8     p = Pool(5)
 9     ret_list = []
10     for i in range(10):
11         ret = p.apply_async(func, args=(i, ))
12         # print(ret)    獲得的是對象
13         ret_list.append(ret)
14     for ret in ret_list: print(ret.get())  # 等着func的計算結果
15 
16 >>>0
17 1
18 4
19 9
20 16
21 25
22 36
23 49
24 64
25 81       
進程池的回調函數
 1 # 進程池的回調函數
 2 from multiprocessing import Pool
 3 
 4 
 5 def func1(i):
 6     print('in func1')
 7     return i*i
 8 
 9 def func2(n):
10     print('in func2' )
11     print(n)
12 
13 
14 if __name__ == '__main__':
15     p = Pool(5)
16     p.apply_async(func1, args=(10, ), callback=func2)
17     p.close()
18     p.join()
19 
20 >>>in func1
21 in func2
22 100    

 進程池的執行位置

 1 # 進程池的回調函數
 2 import os
 3 from multiprocessing import Pool
 4 
 5 
 6 def func1(i):
 7     print('in func1', os.getpid())
 8     return i*i
 9 
10 
11 def func2(n):
12     print('in func2', os.getpid())
13     print(n)
14 
15 
16 if __name__ == '__main__':
17     print('主進程:', os.getpid())
18     p = Pool(5)
19     p.apply_async(func1, args=(10, ), callback=func2)   # 注意這裏回調函數的執行位置,實在主進程中執行的
20     p.close()
21     p.join()
22 
23 >>>主進程: 13732
24 in func1 5292
25 in func2 13732
26 100
利用進程池起爬蟲的案例
相關文章
相關標籤/搜索