協調生產者消費者協程.html
from tornado import gen from tornado.ioloop import IOLoop from tornado.queues import Queue q = Queue(maxsize=2) @gen.coroutine def consumer(): while True: item = yield q.get() try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done() @gen.coroutine def producer(): for item in range(5): yield q.put(item) print('Put %s' % item) @gen.coroutine def main(): # Start consumer without waiting (since it never finishes). IOLoop.current().spawn_callback(consumer) yield producer() # Wait for producer to put all tasks. yield q.join() # Wait for consumer to finish all tasks. print('Done') IOLoop.current().run_sync(main) # Put 0 # Put 1 # Doing work on 0 # Put 2 # Doing work on 1 # Put 3 # Doing work on 2 # Put 4 # Doing work on 3 # Doing work on 4 # Done
在Python 3.5, Queue
實現了異步迭代器協議, 因此 consumer()
能夠被重寫爲:python
async def consumer(): async for item in q: try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done()
在 4.3 版更改: 爲Python 3.5添加 async for
支持 in Python 3.5.異步
maxsize
隊列中容許的最大項目數.async
qsize
()[源代碼]
當前隊列中的項目數.tornado
put
(item, timeout=None)[源代碼]
將一個項目放入隊列中, 可能須要等待直到隊列中有空間.oop
返回一個Future對象, 若是超時會拋出 tornado.gen.TimeoutError
.spa
get
(timeout=None)[源代碼]
從隊列中刪除並返回一個項目.htm
返回一個Future對象, 當項目可用時resolve, 或者在超時後拋出 tornado.gen.TimeoutError
.
get_nowait
()[源代碼]
非阻塞的從隊列中刪除並返回一個項目.
若是有項目是當即可用的則返回該項目, 不然拋出 QueueEmpty
.
task_done
()[源代碼]
代表前面排隊的任務已經完成.
被消費者隊列使用. 每一個 get
用來獲取一個任務, 隨後(subsequent) 調用 task_done
告訴隊列正在處理的任務已經完成.
若是 join
正在阻塞, 它會在全部項目都被處理完後調起; 即當每一個 put
都被一個 task_done
匹配.
若是調用次數超過 put
將會拋出 ValueError
.
join
(timeout=None)[源代碼]
阻塞(block)直到隊列中的全部項目都處理完.
返回一個Future對象, 超時後會拋出 tornado.gen.TimeoutError
異常.
QueueEmpty
tornado.queues.
QueueEmpty
[源代碼]
當隊列中沒有項目時, 由 Queue.get_nowait
拋出.
QueueFull
tornado.queues.
QueueFull
[源代碼]
當隊列爲最大size時, 由 Queue.put_nowait
拋出.