異步回調python
### 什麼是異步回調mysql
異步回調指的是:在發起一個異步任務的同時指定一個函數,在異步任務完成時會自動的調用這個函數sql
### 爲何須要異步回調編程
以前在使用線程池或進程池提交任務時,若是想要處理任務的執行結果則必須調用result函數或是shutdown函數,而它們都是是阻塞的,會等到任務執行完畢後才能繼續執行,這樣一來在這個等待過程當中就沒法執行其餘任務,下降了效率,因此須要一種方案,即保證解析結果的線程不用等待,又能保證數據可以及時被解析,該方案就是異步回調服務器
### 異步回調的使用併發
先來看一個案例:app
在編寫爬蟲程序時,一般都是兩個步驟:dom
1.從服務器下載一個網頁文件異步
2.讀取而且解析文件內容,提取有用的數據socket
按照以上流程能夠編寫一個簡單的爬蟲程序
要請求網頁數據則須要使用到第三方的請求庫requests能夠經過pip或是pycharm來安裝,在pycharm中點擊settings->解釋器->點擊+號->搜索requests->安裝
```python
import requests,re,os,random,time
from concurrent.futures import ProcessPoolExecutor
def get_data(url):
print("%s 正在請求%s" % (os.getpid(),url))
time.sleep(random.randint(1,2))
response = requests.get(url)
print(os.getpid(),"請求成功 數據長度",len(response.content))
#parser(response) # 3.直接調用解析方法 哪一個進程請求完成就那個進程解析數據 強行使兩個操做耦合到一塊兒了
return response
def parser(obj):
data = obj.result()
htm = data.content.decode("utf-8")
ls = re.findall("href=.*?com",htm)
print(os.getpid(),"解析成功",len(ls),"個連接")
if __name__ == '__main__':
pool = ProcessPoolExecutor(3)
urls = ["https://www.baidu.com",
"https://www.sina.com",
"https://www.python.org",
"https://www.tmall.com",
"https://www.mysql.com",
"https://www.apple.com.cn"]
# objs = []
for url in urls:
# res = pool.submit(get_data,url).result() # 1.同步的方式獲取結果 將致使全部請求任務不能併發
# parser(res)
obj = pool.submit(get_data,url) #
obj.add_done_callback(parser) # 4.使用異步回調,保證了數據能夠被及時處理,而且請求和解析解開了耦合
# objs.append(obj)
# pool.shutdown() # 2.等待全部任務執行結束在統一的解析
# for obj in objs:
# res = obj.result()
# parser(res)
# 1.請求任務能夠併發 可是結果不能被及時解析 必須等全部請求完成才能解析
# 2.解析任務變成了串行,
```
總結:異步回調使用方法就是在提交任務後獲得一個Futures對象,調用對象的add_done_callback來指定一個回調函數,
若是把任務比喻爲燒水,沒有回調時就只能守着水壺等待水開,有了回調至關於換了一個會響的水壺,燒水期間可用做其餘的事情,等待水開了水壺會自動發出聲音,這時候再回來處理。水壺自動發出聲音就是回調。
注意:
1. 使用進程池時,回調函數都是主進程中執行執行
2. 使用線程池時,回調函數的執行線程是不肯定的,哪一個線程空閒就交給哪一個線程
3. 回調函數默認接收一個參數就是這個任務對象本身,再經過對象的result函數來獲取任務的處理結果
線程隊列
1.Queue 先進先出隊列
與多進程中的Queue使用方式徹底相同,區別僅僅是不能被多進程共享。
```python
q = Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get(timeout=1))
print(q.get(timeout=1))
print(q.get(timeout=1))
```
2.LifoQueue 後進先出隊列
該隊列能夠模擬堆棧,實現先進後出,後進先出
```python
lq = LifoQueue()
lq.put(1)
lq.put(2)
lq.put(3)
print(lq.get())
print(lq.get())
print(lq.get())
```
3.PriorityQueue 優先級隊列
該隊列能夠爲每一個元素指定一個優先級,這個優先級能夠是數字,字符串或其餘類型,可是必須是能夠比較大小的類型,取出數據時會按照從小到大的順序取出
```python
pq = PriorityQueue()
# 數字優先級
pq.put((10,"a"))
pq.put((11,"a"))
pq.put((-11111,"a"))
print(pq.get())
print(pq.get())
print(pq.get())
# 字符串優先級
pq.put(("b","a"))
pq.put(("c","a"))
pq.put(("a","a"))
print(pq.get())
print(pq.get())
print(pq.get())
```
.線程事件Event
### 什麼是事件
事件表示在某個時間發生了某個事情的通知信號,用於線程間協同工做。
由於不一樣線程之間是獨立運行的狀態不可預測,因此一個線程與另外一個線程間的數據是不一樣步的,當一個線程須要利用另外一個線程的狀態來肯定本身的下一步操做時,就必須保持線程間數據的同步,Event就能夠實現線程間同步
### Event介紹
Event象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行
可用方法:
```python
event.isSet():返回event的狀態值;
event.wait():將阻塞線程;知道event的狀態爲True
event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
event.clear():恢復event的狀態值爲False。
```
使用案例:
```python
# 在連接mysql服務器前必須保證mysql已經啓動,而啓動須要花費一些時間,因此客戶端不能當即發起連接 須要等待msyql啓動完成後當即發起連接
from threading import Event,Thread
import time
boot = False
def start():
global boot
print("正正在啓動服務器.....")
time.sleep(5)
print("服務器啓動完成!")
boot = True
def connect():
while True:
if boot:
print("連接成功")
break
else:
print("連接失敗")
time.sleep(1)
Thread(target=start).start()
Thread(target=connect).start()
Thread(target=connect).start()
```
使用Event改造後:
```python
from threading import Event,Thread
import time
e = Event()
def start():
global boot
print("正正在啓動服務器.....")
time.sleep(3)
print("服務器啓動完成!")
e.set()
def connect():
e.wait()
print("連接成功")
Thread(target=start).start()
Thread(target=connect).start()
Thread(target=connect).start()
```
增長需求,每次嘗試連接等待1秒,嘗試次數爲3次
```python
from threading import Event,Thread
import time
e = Event()
def start():
global boot
print("正正在啓動服務器.....")
time.sleep(5)
print("服務器啓動完成!")
e.set()
def connect():
for i in range(1,4):
print("第%s次嘗試連接" % i)
e.wait(1)
if e.isSet():
print("連接成功")
break
else:
print("第%s次連接失敗" % i)
else:
print("服務器未啓動!")
Thread(target=start).start()
Thread(target=connect).start()
# Thread(target=connect).start()
```
單線程實現併發
併發:指的是多個任務同時發生,看起來好像是同時都在進行
並行:指的是多個任務真正的同時進行
若是一個線程可以檢測IO操做而且將其設置爲非阻塞,並自動切換到其餘任務就能夠提升CPU的利用率,指的就是在單線程下實現併發。
併發 = 切換任務+保存狀態,只要找到一種方案,可以在兩個任務之間切換執行而且保存狀態,那就能夠實現單線程併發
python中的生成器就具有這樣一個特色,每次調用next都會回到生成器函數中執行代碼,這意味着任務之間能夠切換,而且是基於上一次運行的結果,這意味着生成器會自動保存執行狀態!
利用生成器來實現併發執行:
def task1():
while True:
yield
print("task1 run")
def task2():
g = task1()
while True:
next(g)
print("task2 run")
task2()
兩個計算任務一個採用生成器切換併發執行 一個直接串行調用
import time
def task1():
a = 0
for i in range(10000000):
a += i
yield
def task2():
g = task1()
b = 0
for i in range(10000000):
b += 1
next(g)
s = time.time()
task2()
print("併發執行時間",time.time()-s)
# 單線程下串行執行兩個計算任務 效率反而比並發高 由於併發須要切換和保存
def task1():
a = 0
for i in range(10000000):
a += i
def task2():
b = 0
for i in range(10000000):
b += 1
s = time.time()
task1()
task2()
print("串行執行時間",time.time()-s)
```
能夠看到對於純計算任務而言,單線程併發反而使執行效率降低了一半左右,因此這樣的方案對於純計算任務而言是沒有必要的
協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。
協程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。
協程的本質就是在單線程下,由用戶本身控制一個任務遇到io阻塞了就切換另一個任務去執行,以此來提高效率。爲了實現它,咱們須要找尋一種能夠同時知足如下條件的解決方案:
1.能夠控制多個任務之間的切換,切換以前將任務的狀態保存下來,以便從新運行時,能夠基於暫停的位置繼續執行。 2. 做爲1的補充:能夠檢測io操做,在遇到io操做的狀況下才發生切換
優勢以下:
1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級 2. 單線程內就能夠實現併發的效果,最大限度地利用cpu
缺點以下:
1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程 2. 協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程
總結協程特色:
能夠看到對於純計算任務而言,單線程併發反而使執行效率降低了一半左右,因此這樣的方案對於純計算任務而言是沒有必要的
greenlet模塊實現併發
```python
def task1(name):
print("%s task1 run1" % name)
g2.switch(name) # 切換至任務2
print("task1 run2")
g2.switch() # 切換至任務2
def task2(name):
print("%s task2 run1" % name)
g1.switch() # 切換至任務1
print("task2 run2")
g1 = greenlet.greenlet(task1)
g2 = greenlet.greenlet(task2)
g1.switch("jerry") # 爲任務傳參數
```
如今咱們須要一種方案 便可檢測IO 又可以實現單線程併發,因而gevent閃亮登場
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
#用法 g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的 g2=gevent.spawn(func2) g1.join() #等待g1結束 g2.join() #等待g2結束 #或者上述兩步合做一步:gevent.joinall([g1,g2]) g1.value#拿到func1的返回值
遇到IO阻塞時會自動切換任務
# gevent 不具有檢測IO的能力 須要爲它打補丁 打上補丁以後就能檢測IO
# 注意補丁必定打在最上面 必須保證導入模塊前就打好補丁
from gevent import monkey
monkey.patch_all()
from threading import current_thread
import gevent,time
def task1():
print(current_thread(),1)
print("task1 run")
# gevent.sleep(3)
time.sleep(3)
print("task1 over")
def task2():
print(current_thread(),2)
print("task2 run")
print("task2 over")
# spawn 用於建立一個協程任務
g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)
# 任務要執行,必須保證主線程沒掛 由於全部協程任務都是主線在執行 ,必須調用join來等待協程任務
# g1.join()
# g2.join()
# 理論上等待執行時間最長的任務就行 , 可是不清楚誰的時間長 能夠所有join
gevent.joinall([g1,g2])
print("over")
gevent.sleep(3)模擬的是gevent能夠識別的io阻塞,
而time.sleep(3)或其餘的阻塞,gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了
from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊以前