event 事件用來控制線程的執行,mysql
由一些線程去控制另外一些線程。sql
使用threading庫中的Event對象,對象中包含了一個可由線程設置的信號標誌,容許線程等待某些事件的發生服務器
在初始狀況下,Event對象中的信號標誌位False,若是有線程等待一個Event對象,這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至標誌爲真。一個線程若是將一個Event對象信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。多線程
# coding=utf-8 from threading import Event from threading import Thread import time # 調用類實例化出對象 e = Event() # 若程序中有以下代碼,即爲False,阻塞 # e.wait() # 若程序中有以下代碼,則將其餘線程的False改成True,進入就緒態和運行態 # e.set() # 模擬一個紅綠燈 def light(): print("紅燈亮") time.sleep(5) # 開始發信號給其餘線程,告訴其餘線程準備執行 e.set() print("綠燈亮") # 模擬一個個汽車 def car(): print("正在等紅燈") e.wait() print("汽車開始起步") t1 = Thread(target=light) t1.start() for i in range(10): t2 = Thread(target=car) t2.start() 紅燈亮 正在等紅燈 正在等紅燈 正在等紅燈 正在等紅燈 正在等紅燈 正在等紅燈 正在等紅燈 正在等紅燈 正在等紅燈 正在等紅燈 綠燈亮 汽車開始起步 汽車開始起步 汽車開始起步 汽車開始起步 汽車開始起步 汽車開始起步 汽車開始起步 汽車開始起步 汽車開始起步 汽車開始起步
e.wait()
:False,爲阻塞狀態併發
e.set():True,將其餘線程的False改成True,進入就緒態和運行態
app
e.clear():回覆event的狀態值爲False
dom
e.isSet():返回event的狀態值
異步
例如,有多個工做線程嘗試連接MySQL,咱們想要在連接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,若是鏈接不成功,都會去嘗試從新鏈接。那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做socket
from threading import Thread,Event import threading import time,random def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise TimeoutError('連接超時') print('<%s>第%s次嘗試連接' % (threading.current_thread().getName(), count)) event.wait(0.5) count+=1 print('<%s>連接成功' %threading.current_thread().getName()) def check_mysql(): print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName()) time.sleep(random.randint(2,4)) event.set() if __name__ == '__main__': event=Event() conn1=Thread(target=conn_mysql) conn2=Thread(target=conn_mysql) check=Thread(target=check_mysql) conn1.start() conn2.start() check.start()
在剛開始學多進程或多線程時,咱們火燒眉毛地基於多進程或多線程實現併發的套接字通訊,然而這種實現方式的致命缺陷是:服務的開啓的進程數或線程數都會隨着併發的客戶端數目地增多而增多,這會對服務端主機帶來巨大的壓力,甚至於不堪重負而癱瘓,因而咱們必須對服務端開啓的進程數或線程數加以控制,讓機器在一個本身能夠承受的範圍內運行,這就是進程池或線程池的用途,例如進程池,就是用來存放進程的池子,本質仍是基於多進程,只不過是對開啓進程的數目加上了限制
進程池和線程池:
用來控制當前程序容許建立(進程/線程)的數量
進程池和線程池的做用:
保證在硬件容許的範圍內建立(進程/線程)的數量
# coding=utf-8 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import time # 進程池能夠加參數 表示開啓進程數 # 若不寫默認以CPU的個數限制進程數 # ProcessPoolExecutor() # 線程池能夠加參數,表示開啓的線程數 # 若不寫默認以CPU 的個數 *5 限制線程數 # ThreadPoolExecutor() # 建立5個線程 pool = ThreadPoolExecutor(5) def task(res): print("線程任務開始") time.sleep(1) # print("線程任務結束") return 123 # 回調函數 def call_back(res): # print(type(res)) # 注意 回調函數接收一個參數 是 接收線程執行完的結果,用.result()接收 # 獲得的數據能夠拿一個變量名保存,新的變量名不要與回調函數參數同樣 res2 = res.result() print(res2) for i in range(13): # 異步提交任務,每次併發執行最多隻能有5個 pool.submit(task,1).add_done_callback(call_back) # 全部線程任務結束後執行下面代碼 pool.shutdown() print("線程執行完畢了")
from concurrent.futures:提供了異步調用的接口
ProcessPoolExecutor():限制開啓的進程數,若不寫參數默認以CPU的個數限制進程數
ThreadPoolExecutor():限制開啓的線程數,若不寫參數默認以CPU的個數 * 5 限制線程數
pool.submit(函數名,參數):異步提交任務,限制每次併發執行最多的線程個數
add_done_callback:回調函數,線程執行完畢的函數返回值能夠傳到回調函數中,.result()獲取線程執行返回的結果
pool.shutdown():全部線程任務執行完畢後執行線程池關閉,執行下面的代碼,至關於進程池的pool.close()+pool.join()操做
進程池:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(1) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result())
能夠爲進程/線程池內的每一個進程/線程綁定一個函數,該函數在進程或線程的任務執行完畢後自動觸發,並接收線程任務的返回值當作參數,該函數就叫作回調函數
from concurrent.futures import ThreadPoolExecutor import requests import re import uuid pool = ThreadPoolExecutor(200) # 1.發送請求函數 def get_page(url): response = requests.get(url) return response # 2.解析主頁獲取視頻ID號 def parse_index(response): id_list = re.findall('<a href="video_(.*?)".*?>',response.text,re.S) return id_list # 3.解析視頻詳情頁獲取真實 視頻連接 def parse_detail(res): response = res.result() movie_detail_url = re.findall('srcUrl="(.*?)"', response.text, re.S)[0] print(f'往視頻連接: {movie_detail_url}發送請求...') # 異步往視頻詳情頁連接發送請求,把結果交給 pool.submit(get_page, movie_detail_url).add_done_callback(save_movie) return movie_detail_url # 4.往真實視頻連接發送請求,獲取數據並保存到本地 def save_movie(res): movie_response = res.result() name = str(uuid.uuid4()) print(f'{name}.mp4視頻開始保存...') with open(f'{name}.mp4', 'wb') as f: f.write(movie_response.content) print('視頻下載完畢!') if __name__ == '__main__': # 1.訪問主頁獲取數據 index_response = get_page('https://www.pearvideo.com/') # # 2.解析主頁獲取全部的視頻id號 id_list = parse_index(index_response) # 3.循環對每一個視頻詳情頁連接進行拼接 for id in id_list: detail_url = 'https://www.pearvideo.com/video_' + id # 異步提交爬取視頻詳情頁,把返回的數據,交給parse_detail(回調函數) pool.submit(get_page, detail_url).add_done_callback(parse_detail)
進程:資源單位
線程:執行單位
協程:在單線程下實現併發
協程即 基於單線程來實現併發,即只用一個主線程的狀況下實現併發,是一種用戶態的輕量級線程,是由用戶程序本身控制調度的一張程序。
併發的概念:切換 + 保存狀態
cpu正在運行一個任務,會有兩種狀況下切走去執行其餘任務(切換操做由操做系統強制控制即多道技術),一種狀況是該任務發生了阻塞,另一種狀況是該任務執行時間過長,cpu會把使用權切斷。
yield 代碼級別的控制,能夠保存當前狀態
# 基於yield 併發執行 import time # 任務1:接收數據,處理數據 def cousumer(): while True: x = yield def producer(): g = cousumer() next(g) for i in range(10000000): g.send(i) # time.sleep(1) # 併發去執行,可是若是遇到IO就會阻塞住 # 並不會切換到該線程內其餘任務去執行 start = time.time() # 基於yield保存狀態,實現兩個任務直接來回切換,即併發的效果 producer() stop = time.time() print(stop - start) # 2.9251673221588135
以上例子對純計算密集型任務來講,對於單線程下,咱們不可避免程序中出現io操做,但若是咱們能在本身的程序中(即用戶程序級別,而非操做系統級別)控制單線程下的多個任務能在一個任務遇到io阻塞時就切換到另一個任務去計算,這樣就保證了該線程可以最大限度地處於就緒態,即隨時均可以被cpu執行的狀態,至關於咱們在用戶程序級別將本身的io操做最大限度地隱藏起來,從而能夠迷惑操做系統,讓其看到:該線程好像是一直在計算,io比較少,從而更多的將cpu的執行權限分配給咱們的線程。
總結協程特色:
一、 必須在只有一個單線程裏實現併發
二、 修改共享數據不須要枷鎖
三、 用戶程序裏本身保存多個控制流的上下文棧
from gevent import monkey from gevent import spawn,joinall import time # gevent 是一個第三方模塊,能夠幫你監聽IO操做,並切換 # 監聽該程序下全部的IO操做 monkey.patch_all() def func1(): print("1") # 模擬IO操做 time.sleep(1) def func2(): print("2") time.sleep(2) def func3(): print("3") time.sleep(3) start = time.time() # 實現單線程下,遇到IO,保存狀態 + 切換 s1 = spawn(func1) s2 = spawn(func2) s3 = spawn(func3) # 發送信號,在單線程狀況下至關於等待本身執行完畢以後再退出 joinall([s1,s2,s3]) end_time = time.time() print(end_time - start) # 6.013344049453735
經過手動模擬操做系統「多道技術」,實現切換 + 保存狀態
優勢:在IO密集型的狀況下會提升效率
缺點:在計算密集型的狀況下,來回切換,反而效率更低
如何實現協程:切換 + 保存狀態
yield:保存狀態
併發:切換
server端: # coding=utf-8 from gevent import monkey monkey.patch_all() import socket import time from threading import Thread from gevent import spawn,joinall server = socket.socket() server.bind(("127.0.0.1",8888)) server.listen(5) print("啓動服務端...") # 線程任務,接收客戶端消息與發送消息給客戶端 def work1(conn): while True: try: data = conn.recv(1024).decode("utf-8") if not data:break conn.send(data.encode("utf-8")) except Exception as e: print(e) break conn.close() def work2(): while True: conn,addr = server.accept() spawn(work1,conn) if __name__ == '__main__': s1 = spawn(work2) s1.join()
client端: # coding=utf-8 import socket import time from threading import Thread,current_thread def client(): client = socket.socket() client.connect(("127.0.0.1",8888)) print("啓動客戶端...") num = 0 while True: send_data = f"{current_thread().name} {num}" client.send(send_data.encode("utf-8")) data = client.recv(1024) print(data.decode("utf-8")) num += 1 for i in range(30): t = Thread(target=client) t.start()