asyncio併發編程

asyncio 是幹什麼的?html

  • 異步網絡操做
  • 併發
  • 協程

python3.0時代,標準庫裏的異步網絡模塊:select(很是底層) python3.0時代,第三方異步網絡庫:Tornado python3.4時代,asyncio:支持TCP,子進程python

如今的asyncio,有了不少的模塊已經在支持:aiohttp,aiodns,aioredis等等 https://github.com/aio-libs 這裏列出了已經支持的內容,並在持續更新mysql

固然到目前爲止實現協程的不只僅只有asyncio,tornado和gevent都實現了相似功能nginx

關於asyncio的一些關鍵字的說明:git

  • event_loop 事件循環:程序開啓一個無限循環,把一些函數註冊到事件循環上,當知足事件發生的時候,調用相應的協程函數github

  • coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會當即執行函數,而是會返回一個協程對象。協程對象須要註冊到事件循環,由事件循環調用。web

  • task 任務:一個協程對象就是一個原生能夠掛起的函數,任務則是對協程進一步封裝,其中包含了任務的各類狀態redis

  • future: 表明未來執行或沒有執行的任務的結果。它和task上沒有本質上的區別sql

  • async/await 關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。django

看了上面這些關鍵字,你可能扭頭就走了,其實一開始瞭解和研究asyncio這個模塊有種抵觸,本身也不知道爲啥,這也致使很長一段時間,這個模塊本身也基本就沒有關注和使用,可是隨着工做上用python遇到各類性能問題的時候,本身告訴本身仍是要好好學習學習這個模塊。

 一律述:

  一、事件循環+回調(驅動生成器)+epoll(IO多路複用)

  二、asyncio是Python用於解決異步io編程的一套解決方案

  三、基於異步io實現的庫(或框架)tornado、gevent、twisted(scrapy,django、channels)

  四、torando(實現web服務器),django+flask(uwsgi,gunicorn+nginx)

  五、tornado能夠直接部署,nginx+tornado

2、事件循環

案例一: 

#使用asyncio
import asyncio
import time

async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    print("end get url")

if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # 執行單個協程
    # loop.run_until_complete(get_html("http://www.imooc.com"))
    # 批量執行任務
    # 建立任務列表
    tasks = [get_html("http://www.imooc.com") for i in range(10)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    print("執行事件:{}".format(time.time() - start_time))

一、asyncio.ensure_future()等價於loop.create_task

二、taskfuture的一個子類

三、一個線程只有一個event loop

四、asyncio.ensure_future()雖然沒有傳loop可是源碼裏作了get_event_loop()操做從而實現了與loop的關聯,會將任務註冊到任務隊列裏

import asyncio
import time
from functools import partial

async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    print("end get url")
    return "bobby"

# 【注意】傳參url必須放在前面(第一個形參)
def callback(url,future):
    print("執行完任務後執行;url={}".format(url))

if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # 獲取future,若是是單個task或者future則直接做爲參數,若是是列表,則須要加asyncio.wait

    task = asyncio.ensure_future(get_html("http://www.imooc.com"))
    # task = loop.create_task(get_html("http://www.imooc.com"))

    # 執行完task後再執行的回調函數
    # task.add_done_callback(callback)

    # 傳遞迴調函數參數
    task.add_done_callback(partial(callback,"http://www.imooc.com"))

    loop.run_until_complete(task)
    print("執行事件:{}".format(time.time() - start_time))
    print(task.result())
    loop.close()

五、waitgather的區別:

  a)wait是等待全部任務執行完成後纔會執行下面的代碼【loop.run_until_complete(asyncio.wait(tasks))

  b)gather更加高層(height-level)

    一、能夠分組

  

#使用asyncio
import asyncio
import time

async def get_html(url):
    print("start get url={}".format(url))
    await asyncio.sleep(2)
    print("end get url")

if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # 執行單個協程
    # loop.run_until_complete(get_html("http://www.imooc.com"))

    # 批量執行任務,建立任務列表
    tasks = [get_html("http://www.imooc.com") for i in range(10)]
    # loop.run_until_complete(asyncio.wait(tasks))

    # gather實現跟wait同樣的功能,可是切記前面有*
    # loop.run_until_complete(asyncio.gather(*tasks))

    # 分組實現
    # 第一種實現
    # group1 = [get_html("http://www.projectedu.com") for i in range(2)]
    # group2 = [get_html("http://www.imooc.com") for i in range(2)]
    # loop.run_until_complete(asyncio.gather(*group1,*group2))

    # 第二種實現
    group1 = [get_html("http://www.projectedu.com") for i in range(2)]
    group2 = [get_html("http://www.imooc.com") for i in range(2)]
    group1 = asyncio.gather(*group1)
    group2 = asyncio.gather(*group2)

    # 任務取消
    # group2.cancel()
    
    loop.run_until_complete(asyncio.gather(group1,group2))

    loop.close()
    print("執行事件:{}".format(time.time() - start_time))

 六、loop.run_forever()

# 1. loop會被放在future中
# 2. 取消future(task)
# 
import asyncio
import time

