阻塞:程序遇到IO阻塞,程序遇到IO立馬會中止(掛起),cpu立刻切換,等IO結束後再執行python
非阻塞:程序沒有IO或者遇到IO經過某種手段讓CPU去執行同一個線程裏面的其餘的任務,儘量的佔用CPUgit
# 異步回收任務的方式一: 將全部任務的結果統一收回 from concurrent.futures import ProcessPoolExecutor import os import time import random def task(): print(f'{os.getpid()} is running') time.sleep(random.randint(1, 3)) return f'{os.getpid()} is finish' if __name__ == '__main__': p = ProcessPoolExecutor(4) lst = [] for i in range(10): res = p.submit(task) # 異步發出 lst.append(res) # print(res.result()) # 在這裏result()就會變成同步 p.shutdown(wait=True) # 1.阻止再向進程池投放新的任務 # 2.wait=True 一個任務完成了就減一,直至爲0才執行下一行 for res in lst: print(res.result())
# 瀏覽器作的事情很簡單,封裝一些頭部,發一個請求到服務器,服務器拿到請求信息,分析信息,分析正確以後,給瀏覽器返回一個文件,瀏覽器將這個文件的代碼渲染就成了網頁 # 爬蟲: 利用requests模塊,模擬瀏覽器,封裝頭給服務器發送請求,騙過服務器,服務器也給你返回一個文件, # 爬蟲拿到文件進行數據清洗,獲取想要的信息 # 爬蟲: 分兩步 # 第一步: 爬取服務端的文件(IO阻塞) # 第二步: 拿到文件,進行數據清洗(非IO,極少IO) # 版本一 from concurrent.futures import ProcessPoolExecutor import requests import time import os import random def get(url): # 爬取文件 response = requests.get(url) print(os.getpid(), '正在爬取:', url) time.sleep(random.randint(1, 3)) if response.status_code == 200: return response.text def parse(text): # 對爬取回來的字符串的分析,用len模擬一下 print('分析結果:', len(text)) if __name__ == '__main__': url_list = [ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/', 'https://www.cnblogs.com/' ] pool = ProcessPoolExecutor(4) obj_list = [] for url in url_list: obj = pool.submit(get, url) obj_list.append(obj) pool.shutdown(wait=True) for obj in obj_list: text = obj.result() parse(text) # 問題出在哪裏? # 1.分析結果的過程是串行,效率低 # 2.將全部的結果所有爬取成功以後,放在一個列表中 ------------------------------------------------------- # 版本二:異步處理,獲取結果的第二種方式 # 完成一個任務,返回一個結果,併發的獲取結果 from concurrent.futures import ProcessPoolExecutor import requests import time import os import random def get(url): # 爬取文件 response = requests.get(url) print(os.getpid(), '正在爬取:', url) time.sleep(random.randint(1, 3)) if response.status_code == 200: parse(response.text) # return response.text def parse(text): # 對爬取回來的字符串的分析,用len模擬一下 print('分析結果:', len(text)) if __name__ == '__main__': url_list = [ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/', 'https://www.cnblogs.com/' ] pool = ProcessPoolExecutor(4) for url in url_list: obj = pool.submit(get, url) pool.shutdown(wait=True) # 問題,加強了耦合性 ------------------------------------------------------ # 版本三: 版本二,兩個任務有耦合性.在上一個基礎上,對其進行解耦 from concurrent.futures import ProcessPoolExecutor import requests import time import os import random def get(url): # 爬取文件 response = requests.get(url) print(os.getpid(), '正在爬取:', url) time.sleep(random.randint(1, 3)) if response.status_code == 200: return response.text def parse(obj): # 對爬取回來的字符串的分析,用len模擬一下 print(f'{os.getpid()}分析結果:', len(obj.result())) if __name__ == '__main__': url_list = [ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/', 'https://www.cnblogs.com/' ] pool = ProcessPoolExecutor(4) for url in url_list: obj = pool.submit(get, url) obj.add_done_callback(parse) # 增長一個回調函數 # 如今的進程完成的仍是網絡爬取的任務,拿到返回值以後,丟給回調函數, # 進程繼續完成下一個任務,回調函數進行分析結果 pool.shutdown(wait=True) # 回調函數是主進程實現的,回調函數幫咱們進行分析任務 # 明確了進程的任務就是網絡爬取,分析任務交給回調函數執行,對函數之間解耦 # 極值狀況: 若是回調函數是IO任務,那麼因爲回調函數是主進程作的,因此有可能影響效率 # 回調不是萬能的,若是回調的任務是IO,那麼異步+回調機制很差,此時若是須要效率,只能再開一個線程或進程池 # 異步就是回調? # 這個論點是錯的,異步,回調是兩個概念 # 若是多個任務,多進程多線程處理的IO任務 # 1. 剩下的任務 非IO阻塞 異步+回調機制 # 2. 剩下的任務有 IO 遠小於 多個任務的IO 異步+回調機制 # 3. 剩下的任務 IO 大於等於 多個任務的IO 第二種解決方式,或者開啓兩個進程/線程池
FIFO: 先進先出github
import queue #不須要經過threading模塊裏面導入,直接import queue就能夠了,這是python自帶的 #用法基本和咱們進程multiprocess中的queue是同樣的 q = queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(先進先出): first second third '''
LIFO: 後進先出(棧)數組
import queue q = queue.LifoQueue() # 隊列,相似於棧 q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(後進先出): third second first '''
Priority: 優先級隊列瀏覽器
import queue q = queue.PriorityQueue() # put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高 q.put((-10, 'a')) q.put((-5, 'a')) #負數也能夠 # q.put((20, 'ws')) #若是兩個值的優先級同樣,那麼按照後面的值的acsii碼順序來排序,若是字符串第一個數元素相同,比較第二個元素的acsii碼順序 # q.put((20, 'wd')) # q.put((20, {'a': 11})) #TypeError: unorderable types: dict() < dict() 不能是字典 # q.put((20, ('w', 1))) #優先級相同的兩個數據,他們後面的值必須是相同的數據類型才能比較,能夠是元祖,也是經過元素的ascii碼順序來排序 q.put((20, 'b')) q.put((20, 'a')) q.put((0, 'b')) q.put((30, 'c')) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先級越高,優先級高的優先出隊): (-10, 'a') (-5, 'a') (0, 'b') (20, 'a') (20, 'b') (30, 'c') '''
方法服務器
event.isSet(): 返回event的狀態值 event.wait(): 若是 event.isSet() == False將阻塞線程 event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度 event.clear(): 恢復event的狀態值爲False
示例網絡
from threading import Thread from threading import current_thread from threading import Event import time event = Event() # 默認False def task(): print(f'{current_thread().name}檢測服務器是否正常開啓....') time.sleep(3) event.set() # 改爲True def task1(): print(f'{current_thread().name}正在嘗試鏈接服務器') event.wait() # 阻塞,輪詢檢測event是否爲True,當其爲True,繼續下一行代碼 # event.wait(1) # 超時時間,超過期間不管是否爲True都繼續下一行代碼 print(f'{current_thread().name}鏈接成功') if __name__ == '__main__': t1 = Thread(target=task1) t2 = Thread(target=task1) t3 = Thread(target=task1) t = Thread(target=task) t1.start() t2.start() t3.start() t.start()
1. 概念多線程
協程本質就是一條線程,多個任務在一條線程上來回切換 協程是操做系統不可見的 協程的概念自己並 沒有規避I/O操做,可是咱們能夠利用協程這個概念來實現規避I/O操做,進而達到了咱們將一條線程中 的I/O操做降到最低的目的 協程可以實現的大部分I/O操做都在網絡併發
2. 相關模塊概覽和協程的應用app
gevent:利用了greenlet底層模塊(C語言寫的)完成的切換 + 自動規避io的功能
3. gevent模塊
import gevent def eat(name): print('%s eat 1' %name) gevent.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) gevent.sleep(1) print('%s play 2' %name) g1=gevent.spawn(eat,'egon') g2=gevent.spawn(play,name='egon') g1.join() g2.join() #或者gevent.joinall([g1,g2]) print('主') 遇到I/O切換
上例gevent.sleep(2)模擬的是gevent能夠識別的io阻塞,
而time.sleep(2)或其餘的阻塞,gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了
from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊以前
或者咱們乾脆記憶成:要用gevent,須要將from gevent import monkey;monkey.patch_all()放到文件的開頭
from gevent import monkey;monkey.patch_all() #必須寫在最上面,這句話後面的全部阻塞所有可以識別了 import gevent #直接導入便可 import time def eat(): #print() print('eat food 1') time.sleep(2) #加上mokey就可以識別到time模塊的sleep了 print('eat food 2') def play(): print('play 1') time.sleep(1) #來回切換,直到一個I/O的時間結束,這裏都是咱們個gevent作得,再也不是控制不了的操做系統了。 print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play_phone) gevent.joinall([g1,g2]) print('主')
咱們能夠用threading.current_thread().getName()來查看每一個g1和g2,查看的結果爲DummyThread-n,即假線程,虛擬線程,其實都在一個線程裏面
進程線程的任務切換是由操做系統自行切換的,你本身不能控制
協程是經過本身的程序(代碼)來進行切換的,本身可以控制,只有遇到協程模塊可以識別的IO操做的時候,程序纔會進行任務切換,實現併發效果,若是全部程序都沒有IO操做,那麼就基本屬於串行執行了。
from gevent import spawn,joinall,monkey;monkey.patch_all() import time def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(10): task(i) def asynchronous(): g_l=[spawn(task,i) for i in range(10)] joinall(g_l) if __name__ == '__main__': print('Synchronous:') synchronous() print('Asynchronous:') asynchronous() #上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。 協程:同步異步對比