gevent 一種異步的方式,基於事件循環.. 跟 asyncio 裏的東西運做的差很少windows
官方手冊說的太不清楚 . 瀏覽器
本身寫了個入門教程.app
一個最簡單的例子:dom
spawn 將把你的函數封裝成一個個協程對象異步
# 注意. gevent.sleep 不是 time.sleep . 下一個例子說明 def fuck1(arg): print('我在這: ',fuck1.__code__.co_firstlineno) gevent.sleep(1) return arg g1 = gevent.spawn(fuck1, 123) #產生一個GreenLet 協程 . print(g1 , type(g1)) #看看是殺 g1.join() #等待咯 # 與上面一種徹底同樣的方式 . gevent.spawn 至關於建立一個GreenLet ,而後start() g2 = gevent.Greenlet(fuck1,456) g2.start() #啓動協成 print(g2, type(g2)) g2.join()
下面的例子中. 我再也不使用gevent.Greenlet 來本身建立了,比較麻煩.直接spawn了 。socket
下面的例子裏,我也再也不使用繼承的Greenlet啦.
若是對於GreenLet須要,也能夠本身繼承Greenlet . 重寫 _run (有個下劃線) 函數 便可:async
class fuckme( gevent.Greenlet): def __init__(self ,*args): gevent.Greenlet.__init__(self) #本身能夠弄點屬性啥的。我就不弄了 def _run(self): #主要是這個 run前面有個線 i = 0 while i <3 : print(' 我是弱智') i+=1 gevent.sleep(0.3) g = fuckme() g.start() g.join()
用gevent.joinall 等待多個協成對象:函數
def fuck1(arg): print('我在這: ',fuck1.__code__.co_firstlineno) gevent.sleep(1) #你會發現2個函數幾乎同時睡眠. 再也不像time.sleep return arg # 將返回一個 list . 裏面存放一個個GreenLet , 使用 value 獲取返回值 res = gevent.joinall([ gevent.spawn(fuck1 , 123) , #產生一個GreenLet 協成 gevent.spawn(fuck1 , 456) , ]) print(res , type(res)) for v in res: print(v.value) #修改一下,更明顯 def fuck1(arg): print('參數 < %s > 我在這: '%arg,fuck1.__code__.co_firstlineno) gevent.sleep(1) print('參數 < %s > 我醒來了 我在這: '%arg, fuck1.__code__.co_firstlineno) return arg cor_list = [gevent.spawn(fuck1 , arg ) for arg in range(5)] res = gevent.joinall(cor_list)
再來一些例子:測試
交互的運行着.fetch
def fuck1(arg): print('參數 < %s > 我在這: '%arg,fuck1.__code__.co_firstlineno) gevent.sleep(1) print('參數 < %s > 我醒來了 我在這: '%arg, fuck1.__code__.co_firstlineno) return arg def fuck2( arg ): print(fuck2.__code__.co_name , fuck2.__code__.co_firstlineno) gevent.sleep(1) print(fuck2.__code__.co_name + " done") return arg cor_list = [gevent.spawn(fuck1 , arg ) for arg in range(3)] cor_list1 = [gevent.spawn(fuck2 , arg) for arg in range(3)] cor_list.extend(cor_list1) gevent.joinall(cor_list)
看一下同步 和異步的比較:
#用與測試的函數 def job(arg): import random print(' job start :' ,arg) gevent.sleep(random.randint(0,5) * 0.5) #這個時間能夠本身修改看看 print(' im done') def sync(): for i in range(3): job(i) def async(): alist = [ gevent.spawn(job , arg) for arg in range(3)] gevent.joinall(alist) print("先來同步:") sync() print('再來異步:') async()
還有一些類死於線程的同步對象 . event啦, semaphore啦 ,queue啦. 這些都用於協程之間交互的, 畢竟單線程
event:
# 我看了下, 在windows中是個 CreateEvent 的手動事件 ,即一旦 set , 全部wait的將所有繼續運行. # 附註: windows中有2個事件,一個自動一個手動. 自動的在 WaitForSingleObject後將原子的ResetEvent, 手動的不會. # 至關於 py中的 Event.wait, Event.clear # 那個啥, 這行別看了.py中沒那麼麻煩 from gevent.event import Event ev = Event() def request(): print(' fetching pages' * 10) gevent.sleep(2) print(' fetching done' * 10) ev.set() #全部wait的將被所有激活 def response(): print('response 已啓動') ev.wait() #等待 ev.set 後將運行 print('response 完成') res_list = [gevent.spawn(response) for i in range(5)] #先建立了5個,他們運行到 ev.wait的時候將所有等待 res_list.append(gevent.spawn_later(2, request)) #這裏用了 spawn_later .能夠預約幾秒後 開始運行 gevent.joinall(res_list)
queue: 多生產多消費
我一開始使用queue 的時候經常會碰到一個異常. LockUp.Exit (forever 之類的) 好像是這個.
主要緣由是要麼在生產者要麼在消費者中必定有一個地方,沒讓協程退出. 因此在joinall 的時候會產生異常
queue.put / get 都是阻塞操做
from gevent.queue import Queue q = Queue(3) # 最多存放3個 def producer(): for i in range(20): print('->>>>>>>> producer put %d'%(i)) q.put(i) print('->' * 20 + ' producer done') def consumer(arg): while True: try: item = q.get(timeout=0.5) #設置了timeout ,用於過了0.5秒一旦queue爲空則拋異常.結束此循環 print('consumer %d get %d , queue:%d ' %(arg,item,q.qsize()) ) except Exception as e: break print('consumer %d done' % arg) pro_list = [gevent.spawn(producer) for i in range(5)] #多個生產者 con_list = [gevent.spawn(consumer,i) for i in range(3)] #多個消費者 con_list.extend(pro_list) gevent.joinall(con_list) print(q.empty())
一個失敗的例子: 用協程讀取文件 . 測試下來速度很慢:
import os from functools import partial EACH_SIZE = 1024 #每次讀1024 #eachpart : 每塊大小, pos : 從哪裏開始讀取 def pro_readfile(filepath,eachPart,pos): with open(filepath,'rt') as fd: fd.seek(pos) iterbale = iter(partial(fd.read,EACH_SIZE),'') for text in iterbale: print(text) path = 'D:/360極速瀏覽器下載/msdn.txt' co_size = 5 #協程數量 filesize = os.path.getsize(path) #文件大小 eachPart = int(filesize/5) +1 #每一個協程讀多少 be = time.clock() #開始時間 pro_list = [gevent.spawn(pro_readfile,path,eachPart, i*eachPart) for i in range(co_size)] gevent.joinall(pro_list) end = time.clock() #結束 print(end-be)
JoinableQueue:
q = JoinableQueue(50) def doing(arg): print('im doing %d' %arg) gevent.sleep(1) print('im done %d'%arg) q.task_done() def to_do(): while True: func , args= q.get() gevent.spawn(func,args) for i in range(5): gevent.spawn(to_do) for i in range(10): q.put((doing,i)) q.join()
最後介紹一下Pool , 會用Pool ,也就會用Group了 . 附:class Pool(Group)
import gevent.monkey; gevent.monkey.patch_all() #注意導入這個. 若是你的程序涉及了socket pool = Pool(10) #限制在10個協程 def read_from(url): r = requests.get(url) r.encoding = r.apparent_encoding print(r.url, r.headers) return r.status_code urls = ["https://www.qq.com","https://www.baidu.com","http://www.sina.com.cn"] pool.spawn(read_from,urls[0]) #產生一個協程 pool.spawn(read_from,urls[1]) #也能夠這樣 res = pool.imap(read_from, urls) #跟map 函數相似, 返回一個可迭代 for r in res: print(r) #或者 print('- ' * 50) for r in pool.imap_unordered(read_from,urls): #更好的選擇.哪一個先完成就返回 print(r) pool.join()