同步:不一樣程序單元爲了完成某個任務,在執行過程當中需靠某種通訊方式以協調一致,稱這些程序單元是同步執行的。html
例如購物系統中更新商品庫存,須要用「行鎖」做爲通訊信號,讓不一樣的更新請求強制排隊順序執行,那更新庫存的操做是同步的。python
簡言之,同步意味着有序。git
阻塞:程序未獲得所需計算資源時被掛起的狀態。程序員
程序在等待某個操做完成期間,自身沒法繼續幹別的事情,則稱該程序在該操做上是阻塞的。github
常見的阻塞形式有:網絡I/O阻塞、磁盤I/O阻塞、用戶輸入阻塞等。web
在一個程序內,依次執行10次太耗時,那開10個同樣的程序同時執行不就好了。因而咱們想到了多進程編程。爲何會先想到多進程呢?發展脈絡如此。在更早的操做系統(Linux 2.4)及其之前,進程是 OS 調度任務的實體,是面向進程設計的OS.算法
改善效果立竿見影。但仍然有問題。整體耗時並無縮減到原來的十分之一,而是九分之一左右,還有一些時間耗到哪裏去了?進程切換開銷。編程
進程切換開銷不止像「CPU的時間觀」所列的「上下文切換」那麼低。CPU從一個進程切換到另外一個進程,須要把舊進程運行時的寄存器狀態、內存狀態所有保存好,再將另外一個進程以前保存的數據恢復。對CPU來說,幾個小時就乾等着。當進程數量大於CPU核心數量時,進程切換是必然須要的。
除了切換開銷,多進程還有另外的缺點。通常的服務器在可以穩定運行的前提下,能夠同時處理的進程數在數十個到數百個規模。若是進程數量規模更大,系統運行將不穩定,並且可用內存資源每每也會不足。
多進程解決方案在面臨天天須要成百上千萬次下載任務的爬蟲系統,或者須要同時搞定數萬併發的電商系統來講,並不適合。
除了切換開銷大,以及可支持的任務規模小以外,多進程還有其餘缺點,如狀態共享等問題,後文會有說起,此處再也不細究。服務器
因爲線程的數據結構比進程更輕量級,同一個進程能夠容納多個線程,從進程到線程的優化由此展開。後來的OS也把調度單位由進程轉爲線程,進程只做爲線程的容器,用於管理進程所需的資源。並且OS級別的線程是能夠被分配到不一樣的CPU核心同時運行的。網絡
結果符合預期,比多進程耗時要少些。從運行時間上看,多線程彷佛已經解決了切換開銷大的問題。並且可支持的任務數量規模,也變成了數百個到數千個。
可是,多線程仍有問題,特別是Python裏的多線程。首先,Python中的多線程由於GIL的存在,它們並不能利用CPU多核優點,一個Python進程中,只容許有一個線程處於運行狀態。那爲何結果仍是如預期,耗時縮減到了十分之一?
由於在作阻塞的系統調用時,例如sock.connect(),sock.recv()時,當前線程會釋放GIL,讓別的線程有執行機會。可是單個線程內,在阻塞調用上仍是阻塞的。
小提示:Python中 time.sleep 是阻塞的,都知道使用它要謹慎,但在多線程編程中,time.sleep 並不會阻塞其餘線程。
除了GIL以外,全部的多線程還有通病。它們是被OS調度,調度策略是搶佔式的,以保證同等優先級的線程都有均等的執行機會,那帶來的問題是:並不知道下一時刻是哪一個線程被運行,也不知道它正要執行的代碼是什麼。因此就可能存在競態條件。
例如爬蟲工做線程從任務隊列拿待抓取URL的時候,若是多個爬蟲線程同時來取,那這個任務到底該給誰?那就須要用到「鎖」或「同步隊列」來保證下載任務不會被重複執行。
並且線程支持的多任務規模,在數百到數千的數量規模。在大規模的高頻網絡交互系統中,仍然有些吃力。固然,多線程最主要的問題仍是競態條件。
def nonblocking_way(): sock = socket.socket() sock.setblocking(False) try: sock.connect(('example.com', 80)) except BlockingIOError: # 非阻塞鏈接過程當中也會拋出異常 pass request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n' data = request.encode('ascii') # 不知道socket什麼時候就緒,因此不斷嘗試發送 while True: try: sock.send(data) # 直到send不拋異常,則發送完成 break except OSError: pass response = b'' while True: try: chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) break except OSError: pass return response
首先注意到兩點,就感受被騙了。一是耗時與同步阻塞至關,二是代碼更復雜。要非阻塞何用?且慢。
上第9行代碼sock.setblocking(False)告訴OS,讓socket上阻塞調用都改成非阻塞的方式。以前咱們說到,非阻塞就是在作一件事的時候,不阻礙調用它的程序作別的事情。上述代碼在執行完 sock.connect() 和 sock.recv() 後的確再也不阻塞,能夠繼續往下執行請求準備的代碼或者是執行下一次讀取。
代碼變得更復雜也是上述緣由所致。第11行要放在try語句內,是由於socket在發送非阻塞鏈接請求過程當中,系統底層也會拋出異常。connect()被調用以後,當即能夠往下執行第15和16行的代碼。
須要while循環不斷嘗試 send(),是由於connect()已經非阻塞,在send()之時並不知道 socket 的鏈接是否就緒,只有不斷嘗試,嘗試成功爲止,即發送數據成功了。recv()調用也是同理。
雖然 connect() 和 recv() 再也不阻塞主程序,空出來的時間段CPU沒有空閒着,但並無利用好這空閒去作其餘有意義的事情,而是在循環嘗試讀寫 socket (不停判斷非阻塞調用的狀態是否就緒)。還得處理來自底層的可忽略的異常。也不能同時處理多個 socket 。
2 . 回調((Callback))
把I/O事件的等待和監放任務交給了 OS,那 OS 在知道I/O狀態發生改變後(例如socket鏈接已創建成功可發送數據),它又怎麼知道接下來該幹嗎呢?只能回調。
須要咱們將發送數據與讀取數據封裝成獨立的函數,讓epoll代替應用程序監聽socket狀態時,得告訴epoll:「若是socket狀態變爲能夠往裏寫數據(鏈接創建成功了),請調用HTTP請求發送函數。若是socket 變爲能夠讀數據了(客戶端已收到響應),請調用響應處理函數。」
首先,不斷嘗試send() 和 recv() 的兩個循環被消滅掉了。
其次,導入了selectors模塊,並建立了一個DefaultSelector 實例。Python標準庫提供的selectors模塊是對底層select/poll/epoll/kqueue的封裝。DefaultSelector類會根據 OS 環境自動選擇最佳的模塊,那在 Linux 2.5.44 及更新的版本上都是epoll了。
而後,在第25行和第31行分別註冊了socket可寫事件(EVENT_WRITE)和可讀事件(EVENT_READ)發生後應該採起的回調函數。
雖然代碼結構清晰了,阻塞操做也交給OS去等待和通知了,可是,咱們要抓取10個不一樣頁面,就得建立10個Crawler實例,就有20個事件將要發生,那如何從selector裏獲取當前正發生的事件,而且獲得對應的回調函數去執行呢?
def consumer(): r = '' while True: n = yield r if not n: return print('[CONSUMER] Consuming %s...' % n) r = '200 OK' def produce(c): c.send(None) n = 0 while n < 5: n = n + 1 print('[PRODUCER] Producing %s...' % n) r = c.send(n) print('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() produce(c)
demo解析:
注意到consumer函數是一個generator,把一個consumer傳入produce後:
首先調用c.send(None)啓動生成器;
而後,一旦生產了東西,經過c.send(n)切換到consumer執行;
consumer經過yield拿到消息,處理,又經過yield把結果傳回;
produce拿到consumer處理的結果,繼續生產下一條消息;
produce決定不生產了,經過c.close()關閉consumer,整個過程結束
在 Python 中調用協程對象1的 send() 方法時,第一次調用必須使用參數 None, 這使得協程的使用變得十分麻煩
解決此問題:
藉助 Python 自身的特性來避免這一問題,好比,建立一個裝飾器
def routine(func): def start(*args, **kwargs): cr = func(*args, **kwargs) cr.send(None) return cr return start @routine def product(): pass
yield from 是Python 3.3 新引入的語法(PEP 380)。它主要解決的就是在生成器裏玩生成器不方便的問題。它有兩大主要功能。
第一個功能是:讓嵌套生成器沒必要經過循環迭代yield,而是直接yield from。如下兩種在生成器裏玩子生成器的方式是等價的。
def gen_one(): subgen = range(10) yield from subgendef gen_two(): subgen = range(10) for item in subgen: yield item
第二個功能就是在子生成器和原生成器的調用者之間打開雙向通道,二者能夠直接通訊。
def gen(): yield from subgen()def subgen(): while True: x = yield yield x+1def main(): g = gen() next(g) # 驅動生成器g開始執行到第一個 yield retval = g.send(1) # 看似向生成器 gen() 發送數據 print(retval) # 返回2 g.throw(StopIteration) # 看似向gen()拋入異常
用yield from改進基於生成器的協程,代碼抽象程度更高。使業務邏輯相關的代碼更精簡。因爲其雙向通道功能可讓協程之間爲所欲爲傳遞數據,使Python異步編程的協程解決方案大大向前邁進了一步。
因而Python語言開發者們充分利用yield from,使 Guido 主導的Python異步編程框架Tulip迅速脫胎換骨,併火燒眉毛得讓它在 Python 3.4 中換了個名字asyncio以「實習生」角色出如今標準庫中。
asyncio是Python 3.4 試驗性引入的異步I/O框架(PEP 3156),提供了基於協程作異步I/O編寫單線程併發代碼的基礎設施。其核心組件有事件循環(Event Loop)、協程(Coroutine)、任務(Task)、將來對象(Future)以及其餘一些擴充和輔助性質的模塊。
在引入asyncio的時候,還提供了一個裝飾器@asyncio.coroutine用於裝飾使用了yield from的函數,以標記其爲協程。但並不強制使用這個裝飾器。
import threading import asyncio @asyncio.coroutine def hello(): print('Hello world! (%s)' % threading.currentThread()) yield from asyncio.sleep(1) print('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
@asyncio.coroutine把一個generator標記爲coroutine類型,而後,咱們就把這個coroutine扔到EventLoop中執行。
hello()會首先打印出Hello world!,而後,yield from語法可讓咱們方便地調用另外一個generator。因爲asyncio.sleep()也是一個coroutine,因此線程不會等待asyncio.sleep(),而是直接中斷並執行下一個消息循環。當asyncio.sleep()返回時,線程就能夠從yield from拿到返回值(此處是None),而後接着執行下一行語句。
把asyncio.sleep(1)當作是一個耗時1秒的IO操做,在此期間,主線程並未等待,而是去執行EventLoop中其餘能夠執行的coroutine了,所以能夠實現併發執行。
import asyncio import time now = lambda : time.time() async def do_some_work(x): print('Waiting: ', x) start = now() coroutine = do_some_work(2) loop = asyncio.get_event_loop() # task = asyncio.ensure_future(coroutine) # 方式一 task = loop.create_task(coroutine) # 方式二 print(task) loop.run_until_complete(task) print(task) print('TIME: ', now() - start)
建立task後,task在加入事件循環以前是pending狀態,加入loop後運行中是running狀態,loop調用完是Done,運行完是finished狀態,雖然說本質上協程函數和task指的東西都同樣,可是task有了協程函數的狀態。
其中loop.run_until_complete()接受一個future參數,futurn具體指代一個協程函數,而task是future的子類,因此咱們不聲明一個task直接傳入協程函數也能執行。
import asyncio async def test(x): return x+3 def callback(y): print(y.result()) coroutine = test(5) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) task <Task pending coro=<test() running at <ipython-input-4-61142fef17d8>:1>> task.add_done_callback(callback) loop.run_until_complete(task)
import asyncio import time async def test(1): time.sleep(1) print(time.time()) tasks = [asyncio.ensure_future(test()) for _ in range(3)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) 1547187398.7611663 1547187399.7611988 1547187400.7632194
上面執行並非異步執行,而是順序執行,可是改爲下面形式那就是異步執行:
import asyncio import time async def test(t): await asyncio.sleep(1) print(time.time()) tasks = [asyncio.ensure_future(test()) for _ in range(3)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) 1547187398.7611663 1547187399.7611988 1547187400.7632194
用asyncio提供的@asyncio.coroutine能夠把一個generator標記爲coroutine類型,而後在coroutine內部用yield from調用另外一個coroutine實現異步操做。
爲了簡化並更好地標識異步IO,從Python 3.5開始引入了新的語法async和await,可讓coroutine的代碼更簡潔易讀。
請注意,async和await是針對coroutine的新語法,要使用新的語法,只須要作兩步簡單的替換:
把@asyncio.coroutine替換爲async;
把yield from替換爲await。
async def hello(): print("Hello world!") r = await asyncio.sleep(1) print("Hello again!")
asyncio能夠實現單線程併發IO操做。若是僅用在客戶端,發揮的威力不大。若是把asyncio用在服務器端,例如Web服務器,因爲HTTP鏈接就是IO操做,所以能夠用單線程+coroutine實現多用戶的高併發支持。
asyncio實現了TCP、UDP、SSL等協議,aiohttp則是基於asyncio實現的HTTP框架。
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(0.5) return web.Response(body=b'<h1>Index</h1>') async def hello(request): await asyncio.sleep(0.5) text = '<h1>hello, %s!</h1>' % request.match_info['name'] return web.Response(body=text.encode('utf-8')) async def init(loop): app = web.Application(loop=loop) app.router.add_route('GET', '/', index) app.router.add_route('GET', '/hello/{name}', hello) srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv loop = asyncio.get_event_loop() # 建立一個事件循環(池) loop.run_until_complete(init(loop)) # 將協程對象包裝並註冊協程對象 loop.run_forever()
方法1:
asyncio、aiohttp須要配合aiomultiprocess
方法2:
gevent.pool import Pool
multiprocessing import Process
核心代碼
def main(): file_list = ["7001", "7002", "7003"] p_lst = [] # 線程列表 for i in file_list: # self.run(i) p = Process(target=read_file, args=(i,)) # 子進程調用函數 p.start() # 啓動子進程 p_lst.append(p) # 將全部進程寫入列表中 def read_file(self, number): """ 讀取文件 :param number: 文件標記 :return: """ file_name = os.path.join(self.BASE_DIR, "data", "%s.txt" % number) # print(file_name) self.write_log(number, "開始讀取文件 {}".format(file_name),"green") with open(file_name, encoding='utf-8') as f: # 使用協程池,執行任務。語法: pool.map(func,iterator) # partial使用偏函數傳遞參數 # 注意:has_null第一個參數,必須是迭代器遍歷的值 pool.map(partial(self.has_null, number=number), f)
使用loop.run_until_complete(syncio.wait(tasks)) 也可使用 loop.run_until_complete(asyncio.gather(*tasks)) ,前者傳入task列表,會對task進行解包操做。
async def get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print(response) print(time.time()) import time async def request(): url = "http://www.baidu.com" resulit = await get(url) tasks = [asyncio.ensure_future(request()) for _ in range(10000)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
async def get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print(response) print(time.time()) async def request(): url = "http://www.baidu.com" tasks = [asyncio.ensure_future(url) for _ in range(1000)] 方式一: dones, pendings = await asyncio.wait(tasks) # 返回future對象,不返回直接結果 for task in dones: print('Task ret: ', task.result()) 方式二: results = await asyncio.gather(*tasks) # 直接返回結果 方式三: for task in asyncio.as_completed(tasks): result = await task print('Task ret: {}'.format(result)) # 迭代方式返回結果 tasks = asyncio.ensure_future(request()) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
實現結束task有兩種方式:關閉單個task、關閉loop,涉及主要函數: asyncio.Task.all_tasks()獲取事件循環任務列表 KeyboardInterrupt捕獲中止異常(Ctrl+C) loop.stop()中止任務循環 task.cancel()取消單個任務 loop.run_forever() loop.close()關閉事件循環,否則會重啓
python程序實現的一種單線程下的多任務執行調度器,簡單來講在一個線程裏,前後執行AB兩個任務,可是當A遇到耗時操做(網絡等待、文件讀寫等),這個時候gevent會讓A繼續執行,可是同時也會開始執行B任務,若是B在遇到耗時操做同時A又執行完了耗時操做,gevent又繼續執行A。
import gevent def test(time): print(1) gevent.sleep(time) print(2) def test2(time): print(3) gevent.sleep(time) print(4) if __name__ == '__main__': gevent.joinall([ gevent.spawn(test, 2), gevent.spawn(test2, 3) ])
借鑑文章:
https://mp.weixin.qq.com/s/GgamzHPyZuSg45LoJKsofA
https://rgb-24bit.github.io/blog/2019/python-coroutine-event-loop.html
https://zhuanlan.zhihu.com/p/54657754
https://cloud.tencent.com/developer/article/1590280