asyncio 是幹什麼的?html
python3.0時代,標準庫裏的異步網絡模塊:select(很是底層) python3.0時代,第三方異步網絡庫:Tornado python3.4時代,asyncio:支持TCP,子進程python
如今的asyncio,有了不少的模塊已經在支持:aiohttp,aiodns,aioredis等等 https://github.com/aio-libs 這裏列出了已經支持的內容,並在持續更新mysql
固然到目前爲止實現協程的不只僅只有asyncio,tornado和gevent都實現了相似功能nginx
關於asyncio的一些關鍵字的說明:git
event_loop 事件循環:程序開啓一個無限循環,把一些函數註冊到事件循環上,當知足事件發生的時候,調用相應的協程函數github
coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會當即執行函數,而是會返回一個協程對象。協程對象須要註冊到事件循環,由事件循環調用。web
task 任務:一個協程對象就是一個原生能夠掛起的函數,任務則是對協程進一步封裝,其中包含了任務的各類狀態redis
future: 表明未來執行或沒有執行的任務的結果。它和task上沒有本質上的區別sql
async/await 關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。django
看了上面這些關鍵字,你可能扭頭就走了,其實一開始瞭解和研究asyncio這個模塊有種抵觸,本身也不知道爲啥,這也致使很長一段時間,這個模塊本身也基本就沒有關注和使用,可是隨着工做上用python遇到各類性能問題的時候,本身告訴本身仍是要好好學習學習這個模塊。
一律述:
一、事件循環+回調(驅動生成器)+epoll(IO多路複用)
二、asyncio是Python用於解決異步io編程的一套解決方案
三、基於異步io實現的庫(或框架)tornado、gevent、twisted(scrapy,django、channels)
四、torando(實現web服務器),django+flask(uwsgi,gunicorn+nginx)
五、tornado能夠直接部署,nginx+tornado
2、事件循環
案例一:
#使用asyncio import asyncio import time async def get_html(url): print("start get url") await asyncio.sleep(2) print("end get url") if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() # 執行單個協程 # loop.run_until_complete(get_html("http://www.imooc.com")) # 批量執行任務 # 建立任務列表 tasks = [get_html("http://www.imooc.com") for i in range(10)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() print("執行事件:{}".format(time.time() - start_time))
一、asyncio.ensure_future()等價於loop.create_task
二、task是future的一個子類
三、一個線程只有一個event loop
四、asyncio.ensure_future()雖然沒有傳loop可是源碼裏作了get_event_loop()操做從而實現了與loop的關聯,會將任務註冊到任務隊列裏
import asyncio import time from functools import partial async def get_html(url): print("start get url") await asyncio.sleep(2) print("end get url") return "bobby" # 【注意】傳參url必須放在前面(第一個形參) def callback(url,future): print("執行完任務後執行;url={}".format(url)) if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() # 獲取future,若是是單個task或者future則直接做爲參數,若是是列表,則須要加asyncio.wait task = asyncio.ensure_future(get_html("http://www.imooc.com")) # task = loop.create_task(get_html("http://www.imooc.com")) # 執行完task後再執行的回調函數 # task.add_done_callback(callback) # 傳遞迴調函數參數 task.add_done_callback(partial(callback,"http://www.imooc.com")) loop.run_until_complete(task) print("執行事件:{}".format(time.time() - start_time)) print(task.result()) loop.close()
五、wait與gather的區別:
a)wait是等待全部任務執行完成後纔會執行下面的代碼【loop.run_until_complete(asyncio.wait(tasks))】
b)gather更加高層(height-level)
一、能夠分組
#使用asyncio import asyncio import time async def get_html(url): print("start get url={}".format(url)) await asyncio.sleep(2) print("end get url") if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() # 執行單個協程 # loop.run_until_complete(get_html("http://www.imooc.com")) # 批量執行任務,建立任務列表 tasks = [get_html("http://www.imooc.com") for i in range(10)] # loop.run_until_complete(asyncio.wait(tasks)) # gather實現跟wait同樣的功能,可是切記前面有* # loop.run_until_complete(asyncio.gather(*tasks)) # 分組實現 # 第一種實現 # group1 = [get_html("http://www.projectedu.com") for i in range(2)] # group2 = [get_html("http://www.imooc.com") for i in range(2)] # loop.run_until_complete(asyncio.gather(*group1,*group2)) # 第二種實現 group1 = [get_html("http://www.projectedu.com") for i in range(2)] group2 = [get_html("http://www.imooc.com") for i in range(2)] group1 = asyncio.gather(*group1) group2 = asyncio.gather(*group2) # 任務取消 # group2.cancel() loop.run_until_complete(asyncio.gather(group1,group2)) loop.close() print("執行事件:{}".format(time.time() - start_time))
六、loop.run_forever()
# 1. loop會被放在future中 # 2. 取消future(task) # import asyncio import time async def get_html(sleep_times): print("waiting") await asyncio.sleep(sleep_times) print("done after {}s".format(sleep_times)) if __name__ == "__main__": task1 = get_html(2) task2 = get_html(3) task3 = get_html(3) tasks = [task1,task2,task3] loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: all_tasks = asyncio.Task.all_tasks() for task in all_tasks: print("cancel task") print(task.cancel()) loop.stop() # 若是去掉這句則會拋異常 loop.run_forever() finally: loop.close()
七、協程裏調用協程:
import asyncio async def compute(x, y): print("Compute %s + %s..." %(x, y)) await asyncio.sleep(1.0) return x+y async def print_sum(x, y): result = await compute(x, y) print("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()
八、call_soon,call_at,call_later,call_soon_threadsafe
import asyncio import time def callback(sleep_times): # time.sleep(sleep_times) print("sleep {} success".format(sleep_times)) # 中止掉當前的loop def stoploop(loop): loop.stop() if __name__ == "__main__": loop = asyncio.get_event_loop() # 在任務隊列中便可執行 # 第一個參數是幾秒鐘執行函數,第二參數爲函數名,第三參數是是實參 # call_later內部也是調用call_at方法 # loop.call_later(2, callback, 2) # loop.call_later(1, callback, 1) # loop.call_later(3, callback, 3) # call_at 第一個參數是loop裏的當前時間+隔多少秒執行,並非系統時間 now = loop.time() print(now) loop.call_at(now+2, callback, 2) loop.call_at(now+1, callback, 1) loop.call_at(now+3, callback, 3) # call_soon比call_later先執行 loop.call_soon(callback, 4) # loop.call_soon(stoploop, loop) # 由於不是協程,全部不能使用loop.run_until_complete(),因此使用run_forever,一直執行隊列裏的任務 loop.run_forever()
九、經過ThreadPoolExecutor(線程池)方式轉換成協程方式來調用阻塞方式【跟單獨利用線程池執行差很少,沒有提升多少的效率】
#!/usr/bin/env python # -*- coding: utf-8 -*- # @File : thread_asyncio.py # @Author: Liugp # @Date : 2019/6/8 # @Desc : import time import asyncio from concurrent.futures import ThreadPoolExecutor import socket from urllib.parse import urlparse def get_url(url): # 經過socke請求html url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/" # 創建socket連接 client =socket.socket(socket.AF_INET,socket.SOCK_STREAM) # client.setblocking(False) client.connect((host,80)) # 阻塞不會消耗CPU # 不停的詢問連接是否創建好,須要while循環不停的去檢查狀態 # 作計算任務或者再次發起其餘的鏈接請求 client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path,host).encode('utf8')) data = b"" while True: d = client.recv(1024) if d: data += d else: break data = data.decode('utf8') # print(data) html_data = data.split("\r\n\r\n")[1] print(html_data) client.close() if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() # 線程池 executor = ThreadPoolExecutor() tasks = [] for url in range(20): url = "http://shop.projectsedu.com/goods/{}/".format(url) # 把線程裏的future包裝成協程裏的future,因此才能使用協程的方式實現 task = loop.run_in_executor(executor,get_url,url) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print("last time:{}".format(time.time()-start_time))
十、asyncio模擬http請求:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @File : asyncio_http.py # @Author: Liugp # @Date : 2019/6/8 # @Desc : #asyncio 沒有提供http協議的接口,只是提供了更底層的TCP,UDP接口;可是可使用aiohttp import time import asyncio from urllib.parse import urlparse async def get_url(url): # 經過socke請求html url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/" reader,writer = await asyncio.open_connection(host,80) writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path,host).encode('utf8')) all_lines = [] async for raw_line in reader: data = raw_line.decode("utf8") all_lines.append(data) html = "\n".join(all_lines) return html async def main(): tasks = [] for url in range(20): url = "http://shop.projectsedu.com/goods/{}/".format(url) tasks.append(asyncio.ensure_future(get_url(url))) for task in asyncio.as_completed(tasks): result = await task print(result.split("\r\n\n")[10]) if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print("last time:{}".format(time.time()-start_time))
十一、future和task
a)task會啓動一個協程,會調用send(None)或者next()
b)task是future的子類
c)協程裏的future更線程池裏的future差很少;可是協程裏是有區別的,就是會調用call_soon(),由於協程是單線程的,只是把callback放到loop隊列裏執行的,而線程則是直接執行代碼
d)task是future和協程的橋樑
e)task還有就是等到拋出StopInteration時將value設置到result裏面來【self.set_result(exc.value)】
十二、asyncio同步與通訊:
# 若是沒有await操做會順序執行,也就是說,一個任務執行完後纔會執行下一個,可是不是按task順序執行的,順序不定 import asyncio import time total = 0 async def add(): global total for i in range(5): print("執行add:{}".format(i)) total += 1 async def desc(): global total for i in range(5): print("執行desc:{}".format(i)) total -= 1 async def desc2(): global total for i in range(5): print("執行desc2:{}".format(i)) total -= 1 if __name__ == "__main__": loop = asyncio.get_event_loop() tasks = [desc(),add(),desc2()] loop.run_until_complete(asyncio.wait(tasks)) print("最後結果:{}".format(total)) # 執行結果以下 """ 執行add:0 執行add:1 執行add:2 執行add:3 執行add:4 執行desc:0 執行desc:1 執行desc:2 執行desc:3 執行desc:4 執行desc2:0 執行desc2:1 執行desc2:2 執行desc2:3 執行desc2:4 最後結果:-5 """
a)asyncio鎖機制(from asyncio import Lock)
import asyncio from asyncio import Lock,Queue import aiohttp cache = {} lock = Lock() queue = Queue() async def get_stuff(url="http://www.baidu.com"): # await lock.acquire() # with await lock: # 利用鎖機制達到同步的機制,防止重複發請求 async with lock: if url in cache: return cache[url] stuff = await aiohttp.request('GET',url) cache[url] = stuff return stuff async def parse_stuff(): stuff = await get_stuff() async def use_stuff(): stuff = await get_stuff() tasks = [parse_stuff(),use_stuff()] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.close()
1三、不一樣線程的事件循環
不少時候,咱們的事件循環用於註冊協程,而有的協程須要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程建立一個事件循環,而後在新建一個線程,在新線程中啓動事件循環。當前線程不會被block
import asyncio from threading import Thread import time now = lambda :time.time() def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() def more_work(x): print('More work {}'.format(x)) time.sleep(x) print('Finished more work {}'.format(x)) start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: {}'.format(time.time() - start)) new_loop.call_soon_threadsafe(more_work, 6) new_loop.call_soon_threadsafe(more_work, 3)
1四、aiohttp實現高併發編程:
import asyncio import re import aiohttp import aiomysql from pyquery import PyQuery stopping = False start_url = "http://www.jobbole.com/" waitting_urls = [] seen_urls = set() # 控制併發數 sem = asyncio.Semaphore(1) async def fetch(url, session): async with sem: try: async with session.get(url) as resp: print("url status:{}".format(resp.status)) if resp.status in [200,201]: data = await resp.text() return data except Exception as e: print(e) def extract_urls(html): urls = [] pq = PyQuery(html) for link in pq.items("a"): url = link.attr("href") if url and url.startswith("http") and url not in seen_urls: urls.append(url) waitting_urls.append(url) return urls async def init_urls(url, session): html = await fetch(url,session) seen_urls.add(url) extract_urls(html) async def article_handler(url, session, pool): # 獲取文章詳情並解析入庫 html = await fetch(url, session) seen_urls.add(url) extract_urls(html) pq = PyQuery(html) title = pq("title").text() async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") insert_sql = "insert into article_test (title) values ('{}')".format(title) await cur.execute(insert_sql) async def consumer(pool): async with aiohttp.ClientSession() as session: while not stopping: if 0 == len(waitting_urls): await asyncio.sleep(0.5) continue url = waitting_urls.pop() print("start get url:{}".format(url)) if re.match("http://.*?jobbole.com/\d+/", url): if url not in seen_urls: asyncio.ensure_future(article_handler(url,session,pool)) else: if url not in seen_urls: asyncio.ensure_future(init_urls(url,session)) async def main(loop): # 等待mysql鏈接創建好 # 注意charset最好設置,要否則有中文時可能會不添加數據,還有autocommit也最好設置True pool = await aiomysql.create_pool(host='127.0.0.1',port=3306, user='root',password='', db='aiomysql_test',loop=loop, charset="utf8",autocommit=True ) async with aiohttp.ClientSession() as session: html = await fetch(start_url, session) seen_urls.add(start_url) seen_urls.add(html) asyncio.ensure_future(consumer(pool)) if __name__ == "__main__": loop = asyncio.get_event_loop() asyncio.ensure_future(main(loop)) loop.run_forever()
1五、aiohttp + 優先級隊列的使用
#!/usr/bin/env python # -*- coding: utf-8 -*- # @File : aiohttp_queue.py # @Author: Liugp # @Date : 2019/7/4 # @Desc : import asyncio import random import aiohttp NUMBERS = random.sample(range(100), 7) URL = 'http://httpbin.org/get?a={}' sema = asyncio.Semaphore(3) async def fetch_async(a): async with aiohttp.request('GET', URL.format(a)) as r: data = await r.json() return data['args']['a'] async def collect_result(a): with (await sema): return await fetch_async(a) async def produce(queue): for num in NUMBERS: print(f'producing {num}') item = (num, num) await queue.put(item) async def consume(queue): while 1: item = await queue.get() num = item[0] rs = await collect_result(num) print(f'consuming {rs}...') queue.task_done() async def run(): queue = asyncio.PriorityQueue() consumer = asyncio.ensure_future(consume(queue)) await produce(queue) await queue.join() consumer.cancel() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(run()) loop.close()