異步IO

參考: 同步/異步 阻塞/非阻塞 iterator / generator asyio/cortine asy_io
# -------------------------------->同步/異步 阻塞/非阻塞------------------------------------

#
# 在一個線程中,CPU執行代碼的速度極快,然而,一旦遇到IO操做,如讀寫文件、發送網絡數據時,就須要等待IO操做完成,才能繼續進行下一步操做。這種狀況稱爲同步IO。
#
# 使用多線程或者多進程來併發執行代碼,爲多個用戶服務。每一個用戶都會分配一個線程,若是遇到IO致使線程被掛起,其餘用戶的線程不受影響,可是系統不能無上限地增長線程。因爲系統切換線程的開銷也很大,一旦線程數量過多,CPU的時間就花在線程切換上了,結果致使性能嚴重降低。
#
# loop = get_event_loop()
# while True:
#     event = loop.get_event()
#     process_event(event)
# 當代碼須要執行一個耗時的IO操做時,它只發出IO指令,並不等待IO結果,而後就去執行其餘代碼了。一段時間後,當IO返回結果時,再通知CPU進行處理。
#
# 當遇到IO操做時,代碼只負責發出IO請求,不等待IO結果,而後直接結束本輪消息處理,進入下一輪消息處理過程。當IO操做完成後,將收到一條「IO完成」的消息,處理該消息時就能夠直接獲取IO操做結果。
#
# 在「發出IO請求」到收到「IO完成」的這段時間裏,同步IO模型下,主線程只能掛起,但異步IO模型下,主線程並無休息,而是在消息循環中繼續處理其餘消息。這樣,在異步IO模型下,一個線程就能夠同時處理多個IO請求,而且沒有切換線程的操做。
#
# 燒水:
#  1 老張把水壺放到火上,立等水開。(同步阻塞)
#  2 老張把水壺放到火上,去客廳看電視,時不時去廚房看看水開沒有。(同步非阻塞)
#  3 老張把水開了會響的水壺放到火上,立等水開。(異步阻塞)
#  4 老張把水開了會響的水壺放到火上,去客廳看電視,水壺響以前再也不去看它了,響了再去拿壺。(異步非阻塞)
#
#  所謂同步異步,只是對於水壺而言:普通水壺,同步,能沒有結束前,一直等結果;響水壺,異步,不須要知道該功能結果,該功能有結果後通知(回調通知)。 響水壺能夠在本身完工以後,提示老張水開了。 普通水壺同步只能讓調用者去輪詢本身。
#  所謂阻塞非阻塞,僅僅對於老張而言。 立等的老張,阻塞,(函數)沒有接收完數據或者沒有獲得結果以前,不會返回;看電視的老張,非阻塞,(函數)當即返回,經過select通知調用者。

# -------------------------------->COROUTINE-------------------------------------------

# 子程序,或者稱爲函數,在全部語言中都是層級調用,好比A調用B,B在執行過程當中又調用了C,C執行完畢返回,B執行完畢返回,最後是A執行完畢。
# 因此子程序調用是經過棧實現的,一個線程就是執行一個子程序。
# 子程序調用老是一個入口,一次返回,調用順序是明確的。而協程的調用和子程序不一樣。
# 協程看上去也是子程序,但執行過程當中,在子程序內部可中斷,而後轉而執行別的子程序,在適當的時候再返回來接着執行。有點相似CPU的中斷.

# 執行有點像多線程,但協程的特色在因而一個線程執行
# 協程極高的執行效率。子程序切換不是線程切換,而是由程序自身控制,沒有線程切換的開銷。
# 不須要多線程的鎖機制,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態。
# 協程是一個線程執行,利用多核CPU:多進程+協程,既充分利用多核,又充分發揮協程的高效率。

# yield 與yield
# from:
# yield list:返回整個列表,yield from list:一次返回一個元素。
# yield from:會自動處理大量錯誤。
# yield from後面必須是子生成器函數
#
# # 子生成器
# def ger_0():
#   loop:
#     x = yield
#     do
#     somthing
#   return result
#
# # 委託生成器
# def ger_1():
#     result = yield from ger_0()
#
# # 調用方
# g = ger_1()
# g.send(0) -->ger_0: x = 0
# g.send(1) -->ger_0: x = 1
# g.send(2) -->ger_0: x = 2
# ger_1: result = ger_0:result

