複製代碼 1 import os 2 import time 3 from multiprocessing import Process 4 5 6 def f(name): 7 print('in f', os.getpid(), os.getppid()) 8 print('i am is son process', name) 9 10 11 if __name__ == '__main__': 12 p = Process(target=f, args=('小青',)) 13 p.start() # start不是運行一個程序,而是調用操做系統的命令,要建立子進程 14 15 print('我是主程序……') 》》》結果: 我是主程序…… in f 6016 12312 i am is son process 小青 結論:在另外一個地方開闢內存空間,執行f函數,主進程不會阻塞等待
1 def f(args): 2 print('in func 2', args, os.getpid(), os.getppid()) 3 4 5 if __name__ == '__main__': 6 print('我在主進程中……') 7 p1 = Process(target=f, args=(666,)) 8 p1.start() 9 p2 = Process(target=f, args=(777,)) 10 p2.start() 11 print('我會在子進程前運行……,由於我和他們是隔離的,不會等他們') 12 13 ''' 14 結果: 15 我在主進程中…… 16 我會在子進程前運行……,由於我和他們是隔離的,不會等他們 17 in func 2 666 8144 7660 18 in func 2 777 10820 7660 19 '''
1 def fu(num): 2 print("我是進程 :%d " % num) 3 4 5 if __name__ == '__main__': 6 print('in main', os.getpid(), os.getppid()) 7 for i in range(10): 8 Process(target=fu, args=(i,)).start() 9 print('main 66666') 10 """ 11 結果: 12 in main 11984 2064 13 main 66666 14 我是進程 :1 15 我是進程 :0 16 我是進程 :8 17 我是進程 :7 18 我是進程 :4 19 我是進程 :2 20 我是進程 :9 21 我是進程 :3 22 我是進程 :6 23 我是進程 :5 24 p.start()並無當即執行,而是進入就緒隊列,等帶cpu調度,因此不是有序的 25 """
1 def g(args): 2 print("in g", args) 3 4 5 if __name__ == '__main__': 6 print('in main') 7 p = Process(target=g, args=(888,)) 8 p.start() 9 p.join() 10 print('i am in main process') 11 ''' 12 in main 13 in g 888 14 i am in main process 15 結:join老是等子進程執行完畢後再執行接下來的代碼 16 '''
1 def pro(i): 2 print('in func', i, os.getpid(), os.getppid()) 3 4 5 if __name__ == '__main__': 6 print('in main', os.getpid(),os.getppid()) 7 p_list = [] 8 for i in range(10): 9 p = Process(target=pro, args=(i,)) 10 p.start() # 不是運行一個程序,而是調用操做系統命令,要建立子進程,等待操做系統做業,非阻塞 11 p_list.append(p) 12 print(p_list) 13 for p in p_list: # 遍歷每一個子進程,每一個join一下,若是該子進程已經接收,join失效至關於pass,遍歷完成就能保證每一個子進程都結束了 14 p.join() # 阻塞,直到p這個子進程執行完畢以後再繼續執行 15 print('主進程……') 16 ''' 17 in main 1480 2064 18 [<Process(Process-1, started)>, <Process(Process-2, started)>, <Process(Process-3, started)>, <Process(Process-4, started)>, <Process(Process-5, started)>, <Process(Process-6, started)>, <Process(Process-7, started)>, <Process(Process-8, started)>, <Process(Process-9, started)>, <Process(Process-10, started)>] 19 in func 3 6108 1480 20 in func 7 13756 1480 21 in func 5 12548 1480 22 in func 8 12116 1480 23 in func 4 10948 1480 24 in func 6 11744 1480 25 in func 9 11244 1480 26 in func 1 3968 1480 27 in func 2 9412 1480 28 in func 0 14024 1480 29 主進程…… 30 '''
1 # p.is_alive方法:查看子進程生存狀態 2 # p.terminate() 強制結束子進程--非阻塞 3 def gro(i): 4 time.sleep(1) 5 print('in func', i, os.getpid(), os.getppid()) 6 7 8 if __name__ == '__main__': 9 print("in main") 10 p1 = Process(target=gro, args=(1,)) 11 p1.start() 12 # time.sleep(2) # 若是等待一下子,就會執行函數,若是不等,就無論操做系統去建子進程,而直接執行後面的代碼,因此可能比建立子進程前就執行了 13 print(p1.is_alive()) # 檢測子進程是否還在執行任務 14 p1.terminate() # 強制結束子進程,非阻塞,不會等待狀態改變,會立刻執行後面代碼 15 print(p1.is_alive()) 16 print('主進程的代碼執行結束了……') 17 ''' 18 in main 19 True 20 True 21 主進程的代碼執行結束了…… 22 23 結:由於直接執行,主進程執行快些,子進程函數不會執行 24 '''
1 class Myprocess(Process): 2 def __init__(self, name): 3 super().__init__(self) # 需繼承父類的init方法 4 self.name = name # 添加須要本身的屬性 5 6 def run(self): 7 print(self.name) # 只有重寫run方法才能將參數傳入 8 print(os.getppid(), os.getpid()) 9 10 11 if __name__ == '__main__': 12 p = Myprocess('小強') 13 p.start()
1 from multiprocessing import Process 2 3 n = 100 4 5 6 def func(): 7 global n 8 n = n - 1 9 return 111 10 11 12 if __name__ == '__main__': 13 n_l = [] 14 for i in range(100): 15 p = Process(target=func) 16 p.start() 17 n_l.append(p) 18 for p in n_l: p.join() 19 print(n) 20 21 結果爲:100 22 23 總結:說明子進程沒法改變主進程的全局變量,本質是沒法自由通訊,但子進程中的n確定減小了,只是無法拿出來
1 from multiprocessing import Process 2 import time 3 def func1(): 4 print('begin') 5 time.sleep(3) 6 print('wawww') 7 8 # if __name__ == '__main__': 9 # p = Process(target=func1) 10 # # p.daemon = True 11 # p.start() 12 # time.sleep(1) 13 # print('in main') 14 ''' 15 結果: 16 begin 17 in main 18 19 結論:守護進程隨着主進程結束而結束,那怕守護進程任務沒有執行完畢 20 ''' 21 22 def f1(): 23 print('begin fun1') 24 time.sleep(3) 25 print('baidu') 26 27 def f2(): 28 while True: 29 print('in f2') 30 time.sleep(0.5) 31 32 if __name__ == '__main__': 33 Process(target=f1,).start() 34 p = Process(target=f2) 35 p.daemon = True 36 # 守護進程的屬性,默認是False,若是設置成True,就表示設置這個子進程爲一個守護進程 37 # 設置守護進程的操做應該在開啓子進程以前 38 p.start() 39 time.sleep(1) 40 print('in main') # 主進程in main執行完後,守護進程就會結束,但主進程並無結束而是等另外一個子進程結束後才結束 41 42 43 # 設置成守護進程以後 會有什麼效果呢? 44 # 守護進程會在主進程的代碼執行完畢以後直接結束,不管守護進程是否執行完畢 45 46 # 應用 47 # 報活 主進程還活着 48 # 100臺機器 100個進程 10000進程 49 # 應用是否在正常工做 - 任務管理器來查看 50 # 守護進程如何向監測機制報活???send/寫數據庫 51 # 爲何要用守護進程來報活呢?爲何不用主進程來工做呢??? 52 # 守護進程報活幾乎不佔用CPU,也不須要操做系統去調度 53 # 主進程不能嚴格的每60s就發送一條信息
1 import json 2 import time 3 from multiprocessing import Lock 4 from multiprocessing import Process 5 6 # 鎖 7 lock = Lock() # 創造了一把鎖 8 lock.acquire() # 獲取了這把鎖的鑰匙 9 lock.release() # 歸還這把鑰匙,其餘進程就能夠拿鎖了 10 11 12 # 搶票的故事 13 # 需求:每一個人都能查看餘票、買相同車次票同一刻只能一人買完,另外一人才能買 14 15 16 def search(i): 17 with open('db', encoding='utf-8') as f: 18 count_dic = json.load(f) 19 time.sleep(0.2) # 模擬網絡延遲 20 print('person %s 餘票:%s 張' % (i, count_dic.get('count'))) 21 return count_dic.get('count'), count_dic # 返回餘票數,及字典 22 23 24 def buy(i): 25 count, count_dict = search(i) 26 if count > 0: 27 count_dict['count'] -= 1 # 有票就能夠買 28 print('person %s 買票成功'% i) 29 time.sleep(2) 30 with open('db', 'w', encoding='utf-8') as f: 31 json.dump(count_dict, f) # 更改餘票額度 32 33 34 def task(i, lock): 35 search(i) 36 lock.acquire() # 若是以前已經被acquire了 且 沒有被release 那麼進程會在這裏阻塞 37 buy(i) 38 lock.release() 39 40 41 if __name__ == '__main__': 42 lock = Lock() 43 for i in range(1, 11): 44 Process(target=task, args=(i, lock)).start() 45 46 # 當多個進程共享一段數據的時候,數據會出現不安全的現象, 47 # 須要加鎖來維護數據的安全性 48 49 ''' 50 D:\install\Python36\python.exe D:/install/project/七、併發編程/三、鎖.py 51 person 6 餘票:5 張 52 person 5 餘票:5 張 53 person 8 餘票:5 張 54 person 2 餘票:5 張 55 person 4 餘票:5 張 56 person 10 餘票:5 張 57 person 1 餘票:5 張 58 person 9 餘票:5 張 59 person 3 餘票:5 張 60 person 7 餘票:5 張 61 person 6 餘票:5 張 62 person 6 買票成功 63 person 5 餘票:4 張 64 person 5 買票成功 65 person 8 餘票:3 張 66 person 8 買票成功 67 person 2 餘票:2 張 68 person 2 買票成功 69 person 4 餘票:1 張 70 person 4 買票成功 71 person 10 餘票:0 張 72 person 1 餘票:0 張 73 person 9 餘票:0 張 74 person 3 餘票:0 張 75 person 7 餘票:0 張 76 77 Process finished with exit code 0 78 '''
注:進程間的數據交互,本質也用到了socket通訊,不過都是本地的,基於文件的,能夠經過將py名寫成socket來看報錯得知。node
1 #加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
2 雖然能夠用文件共享數據實現進程間通訊,但問題是: 3 1.效率低(共享數據基於文件,而文件是硬盤上的數據) 4 2.須要本身加鎖處理 5
6 #所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
7 隊列和管道都是將數據存放於內存中 8 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來, 9 咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。
1 互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。 2 假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。 3 實現: 4 信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。 5 信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念 6 7 8 import time, random 9 from multiprocessing import Semaphore, Process 10 11 # ktv 只有4個房間,即同時只能四我的進去,其餘人必須等其中的人出來才能進去 12 # 13 # sem = Semaphore(4) # 設置信號量個數,併發數 14 # sem.acquire() 15 # print('進去1我的,關門阻塞中') 16 # sem.acquire() 17 # print('進去第2我的,關門阻塞中') 18 # sem.acquire() 19 # print('進去第3我的,關門阻塞中') 20 # sem.acquire() 21 # print('進去第4我的,關門阻塞中') 22 # sem.release() # 必須歸還一把,才能繼續下面的代碼,否則一直阻塞中 23 # sem.acquire() 24 # print(6666) 25 # sem.release() 26 ''' 27 D:\install\Python36\python.exe D:/install/project/七、併發編程/四、信號量.py 28 進去1我的,關門阻塞中 29 進去第2我的,關門阻塞中 30 進去第3我的,關門阻塞中 31 進去第4我的,關門阻塞中 32 6666 33 34 Process finished with exit code 0 35 ''' 36 37 38 def ktv(num, sem): 39 sem.acquire() 40 print('person %s 進入了ktv' % num) 41 time.sleep(random.randint(1, 4)) 42 print('person %s 進出了ktv' % num) 43 sem.release() 44 45 46 if __name__ == '__main__': 47 sem = Semaphore(4) 48 for i in range(10): 49 Process(target=ktv, args=(i, sem)).start() 50 51 ''' 52 最開始是4個同時進入,以後又人出,纔能有人進 53 D:\install\Python36\python.exe D:/install/project/七、併發編程/四、信號量.py 54 person 2 進入了ktv 55 person 8 進入了ktv 56 person 9 進入了ktv 57 person 7 進入了ktv 58 person 2 進出了ktv 59 person 6 進入了ktv 60 person 7 進出了ktv 61 person 5 進入了ktv 62 person 8 進出了ktv 63 person 1 進入了ktv 64 person 9 進出了ktv 65 person 0 進入了ktv 66 person 1 進出了ktv 67 person 4 進入了ktv 68 person 6 進出了ktv 69 person 3 進入了ktv 70 person 0 進出了ktv 71 person 5 進出了ktv 72 person 4 進出了ktv 73 person 3 進出了ktv 74 75 Process finished with exit code 0 76 '''
事件:multiprocessing.Eventpython
定義:全局定義了一個‘flag’,若是標誌爲False,當程序執行event.wait方法時就會阻塞,若是爲True,那麼event.wait方法時便再也不阻塞。redis
1 from multiprocessing import Process, Event 2 import time, random 3 4 5 def car(e, n): 6 while True: 7 if not e.is_set(): # 進程剛開啓,is_set()的值是Flase,模擬信號燈爲紅色 8 print('\033[31m紅燈亮\033[0m,car%s等着' % n) 9 e.wait() # 阻塞,等待is_set()的值變成True,模擬信號燈爲綠色 10 print('\033[32m車%s 看見綠燈亮了\033[0m' % n) 11 time.sleep(random.randint(3, 6)) 12 if not e.is_set(): #若是is_set()的值是Flase,也就是紅燈,仍然回到while語句開始 13 continue 14 print('車開遠了,car', n) 15 break 16 17 18 def police_car(e, n): 19 while True: 20 if not e.is_set():# 進程剛開啓,is_set()的值是Flase,模擬信號燈爲紅色 21 print('\033[31m紅燈亮\033[0m,car%s等着' % n) 22 e.wait(0.1) # 阻塞,等待設置等待時間,等待0.1s以後沒有等到綠燈就闖紅燈走了 23 if not e.is_set(): 24 print('\033[33m紅燈,警車先走\033[0m,car %s' % n) 25 else: 26 print('\033[33;46m綠燈,警車走\033[0m,car %s' % n) 27 break 28 29 30 31 def traffic_lights(e, inverval): 32 while True: 33 time.sleep(inverval) 34 if e.is_set(): 35 print('######', e.is_set()) 36 e.clear() # ---->將is_set()的值設置爲False 37 else: 38 e.set() # ---->將is_set()的值設置爲True 39 print('***********',e.is_set()) 40 41 42 if __name__ == '__main__': 43 e = Event() 44 for i in range(10): 45 p=Process(target=car,args=(e,i,)) # 建立是個進程控制10輛車 46 p.start() 47 48 for i in range(5): 49 p = Process(target=police_car, args=(e, i,)) # 建立5個進程控制5輛警車 50 p.start() 51 t = Process(target=traffic_lights, args=(e, 10)) # 建立一個進程控制紅綠燈 52 t.start() 53 54 print('============》')
from queue import Queue # 隊列 先進先出FIFO,有序 # 應用:維護秩序的時候用的比較多,買票,搶票 q = Queue(5) # 設置隊列的長度,即元素個數,即只能放入5個元素 ret = q.qsize() # 得到當前隊列中的元素個數,此方法不許,在多進程中,此刻獲取結果時,也許其餘進程向裏面加入了元素 q.put(1111) # 向隊列中放入對象,若是隊列已滿,則阻塞等待,一直到空間可用爲止 ''' 參數: item:項目、元素、對象 block:默認True,隊列滿一直等待阻塞,False則爲不阻塞,滿則直接主動自定義報錯 timeout:阻塞等待的時間,時間到了,還不能放,則報錯Queue.Empty異常 ''' q.put_nowait(2222) # 放入元素,滿了,不等直接報錯 q.get() # 返回q即隊列中的元素,隊列中爲空,則阻塞一直等待有值爲止,通向可設置timeout q.get_nowait() # 隊列爲空時,直接報錯 q.empty() # 判斷是否爲空,空則返回True,一樣在多進程中不許 q.full() # 判斷是否滿了,滿了則返回True,多進程中不許,主要是進程是異步操做
# 例子1 一進程放 一進程取 from multiprocessing import Queue, Process def con(q): print(q.get()) # 從隊列中拿,沒有直到等到有,因此那麼它比其餘進程快,最後也能拿到數據 def pro(q): q.put(112) # 向隊列中放入112 if __name__ == '__main__': q = Queue() p = Process(target=con, args=(q,)) p.start() p = Process(target=pro, args=(q,)) p.start() print('我在主進程中……') ''' 看出隊列能夠實現進程間的通訊 ''' # 主放, 子取 from multiprocessing import Queue, Process def f(q): print(q.get()) if __name__ == '__main__': q = Queue() Process(target=f, args=(q,)).start() # create son_process q.put(666) '''看出主進程可和子進程通訊'''
1 import time 2 import random 3 from multiprocessing import Process, Queue 4 5 6 def consumer(q,name): 7 while 1: 8 food = q.get() 9 print('%s 吃了 %s ' % (name, food)) 10 time.sleep(random.random()) 11 12 13 def producer(q,name,food,n=10): 14 for i in range(1, n): # 定義生產10個食物 15 time.sleep(random.random()) # 模擬生產慢,消費快 16 fd = food + str(i) 17 print('%s 生產了 %s' %(name,fd)) 18 q.put(fd) 19 20 if __name__ == '__main__': 21 q = Queue(10) 22 for person in range(6): # 定義消費者多 23 Process(target=consumer, args=(q, 'person'+ str(person))).start() 24 Process(target=producer, args=(q,'小強','米飯')).start() 25 Process(target=producer, args=(q,'小東','麪條')).start() 26 Process(target=producer, args=(q,'小hua','麪條')).start() 27 Process(target=producer, args=(q,'小cai','麪條')).start()
消費者一直在等着拿數據,生產者生產完了就結束了,生產者須要告訴消費者生產完了才合理,即向隊列中放入stop信號
1 def consumer(q,name): 2 while 1: 3 food = q.get() 4 if food == 'stop':break 5 print('%s 吃了 %s ' % (name, food)) 6 time.sleep(random.random()) 7 8 9 def producer(q,name,food,n=10): 10 for i in range(1, n): # 定義生產10個食物 11 time.sleep(random.random()) # 模擬生產慢,消費快 12 fd = food + str(i) 13 print('%s 生產了 %s' %(name,fd)) 14 q.put(fd) 15 q.put('stop') 16 17 if __name__ == '__main__': 18 q = Queue(10) 19 for person in range(4): # 定義消費者多 20 Process(target=consumer, args=(q, 'person'+ str(person))).start() 21 Process(target=producer, args=(q,'小強','米飯')).start() 22 Process(target=producer, args=(q,'小東','麪條')).start() 23 Process(target=producer, args=(q,'小hua','麪條')).start() 24 Process(target=producer, args=(q,'小cai','麪條')).start() 25 26 # 生產者加入結束信號,消費者收到後就結束,不存在還有進程在使用數據,由於隊列是先進先出的 27 28 # 且有多少個消費者就須要發多少個stop信號,否則就會致使有的的進程還在等待中
1 from multiprocessing import Queue, Process 2 import random, time 3 4 5 def consumer(q, name): 6 while 1: 7 food = q.get() 8 if food == 'stop': break 9 print('%s 吃了 %s ' % (name, food)) 10 time.sleep(random.random()) 11 12 13 def producer(q, name, food, n=10): 14 for i in range(1, n): # 定義生產10個食物 15 time.sleep(random.random()) # 模擬生產慢,消費快 16 fd = food + str(i) 17 print('%s 生產了 %s' % (name, fd)) 18 q.put(fd) 19 20 21 if __name__ == '__main__': 22 q = Queue(10) 23 for person in range(4): # 定義消費者4個 24 Process(target=consumer, args=(q, 'person'+ str(person))).start() 25 p1 = Process(target=producer, args=(q,'小強','米飯')) 26 p1.start() 27 p2 = Process(target=producer, args=(q,'小東','麪條')) 28 p2.start() 29 p1.join() # 保證p生產者結束 30 p2.join() 31 q.put('stop') # 必須得發四個stop信號 32 q.put('stop') 33 q.put('stop') 34 q.put('stop')
1 JoinableQueue的實例p除了與Queue對象相同的方法以外,還具備如下方法: 2 3 q.task_done() 4 使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。 5 6 q.join() 7 生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。 8 下面的例子說明如何創建永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
1 from multiprocessing import Process,JoinableQueue 2 import time,random,os 3 def consumer(q): 4 while True: 5 res=q.get() 6 time.sleep(random.randint(1,3)) 7 print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) 8 q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走了 9 10 def producer(name,q): 11 for i in range(10): 12 time.sleep(random.randint(1,3)) 13 res='%s%s' %(name,i) 14 q.put(res) 15 print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) 16 q.join() #生產完畢,使用此方法進行阻塞,直到隊列中全部項目均被處理。 17 18 19 if __name__ == '__main__': 20 q=JoinableQueue() 21 #生產者們:即廚師們 22 p1=Process(target=producer,args=('包子',q)) 23 p2=Process(target=producer,args=('骨頭',q)) 24 p3=Process(target=producer,args=('泔水',q)) 25 26 #消費者們:即吃貨們 27 c1=Process(target=consumer,args=(q,)) 28 c2=Process(target=consumer,args=(q,)) 29 c1.daemon=True 30 c2.daemon=True 31 32 #開始 33 p_l=[p1,p2,p3,c1,c2] 34 for p in p_l: 35 p.start() 36 37 p1.join() 38 p2.join() 39 p3.join() 40 print('主') 41 42 #主進程等--->p1,p2,p3等---->c1,c2 43 #p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據 44 #於是c1,c2也沒有存在的價值了,不須要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,因此設置成守護進程就能夠了。
注:棧 ==> 先進後出----算法算法
管道:multiprocessing.Pipe
數據庫
答:不安全是因多個進程可能會在同一刻同時去取同一個數據,也可能同一刻拿走一個數據,位置就空了,別的應該放在空位置上,
而因同一刻時,另外一個進程會認爲該位置上有數據,就放在後面了,這就會致使數據異常,解決方法就是在拿數據前加鎖,拿完歸還鎖。django
1 from multiprocessing import Pipe, Process 2 3 4 # 管道 5 # 隊列是基於管道實現的 6 # 隊列 進程間數據安全的 7 # 管道 進程間數據不安全的 8 # 隊列 = 管道 + 鎖 9 10 # left, right = Pipe() 11 # print(right.recv()) 12 # (<multiprocessing.connection.PipeConnection object at 0x000002817A7FB128>, <multiprocessing.connection.PipeConnection object at 0x000002817A65C128>) 13 # 管道對象返回的是一個元組 14 15 16 # 全部的IPC通訊都是經過socket實現的 17 18 # 左邊放,右邊出,一樣能夠左收,右發,全雙工模式 19 20 # 管道必須在建立進程前建立 21 def consumer(left, right): 22 left.close() # 消費者用右邊接那麼就把左邊關閉 23 while 1: 24 try: 25 print(right.recv()) 26 except EOFError: # 再也接不到數據了,從而報錯,才能退出 27 break 28 29 30 if __name__ == '__main__': 31 left, right = Pipe() 32 p = Process(target=consumer, args=(left, right)) 33 p.start() 34 right.close() # 用右邊發送,那麼左邊就關閉 35 for i in range(1, 11): 36 left.send(3333) 37 left.close() # 不用了就關閉 38 39 # EOF異常的觸發 40 # 在這一個進程中 若是不在用這個端點了,應該close 41 # 這一在recv的時候,若是其餘端點都被關閉了,就可以知道不會在有新的消息傳進來 42 # 此時就不會在這裏阻塞等待,而是拋出一個EOFError 43 # * close並非關閉了整個管道,而是修改了操做系統對管道端點的引用計數的處理
進程之間數據共享
消息傳遞的併發是趨勢
線程是經過線程集合,用消息隊列來交換數據
進程間應儘可能避免通訊,由於可能不安全,想安全就必須加鎖,加鎖就會影響效率。
redis分佈式、數據庫解決進程之間數據共享問題
'''
進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的
雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此
'''
# Manager模塊
# 全部的數據類型 都可以進行數據共享
# 一部分都是不加鎖 不支持數據進程安全
# 不安全的解決辦法 加鎖
1 from multiprocessing import Manager,Process,Lock 2 def work(d,lock): 3 with lock: 4 d['count'] -= 1 5 6 if __name__ == '__main__': 7 lock = Lock() 8 m = Manager() 9 dic = m.dict({'count':100}) 10 p_l = [] 11 for i in range(100): # 開了100進程 12 p = Process(target=work, args=(dic, lock)) 13 p_l.append(p) 14 p.start() 15 for p in p_l: 16 p.join() 17 print(dic) 18 ''' 19 結果:{'count':0} 20 ''' 21 22 # with as 的機制 23 # __enter__ 24 # __exit__
1 import os 2 import time 3 from multiprocessing import Pool 4 print(os.cpu_count()) # 獲取cpu個數 5 6 7 def wahaha(): 8 time.sleep(1) 9 print(os.getpid()) 10 return True 11 12 13 if __name__ == '__main__': 14 p = Pool(5) # 進程池中進程數通常爲cpu個數或者cpu+1,不要超過10個 15 for i in range(20): 16 # p.apply(func=wahaha) # 同步,通常不用,還不如一個進程去循環作,進程池有返回值,基於ipc通訊,本身能夠經過q來通訊 17 p.apply_async(func=wahaha) # async 異步的 18 p.close() # 關閉進程池,進程池中的進程不工做了,讓任務不能再繼續提交了, 19 p.join() # 等待這個池中提交的任務都執行完,就結束
異步提交,不獲取返回值編程
def wahaha(): time.sleep(1) print(os.getpid()) if __name__ == '__main__': p = Pool(5) # CPU的個數 或者 +1 ret_l = [] for i in range(20): ret = p.apply_async(func = wahaha) # async 異步的 ret_l.append(ret) p.close() # 關閉 並非進程池中的進程不工做了 # 而是關閉了進程池,讓任務不能再繼續提交了 p.join() # 等待這個池中提交的任務都執行完 # # 表示等待全部子進程中的代碼都執行完 主進程才結束
異步提交,得到返回值,等待全部任務都執行完畢以後再統一獲取結果json
1 # 異步提交,獲取返回值,等待全部任務都執行完畢以後再統一獲取結果 2 def wahaha(): 3 time.sleep(1) 4 print(os.getpid()) 5 return True 6 7 if __name__ == '__main__': 8 p = Pool(5) # CPU的個數 或者 +1 9 ret_l = [] 10 for i in range(20): 11 ret = p.apply_async(func = wahaha) # async 異步的 12 ret_l.append(ret) 13 p.close() # 關閉 不是進程池中的進程不工做了 14 # 而是關閉了進程池,讓任務不能再繼續提交了 15 p.join() # 等待這個池中提交的任務都執行完 16 for ret in ret_l: 17 print(ret.get())
異步提交,得到返回值,一個任務執行完畢以後就能夠獲取到一個結果(順序是按照提交任務的順序)【用在任務關聯不大的時候】flask
1 def wahaha(): 2 time.sleep(1) 3 print(os.getpid()) 4 return True 5 6 if __name__ == '__main__': 7 p = Pool(5) # CPU的個數 或者 +1 8 ret_l = [] 9 for i in range(20): 10 ret = p.apply_async(func = wahaha) # async 異步的 11 ret_l.append(ret) 12 for ret in ret_l: 13 print(ret.get())
總結bootstrap
1 # 異步的 apply_async 2 # 1.若是是異步的提交任務,那麼任務提交以後進程池和主進程也異步了, 3 #主進程不會自動等待進程池中的任務執行完畢 4 # 2.若是須要主進程等待,須要p.join 5 # 可是join的行爲是依賴close 6 # 3.若是這個函數是有返回值的 7 # 也能夠經過ret.get()來獲取返回值 8 # 可是若是一邊提交一遍獲取返回值會讓程序變成同步的 9 # 因此要想保留異步的效果,應該講返回對象保存在列表裏,全部任務提交完成以後再來取結果 10 # 這種方式也能夠去掉join,來完成主進程的阻塞等待池中的任務執行完畢
進程池解決原生socket,同一時刻只能和一個客戶端鏈接【這種方式的弊端是同時最多隻能和進程池中的數量相同,其它用戶等待】
1 import socket 2 from multiprocessing import Pool 3 4 5 def talk(conn): 6 try: 7 while 1: 8 msg = conn.recv(1024).decode('utf-8') 9 print(msg) 10 conn.send(b'hello') 11 finally: 12 conn.close() 13 14 15 if __name__ == '__main__': 16 sk = socket.socket() 17 sk.bind(('127.0.0.1', 9999)) 18 sk.listen() 19 pool = Pool(5) 20 try: 21 while 1: 22 conn, addr = sk.accept() 23 pool.apply_async(talk, args=(conn,)) 24 finally: 25 conn.close() 26 sk.close()
1 import socket 2 3 ip_port = ('127.0.0.1', 9999) 4 sk = socket.socket() 5 sk.connect(ip_port) 6 7 while 1: 8 msg = input('>>>>:').strip() 9 if len(msg) == 0: continue 10 sk.send(msg.encode('utf-8')) 11 content = sk.recv(1024).decode('utf-8') 12 print(content)
Pool中的回調函數callback
1 import os 2 import time 3 import random 4 from multiprocessing import Pool 5 6 # 異步提交,獲取返回值,從頭至尾一個任務執行完畢以後就能夠獲取到一個結果 7 def wahaha(num): 8 time.sleep(random.random()) 9 print('pid : ',os.getpid(),num) 10 return num 11 12 def back(arg): 13 print('call_back : ',os.getpid(),arg) 14 15 if __name__ == '__main__': 16 print('主進程',os.getpid()) 17 p = Pool(5) # CPU的個數 或者 +1 18 for i in range(20): 19 ret = p.apply_async(func = wahaha,args=(i,),callback=back) # async 異步的 20 p.close() 21 p.join() 22 23 # 回調函數 _ 在主進程中執行 24 # 在發起任務的時候 指定callback參數 25 # 在每一個進程執行完apply_async任務以後,返回值會直接做爲參數傳遞給callback的函數,執行callback函數中的代碼 26 27 28 # 北京 30min 30min +5min 5min + 5min 29 # 建設 20min 直接辦 + 5min 5min 30 # 中國 1h 20min +5 25min + 5min 31 # 農業 2h 55min + 5min 55min + 5min 32 # 工商 15min 5min 15min + 5min 33 34 # 2h10min 35 # 2h05min
1 程序:是指令的集合,它是進程運行的靜態描述文本 2 進程:是程序的一次執行活動,是動態概念,計算機中最小資源分配單位 3 線程:cpu調度的最小單位,從屬於進程,任務的實際執行者
1 # 經過類Thread 2 import os 3 import time 4 from threading import Thread,get_ident 5 def func(name): 6 time.sleep(0.1) 7 print('線程:%s,該線程的進程id爲:%s,線程id爲:%s' %(name,os.getpid(),get_ident())) # get_ident()爲類get_ident中的方法,做用是獲取線程id 8 for i in range(10): 9 t = Thread(target=func, args=(i,)) 10 t.start() 11 12 》》》》結果: 13 線程:3,該線程的進程id爲:14140,線程id爲:13352 14 線程:0,該線程的進程id爲:14140,線程id爲:1672 15 線程:1,該線程的進程id爲:14140,線程id爲:15808 16 線程:2,該線程的進程id爲:14140,線程id爲:10136 17 線程:7,該線程的進程id爲:14140,線程id爲:4508 18 線程:6,該線程的進程id爲:14140,線程id爲:15068 19 線程:4,該線程的進程id爲:14140,線程id爲:13532 20 線程:5,該線程的進程id爲:14140,線程id爲:10836 21 線程:9,該線程的進程id爲:14140,線程id爲:9520 22 線程:8,該線程的進程id爲:14140,線程id爲:15568
1 # 自定義類建立多線程 【繼承Thread類,同時重些run方法,如需添加本身的屬性或者說加入參數繼承父類init方法】 2 import os 3 from threading import Thread, get_ident 4 5 6 class Mythread(Thread): 7 def __init__(self, args): 8 super().__init__() 9 self.args = args 10 11 def run(self): 12 """ 13 執行函數 14 :return: 15 """ 16 print(self.args) # 就能夠調用本身自定義的屬性了 17 print('in thread 子線程Id:', get_ident(), '進程Id:', os.getpid()) 18 19 20 print("父進程python解釋器:", os.getppid()) 21 print('進程即執行py文件:', os.getpid()) 22 print('主線程:', get_ident()) 23 t_obj = Mythread('china') 24 t_obj.start() 25 ''' 26 結果: 27 父進程python解釋器: 1168 28 進程及執行py文件: 2724 29 主線程: 7468 30 china 31 in thread 子線程Id: 8180 進程Id: 2724 32 '''
1 '''效率:多線程開閉切開銷遠遠小於進程隔了大幾百倍''' 2 3 def func(a): 4 a += 1 5 6 7 if __name__ == '__main__': 8 start = time.time() 9 t_lis = [] 10 for i in range(50): 11 t = Thread(target=func, args=(i,)) 12 t.start() 13 t_lis.append(t) 14 for t in t_lis:t.join() 15 print('主線程') 16 print('時間:%s' % str(time.time() - start)) 17 18 start = time.time() 19 t_lis = [] 20 for i in range(50): 21 t = Process(target=func, args=(i,)) 22 t.start() 23 t_lis.append(t) 24 for t in t_lis: t.join() 25 print('主進程') 26 print('時間:%s' % str(time.time() - start)) 27 28 ''' 29 效率測試: 30 主線程 31 時間:0.008229732513427734 32 主進程 33 時間:5.307320833206177 34 '''
1 '''線程間數據共享''' 2 from threading import Thread 3 4 n = 100 # 全局變量,存在主進程內存空間中 5 6 7 def func(): 8 global n 9 n -= 1 10 print(n) 11 12 13 if __name__ == '__main__': 14 t_li = [] 15 for i in range(100): 16 t = Thread(target=func) # 始終在同一個內存空間中 17 t.start() 18 t_li.append(t) 19 for t in t_li: 20 t.join() 21 print("線程:", n) 22 ''' 23 結果: 24 n = 0 25 每一個線程中的n從99,開始遞減 26 ''' 27 28 p_li = [] 29 for i in range(100): 30 p = Process(target=func,) # 分別建立100個獨立的內存空間,且相互獨立 31 p.start() 32 p_li.append(p) 33 for p in p_li: 34 p.join() 35 print('進程:',n) 36 ''' 37 結果: 38 n = 100 39 每一個進程中的n爲99 40 '''
1 '''守護線程等待主線程結束而結束, 2 # 主線程結束,必須等全部非守護線程結束,才能結束 3 # 主線程結束,進程就結束了,進程必須保證全部非守護線程結束才行''' 4 import os 5 import time 6 from threading import Thread 7 8 def f1(): 9 print(True) 10 time.sleep(0.5) 11 print(os.getpid()) 12 13 def f2(): 14 print('in f2 start') 15 time.sleep(3) 16 print('in f2 end') 17 print(os.getpid()) 18 19 t = Thread(target=f1) 20 t.setDaemon(True) 21 t.start() 22 23 t2 = Thread(target=f2) 24 t2.start() 25 print('主線程',os.getpid()) 26 27 ''' 28 True 29 主線程 1440 30 in f2 start 31 1440 32 in f2 end 33 1440 34 '''
場景: 多我的圍着桌子吃一盤面,必須拿到叉子和麪才能吃 若是一我的拿到了叉子,另外一我的拿到了面,就不能吃,就會致使僵直在一塊兒 import time from threading import Thread,Lock lock = Lock() noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s拿到了面' % name) fork_lock.acquire() print('%s拿到了叉子' % name) print('%s在吃麪'%name) time.sleep(0.5) fork_lock.release() # 0.01 noodle_lock.release() # 0.01 def eat2(name): fork_lock.acquire() # 0.01 print('%s拿到了叉子' % name) # 0.01 noodle_lock.acquire() print('%s拿到了面' % name) print('%s在吃麪'%name) time.sleep(0.5) noodle_lock.release() fork_lock.release() eat_lst = ['alex','wusir','太白','yuan'] for name in eat_lst: # 8個子線程 7個線程 3個線程eat1,4個線程eat2 Thread(target=eat1,args=(name,)).start() Thread(target=eat2,args=(name,)).start() 結果: china拿到了面 china拿到了叉子 china在吃麪 china拿到了叉子 usa拿到了面 --------程序開始卡死
1 # 遞歸鎖解決死鎖問題 2 import time 3 from threading import Thread, RLock 4 5 lock = RLock() 6 7 8 def eat1(name): 9 lock.acquire() 10 print('%s拿到了面' % name) 11 lock.acquire() 12 print('%s拿到了叉子' % name) 13 print('%s在吃麪' % name) 14 time.sleep(0.5) 15 lock.release() # 0.01 16 lock.release() # 0.01 17 18 19 def eat2(name): 20 lock.acquire() # 0.01 21 print('%s拿到了叉子' % name) # 0.01 22 lock.acquire() 23 print('%s拿到了面' % name) 24 print('%s在吃麪' % name) 25 time.sleep(0.5) 26 lock.release() 27 lock.release() 28 29 30 eat_lst = ['china', 'beijing', 'shanghai', 'shenzhen'] 31 for name in eat_lst: # 8個子線程 7個線程 3個線程eat1,4個線程eat2 32 Thread(target=eat1, args=(name,)).start() 33 Thread(target=eat2, args=(name,)).start()
1 import time 2 from threading import Thread,Lock 3 lock = Lock() 4 def eat1(name): 5 lock.acquire() 6 print('%s拿到了面' % name) 7 print('%s拿到了叉子' % name) 8 print('%s在吃麪'%name) 9 time.sleep(0.5) 10 lock.release() # 0.01 11 12 def eat2(name): 13 lock.acquire() # 0.01 14 print('%s拿到了叉子' % name) # 0.01 15 print('%s拿到了面' % name) 16 print('%s在吃麪'%name) 17 time.sleep(0.5) 18 lock.release() 19 20 eat_lst = ['china', 'beijing', 'shanghai', 'shenzhen'] 21 for name in eat_lst: # 8個子線程 7個線程 3個線程eat1,4個線程eat2 22 Thread(target=eat1,args=(name,)).start() 23 Thread(target=eat2,args=(name,)).start() 24 25 26 》》》: 27 china拿到了面 28 china拿到了叉子 29 china在吃麪 30 china拿到了叉子 31 china拿到了面 32 china在吃麪 33 beijing拿到了面 34 beijing拿到了叉子 35 beijing在吃麪 36 beijing拿到了叉子 37 beijing拿到了面 38 beijing在吃麪 39 shanghai拿到了面 40 shanghai拿到了叉子 41 shanghai在吃麪 42 shanghai拿到了叉子 43 shanghai拿到了面 44 shanghai在吃麪 45 shenzhen拿到了面 46 shenzhen拿到了叉子 47 shenzhen在吃麪 48 shenzhen拿到了叉子 49 shenzhen拿到了面 50 shenzhen在吃麪
1 # 進程中建立信號量,與開啓進程池效率對比 2 # 池效率高於信號量 3 def ktv1(sem, i): 4 sem.acquire() 5 i += 1 6 sem.release() 7 8 9 def ktv2(i): 10 i += 1 11 12 13 # process 14 if __name__ == '__main__': 15 sem = Semaphore(5) # 同時只執行5個任務 16 start_time = time.time() 17 p_list = [] 18 for i in range(20): # 開啓20個任務 19 p = Process(target=ktv1, args=(sem, i)) # 開啓進程20個 20 p.start() 21 p_list.append(p) 22 for p in p_list: p.join() 23 print('process_semaphore:', time.time() - start_time) 24 25 pool = Pool(5) # 開啓進程池,同一時間執行5個任務 26 start_time1 = time.time() 27 pool_list = [] 28 for i in range(20): # 開啓20個任務 29 ret = pool.apply_async(func=ktv2, args=(i,)) # 異步 30 pool_list.append(ret) 31 pool.close() # 關閉進程池,再也不受理任務 32 pool.join() # 等待全部進程池中的任務結束 33 print('process_pool:', time.time() - start_time1) 34 ''' 35 process_semaphore: 2.2986388206481934 36 process_pool: 0.5303816795349121 37 ''' 38 39 ========================================== 40 # thread 類中沒有線程池 41 # 但concurrent.futures中有 42 from threading import Thread,Semaphore,currentThread 43 from concurrent.futures import ThreadPoolExecutor 44 def f(sem,i): 45 sem.acquire() 46 i += 1 47 # print("sem",currentThread().getName()) # 獲取線程名 48 sem.release() 49 50 def f2(i): 51 i += 1 52 # print("pool",currentThread().getName()) 53 54 start = time.time() 55 t_sem = Semaphore(5) # 線程信號量, 同時5個任務 56 t_list = [] 57 for i in range(20): # 20個任務,開啓20個線程 58 t = Thread(target=f, args=(t_sem, i)) 59 t.start() 60 t_list.append(t) 61 for t in t_list:t.join() 62 print("in thread sem:" , time.time() - start) 63 64 start = time.time() 65 t_pool = ThreadPoolExecutor(5) 66 67 for i in range(20): 68 ret = t_pool.submit(f2,i) 69 t_pool.shutdown() 70 end = time.time() 71 print("in thread pool:", end- start) 72 ''' 73 in thread: 0.00498652458190918 74 in thread pool: 0.001001596450805664 75 '''
1 '''事件:event,一個任務依賴另外一個任務的狀態才進行下一步 2 wait 等待事件內部的信號變成True就不阻塞了 3 set 將標誌改成True 4 clear 改爲False 5 is_set 查看標誌是否爲True 6 ''' 7 # 數據庫鏈接 8 import time 9 import random 10 from threading import Event,Thread 11 12 13 def check(e): 14 '''檢測是否可以連通數據庫,網絡''' 15 print('正在檢測兩臺機器之間的網絡狀況……') 16 time.sleep(random.randint(2,5)) 17 e.set() # 改爲True,非阻塞 18 19 20 def connet_db(e): 21 print("status:", e.is_set()) 22 e.wait() 23 print("status:", e.is_set()) 24 print('鏈接數據庫……') 25 print('鏈接數據庫成功~~~') 26 27 # e = Event() 28 # Thread(target=connet_db, args=(e,)).start() 29 # Thread(target=check, args=(e,)).start() 30 ''' 31 status: False 32 正在檢測兩臺機器之間的網絡狀況…… 33 status: True 34 鏈接數據庫…… 35 鏈接數據庫成功~~~ 36 ''' 37 38 39 def check(e): 40 '''檢測是否可以連通數據庫,網絡''' 41 print('正在檢測兩臺機器之間的網絡狀況……') 42 time.sleep(random.randint(2,5)) 43 e.set() # 改爲True,非阻塞 44 45 def connet_db(e): 46 '''3次鏈接不上就退出''' 47 n = 0 48 while n < 3: 49 if e.is_set(): 50 break # 退出循環,執行鏈接庫 51 else: 52 e.wait(timeout=0.5) 53 n += 1 54 if n == 3: 55 raise TimeoutError 56 print('鏈接數據庫……') 57 print('鏈接數據庫成功~~~') 58 59 60 e = Event() 61 Thread(target=connet_db, args=(e,)).start() 62 Thread(target=check, args=(e,)).start() 63 64 ''' 65 正在檢測兩臺機器之間的網絡狀況…… 66 Exception in thread Thread-1: 67 Traceback (most recent call last): 68 File "D:\install\Python36\lib\threading.py", line 916, in _bootstrap_inner 69 self.run() 70 File "D:\install\Python36\lib\threading.py", line 864, in run 71 self._target(*self._args, **self._kwargs) 72 File "D:/install/project/七、併發編程/十一、threading_信號量.py", line 140, in connet_db 73 raise TimeoutError 74 TimeoutError 75 '''
1 # from multiprocessing import Queue,JoinableQueue # 進程IPC隊列 2 from queue import Queue # 線程隊列 先進先出的 3 from queue import LifoQueue # 後進先出的 4 #方法: put get put_nowait get_nowait full empty qsize 5 # 隊列Queue 6 # 先進先出 7 # 自帶鎖 數據安全 8 # 棧 LifoQueue 9 # 後進先出 10 # 自帶鎖 數據安全 11 # lq = LifoQueue(5)
1 Python提供的Condition對象提供了對複雜線程同步問題的支持。 2 Condition被稱爲條件變量,除了提供與Lock相似的acquire和release方法外,還提供了wait和notify方法。 3 線程首先acquire一個條件變量,而後判斷一些條件。 4 若是條件不知足則wait; 5 若是條件知足,進行一些處理改變條件後,經過notify方法通知其餘線程,其餘處於wait狀態的線程接到通知後會從新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。
1 from threading import Condition 2 # acquire 3 # release 4 # wait 阻塞 5 # notify 讓wait解除阻塞的工具 6 # wait仍是notify在執行這兩個方法的先後 必須執行acquire和release 7 from threading import Condition,Thread 8 def func(con,i): 9 con.acquire() 10 # 判斷某條件 11 con.wait() 12 print('threading : ',i) 13 con.release() 14 15 con = Condition() 16 for i in range(20): 17 Thread(target=func,args=(con,i)).start() 18 con.acquire() 19 # 幫助wait的子線程處理某個數據直到知足條件 20 con.notify_all() 21 con.release() 22 while True: 23 num = int(input('num >>>')) 24 con.acquire() 25 con.notify(num) 26 con.release()
1 from threading import Timer 2 3 4 def func(): 5 print('執行我啦') 6 7 8 # interval 時間間隔 9 Timer(0.2, func).start() # 定時器 10 # 建立線程的時候,就規定它多久以後去執行
1 #1 介紹
2 concurrent.futures模塊提供了高度封裝的異步調用接口 3 ThreadPoolExecutor:線程池,提供異步調用 4 ProcessPoolExecutor: 進程池,提供異步調用 5 Both implement the same interface, which is defined by the abstract Executor class. 6
7 #2 基本方法
8 #submit(fn, *args, **kwargs)
9 異步提交任務 10
11 #map(func, *iterables, timeout=None, chunksize=1)
12 取代for循環submit的操做 13
14 #shutdown(wait=True)
15 至關於進程池的pool.close()+pool.join()操做 16 wait=True,等待池內全部任務執行完畢回收完資源後才繼續 17 wait=False,當即返回,並不會等待池內的任務執行完畢 18 但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢 19 submit和map必須在shutdown以前 20
21 #result(timeout=None)
22 取得結果 23
24 #add_done_callback(fn)
25 回調函數