python之併發編程進階篇9

1、守護進程和守護線程python

1)守護進程的概念json

什麼是守護進程: 守護: 在主進程代碼結束狀況下,就當即死掉 守護進程本質就是一個子進程,該子進程守護着主進程 爲什麼要用守護進程 守護進程本質就是一個子進程,因此在主進程須要將任務併發執行的時候須要開啓子進程 當該子進程執行的任務生命週期伴隨主進程整個生命週期的時候,就須要將該子進程作成守護的進程

2)建立守護進程安全

from multiprocessing import Process import time def task(x): print('%s is running' %x) time.sleep(1) print('%s is done' % x) if __name__ == '__main__': p=Process(target=task,args=('守護進程',)) p.daemon=True # 必須放到p.start()以前 p.start() time.sleep(3) print('')
View Code

3)守護線程的概念網絡

主線程要等到該進程內全部非守護線程(子線程)都死掉纔算死掉,由於主線程的生命週期 表明了該進程的生命週期,該進程必定是要等到全部非守護的線程都幹完活才應該死掉 能夠簡單理解爲: 守護線程是要等待該進程內全部非守護的線程都運行完畢才死掉

4)建立守護線程多線程

from threading import Thread import time def task(x): print('%s is running' %x) time.sleep(3) print('%s is done' % x) if __name__ == '__main__': t=Thread(target=task,args=('守護線程',)) t.daemon=True # 必須放到p.start()以前 t.start() print('')
View Code

2、互斥鎖和信號量與GIL全局解釋器鎖,死鎖及遞歸鎖併發

1)互斥鎖的意義app

互斥鎖的原理是將進程/線程內執行的部分代碼由併發執行變成穿行執行,犧牲了效率但保證數據安全 互斥鎖不能連續低執行mutex.acquire()操做,必須等到拿着鎖的進程釋放鎖mutex.release()其餘進程才能搶到

2)進程mutex=Lock()dom

from multiprocessing import Process,Lock import json import os import time import random mutex=Lock() def check(): with open('db.json','rt',encoding='utf-8') as f: dic=json.load(f) print('%s 剩餘票數:%s' %(os.getpid(),dic['count'])) def get(): with open('db.json','rt',encoding='utf-8') as f: dic=json.load(f) time.sleep(1) if dic['count']  > 0: dic['count']-=1 time.sleep(random.randint(1,3)) #模擬網絡延遲 with open('db.json','wt',encoding='utf-8') as f: json.dump(dic,f) print('%s 搶票成功' %os.getpid()) def task(mutex): # 併發查看 check() # 串行購票 mutex.acquire() get() mutex.release() if __name__ == '__main__': for i in range(7): p=Process(target=task,args=(mutex,)) p.start() # p.join() # 將p內的代碼變成總體串行
View Code

3)信號量。設置能同時執行任務的數量異步

# 好比公共廁所,能同時上廁所的有4個位置 from multiprocessing import Process,Semaphore import os import time import random sm=Semaphore(4) def go_wc(sm): sm.acquire() print('%s is wcing' %os.getpid()) time.sleep(random.randint(1,3)) sm.release() if __name__ == '__main__': for i in range(20): p=Process(target=go_wc,args=(sm,)) p.start()
View Code

4)線程問題版。線程中修改同一個數據,數據存在不安全,故障socket

from threading import Thread import time n = 100 def task(): global n temp = n time.sleep(0.1) n = temp -1

if __name__ == '__main__': t_l = [] for i in range(100): t = Thread(target=task) t_l.append(t) t.start() for t in t_l: t.join() print(n)
View Code

5)線程互斥鎖修改版

from threading import Thread,Lock import time mutex = Lock() n = 100 def task(): global n with mutex: # 拿到鎖,自動釋放鎖 temp = n time.sleep(0.1) n = temp -1

if __name__ == '__main__': t_l = [] start_time = time.time() for i in range(50): t = Thread(target=task) t_l.append(t) t.start() for t in t_l: t.join() print(n) print(time.time()- start_time)
View Code

6)GIL的意義。判斷什麼狀況下使用線程和進程

1 GIL是什麼 GIL是全局解釋器鎖,本質就是一把互斥鎖 GIL是Cpython解釋器的特性,而不是python的特性 每啓動一個進程,該進程就會有一個GIL鎖,用來控制該進程內的多個線程同一時間只有一個執行 這意味着Cpython解釋器的多線程沒有並行的效果,可是有併發的效果 2、爲何要有GIL 由於Cpython解釋器的垃圾回收機制不是線程安全的 3、GIL vs 自定義互斥鎖 在一個進程內的多個線程要想執行,首先須要搶的是GIL,GIL就至關於執行權限 python的多進程用於計算密集型 python的多線程用於IO密集型