# consumer函數是一個generator
def consumer():
    print('2---------')
    r = ''
    while True:
        print('3---------')
        # 經過yield拿到消息n處理,又經過yield把結果r傳回
        # 賦值語句先計算= 右邊,因爲右邊是 yield 語句,
        # 因此yield語句執行完之後,進入暫停,而賦值語句在下一次啓動生成器的時候首先被執行;
        # P:sned(None)->C:yield r=''->P:send(1)->C:n=1->C:yield r='200 OK'->P:send(2)
        n = yield r
        if not n:
            print('6---------')
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    print('1---------')
    # 啓動生成器
    # 在一個生成器函數未啓動以前,是不能傳遞值進去。
    # 也就是說在使用c.send(n)以前,必須先使用c.send(None)或者next(c)來返回生成器的第一個值
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('4---------')
        print('[PRODUCER] Producing %s...' % n)
        # 切換到consumer執行
        r = c.send(n)
        print('5---------')
        # 拿到consumer處理的結果,繼續生產下一條消息
        print('[PRODUCER] Consumer return: %s' % r)
    # 關閉consumer,整個過程結束。
    c.close()

#   1->2->3 ->  4->3->5  ->  4->3->5  ->   4->3->5
c = consumer()
produce(c)


# -------------------------------->ASYNCIO-------------------------------------------

# https://blog.csdn.net/SL_World/article/details/86597738
# asyncio的編程模型就是一個消息循環。咱們從asyncio模塊中直接獲取一個EventLoop的引用,而後把須要執行的協程扔到EventLoop中執行,就實現了異步IO。
# 異步操做須要在coroutine中經過yield from完成;
# 多個coroutine能夠封裝成一組Task而後併發執行。
import threading
import asyncio
# 把一個generator標記爲coroutine類型
@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    # yield from語法可讓咱們方便地調用另外一個generator
    # asyncio.sleep()也是一個coroutine,因此線程不會等待asyncio.sleep(),而是直接中斷並執行下一個消息循環。
    # 當asyncio.sleep()返回時,線程就能夠從yield from拿到返回值(此處是None),而後接着執行下一行語句。
    yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
# 兩個coroutine是由同一個線程併發執行的。
# 直到循環事件的全部事件都處理完才能完整結束。
loop.run_until_complete(asyncio.wait(tasks))
loop.close()


# 引入了新的語法async和await,可讓coroutine的代碼更簡潔易讀
# @asyncio.coroutine替換爲async;
# 把yield from替換爲await。

import threading
import asyncio

async def hello():
    print('Hello world! (%s)' % threading.currentThread())
    await asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()


# -------------------------------->調用方-子生成器-委託生成器<-------------------------------


import time
import asyncio

async def taskIO_1():
    print('開始運行IO任務1...')
    await asyncio.sleep(3)
    print('IO任務1已完成,耗時3s')
    return taskIO_1.__name__

async def taskIO_2():
    print('開始運行IO任務2...')
    await asyncio.sleep(2)
    print('IO任務2已完成,耗時2s')
    return taskIO_2.__name__

# 調用方
async def main():
    # 把全部任務添加到task中
    tasks = [taskIO_1(), taskIO_2()]

    # 返回已經完成的任務,完成一個返回一個
    for completed_task in asyncio.as_completed(tasks):
        # 子生成器
        resualt = await completed_task
        print('協程無序返回值:'+resualt)

    # # done:已經完成的任務,pending:未完成任務
    # # 等待任務所有完成才返回
    # done, pending = await asyncio.wait(tasks)
    # for r in done:
    #     print('協程無序返回值:' + r.result())

if __name__ == '__main__':
    start = time.time()
    # 建立一個事件循環對象loop
    loop = asyncio.get_event_loop()
    try:
        # 完成事件循環,直到最後一個任務結束
        loop.run_until_complete(main())
    finally:
        # 結束事件循環
        loop.close()
    print('全部IO任務總耗時%.5f秒' % float(time.time()-start))

# -------------------------------->AIOHTTP-------------------------------------------

# 把asyncio用在服務器端,例如Web服務器,因爲HTTP鏈接就是IO操做,所以能夠用單線程+coroutine實現多用戶的高併發支持。
# asyncio實現了TCP、UDP、SSL等協議

from aiohttp import web

routes = web.RouteTableDef()

@routes.get('/')
async def index(request):
    await asyncio.sleep(2)
    return web.json_response({
        'name': 'index'
    })

@routes.get('/about')
async def about(request):
    await asyncio.sleep(0.5)
    return web.Response(text="<h1>about us</h1>")

def init():
    app = web.Application()
    app.add_routes(routes)
    web.run_app(app)

init()
相關文章
相關標籤/搜索