asyncio併發編程

asyncio 是幹什麼的?html

  • 異步網絡操做
  • 併發
  • 協程

python3.0時代,標準庫裏的異步網絡模塊:select(很是底層) python3.0時代,第三方異步網絡庫:Tornado python3.4時代,asyncio:支持TCP,子進程python

如今的asyncio,有了不少的模塊已經在支持:aiohttp,aiodns,aioredis等等 https://github.com/aio-libs 這裏列出了已經支持的內容,並在持續更新mysql

固然到目前爲止實現協程的不只僅只有asyncio,tornado和gevent都實現了相似功能nginx

關於asyncio的一些關鍵字的說明:git

  • event_loop 事件循環:程序開啓一個無限循環,把一些函數註冊到事件循環上,當知足事件發生的時候,調用相應的協程函數github

  • coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會當即執行函數,而是會返回一個協程對象。協程對象須要註冊到事件循環,由事件循環調用。web

  • task 任務:一個協程對象就是一個原生能夠掛起的函數,任務則是對協程進一步封裝,其中包含了任務的各類狀態redis

  • future: 表明未來執行或沒有執行的任務的結果。它和task上沒有本質上的區別sql

  • async/await 關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。django

看了上面這些關鍵字,你可能扭頭就走了,其實一開始瞭解和研究asyncio這個模塊有種抵觸,本身也不知道爲啥,這也致使很長一段時間,這個模塊本身也基本就沒有關注和使用,可是隨着工做上用python遇到各類性能問題的時候,本身告訴本身仍是要好好學習學習這個模塊。

 一律述:

  一、事件循環+回調(驅動生成器)+epoll(IO多路複用)

  二、asyncio是Python用於解決異步io編程的一套解決方案

  三、基於異步io實現的庫(或框架)tornado、gevent、twisted(scrapy,django、channels)

  四、torando(實現web服務器),django+flask(uwsgi,gunicorn+nginx)

  五、tornado能夠直接部署,nginx+tornado

2、事件循環

案例一: 

#使用asyncio
import asyncio import time async def get_html(url): print("start get url") await asyncio.sleep(2) print("end get url") if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() # 執行單個協程
    # loop.run_until_complete(get_html("http://www.imooc.com"))
    # 批量執行任務
    # 建立任務列表
    tasks = [get_html("http://www.imooc.com") for i in range(10)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() print("執行事件:{}".format(time.time() - start_time))

一、asyncio.ensure_future()等價於loop.create_task

二、taskfuture的一個子類

三、一個線程只有一個event loop

四、asyncio.ensure_future()雖然沒有傳loop可是源碼裏作了get_event_loop()操做從而實現了與loop的關聯,會將任務註冊到任務隊列裏

import asyncio import time from functools import partial async def get_html(url): print("start get url") await asyncio.sleep(2) print("end get url") return "bobby"

# 【注意】傳參url必須放在前面(第一個形參)
def callback(url,future): print("執行完任務後執行;url={}".format(url)) if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() # 獲取future,若是是單個task或者future則直接做爲參數,若是是列表,則須要加asyncio.wait
 task = asyncio.ensure_future(get_html("http://www.imooc.com")) # task = loop.create_task(get_html("http://www.imooc.com"))

    # 執行完task後再執行的回調函數
    # task.add_done_callback(callback)

    # 傳遞迴調函數參數
    task.add_done_callback(partial(callback,"http://www.imooc.com")) loop.run_until_complete(task) print("執行事件:{}".format(time.time() - start_time)) print(task.result()) loop.close()

五、waitgather的區別:

  a)wait是等待全部任務執行完成後纔會執行下面的代碼【loop.run_until_complete(asyncio.wait(tasks))

  b)gather更加高層(height-level)

    一、能夠分組

  

#使用asyncio
import asyncio import time async def get_html(url): print("start get url={}".format(url)) await asyncio.sleep(2) print("end get url") if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() # 執行單個協程
    # loop.run_until_complete(get_html("http://www.imooc.com"))

    # 批量執行任務,建立任務列表
    tasks = [get_html("http://www.imooc.com") for i in range(10)] # loop.run_until_complete(asyncio.wait(tasks))

    # gather實現跟wait同樣的功能,可是切記前面有*
    # loop.run_until_complete(asyncio.gather(*tasks))

    # 分組實現
    # 第一種實現
    # group1 = [get_html("http://www.projectedu.com") for i in range(2)]
    # group2 = [get_html("http://www.imooc.com") for i in range(2)]
    # loop.run_until_complete(asyncio.gather(*group1,*group2))

    # 第二種實現
    group1 = [get_html("http://www.projectedu.com") for i in range(2)] group2 = [get_html("http://www.imooc.com") for i in range(2)] group1 = asyncio.gather(*group1) group2 = asyncio.gather(*group2) # 任務取消
    # group2.cancel()
 loop.run_until_complete(asyncio.gather(group1,group2)) loop.close() print("執行事件:{}".format(time.time() - start_time))

 六、loop.run_forever()

