基於asyncio、aiohttp、xpath的異步爬蟲

今天介紹一下基於asyncio和aiohttp的異步爬蟲的編寫,解析html用的是xpath。html

該爬蟲實現瞭如下功能:
1.讀取csv文件中的爬取規則,根據規則爬取數據;代碼中添加了對3個網站的不一樣提取規則,若有須要,還能夠繼續添加;
2.將爬取到的數據保存到mysql數據庫中。前端

經過輸入問題,該爬蟲能爬取關於健康方面的數據。mysql

具體代碼以下:sql

# coding:utf-8


"""
async-apiser xpath
"""


from lxml import etree
import csv
import re
import os
import asyncio
import aiohttp
import aiomysql
from datetime import datetime

from config import Config


class HealthSpider(object):

    def __init__(self, user_id, keyword, url, hrule, drule, count, trule):
        self.user_id = user_id
        self.keyword = keyword
        self.url = url
        self.hrule = hrule
        self.drule = drule
        self.count = count
        self.trule = trule
        self.headers = ''
        self.urls_done = []
        self.urls_will = []
        self.spider_data = {}

    @staticmethod
    def handle_flag(str):
        """
        去除字符串中的style樣式標籤
        :param html:
        :return:
        """
        pattern = re.compile(r' style=".*?;"', re.S)
        return pattern.sub('', str)

    async def get_html(self, url, session):
        """
        根據url,返回html
        :param url:
        :return:
        """
        try:
            async with session.get(url, headers=self.headers, timeout=5) as resp:
                if resp.status in [200, 201]:
                    data = await resp.text()
                    return data
        except Exception as e:
            raise Exception("數據搜索錯誤")

    def get_url(self, resp):
        """
        根據html獲取每條數據的url
        :param resp:
        :return:
        """
        # 保存爬取的數據
        root = etree.HTML(str(resp))
        items = root.xpath(self.hrule)
        # html結構不一樣,組織url的方式也不一樣
        if 5 == self.count:
            self.urls_will = ['https://dxy.com' + i for i in items[:5]]
        elif 3 == self.count:
            self.urls_will = [i for i in items[:3]]
        elif 2 == self.count:
            self.urls_will = [i for i in items[:2]]

    async def get_data(self, url, session, pool):
        """
        根據url獲取具體數據
        :return:
        """
        # 根據url解析出htm
        html = await self.get_html(url, session)
        # 保存爬取的數據
        root = etree.HTML(str(html))
        html_data = ''
        try:
            title = root.xpath(self.trule)
            title = ''.join(title)
        except Exception as e:
            title = ''
        try:
            data = root.xpath(self.drule)
            if data:
                # html結構不一樣,獲取數據的方式也不一樣
                if 3 == self.count:
                    html_data = ''.join(map(etree.tounicode, data))
                    # 去除結果中的style標籤
                    html_data = HealthSpider.handle_flag(html_data)
                else:
                    html_data = etree.tounicode(data[0])
                    html_data = HealthSpider.handle_flag(html_data)
        except Exception as e:
            html_data = []

        self.urls_done.append(url)
        # 數據入庫,保存:用戶id, 關鍵詞, 日期, 主url, 子url, html數據
        if html_data:
            self.spider_data["data"].append({"title": title, "html_data": html_data})
            spide_date = datetime.now()
            data = (self.user_id, self.keyword, spide_date, self.url, url, title, html_data)
            stmt = "INSERT INTO spider_data (user_id, keyword, spide_date,  main_url, sub_url, title, html_data) " \
                   "VALUES (%s, %s, %s, %s, %s, %s, %s)"
            try:
                async with pool.acquire() as conn:
                    async with conn.cursor() as cur:
                        await cur.execute(stmt, data)
            except Exception as e:
                pass

    async def start_spider(self, pool):
        """
        開始爬取數據
        :return:
        """
        async with aiohttp.ClientSession() as session:
            self.spider_data["user_id"] = self.user_id
            self.spider_data["keyword"] = self.keyword
            self.spider_data["data"] = []
            while True:
                # 待爬取url隊列爲空或者已經爬取3條數據,則中止爬取
                if (len(self.urls_will) == 0) or len(self.spider_data["data"]) == self.count:
                    break
                # 獲取待爬url
                url = self.urls_will.pop()
                # 開始爬取數據
                if url not in self.urls_done:
                    await self.get_data(url, session, pool)
            return self.spider_data

    async def main(self, loop):
        # 請求頭
        self.headers = {'Accept': 'text/html, application/xhtml+xml, application/xml;q=0.9,*/*;q=0.8',
                        'Accept-Encoding': 'gzip, deflate',
                        'Accept-Language': 'zh-Hans-CN, zh-Hans; q=0.5',
                        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
                                      '(KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36 Edge/15.15063'
                        }

        # 鏈接mysql數據庫
        pool = await aiomysql.create_pool(host=Config.DB_HOST, port=Config.DB_PORT,
                                          user=Config.DB_USER, password=Config.DB_PASSWORD,
                                          db=Config.DB_NAME, loop=loop, charset="utf8", autocommit=True)
        async with aiohttp.ClientSession() as session:
            # 首次獲取html
            html = await self.get_html(self.url, session)
            # 獲取url
            self.get_url(html)
        data = await self.start_spider(pool)
        return data
        # asyncio.ensure_future(self.start_spider(pool))


