阻塞:當程序執行過程當中遇到了IO操做,在執行IO操做時,程序沒法繼續執行其餘代碼,稱爲阻塞。python
非阻塞:程序在正常運行沒有遇到IO操做,或者經過某種方式使程序即便遇到了也不會停在原地,還能夠執行其餘操做,以提升CPU的佔用率。mysql
同步指調用:發起任務後必須在原地等待任務執行完成功能,才能繼續執行,好比進行一億次計算,在原地等待,但沒有IO操做,不是阻塞sql
異步指調用:發起任務後不用等待任務執行,能夠當即開啓執行其餘操做。其實就是開啓一個線程或進程服務器
同步會有等待的效果可是這和阻塞是徹底不一樣的,阻塞時程序會被剝奪CPU執行權,而同步調用不會併發
程序中的異步調用並獲取結果方式1:app
from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time pool = ThreadPoolExecutor(3) def task(i): time.sleep(0.01) print(current_thread().name,"working..") return i ** i if __name__ == '__main__': objs = [] for i in range(3): res_obj = pool.submit(task,i) # 異步方式提交任務# 會返回一個對象用於表示任務結果 objs.append(res_obj) # 該函數默認是阻塞的 會等待池子中全部任務執行結束後執行 pool.shutdown(wait=True) # 從結果對象中取出執行結果 for res_obj in objs: print(res_obj.result()) print("over")
程序中的異步調用並獲取結果方式2:dom
from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time pool = ThreadPoolExecutor(3) def task(i): time.sleep(0.01) print(current_thread().name,"working..") return i ** i if __name__ == '__main__': objs = [] for i in range(3): res_obj = pool.submit(task,i) # 會返回一個對象用於表示任務結果 print(res_obj.result()) #result是同步的一旦調用就必須等待 任務執行完成拿到結果 print("over")
異步回調指的是:在發起一個異步任務的同時指定一個函數,在異步任務完成時會自動的調用這個函數異步
異步效率要高於同步,可是異步任務將致使一個問題就是任務的發起方不知道任務什麼時候處理完畢
解決方案:
1. 輪詢:重複的每隔一段時間就問一次,可是效率低,沒法及時拿到任務結果
2. 讓任務的發起方主動通知(異步回調),推薦方式,能夠及時拿到任務結果函數
以前在使用線程池或進程池提交任務時,若是想要處理任務的執行結果則必須調用result函數或是shutdown函數,而它們都是是阻塞的,會等到任務執行完畢後才能繼續執行,這樣一來在這個等待過程當中就沒法執行其餘任務,下降了效率,因此須要一種方案,即保證解析結果的線程不用等待,又能保證數據可以及時被解析,該方案就是異步回調url
在線程中使用:
在線程池中使用:
再來看一個案例:
在編寫爬蟲程序時,一般都是兩個步驟:
1.從服務器下載一個網頁文件
2.讀取而且解析文件內容,提取有用的數據
按照以上流程能夠編寫一個簡單的爬蟲程序
要請求網頁數據則須要使用到第三方的請求庫requests能夠經過pip或是pycharm來安裝,在pycharm中點擊settings->解釋器->點擊+號->搜索requests->安裝
import requests,re,os,random,time from concurrent.futures import ProcessPoolExecutor def get_data(url): print("%s 正在請求%s" % (os.getpid(),url)) time.sleep(random.randint(1,2)) response = requests.get(url) print(os.getpid(),"請求成功 數據長度",len(response.content)) #parser(response) # 3.直接調用解析方法 哪一個進程請求完成就那個進程解析數據 強行使兩個操做耦合到一塊兒了 return response def parser(obj): data = obj.result() htm = data.content.decode("utf-8") ls = re.findall("href=.*?com",htm) print(os.getpid(),"解析成功",len(ls),"個連接") if __name__ == '__main__': pool = ProcessPoolExecutor(3) urls = ["https://www.baidu.com", "https://www.sina.com", "https://www.python.org", "https://www.tmall.com", "https://www.mysql.com", "https://www.apple.com.cn"] # objs = [] for url in urls: # res = pool.submit(get_data,url).result() # 1.同步的方式獲取結果 將致使全部請求任務不能併發 # parser(res) obj = pool.submit(get_data,url) # obj.add_done_callback(parser) # 4.使用異步回調,保證了數據能夠被及時處理,而且請求和解析解開了耦合 # objs.append(obj) # pool.shutdown() # 2.等待全部任務執行結束在統一的解析 # for obj in objs: # res = obj.result() # parser(res) # 1.請求任務能夠併發 可是結果不能被及時解析 必須等全部請求完成才能解析 # 2.解析任務變成了串行,
總結:異步回調使用方法就是在提交任務後獲得一個Futures對象,調用對象的add_done_callback來指定一個回調函數,
若是把任務比喻爲燒水,沒有回調時就只能守着水壺等待水開,有了回調至關於換了一個會響的水壺,燒水期間可用做其餘的事情,等待水開了水壺會自動發出聲音,這時候再回來處理。水壺自動發出聲音就是回調。
注意:
1.Queue先進先出隊列
與多進程中的Queue使用方式徹底相同,區別僅僅是不能被多進程共享。
q = Queue(3) q.put(1) q.put(2) q.put(3) print(q.get(timeout=1)) print(q.get(timeout=1)) print(q.get(timeout=1))
2.LifoQueue 後進先出隊列
該隊列能夠模擬堆棧,實現先進後出,後進先出
lq = LifoQueue() lq.put(1) lq.put(2) lq.put(3) print(lq.get()) print(lq.get()) print(lq.get())
3.PriorityQueue 優先級隊列
該隊列能夠爲每一個元素指定一個優先級,這個優先級能夠是數字,字符串或其餘類型,可是必須是能夠比較大小的類型,取出數據時會按照從小到大的順序取出
pq = PriorityQueue() # 數字優先級 pq.put((10,"a")) pq.put((11,"a")) pq.put((-11111,"a")) print(pq.get()) print(pq.get()) print(pq.get()) # 字符串優先級 pq.put(("b","a")) pq.put(("c","a")) pq.put(("a","a")) print(pq.get()) print(pq.get()) print(pq.get())
事件表示在某個時間發生了某個事情的通知信號,用於線程間協同工做。
由於不一樣線程之間是獨立運行的狀態不可預測,因此一個線程與另外一個線程間的數據是不一樣步的,當一個線程須要利用另外一個線程的狀態來肯定本身的下一步操做時,就必須保持線程間數據的同步,Event就能夠實現線程間同步
Event象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行
可用方法:
event.isSet():返回event的狀態值; event.wait():將阻塞線程;知道event的狀態爲True event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear():恢復event的狀態值爲False。
使用案例:
# 在連接mysql服務器前必須保證mysql已經啓動,而啓動須要花費一些時間,因此客戶端不能當即發起連接 須要等待msyql啓動完成後當即發起連接 from threading import Event,Thread import time boot = False def start(): global boot print("正正在啓動服務器.....") time.sleep(5) print("服務器啓動完成!") boot = True def connect(): while True: if boot: print("連接成功") break else: print("連接失敗") time.sleep(1) Thread(target=start).start() Thread(target=connect).start() Thread(target=connect).start()
使用Event改造後:
from threading import Event,Thread import time e = Event() def start(): global boot print("正正在啓動服務器.....") time.sleep(3) print("服務器啓動完成!") e.set() def connect(): e.wait() print("連接成功") Thread(target=start).start() Thread(target=connect).start() Thread(target=connect).start()
增長需求,每次嘗試連接等待1秒,嘗試次數爲3次
from threading import Event,Thread import time e = Event() def start(): global boot print("正正在啓動服務器.....") time.sleep(5) print("服務器啓動完成!") e.set() def connect(): for i in range(1,4): print("第%s次嘗試連接" % i) e.wait(1) if e.isSet(): print("連接成功") break else: print("第%s次連接失敗" % i) else: print("服務器未啓動!") Thread(target=start).start() Thread(target=connect).start() # Thread(target=connect).start()