async def get_html(sleep_times):
    print("waiting")
    await asyncio.sleep(sleep_times)
    print("done after {}s".format(sleep_times))

if __name__ == "__main__":
    task1 = get_html(2)
    task2 = get_html(3)
    task3 = get_html(3)

    tasks = [task1,task2,task3]

    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        all_tasks = asyncio.Task.all_tasks()
        for task in all_tasks:
            print("cancel task")
            print(task.cancel())
        loop.stop()
        # 若是去掉這句則會拋異常
        loop.run_forever()
    finally:
        loop.close()

七、協程裏調用協程:

import asyncio

async def compute(x, y):
    print("Compute %s + %s..." %(x, y))
    await asyncio.sleep(1.0)
    return x+y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

  

八、call_soon,call_at,call_later,call_soon_threadsafe 

 

import asyncio
import time

def callback(sleep_times):
    # time.sleep(sleep_times)
    print("sleep {} success".format(sleep_times))

# 中止掉當前的loop
def stoploop(loop):
    loop.stop()

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    # 在任務隊列中便可執行

    # 第一個參數是幾秒鐘執行函數,第二參數爲函數名,第三參數是是實參
    # call_later內部也是調用call_at方法
    # loop.call_later(2, callback, 2)
    # loop.call_later(1, callback, 1)
    # loop.call_later(3, callback, 3)

    # call_at 第一個參數是loop裏的當前時間+隔多少秒執行,並非系統時間
    now = loop.time()
    print(now)
    loop.call_at(now+2, callback, 2)
    loop.call_at(now+1, callback, 1)
    loop.call_at(now+3, callback, 3)

    # call_soon比call_later先執行
    loop.call_soon(callback, 4)

    # loop.call_soon(stoploop, loop)
    # 由於不是協程,全部不能使用loop.run_until_complete(),因此使用run_forever,一直執行隊列裏的任務
    loop.run_forever()

 

九、經過ThreadPoolExecutor(線程池)方式轉換成協程方式來調用阻塞方式【跟單獨利用線程池執行差很少,沒有提升多少的效率】

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File  : thread_asyncio.py
# @Author: Liugp
# @Date  : 2019/6/8
# @Desc  :
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparse

def get_url(url):
    # 經過socke請求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    # 創建socket連接
    client =socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    # client.setblocking(False)
    client.connect((host,80)) # 阻塞不會消耗CPU

    # 不停的詢問連接是否創建好,須要while循環不停的去檢查狀態
    # 作計算任務或者再次發起其餘的鏈接請求

    client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path,host).encode('utf8'))

    data = b""
    while True:
        d = client.recv(1024)
        if d:
            data += d
        else:
            break

    data = data.decode('utf8')
    # print(data)
    html_data = data.split("\r\n\r\n")[1]
    print(html_data)
    client.close()


if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # 線程池
    executor = ThreadPoolExecutor()
    tasks = []
    for url in range(20):
        url = "http://shop.projectsedu.com/goods/{}/".format(url)
        # 把線程裏的future包裝成協程裏的future,因此才能使用協程的方式實現
        task = loop.run_in_executor(executor,get_url,url)
        tasks.append(task)

    loop.run_until_complete(asyncio.wait(tasks))
    print("last time:{}".format(time.time()-start_time))

十、asyncio模擬http請求:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File  : asyncio_http.py
# @Author: Liugp
# @Date  : 2019/6/8
# @Desc  :
#asyncio 沒有提供http協議的接口,只是提供了更底層的TCP,UDP接口;可是可使用aiohttp

import time
import asyncio
from urllib.parse import urlparse

async def get_url(url):
    # 經過socke請求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    reader,writer = await asyncio.open_connection(host,80)
    writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path,host).encode('utf8'))
    all_lines = []
    async for raw_line in reader:
        data = raw_line.decode("utf8")
        all_lines.append(data)

    html = "\n".join(all_lines)
    return html

async def main():
    tasks = []
    for url in range(20):
        url = "http://shop.projectsedu.com/goods/{}/".format(url)
        tasks.append(asyncio.ensure_future(get_url(url)))
    for task in asyncio.as_completed(tasks):
        result = await task
        print(result.split("\r\n\n")[10])

if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print("last time:{}".format(time.time()-start_time))

十一、futuretask

  a)task會啓動一個協程,會調用send(None)或者next()

  b)task是future的子類

  c)協程裏的future更線程池裏的future差很少;可是協程裏是有區別的,就是會調用call_soon(),由於協程是單線程的,只是把callback放到loop隊列裏執行的,而線程則是直接執行代碼

  d)task是future和協程的橋樑

  e)task還有就是等到拋出StopInteration時將value設置到result裏面來【self.set_result(exc.value)

十二、asyncio同步與通訊:

# 若是沒有await操做會順序執行,也就是說,一個任務執行完後纔會執行下一個,可是不是按task順序執行的,順序不定
import asyncio
import time

total = 0

async def add():
    global total
    for i in range(5):
        print("執行add:{}".format(i))
        total += 1

