假設有3個url須要發請求。html
import requests urls_list = [ 'https://www.cnblogs.com/longyunfeigu/p/9491496.html', 'https://dig.chouti.com/', 'https://dig.chouti.com/r/pic/hot/1' ] for url in urls_list: requests.get(url)
串行確定是最慢的,怎麼改進?第一反應:開多個線程。好吧,lowB 第一反應都是這個方法python
import time from concurrent.futures import ThreadPoolExecutor import requests urls_list = [ 'https://www.cnblogs.com/longyunfeigu/p/9491496.html', 'https://dig.chouti.com/', 'https://dig.chouti.com/r/pic/hot/1' ] start_time = time.time() pool = ThreadPoolExecutor(3) def task(url): requests.get(url) print('ending') for i in urls_list: pool.submit(task, i) pool.shutdown() end_time = time.time() print(end_time - start_time)
開多個線程雖然能夠實現併發,可是開線程總歸是要耗費資源的,那能不能利用一個線程幫忙提升效率呢,這就須要異步非阻塞登場了react
什麼是異步非阻塞?異步非阻塞就是 異步+非阻塞。異步就是回調,非阻塞指的是單個任務不等待。
在網絡IO中,有兩個階段會出現"浪費時間"的狀況,一個階段是connect的時候,請求發出去等待信息回來通知這個客戶端socket就緒能夠發http報文信息。
另外一個階段是recv接受數據的時候(發送數據不須要等待,直接發就行)須要等待,由於服務端需經過網絡把數據發送過來。非阻塞就是再也不等待,connect原先須要等待是吧,ok如今我不等了,
recv須要等待是吧,ok我也不等了。不等報錯咋辦?報錯就報錯唄,大不了異常捕獲就行。若是是非阻塞socket,那麼3個url的請求在connect的時候都發出去了,注意,即便報錯,請求也是如弦上之箭射出去了。
那麼等connect成功後就應該發送數據了呀,這時候就體現出回調了,成功就回來調用一段代碼。全部的抽象的話語均可以結合大體的代碼來理解。之後若是遇到一些抽象話語很差理解,那麼就應該用代碼去理解這個抽象話語。
有了異步非阻塞的概念是,那麼咱們就能夠利用一個線程把全部的鏈接都發送出去,等鏈接都發送出去並且任意一個connect的信息都沒返回的時候,線程就只好等待了,
等有connect的信息返回表示這個客戶端socket準備就緒能夠發http報文了,這時候再去執行發送數據的代碼。這個過程提及來簡單,但有一個問題還不清晰:程序怎麼知道哪一個socket是就緒的?這個大體有2種解決方式:要麼是通知(叫醒服務,這個更偏向底層,咱們寫的應用程序代碼能夠利用底層已經實現好的技術);要麼是用while死循環不斷去檢測 。python中有一些模塊已經幫咱們實現了異步非阻塞的功能。git
from twisted.web.client import getPage, defer from twisted.internet import reactor def all_done(arg): reactor.stop() def callback(contents): print(contents) deferred_list = [] url_list = ['http://www.bing.com', 'http://www.baidu.com', ] for url in url_list: deferred = getPage(bytes(url, encoding='utf8')) # 次數使用回調函數,猜想用到了異步功能 deferred.addCallback(callback) deferred_list.append(deferred) dlist = defer.DeferredList(deferred_list) dlist.addBoth(all_done) # 死循環,不斷地處理deferred_list裏的socket對象,按理說處理完成就應該自動中止了,但多是這個框架架構設計問題,不能作到自動中止,須要手動停 reactor.run()
gevent 是基於greenlet作的,greenlet實現了協程。協程和線程不同,協程不是真實存在的,是程序員僞造出來的具備相似於線程的切換效果。單純的協程不能完成提升效率的方式,如greenlet須要手動切換,因此這出現了gevent。
gevent 能夠在遇到IO阻塞的時候自動切換協程的運行,這樣就能夠提高效率。而協程切來切去的功能正好能夠利用過來實現異步非阻塞程序員
import gevent from gevent import monkey monkey.patch_all() import requests def fetch_async(method, url, req_kwargs): print(method, url, req_kwargs) response = requests.request(method=method, url=url, **req_kwargs) print(response.url) # ##### 發送請求 ##### gevent.joinall([ gevent.spawn(fetch_async, method='get', url='https://www.cnblogs.com/longyunfeigu/', req_kwargs={}), gevent.spawn(fetch_async, method='get', url='https://www.baidu.com/', req_kwargs={}), gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}), ])
requests模塊發請求,內部也是用socket,先去connect,而後發送請求,最後收到響應github
asyncio默認只支持發tcp層的報文,想要發http請求,就須要本身封裝http報文web
import asyncio @asyncio.coroutine def fetch_async(host, url='/'): print(host, url) reader, writer = yield from asyncio.open_connection(host, 80) request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,) request_header_content = bytes(request_header_content, encoding='utf-8') writer.write(request_header_content) yield from writer.drain() text = yield from reader.read() print(host, url, text) writer.close() tasks = [ fetch_async('www.cnblogs.com', '/longyunfeigu/'), fetch_async('baidu.com', '/') ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
運用知識:非阻塞socket + IO多路複用網絡
from socket import * import select import time class HttpContext(object): def __init__(self, sock, url_dict): import time self._start_time = time.time() self.sock = sock self.port = '' self.host = '' self.method = '' self.data = '' self.path = '' self.content = b'' self.text = '' self.timeout = 0 self.callback = None self.initial(url_dict) def initial(self, url_dict): self.port = url_dict.get('port') self.host = url_dict.get('host') self.method = url_dict.get('method') self.data = url_dict.get('data') self.path = url_dict.get('path') self.timeout = url_dict.get('timeout', 5) self.callback = url_dict.get('callback') def connect(self): self.sock.connect((self.host, self.port)) def fileno(self): return self.sock.fileno() def send_get(self): content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n"""%( self.method.upper(), self.path, self.host) self.sock.sendall(bytes(content, encoding='utf8')) def sendall(self): if self.method.upper() == 'GET': self.send_get() elif self.method.upper() == 'POST': self.send_post() else: pass def send_post(self): content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s"""%( self.method.upper(), self.path, self.host, self.data) self.sock.sendall(bytes(content, encoding='utf8')) def recv(self): import time while 1: data = self.sock.recv(8096) if not data: break self.content += data self.text += str(data, encoding='utf8') time.sleep(0.1) self.finish() def finish(self, msg=''): if msg: self.text = msg self.content = bytes(msg, encoding='utf8') if self.callback: self.callback(self.text) class AsynchRequest(object): def __init__(self): self.conn_socket_list = [] self.recv_socket_list = [] def add_request(self, **url_dict): # 當即發起connect鏈接 soc = socket(AF_INET,SOCK_STREAM) soc.setblocking(0) ctx = HttpContext(soc, url_dict) self.conn_socket_list.append(ctx) self.recv_socket_list.append(ctx) try: ctx.connect() except BlockingIOError as e: pass # print('request is sended') def check_timeout(self): # 檢驗是否超時 ctime = time.time() for ctx in self.recv_socket_list: if ctx._start_time + ctx.timeout <= ctime: self.recv_socket_list.remove(ctx) self.conn_socket_list.remove(ctx) ctx.finish('connect超時') def run(self): while 1: # r_list 表明socket對象是否有數據能夠讀, w_list表明socket是否能夠寫,也就是是否能夠發送數據,寫服務端程序通常只須要用到 r_list # IO多路複用監聽的對象不必定是socket對象,只要對象有fileno方法都能監聽,內部也是拿對象的fileno的返回值來監聽 r_list, w_list, e_list = select.select(self.conn_socket_list, self.recv_socket_list, [], 0.05) for w in w_list: w.sendall() self.recv_socket_list.remove(w) for r in r_list: r.recv() self.conn_socket_list.remove(r) if not self.conn_socket_list: break self.check_timeout() def callback(response): print(response) url_list = [ {'host': 'www.baidu.com','port': 80, 'path':'/', 'name':'baidu', 'method':'GET', 'callback': callback}, {'host': 'cn.bing.com','port': 80,'path':'/', 'name':'chouti', 'method':'GET'}, ] if __name__ == '__main__': obj = AsynchRequest() for i in url_list: obj.add_request(**i) obj.run()
這裏的自定義IO模塊是站在客戶端的角度來定義的,藉助底層的IO多路複用來幫咱們檢測socket是否已經準備好(不用咱們手動寫死循環去檢測socket對象是否已經準備好)多線程