# 1. loop會被放在future中 # 2. 取消future(task) # 
import asyncio import time async def get_html(sleep_times): print("waiting") await asyncio.sleep(sleep_times) print("done after {}s".format(sleep_times)) if __name__ == "__main__": task1 = get_html(2) task2 = get_html(3) task3 = get_html(3) tasks = [task1,task2,task3] loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: all_tasks = asyncio.Task.all_tasks() for task in all_tasks: print("cancel task") print(task.cancel()) loop.stop() # 若是去掉這句則會拋異常
 loop.run_forever() finally: loop.close()

七、協程裏調用協程:

import asyncio async def compute(x, y): print("Compute %s + %s..." %(x, y)) await asyncio.sleep(1.0) return x+y async def print_sum(x, y): result = await compute(x, y) print("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()

  

八、call_soon,call_at,call_later,call_soon_threadsafe 

 

import asyncio import time def callback(sleep_times): # time.sleep(sleep_times)
    print("sleep {} success".format(sleep_times)) # 中止掉當前的loop
def stoploop(loop): loop.stop() if __name__ == "__main__": loop = asyncio.get_event_loop() # 在任務隊列中便可執行

    # 第一個參數是幾秒鐘執行函數,第二參數爲函數名,第三參數是是實參
    # call_later內部也是調用call_at方法
    # loop.call_later(2, callback, 2)
    # loop.call_later(1, callback, 1)
    # loop.call_later(3, callback, 3)

    # call_at 第一個參數是loop裏的當前時間+隔多少秒執行,並非系統時間
    now = loop.time() print(now) loop.call_at(now+2, callback, 2) loop.call_at(now+1, callback, 1) loop.call_at(now+3, callback, 3) # call_soon比call_later先執行
    loop.call_soon(callback, 4) # loop.call_soon(stoploop, loop)
    # 由於不是協程,全部不能使用loop.run_until_complete(),因此使用run_forever,一直執行隊列裏的任務
    loop.run_forever()

 

九、經過ThreadPoolExecutor(線程池)方式轉換成協程方式來調用阻塞方式【跟單獨利用線程池執行差很少,沒有提升多少的效率】

#!/usr/bin/env python # -*- coding: utf-8 -*- # @File : thread_asyncio.py # @Author: Liugp # @Date : 2019/6/8 # @Desc :
import time import asyncio from concurrent.futures import ThreadPoolExecutor import socket from urllib.parse import urlparse def get_url(url): # 經過socke請求html
    url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/"

    # 創建socket連接
    client =socket.socket(socket.AF_INET,socket.SOCK_STREAM) # client.setblocking(False)
    client.connect((host,80)) # 阻塞不會消耗CPU

    # 不停的詢問連接是否創建好,須要while循環不停的去檢查狀態
    # 作計算任務或者再次發起其餘的鏈接請求
 client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path,host).encode('utf8')) data = b""
    while True: d = client.recv(1024) if d: data += d else: break data = data.decode('utf8') # print(data)
    html_data = data.split("\r\n\r\n")[1] print(html_data) client.close() if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() # 線程池
    executor = ThreadPoolExecutor() tasks = [] for url in range(20): url = "http://shop.projectsedu.com/goods/{}/".format(url) # 把線程裏的future包裝成協程裏的future,因此才能使用協程的方式實現
        task = loop.run_in_executor(executor,get_url,url) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print("last time:{}".format(time.time()-start_time))

十、asyncio模擬http請求:

#!/usr/bin/env python # -*- coding: utf-8 -*- # @File : asyncio_http.py # @Author: Liugp # @Date : 2019/6/8 # @Desc : #asyncio 沒有提供http協議的接口,只是提供了更底層的TCP,UDP接口;可是可使用aiohttp

import time import asyncio from urllib.parse import urlparse async def get_url(url): # 經過socke請求html
    url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/" reader,writer = await asyncio.open_connection(host,80) writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path,host).encode('utf8')) all_lines = [] async for raw_line in reader: data = raw_line.decode("utf8") all_lines.append(data) html = "\n".join(all_lines) return html async def main(): tasks = [] for url in range(20): url = "http://shop.projectsedu.com/goods/{}/".format(url) tasks.append(asyncio.ensure_future(get_url(url))) for task in asyncio.as_completed(tasks): result = await task print(result.split("\r\n\n")[10]) if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print("last time:{}".format(time.time()-start_time))

十一、futuretask

  a)task會啓動一個協程,會調用send(None)或者next()

  b)task是future的子類

  c)協程裏的future更線程池裏的future差很少;可是協程裏是有區別的,就是會調用call_soon(),由於協程是單線程的,只是把callback放到loop隊列裏執行的,而線程則是直接執行代碼

  d)task是future和協程的橋樑

  e)task還有就是等到拋出StopInteration時將value設置到result裏面來【self.set_result(exc.value)

十二、asyncio同步與通訊:

# 若是沒有await操做會順序執行,也就是說,一個任務執行完後纔會執行下一個,可是不是按task順序執行的,順序不定
import asyncio import time total = 0 async def add(): global total for i in range(5): print("執行add:{}".format(i)) total += 1 async def desc(): global total for i in range(5): print("執行desc:{}".format(i)) total -= 1 async def desc2(): global total for i in range(5): print("執行desc2:{}".format(i)) total -= 1


