asyncio
包含各類特定系統實現的模塊化事件循環
傳輸和協議抽象
對TCP、UDP、SSL、子進程、延時調用以及其餘的具體支持
模仿futures模塊但適用於事件循環使用的Future類
基於 yield from 的協議和任務,能夠讓你用順序的方式編寫併發代碼
必須使用一個將產生阻塞IO的調用時,有接口能夠把這個事件轉移到線程池
模仿threading模塊中的同步原語、能夠用在單線程內的協程之間html
事件循環+回調(驅動生成器)+epoll(IO多路複用)
asyncio是python用於解決異步io編程的一整套解決方案
tornado、gevent、twisted(scrapy, django channels)
torando(實現web服務器), django+flask(uwsgi, gunicorn+nginx)
tornado能夠直接部署, nginx+tornadopython
import asyncio import time # 再也不這使用同步阻塞的time async def get_html(url): print("start get url") await asyncio.sleep(2) # time.sleep(2) 不要這樣寫 print("end get url") if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() tasks = [get_html("http://www.imooc.com") for i in range(10)] loop.run_until_complete(asyncio.wait(tasks)) print(time.time() - start_time) """ start get url start get url start get url start get url start get url start get url start get url start get url start get url start get url end get url end get url end get url end get url end get url end get url end get url end get url end get url end get url 2.001918077468872 """
import asyncio import time from functools import partial # 偏函數 async def get_html(url): print("start get url") await asyncio.sleep(2) return "lewen" def callback(url, future): print(url) print("send callback email to lewen") if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() # 事件循環 # task = asyncio.ensure_future(get_html("http://www.imooc.com")) # 任務的兩種不一樣寫法 task = loop.create_task(get_html("http://www.imooc.com")) task.add_done_callback(partial(callback, "http://www.imooc.com")) loop.run_until_complete(task) print(task.result()) """ start get url http://www.imooc.com send callback email to lewen lewen """
import asyncio import time async def get_html(url): print("start get url") await asyncio.sleep(2) print("end get url") if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() tasks = [get_html("http://www.imooc.com") for i in range(10)] # loop.run_until_complete(asyncio.gather(*tasks)) loop.run_until_complete(asyncio.wait(tasks)) # print(time.time()-start_time) # gather和wait的區別 # gather更加高層 high-level 分組 group1 = [get_html("http://projectsedu.com") for i in range(2)] group2 = [get_html("http://www.imooc.com") for i in range(2)] group1 = asyncio.gather(*group1) group2 = asyncio.gather(*group2) # group2.cancel() #取消 loop.run_until_complete(asyncio.gather(group1, group2)) print(time.time() - start_time)
import asyncio def callback(sleep_times, loop): print("success time {}".format(loop.time())) def stoploop(loop): loop.stop() # call_later, call_at if __name__ == "__main__": loop = asyncio.get_event_loop() # 立刻執行隊列裏面的task # loop.call_soon(callback, 4, loop) # loop.call_soon(stoploop, loop) # call_later() 等待多少秒後執行 # loop.call_later(2, callback, 2, loop) # loop.call_later(1, callback, 1, loop) # loop.call_later(3, callback, 3, loop) # call_at() 在某一時刻執行 now = loop.time() loop.call_at(now+2, callback, 2, loop) loop.call_at(now+1, callback, 1, loop) loop.call_at(now+3, callback, 3, loop) loop.run_forever() # loop.call_soon_threadsafe()
# 使用多線程:在協程中集成阻塞io # 數據庫等阻塞式IO import asyncio from concurrent.futures import ThreadPoolExecutor import socket from urllib.parse import urlparse def get_url(url): # 經過socket請求html url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/" # 創建socket鏈接 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # client.setblocking(False) client.connect((host, 80)) # 阻塞不會消耗cpu # 不停的詢問鏈接是否創建好, 須要while循環不停的去檢查狀態 # 作計算任務或者再次發起其餘的鏈接請求 client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8")) data = b"" while True: d = client.recv(1024) if d: data += d else: break data = data.decode("utf8") html_data = data.split("\r\n\r\n")[1] print(html_data) client.close() if __name__ == "__main__": import time start_time = time.time() loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(3) # 線程池 tasks = [] for url in range(20): url = "http://www.baidu.com/s?wd={}/".format(url) task = loop.run_in_executor(executor, get_url, url) # 將阻塞的放到執行器裏面 tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print("last time:{}".format(time.time() - start_time)) # 將線程池直接應用到協程裏面
# coding=utf-8 # asyncio 沒有提供http協議的接口 aiohttp import asyncio from urllib.parse import urlparse async def get_url(url): # 經過socket請求html url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/" # 創建socket鏈接 reader, writer = await asyncio.open_connection(host, 80) writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8")) all_lines = [] async for raw_line in reader: data = raw_line.decode("utf8") all_lines.append(data) html = "\n".join(all_lines) return html async def main(): tasks = [] for url in range(20): url = "http://www.baidu.com/s?wd={}/".format(url) tasks.append(asyncio.ensure_future(get_url(url))) for task in asyncio.as_completed(tasks): result = await task print(result) if __name__ == "__main__": import time start_time = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print('last time:{}'.format(time.time() - start_time))
future 結果容器nginx
task 是 future 的子類,協程和future之間的橋樑,啓動協程web
total = 0 async def add(): # 1,dosomething1 # 2.io操做 # 1.dosomething3 global total for i in range(100000): total += 1 async def desc(): global total for i in range(100000): total -= 1 if __name__ == "__main__": import asyncio tasks = [add(), desc()] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) print(total)
13.8 aiohttp實現高併發爬蟲數據庫