def get_rules(keyword):
    """
    獲取csv中的xpath規則
    :return:
    """
    csv_dict = []
    path = os.path.join(os.path.dirname(__file__), 'rules.csv')
    with open(path, 'rU') as f:
        reader = csv.DictReader(f)
        for line in reader:
            url = line['url'].format(keyword)
            hrule = line['hrule']
            drule = line['drule']
            count = int(line['count'])
            title = line['trule']
            csv_dict.append({"url": url, "hrule": hrule, "drule": drule, "count": count, "trule": title})
    return csv_dict


def start_spider(keyword):
    """
    爬取數據
    :param user_id:
    :param keyword:
    :return:
    """
    try:
        data_list = get_rules(keyword)
    except Exception as e:
        raise Exception("搜索規則獲取失敗")
    spider_data = []
    tasks = []
    loop = asyncio.get_event_loop()
    for i in data_list:
        spider = HealthSpider(1, keyword, i['url'], i['hrule'], i['drule'], i['count'], i['trule'])
        # 任務列表
        tasks.append(asyncio.ensure_future(spider.main(loop)))
        # 添加到loop
        loop.run_until_complete(asyncio.wait(tasks))

    try:
        for task in tasks:
            for i in range(len(task.result()["data"])):
                spider_data.append(task.result()["data"][i])
    except Exception as e:
        pass
    # 延時以等待底層打開的鏈接關閉
    loop.run_until_complete(asyncio.sleep(0.250))
    loop.close()
    return spider_data


if __name__ == '__main__':
    # 爬取感冒了怎麼辦相關內容
    start_spider("感冒了怎麼辦")

下面講一下代碼中某些方法的做用:
1.handle_flag()方法用於去掉html字符串中的style樣式標籤,保留html中的其餘標籤,便於前端的展現;
2.get_data()方法用於爬取具體數據,並使用aiomysql將爬取道德數據保存到數據庫;
數據庫的配置文件config.py:數據庫

# coding=utf-8


class Config(object):
    DB_ENGINE = 'mysql'
    DB_HOST = '127.0.0.1'
    DB_PORT = 3306
    DB_USER = 'root'
    DB_PASSWORD = 'wyzane'
    DB_NAME = 'db_tornado'
    DB_OPTIONS = {
        'init_command': "SET sql_mode='STRICT_TRANS_TABLES'",
        'charset': 'utf8mb4',
    }

3.get_rules()方法用於從rules.csv文件中讀取爬取的規則。由於這裏同時爬取了3個不一樣的網站,因爲每一個網站解析html的xpath規則不一樣,而且每一個網站提取的數據條數不一樣,因此把這些規則寫到了rules.csv文件(就是一個excel文件)中。先讀取規則,再爬取數據。api

以上就是基於asyncio的異步爬蟲的代碼,若有錯誤,歡迎交流指正!session

相關文章
相關標籤/搜索