7)計算密集型中,進程計算和線程計算對比

  進程計算,計算密集型。利用多核cpu的優點,但進程數不能超過核數的2倍,會大量消耗cpu資源。計算時間  27.400567293167114

from multiprocessing import Process import time def task1(): res=1
    for i in range(100000000): res*=i def task2(): res = 1
    for i in range(100000000): res += i def task3(): res = 1
    for i in range(100000000): res -= i def task4(): res = 1
    for i in range(100000000): res += i if __name__ == '__main__': start_time=time.time() p1=Process(target=task1) p2=Process(target=task2) p3=Process(target=task3) p4=Process(target=task4) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() stop_time=time.time() print(stop_time-start_time) #27.400567293167114
計算密集型任務用多進程

  線程進行密集計算。計算時間 86.84396719932556

import time from threading import Thread def task1(): res=1
    for i in range(100000000): res*=i def task2(): res = 1
    for i in range(100000000): res += i def task3(): res = 1
    for i in range(100000000): res -= i def task4(): res = 1
    for i in range(100000000): res += i if __name__ == '__main__': start_time=time.time() p1=Thread(target=task1) p2=Thread(target=task2) p3=Thread(target=task3) p4=Thread(target=task4) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() stop_time=time.time() print(stop_time-start_time) # 86.84396719932556
View Code

 8)IO密集型中。進程與線程對比

   進程完成時間 3.5172011852264404

import time from multiprocessing import Process def task1(): time.sleep(3) def task2(): time.sleep(3) def task3(): time.sleep(3) def task4(): time.sleep(3) if __name__ == '__main__': start_time=time.time() p1=Process(target=task1) p2=Process(target=task2) p3=Process(target=task3) p4=Process(target=task4) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() stop_time=time.time() print(stop_time-start_time) # 3.5172011852264404
View Code

   線程優點,完成時間 3.003171443939209

import time from threading import Thread def task1(): time.sleep(3) def task2(): time.sleep(3) def task3(): time.sleep(3) def task4(): time.sleep(3) if __name__ == '__main__': start_time=time.time() p1=Thread(target=task1) p2=Thread(target=task2) p3=Thread(target=task3) p4=Thread(target=task4) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() stop_time=time.time() print(stop_time-start_time) # 3.003171443939209
View Code

9)死鎖現象。釋放鎖以前,都須要獲取到對方的鎖,形成了沒法釋放鎖

from threading import Thread,Lock import time mutexA = Lock() mutexB = Lock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 搶到了A鎖' %self.name) mutexB.acquire() print('%s 搶到了B鎖' % self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 搶到了B鎖' % self.name) time.sleep(1) mutexA.acquire() print('%s 搶到了A鎖' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(2): t = Mythread() t.start()
View Code

10)遞歸鎖。RLock,解決死鎖現象。遞歸鎖能夠連續acquire()

from threading import Thread,RLock import time mutexA = mutexB = RLock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 搶到了A鎖' %self.name) mutexB.acquire() print('%s 搶到了B鎖' % self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 搶到了B鎖' % self.name) time.sleep(1) mutexA.acquire() print('%s 搶到了A鎖' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(2): t = Mythread() t.start()
View Code

3、IPC機制或隊列和生產者模型

  IPC:進程間通訊,有兩種解決方案:隊列、管道

1)隊列,先進先出。應用於生產者模型

from multiprocessing import Queue q=Queue(maxsize=3) q.put({'x':1}) q.put(2) q.put('third') print(q.get()) print(q.get()) print(q.get())
View Code

  默認不加參數,超過隊列最大值會堵塞。 q.put(1,block=False)  超過最大值,程序中斷。效果等同於 q.put_nowait(1)。

 timeout=3 超時時間,block=True的時候,纔有意義

2)生產者模型的意義

1、什麼是生產者消費者模型 生產者消費者模型指的是一種解決問題的思路 該模型中包含兩類明確的角色: 1、生產者:創造數據的任務 2、消費者:處理數據的任務 2、爲何要用生產者消費者模型? 1、實現生產者與消費者任務的解耦和 2、平衡了生產者的生產力與消費者消費力 一旦程序中出現明顯的兩類須要併發執行的任務,一類是負責數據的,另一類是負責處理數據的 那麼就可使用生產者消費者模型來提高執行效率 3、如何用 生產者----》隊列《-------消費者 隊列 1、隊列佔用的是內存控制,即使是不指定隊列的大小也不可能無限制地放數據 2、隊列是用來傳遞消息的介質,即隊列內存放的是數據量較小的數據

