自定義異步IO模塊

高性能爬蟲

假設有3個url須要發請求。html

串行

import requests

urls_list = [
    'https://www.cnblogs.com/longyunfeigu/p/9491496.html',
    'https://dig.chouti.com/',
    'https://dig.chouti.com/r/pic/hot/1'
]

for url in urls_list:
    requests.get(url)

串行確定是最慢的,怎麼改進?第一反應:開多個線程。好吧,lowB 第一反應都是這個方法python

多線程

import time
from concurrent.futures import ThreadPoolExecutor
import requests

urls_list = [
    'https://www.cnblogs.com/longyunfeigu/p/9491496.html',
    'https://dig.chouti.com/',
    'https://dig.chouti.com/r/pic/hot/1'
]

start_time = time.time()
pool = ThreadPoolExecutor(3)
def task(url):
    requests.get(url)
    print('ending')

for i in urls_list:
    pool.submit(task, i)
pool.shutdown()
end_time = time.time()
print(end_time - start_time)

開多個線程雖然能夠實現併發,可是開線程總歸是要耗費資源的,那能不能利用一個線程幫忙提升效率呢,這就須要異步非阻塞登場了react

異步非阻塞

什麼是異步非阻塞?異步非阻塞就是 異步+非阻塞。異步就是回調,非阻塞指的是單個任務不等待。
在網絡IO中,有兩個階段會出現"浪費時間"的狀況,一個階段是connect的時候,請求發出去等待信息回來通知這個客戶端socket就緒能夠發http報文信息。
另外一個階段是recv接受數據的時候(發送數據不須要等待,直接發就行)須要等待,由於服務端需經過網絡把數據發送過來。非阻塞就是再也不等待,connect原先須要等待是吧,ok如今我不等了,
recv須要等待是吧,ok我也不等了。不等報錯咋辦?報錯就報錯唄,大不了異常捕獲就行。若是是非阻塞socket,那麼3個url的請求在connect的時候都發出去了,注意,即便報錯,請求也是如弦上之箭射出去了。
那麼等connect成功後就應該發送數據了呀,這時候就體現出回調了,成功就回來調用一段代碼。全部的抽象的話語均可以結合大體的代碼來理解。之後若是遇到一些抽象話語很差理解,那麼就應該用代碼去理解這個抽象話語。
有了異步非阻塞的概念是,那麼咱們就能夠利用一個線程把全部的鏈接都發送出去,等鏈接都發送出去並且任意一個connect的信息都沒返回的時候,線程就只好等待了,
等有connect的信息返回表示這個客戶端socket準備就緒能夠發http報文了,這時候再去執行發送數據的代碼。這個過程提及來簡單,但有一個問題還不清晰:程序怎麼知道哪一個socket是就緒的?這個大體有2種解決方式:要麼是通知(叫醒服務,這個更偏向底層,咱們寫的應用程序代碼能夠利用底層已經實現好的技術);要麼是用while死循環不斷去檢測 。python中有一些模塊已經幫咱們實現了異步非阻塞的功能。git

twisted

from twisted.web.client import getPage, defer
from twisted.internet import reactor


def all_done(arg):
    reactor.stop()


def callback(contents):
    print(contents)


deferred_list = []

url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
for url in url_list:
    deferred = getPage(bytes(url, encoding='utf8'))
    # 次數使用回調函數,猜想用到了異步功能
    deferred.addCallback(callback)
    deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)
# 死循環,不斷地處理deferred_list裏的socket對象,按理說處理完成就應該自動中止了,但多是這個框架架構設計問題,不能作到自動中止,須要手動停
reactor.run()

gevent

gevent 是基於greenlet作的,greenlet實現了協程。協程和線程不同,協程不是真實存在的,是程序員僞造出來的具備相似於線程的切換效果。單純的協程不能完成提升效率的方式,如greenlet須要手動切換,因此這出現了gevent。
gevent 能夠在遇到IO阻塞的時候自動切換協程的運行,這樣就能夠提高效率。而協程切來切去的功能正好能夠利用過來實現異步非阻塞程序員

import gevent
from gevent import monkey

monkey.patch_all()
import requests

def fetch_async(method, url, req_kwargs):
    print(method, url, req_kwargs)
    response = requests.request(method=method, url=url, **req_kwargs)
    print(response.url)

# ##### 發送請求 #####
gevent.joinall([
    gevent.spawn(fetch_async, method='get', url='https://www.cnblogs.com/longyunfeigu/', req_kwargs={}),
    gevent.spawn(fetch_async, method='get', url='https://www.baidu.com/', req_kwargs={}),
    gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
])

requests模塊發請求,內部也是用socket,先去connect,而後發送請求,最後收到響應github

