Python併發實踐_02_經過yield實現協程

python中實現併發的方式有不少種,經過多進程併發能夠真正利用多核資源,而多線程併發則實現了進程內資源的共享,然而Python中因爲GIL的存在,多線程是沒有辦法真正實現多核資源的。python

對於計算密集型程序,應該使用多進程併發充分利用多核資源,而在IO密集型程序中,多核優點並不明顯,甚至因爲大多數時間都是在IO堵塞狀態,多進程的切換消耗反而讓程序效率更加低下。多線程

而當須要併發處理IO密集型任務時,就須要用到協程(Coroutine)。協程並無系統級的調度,而是用戶級的調度方式,避免了系統調用的開銷,雖然協程最終是串行工做,可是卻能夠實現很是大的併發量。經過多進程+協程的方式,能夠有效均衡多核計算和請求等待。併發

參考文章:函數

https://blog.tonyseek.com/post/event-manage-with-greenlet/post

producer-consumer

利用yield生成器,能夠簡單展示協程的工做方式:spa

import time
def consumer():
    print "Ready to receive"
    while True:
        y = (yield )
        time.sleep(1)
        print "Receive %s from producer」%y
def producer():
    c = consumer()
    c.next()
    i = 1
    while i > 0 and i < 11:
        time.sleep(1)
        print "Send %s to consumer"%i
        c.send(i)
        i += 1
if __name__ == '__main__':
    producer()線程

上述過程展現了基本的生產者-消費者模型,消費者consumer是一個生成器;code

當第一次在producer中調用c.next()時,激活consumer,而且運行到yield時協程(consumer)被掛起,等待生成器被調用next或者send。協程

producer進行後續操做,並進入一個循環,每次暫停1s後,向生成器send一個消息,消費者yield獲取到該消息,並進行後續的工做。blog

能夠看到,每次yield都須要等待send傳入的消息以後纔會繼續執行以後的任務。

經過yield實現協程

如今要來用yield真正建立一個協程了。

能夠想象這樣一個模型,一個工地裏有不少類似的任務(jobs),而且會源源不斷產生這些任務,工地裏有一個工頭(foreman)負責,工頭爲了分配任務給工人(worker),會制定一套流程(pipeline)來方便管理:分配工人,驗收工做(accept),因爲工人工做(work)的時間遠遠大於分配任務的時間,將這些工人的工做(簡單枯燥的重複勞動)當作IO操做的話,這就是一個IO密集型的任務。下面看看python是如何經過yield來實現協程完成真個工做的:

 

 1 def main():
 2     foreman(args_of_overall,worker_num)
 3 
 4 def foreman(args_of_overall,worker_num):
 5     pipeline = create_pipeline(args_of_pipeline,worker_num)
 6     for i,job in enumerate(get_jobs(args_of_ceate_jobs)):
 7         worker_id  = i % worker_num
 8         pipeline.send((job,worker_id))
 9 
10 @coroutine
11 def worker(pipeline,accepting,job,my_id):
12     while True:
13         args_of_job, worker_id = (yield )
14         if worker_id == my_id:
15             result = work(args_of_job)
16             accepting.send(result)
17         elif pipeline is not None:
18             pipeline.send((job,worker_id))
19 
20 @coroutine
21 def accept():
22     while True:
23         result = (yield )
24         #do_some_accepting
25 
26 def create_pipeline(args_of_pipeline,worker_num):
27     pipeline = None
28     accepting = accept()
29     for work_id in range(work_num):
30         pipeline = worker(pipeline,accepting,job,work_id)
31     return pipeline
32 
33 def get_jobs(args_of_ceate_jobs):
34     for job in job_source:
35         yield job
36 
37 def coroutine(func):
38     def warper(*args):
39         f = func(*args)
40         f.next()
41         return f
42     return warper
43 
44 def work(args_of_job):
45     pass
46     #do_some_work
47 
48 if __name__ == '__main__':
49     main()

 上述過程當中,工人和驗收工做都是協程,而get_jobs()函數是一個生成器,當job是動態添加時,就能夠改寫成一個協程。

上述全部的工做都是串行完成,雖然有不少工人,工人之間的工做是併發的(IO等待時間),可是工做一直是從第一個開始一個一個分配任務。

相關文章
相關標籤/搜索