2)生產者模型Queue,消費者卡住的不完善版本

from multiprocessing import Queue,Process import time def producer(name,q): for i in range(5): res = '包子%s' %i time.sleep(0.5) print('\033[45m廚師%s 生成了%s\033[0m' %(name,res)) q.put(res) def consumer(name,q): while True: res = q.get() time.sleep(1) print('\033[47m吃貨%s 吃了%s\033[0m'%(name,res)) if __name__ == '__main__': q = Queue() # 生產者們 p1 = Process(target=producer,args=('egon',q)) # 消費者們 c1 = Process(target=consumer,args=('alex',q)) p1.start() c1.start() print("")
View Code

3)low版生產者模型Queue,結束信號None

from multiprocessing import Queue,Process import time def producer(name,food,q): for i in range(5): res = '%s%s' %(food,i) time.sleep(0.5) print('\033[45m廚師%s 生成了%s\033[0m' %(name,res)) q.put(res) def consumer(name,q): while True: res = q.get() time.sleep(1) print('\033[47m吃貨%s 吃了%s\033[0m'%(name,res)) if __name__ == '__main__': q = Queue() # 生產者們 p1 = Process(target=producer, args=('egon','蛋糕',q)) p2 = Process(target=producer, args=('lxx','麪包' ,q)) p3 = Process(target=producer, args=('cxx','炸彈' ,q)) # 消費者們 c1 = Process(target=consumer,args=('alex',q)) c2 = Process(target=consumer, args=('wcc', q)) p1.start() pfile:/D:/oldboyedu/manth-03/day-03/tet2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() q.put(None) q.put(None) print('') print("")
View Code

4)生產者模型最終版本 JoinableQueue,以守護進程的方式來結束

from multiprocessing import JoinableQueue,Process import time def producer(name,food,q): for i in range(5): res = '%s%s' %(food,i) time.sleep(0.5) print('\033[45m廚師%s 生成了%s\033[0m' %(name,res)) q.put(res) def consumer(name,q): while True: res = q.get() time.sleep(1) print('\033[47m吃貨%s 吃了%s\033[0m'%(name,res)) q.task_done() # 消費者拿了一個,隊列就少了一個 if __name__ == '__main__': q = JoinableQueue() # 生產者們 p1 = Process(target=producer, args=('egon','蛋糕',q)) p2 = Process(target=producer, args=('lxx','麪包' ,q)) p3 = Process(target=producer, args=('cxx','炸彈' ,q)) # 消費者們 c1 = Process(target=consumer,args=('alex',q)) c2 = Process(target=consumer, args=('wcc', q)) c1.daemon = True c2.daemon = True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() # 此時 3個生產者都已經生產完了 q.join() # 1、證實生產者都已經徹底生產完畢 2、隊列爲空,也就是消費者也消費完畢 print('')
View Code

 4、線程queue

1)隊列:先進先出

import queue q=queue.Queue(3) q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get())
隊列

2)堆棧:先進後出

import queue q=queue.LifoQueue(3) q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get())
堆棧

3)優先級隊列:優先級高的優先出來

import queue q=queue.PriorityQueue(3) q.put((13,'lxx')) q.put((10,'egon')) #數字表明優先級,數字越小優先級越高 q.put((11,'alex')) print(q.get()) print(q.get()) print(q.get())
優先級隊列

5、進程池與線程池

1)池的概念

1、什麼進程池、線程池 池指的一個容器,該容器用來存放進程或線程,存放的數目是必定的 2、爲何要用池 用池是爲了將併發的進程或線程數目控制在計算機可承受的範圍內 爲什麼要用進程進池? 當任務是計算密集型的狀況下應該用進程來利用多核優點 爲什麼要用線程進池? 當任務是IO密集型的狀況下應該用線程減小開銷

2)同步與異步

同步調用 vs 異步調用 異步調用與同步調用指的是提交任務的兩種方式 同步調用:提交完任務後,就在原地等待任務執行完畢,拿到運行結果/返回值後再執行下一行代碼 同步調用下任務的執行是串行執行 異步調用:提交完任務後,不會原地等待任務執行完畢,結果 futrue = p.submit(task,i),結果記錄在內存中, 直接執行下一行代碼 同步調用下任務的執行是併發執行

3)異步進程池

from concurrent.futures import ProcessPoolExecutor import os import time import random def task(x): print('%s is running' %os.getpid()) time.sleep(random.randint(1,3)) return x**2