if __name__ == "__main__": loop = asyncio.get_event_loop() tasks = [desc(),add(),desc2()] loop.run_until_complete(asyncio.wait(tasks)) print("最後結果:{}".format(total)) # 執行結果以下
""" 執行add:0 執行add:1 執行add:2 執行add:3 執行add:4 執行desc:0 執行desc:1 執行desc:2 執行desc:3 執行desc:4 執行desc2:0 執行desc2:1 執行desc2:2 執行desc2:3 執行desc2:4 最後結果:-5 """

  a)asyncio鎖機制(from asyncio import Lock

import asyncio from asyncio import Lock,Queue import aiohttp cache = {} lock = Lock() queue = Queue() async def get_stuff(url="http://www.baidu.com"): # await lock.acquire()
    # with await lock:
    # 利用鎖機制達到同步的機制,防止重複發請求
 async with lock: if url in cache: return cache[url] stuff = await aiohttp.request('GET',url) cache[url] = stuff return stuff async def parse_stuff(): stuff = await get_stuff() async def use_stuff(): stuff = await get_stuff() tasks = [parse_stuff(),use_stuff()] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.close()

 1三、不一樣線程的事件循環

不少時候,咱們的事件循環用於註冊協程,而有的協程須要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程建立一個事件循環,而後在新建一個線程,在新線程中啓動事件循環。當前線程不會被block

import asyncio from threading import Thread import time now = lambda :time.time() def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() def more_work(x): print('More work {}'.format(x)) time.sleep(x) print('Finished more work {}'.format(x)) start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: {}'.format(time.time() - start)) new_loop.call_soon_threadsafe(more_work, 6) new_loop.call_soon_threadsafe(more_work, 3)

1四、aiohttp實現高併發編程:

import asyncio import re import aiohttp import aiomysql from pyquery import PyQuery stopping = False start_url = "http://www.jobbole.com/" waitting_urls = [] seen_urls = set() # 控制併發數
sem = asyncio.Semaphore(1) async def fetch(url, session): async with sem: try: async with session.get(url) as resp: print("url status:{}".format(resp.status)) if resp.status in [200,201]: data = await resp.text() return data except Exception as e: print(e) def extract_urls(html): urls = [] pq = PyQuery(html) for link in pq.items("a"): url = link.attr("href") if url and url.startswith("http") and url not in seen_urls: urls.append(url) waitting_urls.append(url) return urls async def init_urls(url, session): html = await fetch(url,session) seen_urls.add(url) extract_urls(html) async def article_handler(url, session, pool): # 獲取文章詳情並解析入庫
    html = await fetch(url, session) seen_urls.add(url) extract_urls(html) pq = PyQuery(html) title = pq("title").text() async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") insert_sql = "insert into article_test (title) values ('{}')".format(title) await cur.execute(insert_sql) async def consumer(pool): async with aiohttp.ClientSession() as session: while not stopping: if 0 == len(waitting_urls): await asyncio.sleep(0.5) continue url = waitting_urls.pop() print("start get url:{}".format(url)) if re.match("http://.*?jobbole.com/\d+/", url): if url not in seen_urls: asyncio.ensure_future(article_handler(url,session,pool)) else: if url not in seen_urls: asyncio.ensure_future(init_urls(url,session)) async def main(loop): # 等待mysql鏈接創建好
    # 注意charset最好設置,要否則有中文時可能會不添加數據,還有autocommit也最好設置True
    pool = await aiomysql.create_pool(host='127.0.0.1',port=3306, user='root',password='', db='aiomysql_test',loop=loop, charset="utf8",autocommit=True ) async with aiohttp.ClientSession() as session: html = await fetch(start_url, session) seen_urls.add(start_url) seen_urls.add(html) asyncio.ensure_future(consumer(pool)) if __name__ == "__main__": loop = asyncio.get_event_loop() asyncio.ensure_future(main(loop)) loop.run_forever()

 1五、aiohttp + 優先級隊列的使用

#!/usr/bin/env python # -*- coding: utf-8 -*- # @File : aiohttp_queue.py # @Author: Liugp # @Date : 2019/7/4 # @Desc :
import asyncio import random import aiohttp NUMBERS = random.sample(range(100), 7) URL = 'http://httpbin.org/get?a={}' sema = asyncio.Semaphore(3) async def fetch_async(a): async with aiohttp.request('GET', URL.format(a)) as r: data = await r.json() return data['args']['a'] async def collect_result(a): with (await sema): return await fetch_async(a) async def produce(queue): for num in NUMBERS: print(f'producing {num}') item = (num, num) await queue.put(item) async def consume(queue): while 1: item = await queue.get() num = item[0] rs = await collect_result(num) print(f'consuming {rs}...') queue.task_done() async def run(): queue = asyncio.PriorityQueue() consumer = asyncio.ensure_future(consume(queue)) await produce(queue) await queue.join() consumer.cancel() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(run()) loop.close()
相關文章
相關標籤/搜索