Python 協程

迭代器

  • 可迭代 (Iterable):直接做用於for循環變量
  • 迭代器 (Iterator):直接做用於for循環變量,而且能夠用next調用
  • 鑑別,可用isinstancle()

生成器

  • 不用for,佔內存小,一邊循環一邊計算——時間換空間python

  • next函數調用,到最後一個,報StopIteration異常web

  • 生成:app

    1. 直接使用
    g = (x * x for x in range(10)) # 中括號是列表生成器,小括號是生成器
    1. 函數包含yield,則叫生成器,next調用函數,遇yield返回
    def odd():
        print('step 1')
        yield 1
        print('step 2')
        yield(3)
        print('step 3')
        yield(5)
    
    
    g = odd()
    one = next(g)
    print(one)
    two = next(g)
    print(two)
    three = next(g)
    print(three)

    Step 1
    1
    Step 2
    2
    Step 3
    3異步

    注意:此時g是一個generator, next調用從上次yield後開始async

    1. for循環調用生成器
    def fib(max):
        n, a, b = 0, 0, 1
        while n < max:
            yield b
            a, b = b, a+b
            n += 1
        return 'Done'
    
    
    g = fib(5)
    
    for i in g:
        print(i)

    1
    1
    2
    3
    5函數

協程

  • 定義:爲非搶佔式多任務產生子程序,能夠暫停執行——像generator同樣oop

  • 關鍵字:yieldsendurl

    def simple_coroutine(a):
        print('-> start')
    
        b = yield a
        print('-> recived', a, b)
    
        c = yield a + b
        print('-> recived', a, b, c)
    
    
    # runc
    sc = simple_coroutine(5)
    
    aa = next(sc)  # 預激
    print(aa)
    bb = sc.send(6)  # 5, 6
    print(bb)
    cc = sc.send(7)  # 5, 6, 7
    print(cc)

    -> start
    5
    -> recived 5 6
    11
    -> recived 5 6 7線程

    分析:動腦子code

  • 協程終止:向上冒泡,發送哨符值,讓協程退出

  • yield from:至關於加一個通道(協程與主線程間)

    def gen():
        for c in 'AB':
            yield c
    
    
    print(list(gen()))
    
    
    def gen_new():
        yield from 'AB'
    
    
    print(list(gen_new()))
  • 委派生成器:包含yield from的生成器函數

    from collections import namedtuple
    
    ResClass = namedtuple('Res', 'count average')
    
    
    # 子生成器
    def averager():
        total = 0.0
        count = 0
        average = None
    
        while True:
            term = yield
            if term is None:
                break
            total += term
            count += 1
            average = total / count
    
        return ResClass(count, average)
    
    
    # 委派生成器
    def grouper(storages, key):
        while True:
            # 獲取averager()返回的值
            storages[key] = yield from averager()
    
    
    # 客戶端代碼
    def client():
        process_data = {
            'boys_2': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
            'boys_1': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46]
        }
    
        storages = {}
        for k, v in process_data.items():
            # 得到協程
            coroutine = grouper(storages, k)
    
            # 預激協程
            next(coroutine)
    
            # 發送數據到協程
            for dt in v:
                coroutine.send(dt)
    
            # 終止協程
            coroutine.send(None)
        print(storages)
    
    # run
    client()

    {'boys_2': Res(count=9, average=40.422222222222224), 'boys_1': Res(count=9, average=1.3888888888888888)}

    解釋:

    1. client()函數開始,for k, v 循環內,每次建立一個新的grouper實例coroutine
    2. next(coroutine)預激協程,進入while True循環,調用averager(),yield from處暫停
    3. 內層for dt in v 結束後,grouper實例仍暫停,因此storages[key]的賦值還未完成
    4. coroutine.send(None)後,term變爲None,averager子生成器停止,拋出StopIteration,並將返回的數據包含在異常對象的value中,yield from 直接抓取 StopItration ,將異常對象的 value 賦值給 storages[key]

asyncio

  • 步驟:建立消息循環(解決異步IO,有中轉:至關於信箱,消息queue)-> 導入協程-> 關閉

    import threading
    import asyncio
    
    
    @asyncio.coroutine
    def hello():
        print('Hello world! (%s)' % threading.currentThread())
        print('Starting......(%s)' % threading.currentThread())
        yield from asyncio.sleep(3)
        print('Done......(%s)' % threading.currentThread())
        print('Hello again! (%s)' % threading.currentThread())
    
    
    loop = asyncio.get_event_loop()
    tasks = [hello(), hello()]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

async & await

  • 更簡潔,不用裝飾器

    import threading
    import asyncio
    
    
    async def hello():
        print('Hello world! (%s)' % threading.currentThread())
        print('Starting......(%s)' % threading.currentThread())
        await asyncio.sleep(3)
        print('Done......(%s)' % threading.currentThread())
        print('Hello again! (%s)' % threading.currentThread())
    
    
    loop = asyncio.get_event_loop()
    tasks = [hello(), hello()]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

aiohttp

  • 介紹:

    • 用asyncio和coroutine配合——http是io操做
  • 例:

    import asyncio
    from aiohttp import web
    
    
    async def index(request):
        await asyncio.sleep(0.5)
        return web.Response(body=b'<h1>Index</h1>')
    
    
    async def hello(request):
        await asyncio.sleep(0.5)
        text = '<h1>hello, %s!</h1>' % request.match_info['name']
        return web.Response(body=text.encode('utf-8'))
    
    
    async def init(loop):
        app = web.Application(loop=loop)
        app.router.add_route('GET', '/', index)
        app.router.add_route('GET', '/hello/{name}', hello)
        srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
        print('Server started at http://127.0.0.1:8000...')
        return srv
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init(loop))
    loop.run_forever()

    注:查+理解

concurrent.futures

  • 相似線程池

  • 用multiprocessing實現真正並行計算——運行多個解釋器

  • concurrent.furtures.Executor

    • ThreadPoolExecutor
    • ProcessPoolExecutor
  • 例子:

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    def return_future(msg):
        time.sleep(3)
        return msg
    
    
    # 建立一個線程池
    pool = ThreadPoolExecutor(max_workers=2)
    
    # 往線程池加入2個task
    f1 = pool.submit(return_future, 'hello')
    f2 = pool.submit(return_future, 'world')
    
    print(f1.done())
    time.sleep(3)
    print(f2.done())
    
    print(f1.result())
    print(f2.result())
  • map(fn, *iterables, timeout=None):

    map和submit用一個就行

    import time
    import re
    import os
    import datetime
    from concurrent import futures
    
    data = ['1', '2']
    
    
    def wait_on(argument):
        print(argument)
        time.sleep(2)
        return "ok"
    
    
    ex = futures.ThreadPoolExecutor(max_workers=2)
    for i in ex.map(wait_on, data):
        print(i)
  • Future

    • future實例由Executor.submit建立
    from concurrent.futures import ThreadPoolExecutor as Pool
    from concurrent.futures import as_completed
    import requests
    
    URLS = ['http://qq.com', 'http://sina.com', 'http://www.baidu.com', ]
    
    
    def task(url, timeout=10):
        return requests.get(url, timeout=timeout)
    
    
    with Pool(max_workers=3) as executor:
        future_tasks = [executor.submit(task, url) for url in URLS]
    
        for f in future_tasks:
            if f.running():
                print('%s is running' % str(f))
    
        for f in as_completed(future_tasks):
            try:
                ret = f.done()
                if ret:
                    f_ret = f.result()
                    print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
            except Exception as e:
                f.cancel()
                print(str(e))
相關文章
相關標籤/搜索