if __name__ == '__main__': p=ProcessPoolExecutor() #不指定參數默認池的大寫等於cpu的核數 futrues = [] # 保存任務返回值 for i in range(10): futrue = p.submit(task,i) # 提交任務,異步提交 futrues.append(futrue) # 保存任務返回值 p.shutdown(wait=True)   # 關閉了繼續提入口交任務的,wait=True 把進程池的裏的事作完,再執行後面的任務 for futrue in futrues: print(futrue.result()) # 輸入返回結果值 print('')
View Code

 4)同步進程池

from concurrent.futures import ProcessPoolExecutor import os import time import random def task(x): print('%s is running' %os.getpid()) time.sleep(random.randint(1,3)) return x**2

if __name__ == '__main__': p=ProcessPoolExecutor() #不指定參數默認池的大寫等於cpu的核數 for i in range(10): res = p.submit(task,i).result() # 提交任務,異步提交 print(res) print('')
View Code

  小結,同步與異步,對於獲取任務返回值的方式,在於何時 obj.result()。

 6)回調函數,進程池,解析任務返回值。add_done_callback(parse)

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os import time import requests def get(url): print('%s GET %s' %(os.getpid(),url)) time.sleep(2) response=requests.get(url) if response.status_code == 200: res=response.text return res def parse(obj): res = obj.result() print('%s 解析[url]結果是 %s' % (os.getpid(), len(res))) if __name__ == '__main__': p=ProcessPoolExecutor(3) urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://www.taobao.com', 'https://www.jd.com', ] for url in urls: p.submit(get,url).add_done_callback(parse) # 回調函數會在任務運行完畢後自動觸發,而且接收該任務對象 print('',os.getpid())
View Code

 7)回調函數,線程池,解析任務返回值。add_done_callback(parse)

from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time import random def task(x): print('%s is running' %current_thread().getName()) time.sleep(random.randint(1,3)) return x**2 def parse(obj): res=obj.result() print('%s 解析的結果爲%s' %(current_thread().getName(),res)) if __name__ == '__main__': t=ThreadPoolExecutor(3) for i in range(10): t.submit(task,i).add_done_callback(parse)
View Code

6、補充知識

1)線程event事件。一個線程發了一個信號,另外個線程收到該信號,才能繼續執行

from threading import Event,current_thread,Thread import time event=Event() # 生成信號事件 def check(): print('%s 正在檢測服務是否正常....' %current_thread().name) time.sleep(3) event.set() # 發送信號 def connect(): print('%s 等待鏈接...' %current_thread().name) event.wait() # 接收信號 print('%s 開始鏈接...' % current_thread().name) if __name__ == '__main__': t1=Thread(target=connect) t2=Thread(target=connect) t3=Thread(target=connect) c1=Thread(target=check) t1.start() t2.start() t3.start() c1.start()
View Code

2)基於上面內容,設置嘗試次數

from threading import Event,current_thread,Thread import time event=Event() def check(): print('%s 正在檢測服務是否正常....' %current_thread().name) time.sleep(2) event.set() def connect(): count=1
    while not event.is_set(): if count ==  4: print('嘗試的次數過多,請稍後重試') return print('%s 嘗試第%s次鏈接...' %(current_thread().name,count)) event.wait(1) count+=1 print('%s 開始鏈接...' % current_thread().name) if __name__ == '__main__': t1=Thread(target=connect) t2=Thread(target=connect) t3=Thread(target=connect) c1=Thread(target=check) t1.start() t2.start() t3.start() c1.start()
View Code

七)協程介紹(單線程下併發)

單線程下實現併發:協程 併發指的多個任務看起來是同時運行的 併發實現的本質:切換+保存狀態 併發、並行、串行: 併發:看起來是同時運行,切換+保存狀態 並行:真正意義上的同時運行,只有在多cpu的狀況下才能 實現並行,4個cpu可以並行4個任務 串行:一我的完完整整地執行完畢才運行下一個任務 實現方法: 基於yield保存狀態,實現兩個任務直接來回切換,即併發的效果 PS:若是每一個任務中都加上打印,那麼明顯地看到兩個任務的打印是你一次我一次,即併發執行的.

1)gevent模擬單線程併發(協程),from gevent import monkey;monkey.patch_all(),監控IO,實現單線程的併發操做

from gevent import monkey;monkey.patch_all() import gevent import time def eat(name): print('%s eat 1' %name) time.sleep(3) print('%s eat 2' % name) def play(name): print('%s play 1' %name) time.sleep(5) print('%s play 2' % name) g1 = gevent.spawn(eat,'egon') g2 = gevent.spawn(play,'alex') gevent.joinall([g1,g2])
View Code

2)DummyThread(單線程的併發:spawn,實現的是假線程)

