gevent gevent.queue gevent讀取文件

gevent 一種異步的方式,基於事件循環..   跟 asyncio 裏的東西運做的差很少windows

官方手冊說的太不清楚 . 瀏覽器

本身寫了個入門教程.app

一個最簡單的例子:dom

spawn 將把你的函數封裝成一個個協程對象異步

# 注意. gevent.sleep 不是 time.sleep . 下一個例子說明
def fuck1(arg):
    print('我在這: ',fuck1.__code__.co_firstlineno)
    gevent.sleep(1)                                       
    return arg


g1 = gevent.spawn(fuck1, 123)  #產生一個GreenLet 協程 .
print(g1 , type(g1))            #看看是殺
g1.join()       #等待咯



# 與上面一種徹底同樣的方式 .  gevent.spawn 至關於建立一個GreenLet ,而後start()
g2 = gevent.Greenlet(fuck1,456)
g2.start()                    #啓動協成
print(g2, type(g2))
g2.join()

下面的例子中. 我再也不使用gevent.Greenlet 來本身建立了,比較麻煩.直接spawn了 。socket

下面的例子裏,我也再也不使用繼承的Greenlet啦.
若是對於GreenLet須要,也能夠本身繼承Greenlet . 重寫 _run (有個下劃線) 函數 便可:async

class fuckme( gevent.Greenlet):
    def __init__(self ,*args):
        gevent.Greenlet.__init__(self)

        #本身能夠弄點屬性啥的。我就不弄了

    def _run(self): #主要是這個  run前面有個線
        i = 0
        while i <3 :
            print(' 我是弱智')
            i+=1
            gevent.sleep(0.3)

g = fuckme()
g.start()
g.join()

用gevent.joinall 等待多個協成對象:函數

def fuck1(arg):
    print('我在這: ',fuck1.__code__.co_firstlineno)
    gevent.sleep(1)                                       #你會發現2個函數幾乎同時睡眠. 再也不像time.sleep
    return arg



# 將返回一個 list . 裏面存放一個個GreenLet , 使用 value 獲取返回值
res  = gevent.joinall([
                    gevent.spawn(fuck1 , 123) ,  #產生一個GreenLet 協成
                    gevent.spawn(fuck1 , 456) ,
                ])

print(res ,  type(res))

for v in res:
    print(v.value)
    
#修改一下,更明顯

def fuck1(arg):
    print('參數 < %s > 我在這: '%arg,fuck1.__code__.co_firstlineno)
    gevent.sleep(1)
    print('參數 < %s > 我醒來了 我在這: '%arg, fuck1.__code__.co_firstlineno)
    return arg


cor_list = [gevent.spawn(fuck1  ,  arg )  for arg in range(5)]
res  = gevent.joinall(cor_list)

再來一些例子:測試

交互的運行着.fetch

def fuck1(arg):
    print('參數 < %s > 我在這: '%arg,fuck1.__code__.co_firstlineno)
    gevent.sleep(1)
    print('參數 < %s > 我醒來了 我在這: '%arg, fuck1.__code__.co_firstlineno)
    return arg

def fuck2( arg ):
    print(fuck2.__code__.co_name , fuck2.__code__.co_firstlineno)
    gevent.sleep(1)
    print(fuck2.__code__.co_name + " done")
    return arg


cor_list = [gevent.spawn(fuck1  ,  arg )  for arg in range(3)]
cor_list1 = [gevent.spawn(fuck2 , arg) for arg in range(3)]
cor_list.extend(cor_list1)
gevent.joinall(cor_list)

看一下同步 和異步的比較:

#用與測試的函數
def job(arg):
    import random
    print(' job start :' ,arg)
    gevent.sleep(random.randint(0,5) * 0.5)        #這個時間能夠本身修改看看
    print(' im done')


def sync():
    for i in range(3):
        job(i)


def async():
    alist = [ gevent.spawn(job , arg) for arg in range(3)]
    gevent.joinall(alist)



print("先來同步:")
sync()

print('再來異步:')
async()

還有一些類死於線程的同步對象 . event啦, semaphore啦 ,queue啦. 這些都用於協程之間交互的, 畢竟單線程

event:

# 我看了下, 在windows中是個 CreateEvent 的手動事件 ,即一旦 set , 全部wait的將所有繼續運行.
# 附註: windows中有2個事件,一個自動一個手動. 自動的在 WaitForSingleObject後將原子的ResetEvent, 手動的不會.
# 至關於 py中的 Event.wait, Event.clear
# 那個啥, 這行別看了.py中沒那麼麻煩

from gevent.event import Event
ev = Event()

def request():
    print(' fetching pages' * 10)
    gevent.sleep(2)
    print(' fetching done' * 10)
    ev.set()                    #全部wait的將被所有激活

def response():
    print('response 已啓動')
    ev.wait()               #等待 ev.set 後將運行
    print('response 完成')

res_list = [gevent.spawn(response) for i in range(5)] #先建立了5個,他們運行到 ev.wait的時候將所有等待

res_list.append(gevent.spawn_later(2, request))     #這裏用了 spawn_later .能夠預約幾秒後 開始運行

gevent.joinall(res_list)

queue: 多生產多消費
我一開始使用queue 的時候經常會碰到一個異常. LockUp.Exit (forever 之類的) 好像是這個.
主要緣由是要麼在生產者要麼在消費者中必定有一個地方,沒讓協程退出. 因此在joinall 的時候會產生異常
queue.put / get 都是阻塞操做

from gevent.queue import Queue
q = Queue(3)  # 最多存放3個
def producer():
    for i in range(20):
        print('->>>>>>>> producer put %d'%(i))
        q.put(i)

    print('->' * 20 + ' producer done')



def consumer(arg):
    while True:
        try:
            item = q.get(timeout=0.5)    #設置了timeout ,用於過了0.5秒一旦queue爲空則拋異常.結束此循環
            print('consumer %d get %d , queue:%d ' %(arg,item,q.qsize()) )
        except Exception as e:
            break

    print('consumer %d done' % arg)

pro_list = [gevent.spawn(producer) for i in range(5)]  #多個生產者
con_list = [gevent.spawn(consumer,i) for i in range(3)] #多個消費者
con_list.extend(pro_list)
gevent.joinall(con_list)
print(q.empty())

一個失敗的例子: 用協程讀取文件 . 測試下來速度很慢:

import os
from functools import partial

EACH_SIZE = 1024    #每次讀1024

#eachpart : 每塊大小, pos : 從哪裏開始讀取
def pro_readfile(filepath,eachPart,pos):
    with open(filepath,'rt') as fd:
        fd.seek(pos)
        iterbale = iter(partial(fd.read,EACH_SIZE),'')
        for text in iterbale:
            print(text)


path = 'D:/360極速瀏覽器下載/msdn.txt'
co_size = 5                #協程數量
filesize = os.path.getsize(path)    #文件大小
eachPart = int(filesize/5) +1       #每一個協程讀多少

be = time.clock()        #開始時間
pro_list = [gevent.spawn(pro_readfile,path,eachPart, i*eachPart) for i in range(co_size)]    
gevent.joinall(pro_list)
end = time.clock()    #結束
print(end-be)

JoinableQueue:

q = JoinableQueue(50)
def doing(arg):
    print('im doing %d' %arg)
    gevent.sleep(1)
    print('im done %d'%arg)
    q.task_done()

def to_do():
    while True:
        func , args= q.get()
        gevent.spawn(func,args)


for i in range(5):
    gevent.spawn(to_do)

for i in range(10):
    q.put((doing,i))


q.join()

最後介紹一下Pool , 會用Pool ,也就會用Group了 . 附:class Pool(Group)

import gevent.monkey; gevent.monkey.patch_all()  #注意導入這個. 若是你的程序涉及了socket


pool = Pool(10) #限制在10個協程
def read_from(url):
    r = requests.get(url)
    r.encoding = r.apparent_encoding
    print(r.url, r.headers)

    return r.status_code
urls = ["https://www.qq.com","https://www.baidu.com","http://www.sina.com.cn"]

pool.spawn(read_from,urls[0])  #產生一個協程
pool.spawn(read_from,urls[1])

#也能夠這樣

res = pool.imap(read_from, urls)        #跟map 函數相似, 返回一個可迭代
for r in res:
    print(r)

#或者
print('- ' * 50)
for r in pool.imap_unordered(read_from,urls):   #更好的選擇.哪一個先完成就返回
    print(r)

pool.join()
相關文章
相關標籤/搜索