aiohttp筆記

簡介

aiohttp須要python3.5.3以及更高的版本,它不但能作客戶端爬蟲,也能作服務器端,利用asyncio,協程,十分高效
官方文檔html

採集模板

一批,一次性採集

import asyncio
import logging
import time
from aiohttp import ClientSession, ClientTimeout


logging.basicConfig(level=logging.INFO, format='[%(asctime)s] - %(levelname)s in %(filename)s.%(funcName)s: %(message)s')

# 默認請求頭
HEADERS = {
    'accept': 'text/javascript, text/html, application/xml, text/xml, */*',
    'accept-encoding': 'gzip, deflate, br',
    'accept-language': 'zh-CN,zh;q=0.9',
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
                  'Chrome/69.0.3497.100 Safari/537.36',
}

# 默認超時時間
TIMEOUT = 15


class AioCrawl:

    def __init__(self):
        self.logger = logging.getLogger(__name__)

    async def fetch(self, url, method='GET', headers=None, timeout=TIMEOUT, cookies=None, data=None):
        """採集纖程"""

        method = 'POST' if method.upper() == 'POST' else 'GET'
        headers = headers if headers else HEADERS
        timeout = ClientTimeout(total=timeout)
        cookies = cookies if cookies else None
        data = data if data and isinstance(data, dict) else {}

        async with ClientSession(headers=headers, timeout=timeout, cookies=cookies) as session:
            try:
                if method == 'GET':
                    async with session.get(url) as response:
                        return await response.read()
                else:
                    async with session.post(url, data=data) as response:
                        return await response.read()
            except Exception as e:
                raise e

    def prepare_fetch(self, urls):
        """準備future_list"""
        return [asyncio.ensure_future(self.fetch(url)) for url in urls]

    def crawl_batch_urls(self, urls):
        """執行採集"""
        future_list = self.prepare_fetch(urls)

        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait(future_list))

        self.logger.info('採集完一批: {}'.format(len(urls)))
        return future_list


if __name__ == '__main__':
    a = AioCrawl()
    # 2-4秒
    t0 = time.time()
    future_list = a.crawl_batch_urls(['https://www.sina.com.cn' for _ in range(5)])
    print(time.time() - t0)

    for future in future_list:
        if future.exception():
            print(future.exception())
        else:
            print(len(future.result()))

動態添加任務

import asyncio
from threading import Thread

from aiohttp import ClientSession, ClientTimeout


# 默認請求頭
HEADERS = {
    'accept': 'text/javascript, text/html, application/xml, text/xml, */*',
    'accept-encoding': 'gzip, deflate, br',
    'accept-language': 'zh-CN,zh;q=0.9',
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
                  'Chrome/69.0.3497.100 Safari/537.36',
}

# 默認超時時間
TIMEOUT = 15


def start_loop(loop):
    """驅動事件循環"""
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def fetch(url, method='GET', headers=None, timeout=TIMEOUT, cookies=None, data=None):
    """採集纖程"""

    method = 'POST' if method.upper() == 'POST' else 'GET'
    headers = headers if headers else HEADERS
    timeout = ClientTimeout(total=timeout)
    cookies = cookies if cookies else None
    data = data if data and isinstance(data, dict) else {}

    async with ClientSession(headers=headers, timeout=timeout, cookies=cookies) as session:
        try:
            if method == 'GET':
                async with session.get(url) as response:
                    return await response.read()
            else:
                async with session.post(url, data=data) as response:
                    return await response.read()
        except Exception as e:
            raise e


def callback(future):
    """回調函數"""
    print(future)


if __name__ == '__main__':
    # 啓動事件循環
    event_loop = asyncio.new_event_loop()
    t = Thread(target=start_loop, args=(event_loop,))
    t.setDaemon(True)
    t.start()

    # 添加任務
    future = asyncio.run_coroutine_threadsafe(fetch('https://www.sina.com.cn'), event_loop)
    future.add_done_callback(callback)  # 給future對象添加回調函數

動態添加任務,封裝成類

import asyncio
import logging
import time
from threading import Thread
from aiohttp import ClientSession, ClientTimeout, TCPConnector


# 默認請求頭
HEADERS = {
    'accept': 'text/javascript, text/html, application/xml, text/xml, */*',
    'accept-encoding': 'gzip, deflate, br',
    'accept-language': 'zh-CN,zh;q=0.9',
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
                  '(KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36',
}


# 默認超時時間
TIMEOUT = 15


def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


class AioCrawl:

    def __init__(self):
        self.logger = logging.getLogger(__name__)

        # 啓動事件循環
        self.event_loop = asyncio.new_event_loop()
        self.t = Thread(target=start_loop, args=(self.event_loop,))
        self.t.setDaemon(True)
        self.t.start()

        self.concurrent = 0  # 記錄併發數

    async def fetch(self, url, method='GET', headers=None, timeout=TIMEOUT, cookies=None, data=None, proxy=None):
        """採集纖程
        :param url: str
        :param method: 'GET' or 'POST'
        :param headers: dict()
        :param timeout: int
        :param cookies:
        :param data: dict()
        :param proxy: str
        :return: (status, content)
        """

        method = 'POST' if method.upper() == 'POST' else 'GET'
        headers = headers if headers else HEADERS
        timeout = ClientTimeout(total=timeout)
        cookies = cookies if cookies else None
        data = data if data and isinstance(data, dict) else {}

        tcp_connector = TCPConnector(verify_ssl=False)  # 禁用證書驗證
        async with ClientSession(headers=headers, timeout=timeout, cookies=cookies, connector=tcp_connector) as session:
            try:
                if method == 'GET':
                    async with session.get(url, proxy=proxy) as response:
                        content = await response.read()
                        return response.status, content
                else:
                    async with session.post(url, data=data, proxy=proxy) as response:
                        content = await response.read()
                        return response.status, content
            except Exception as e:
                raise e

    def callback(self, future):
        """回調函數
        1.處理並轉換成Result對象
        2.寫數據庫
        """
        msg = str(future.exception()) if future.exception() else 'success'
        code = 1 if msg == 'success' else 0
        status = future.result()[0] if code == 1 else None
        data = future.result()[1] if code == 1 else b''  # 空串

        data_len = len(data) if data else 0
        if code == 0 or (status is not None and status != 200):  # 打印小異常
            self.logger.warning('<url="{}", code={}, msg="{}", status={}, data(len):{}>'.format(
                future.url, code, msg, status, data_len))

        self.concurrent -= 1  # 併發數-1

        print(len(data))

    def add_tasks(self, tasks):
        """添加任務
        :param tasks: list <class Task>
        :return: future
        """
        for task in tasks:
            # asyncio.run_coroutine_threadsafe 接收一個協程對象和,事件循環對象
            future = asyncio.run_coroutine_threadsafe(self.fetch(task), self.event_loop)
            future.add_done_callback(self.callback)  # 給future對象添加回調函數
            self.concurrent += 1  # 併發數加 1


if __name__ == '__main__':
    a = AioCrawl()

    for _ in range(5):
        a.add_tasks(['https://www.sina.com.cn' for _ in range(2)])  # 模擬動態添加任務
        time.sleep(1)
相關文章
相關標籤/搜索