asyncio

asyncio默認只支持發tcp層的報文,想要發http請求,就須要本身封裝http報文web

import asyncio

@asyncio.coroutine
def fetch_async(host, url='/'):
    print(host, url)
    reader, writer = yield from asyncio.open_connection(host, 80)

    request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,)
    request_header_content = bytes(request_header_content, encoding='utf-8')

    writer.write(request_header_content)
    yield from writer.drain()
    text = yield from reader.read()
    print(host, url, text)
    writer.close()

tasks = [
    fetch_async('www.cnblogs.com', '/longyunfeigu/'),
    fetch_async('baidu.com', '/')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

自定義IO模塊

運用知識:非阻塞socket + IO多路複用網絡

from socket import *
import select
import time

class HttpContext(object):
    def __init__(self, sock, url_dict):
        import time
        self._start_time = time.time()
        self.sock = sock
        self.port = ''
        self.host = ''
        self.method = ''
        self.data = ''
        self.path = ''
        self.content = b''
        self.text = ''
        self.timeout = 0
        self.callback = None
        self.initial(url_dict)

    def initial(self, url_dict):
        self.port = url_dict.get('port')
        self.host = url_dict.get('host')
        self.method = url_dict.get('method')
        self.data = url_dict.get('data')
        self.path = url_dict.get('path')
        self.timeout = url_dict.get('timeout', 5)
        self.callback = url_dict.get('callback')

    def connect(self):
        self.sock.connect((self.host, self.port))

    def fileno(self):
        return self.sock.fileno()

    def send_get(self):
        content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n"""%(
            self.method.upper(), self.path, self.host)
        self.sock.sendall(bytes(content, encoding='utf8'))

    def sendall(self):
        if self.method.upper() == 'GET':
            self.send_get()
        elif self.method.upper() == 'POST':
            self.send_post()
        else:
            pass

    def send_post(self):
        content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s"""%(
            self.method.upper(), self.path, self.host, self.data)
        self.sock.sendall(bytes(content, encoding='utf8'))

    def recv(self):
        import time
        while 1:
            data = self.sock.recv(8096)
            if not data:
                break
            self.content += data
            self.text += str(data, encoding='utf8')
            time.sleep(0.1)
        self.finish()

    def finish(self, msg=''):
        if msg:
            self.text = msg
            self.content = bytes(msg, encoding='utf8')
        if self.callback:
            self.callback(self.text)


class AsynchRequest(object):
    def __init__(self):
        self.conn_socket_list = []
        self.recv_socket_list = []

    def add_request(self, **url_dict):
        # 當即發起connect鏈接
        soc = socket(AF_INET,SOCK_STREAM)
        soc.setblocking(0)
        ctx = HttpContext(soc, url_dict)
        self.conn_socket_list.append(ctx)
        self.recv_socket_list.append(ctx)
        try:
            ctx.connect()
        except BlockingIOError as e:
            pass
            # print('request is sended')

    def check_timeout(self):
        # 檢驗是否超時
        ctime = time.time()
        for ctx in self.recv_socket_list:
            if ctx._start_time + ctx.timeout <= ctime:
                self.recv_socket_list.remove(ctx)
                self.conn_socket_list.remove(ctx)
                ctx.finish('connect超時')

    def run(self):
        while 1:
            #  r_list 表明socket對象是否有數據能夠讀, w_list表明socket是否能夠寫,也就是是否能夠發送數據,寫服務端程序通常只須要用到 r_list
            # IO多路複用監聽的對象不必定是socket對象,只要對象有fileno方法都能監聽,內部也是拿對象的fileno的返回值來監聽
            r_list, w_list, e_list = select.select(self.conn_socket_list, self.recv_socket_list, [], 0.05)
            for w in w_list:
                w.sendall()
                self.recv_socket_list.remove(w)
            for r in r_list:
                r.recv()
                self.conn_socket_list.remove(r)
            if not self.conn_socket_list:
                break
            self.check_timeout()

def callback(response):
    print(response)

url_list = [
    {'host': 'www.baidu.com','port': 80, 'path':'/', 'name':'baidu', 'method':'GET', 'callback': callback},
    {'host': 'cn.bing.com','port': 80,'path':'/', 'name':'chouti', 'method':'GET'},
]
if __name__ == '__main__':    
    obj = AsynchRequest()
    for i in url_list:
        obj.add_request(**i)
    obj.run()

這裏的自定義IO模塊是站在客戶端的角度來定義的,藉助底層的IO多路複用來幫咱們檢測socket是否已經準備好(不用咱們手動寫死循環去檢測socket對象是否已經準備好)多線程

相關文章
相關標籤/搜索