async def desc():
    global total
    for i in range(5):
        print("執行desc:{}".format(i))
        total -= 1

async def desc2():
    global total
    for i in range(5):
        print("執行desc2:{}".format(i))
        total -= 1


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    tasks = [desc(),add(),desc2()]
    loop.run_until_complete(asyncio.wait(tasks))
    print("最後結果:{}".format(total))

# 執行結果以下
"""
執行add:0
執行add:1
執行add:2
執行add:3
執行add:4
執行desc:0
執行desc:1
執行desc:2
執行desc:3
執行desc:4
執行desc2:0
執行desc2:1
執行desc2:2
執行desc2:3
執行desc2:4
最後結果:-5
"""

  a)asyncio鎖機制(from asyncio import Lock

import asyncio
from asyncio import Lock,Queue
import aiohttp

cache = {}
lock = Lock()
queue = Queue()

async def get_stuff(url="http://www.baidu.com"):
    # await lock.acquire()
    # with await lock:
    # 利用鎖機制達到同步的機制,防止重複發請求
    async with lock:
        if url in cache:
            return cache[url]
        stuff = await aiohttp.request('GET',url)
        cache[url] = stuff
        return stuff

async def parse_stuff():
    stuff = await get_stuff()

async def use_stuff():
    stuff = await get_stuff()

tasks = [parse_stuff(),use_stuff()]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

 1三、不一樣線程的事件循環

不少時候,咱們的事件循環用於註冊協程,而有的協程須要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程建立一個事件循環,而後在新建一個線程,在新線程中啓動事件循環。當前線程不會被block

import asyncio
from threading import Thread
import time

now = lambda :time.time()

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)

1四、aiohttp實現高併發編程:

import asyncio
import re

import aiohttp
import aiomysql
from pyquery import PyQuery

stopping = False
start_url = "http://www.jobbole.com/"
waitting_urls = []
seen_urls = set()

# 控制併發數
sem = asyncio.Semaphore(1)

async def fetch(url, session):
    async with sem:
        try:
            async with session.get(url) as resp:
                print("url status:{}".format(resp.status))
                if resp.status in [200,201]:
                    data = await resp.text()
                    return data
        except Exception as e:
            print(e)

def extract_urls(html):
    urls = []
    pq = PyQuery(html)
    for link in pq.items("a"):
        url = link.attr("href")
        if url and url.startswith("http") and url not in seen_urls:
            urls.append(url)
            waitting_urls.append(url)
    return urls

async def init_urls(url, session):
    html = await fetch(url,session)
    seen_urls.add(url)
    extract_urls(html)

async def article_handler(url, session, pool):
    # 獲取文章詳情並解析入庫
    html = await fetch(url, session)
    seen_urls.add(url)
    extract_urls(html)
    pq = PyQuery(html)
    title = pq("title").text()
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            insert_sql = "insert into article_test (title) values ('{}')".format(title)
            await cur.execute(insert_sql)

async def consumer(pool):
    async with aiohttp.ClientSession() as session:
        while not stopping:
            if 0 == len(waitting_urls):
                await asyncio.sleep(0.5)
                continue

            url = waitting_urls.pop()
            print("start get url:{}".format(url))
            if re.match("http://.*?jobbole.com/\d+/", url):
                if url not in seen_urls:
                    asyncio.ensure_future(article_handler(url,session,pool))
                else:
                    if url not in seen_urls:
                        asyncio.ensure_future(init_urls(url,session))

async def main(loop):
    # 等待mysql鏈接創建好
    # 注意charset最好設置,要否則有中文時可能會不添加數據,還有autocommit也最好設置True
    pool = await aiomysql.create_pool(host='127.0.0.1',port=3306,
                                      user='root',password='',
                                      db='aiomysql_test',loop=loop,
                                      charset="utf8",autocommit=True
                                      )

    async with aiohttp.ClientSession() as session:
        html = await fetch(start_url, session)
        seen_urls.add(start_url)
        seen_urls.add(html)

    asyncio.ensure_future(consumer(pool))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(main(loop))
    loop.run_forever()

 1五、aiohttp + 優先級隊列的使用

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File  : aiohttp_queue.py
# @Author: Liugp
# @Date  : 2019/7/4
# @Desc  :
import asyncio
import random
import aiohttp


NUMBERS = random.sample(range(100), 7)
URL = 'http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(3)


async def fetch_async(a):
    async with aiohttp.request('GET', URL.format(a)) as r:
        data = await r.json()
    return data['args']['a']


async def collect_result(a):
    with (await sema):
        return await fetch_async(a)

async def produce(queue):
    for num in NUMBERS:
        print(f'producing {num}')
        item = (num, num)
        await queue.put(item)


async def consume(queue):
    while 1:
        item = await queue.get()
        num = item[0]
        rs = await collect_result(num)
        print(f'consuming {rs}...')
        queue.task_done()


async def run():
    queue = asyncio.PriorityQueue()
    consumer = asyncio.ensure_future(consume(queue))
    await produce(queue)
    await queue.join()
    consumer.cancel()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    loop.close()
相關文章
相關標籤/搜索