from gevent import monkey;monkey.patch_all() from threading import current_thread import gevent import time def eat(): print('%s eat 1' %current_thread().name) time.sleep(3) print('%s eat 2' %current_thread().name) def play(): print('%s play 1' %current_thread().name) time.sleep(5) print('%s play 2' % current_thread().name) g1 = gevent.spawn(eat) g2 = gevent.spawn(play) print(current_thread().name) gevent.joinall([g1,g2])
View Code

3)socket鏈接,單線程下的併發,測試鏈接抗壓能力

from gevent import monkey,spawn;monkey.patch_all() from threading import Thread from socket import * def talk(conn): while True: try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port,backlog=5): s = socket() s.bind((ip,port)) s.listen(backlog) while True: conn, addr = s.accept() print(addr) # 通訊 g=spawn(talk,conn) s.close() if __name__ == '__main__': spawn(server,'127.0.0.1',8080).join() # server(('127.0.0.1',8080))
server_spawn
from threading import Thread,current_thread from socket import * import os def client(): client = socket() client.connect(('127.0.0.1', 8080)) while True: data = '%s hello' % current_thread().name client.send(data.encode('utf-8')) res = client.recv(1024) print(res.decode('utf-8')) if __name__ == '__main__': for i in range(200): t=Thread(target=client) t.start()
client_Thread

4)網絡IO非堵塞模型,實現單線程的併發。s.setblocking(False),與from gevent import monkey;monkey.patch_all()的原理同樣

from socket import * s = socket() s.bind(('127.0.0.1',8080)) s.listen(5) s.setblocking(False) r_list=[] while True: try: conn, addr = s.accept() r_list.append(conn) except BlockingIOError: print('能夠去幹其餘的活了') print('rlist: ',len(r_list)) for conn in r_list: try: data=conn.recv(1024) conn.send(data.upper()) except BlockingIOError: continue
socker_server
from socket import * import os client = socket() client.connect(('127.0.0.1', 8080)) while True: data='%s say hello' %os.getpid() client.send(data.encode('utf-8')) res=client.recv(1024) print(res.decode('utf-8'))
socker_client

5)網絡IO非堵塞模型,修正版,收消息與發消息區分開

from socket import * s = socket() s.bind(('127.0.0.1',8080)) s.listen(5) s.setblocking(False) r_list=[] w_list=[] while True: try: conn, addr = s.accept() r_list.append(conn) except BlockingIOError: print('能夠去幹其餘的活了') print('rlist: ',len(r_list)) # 收消息 del_rlist=[] for conn in r_list: try: data=conn.recv(1024) if not data: conn.close() del_rlist.append(conn) continue w_list.append((conn,data.upper())) except BlockingIOError: continue except ConnectionResetError: conn.close() del_rlist.append(conn) # 發消息 del_wlist=[] for item in w_list: try: conn=item[0] res=item[1] conn.send(res) del_wlist.append(item) except BlockingIOError: continue except ConnectionResetError: conn.close() del_wlist.append(item) # 回收無用鏈接 for conn in del_rlist: r_list.remove(conn) for item in del_wlist: w_list.remove(item)
服務端
from socket import * import os client = socket() client.connect(('127.0.0.1', 8080)) while True: data='%s say hello' %os.getpid() client.send(data.encode('utf-8')) res=client.recv(1024) print(res.decode('utf-8'))
客戶端

6)IO多路複用。select模塊優化上面的內容

from socket import * import select s = socket() s.bind(('127.0.0.1',8080)) s.listen(5) s.setblocking(False) # print(s) r_list=[s,] w_list=[] w_data={} while True: print('被檢測r_list: ',len(r_list)) print('被檢測w_list: ',len(w_list)) rl,wl,xl=select.select(r_list,w_list,[],) #r_list=[server,conn] # print('rl: ',len(rl)) #rl=[conn,] # print('wl: ',len(wl)) # 收消息 for r in rl: #r=conn if r == s: conn,addr=r.accept() r_list.append(conn) else: try: data=r.recv(1024) if not data: r.close() r_list.remove(r) continue # r.send(data.upper()) w_list.append(r) w_data[r]=data.upper() except ConnectionResetError: r.close() r_list.remove(r) continue # 發消息 for w in wl: w.send(w_data[w]) w_list.remove(w) w_data.pop(w)
server
from socket import * import os client = socket() client.connect(('127.0.0.1', 8080)) while True: data='%s say hello' %os.getpid() client.send(data.encode('utf-8')) res=client.recv(1024) print(res.decode('utf-8'))
client

8、目前知識總結,項目篇

相關文章
相關標籤/搜索