今天介紹一下基於asyncio和aiohttp的異步爬蟲的編寫,解析html用的是xpath。html
該爬蟲實現瞭如下功能:
1.讀取csv文件中的爬取規則,根據規則爬取數據;代碼中添加了對3個網站的不一樣提取規則,若有須要,還能夠繼續添加;
2.將爬取到的數據保存到mysql數據庫中。前端
經過輸入問題,該爬蟲能爬取關於健康方面的數據。mysql
具體代碼以下:sql
# coding:utf-8 """ async-apiser xpath """ from lxml import etree import csv import re import os import asyncio import aiohttp import aiomysql from datetime import datetime from config import Config class HealthSpider(object): def __init__(self, user_id, keyword, url, hrule, drule, count, trule): self.user_id = user_id self.keyword = keyword self.url = url self.hrule = hrule self.drule = drule self.count = count self.trule = trule self.headers = '' self.urls_done = [] self.urls_will = [] self.spider_data = {} @staticmethod def handle_flag(str): """ 去除字符串中的style樣式標籤 :param html: :return: """ pattern = re.compile(r' style=".*?;"', re.S) return pattern.sub('', str) async def get_html(self, url, session): """ 根據url,返回html :param url: :return: """ try: async with session.get(url, headers=self.headers, timeout=5) as resp: if resp.status in [200, 201]: data = await resp.text() return data except Exception as e: raise Exception("數據搜索錯誤") def get_url(self, resp): """ 根據html獲取每條數據的url :param resp: :return: """ # 保存爬取的數據 root = etree.HTML(str(resp)) items = root.xpath(self.hrule) # html結構不一樣,組織url的方式也不一樣 if 5 == self.count: self.urls_will = ['https://dxy.com' + i for i in items[:5]] elif 3 == self.count: self.urls_will = [i for i in items[:3]] elif 2 == self.count: self.urls_will = [i for i in items[:2]] async def get_data(self, url, session, pool): """ 根據url獲取具體數據 :return: """ # 根據url解析出htm html = await self.get_html(url, session) # 保存爬取的數據 root = etree.HTML(str(html)) html_data = '' try: title = root.xpath(self.trule) title = ''.join(title) except Exception as e: title = '' try: data = root.xpath(self.drule) if data: # html結構不一樣,獲取數據的方式也不一樣 if 3 == self.count: html_data = ''.join(map(etree.tounicode, data)) # 去除結果中的style標籤 html_data = HealthSpider.handle_flag(html_data) else: html_data = etree.tounicode(data[0]) html_data = HealthSpider.handle_flag(html_data) except Exception as e: html_data = [] self.urls_done.append(url) # 數據入庫,保存:用戶id, 關鍵詞, 日期, 主url, 子url, html數據 if html_data: self.spider_data["data"].append({"title": title, "html_data": html_data}) spide_date = datetime.now() data = (self.user_id, self.keyword, spide_date, self.url, url, title, html_data) stmt = "INSERT INTO spider_data (user_id, keyword, spide_date, main_url, sub_url, title, html_data) " \ "VALUES (%s, %s, %s, %s, %s, %s, %s)" try: async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(stmt, data) except Exception as e: pass async def start_spider(self, pool): """ 開始爬取數據 :return: """ async with aiohttp.ClientSession() as session: self.spider_data["user_id"] = self.user_id self.spider_data["keyword"] = self.keyword self.spider_data["data"] = [] while True: # 待爬取url隊列爲空或者已經爬取3條數據,則中止爬取 if (len(self.urls_will) == 0) or len(self.spider_data["data"]) == self.count: break # 獲取待爬url url = self.urls_will.pop() # 開始爬取數據 if url not in self.urls_done: await self.get_data(url, session, pool) return self.spider_data async def main(self, loop): # 請求頭 self.headers = {'Accept': 'text/html, application/xhtml+xml, application/xml;q=0.9,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-Hans-CN, zh-Hans; q=0.5', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36 Edge/15.15063' } # 鏈接mysql數據庫 pool = await aiomysql.create_pool(host=Config.DB_HOST, port=Config.DB_PORT, user=Config.DB_USER, password=Config.DB_PASSWORD, db=Config.DB_NAME, loop=loop, charset="utf8", autocommit=True) async with aiohttp.ClientSession() as session: # 首次獲取html html = await self.get_html(self.url, session) # 獲取url self.get_url(html) data = await self.start_spider(pool) return data # asyncio.ensure_future(self.start_spider(pool)) def get_rules(keyword): """ 獲取csv中的xpath規則 :return: """ csv_dict = [] path = os.path.join(os.path.dirname(__file__), 'rules.csv') with open(path, 'rU') as f: reader = csv.DictReader(f) for line in reader: url = line['url'].format(keyword) hrule = line['hrule'] drule = line['drule'] count = int(line['count']) title = line['trule'] csv_dict.append({"url": url, "hrule": hrule, "drule": drule, "count": count, "trule": title}) return csv_dict def start_spider(keyword): """ 爬取數據 :param user_id: :param keyword: :return: """ try: data_list = get_rules(keyword) except Exception as e: raise Exception("搜索規則獲取失敗") spider_data = [] tasks = [] loop = asyncio.get_event_loop() for i in data_list: spider = HealthSpider(1, keyword, i['url'], i['hrule'], i['drule'], i['count'], i['trule']) # 任務列表 tasks.append(asyncio.ensure_future(spider.main(loop))) # 添加到loop loop.run_until_complete(asyncio.wait(tasks)) try: for task in tasks: for i in range(len(task.result()["data"])): spider_data.append(task.result()["data"][i]) except Exception as e: pass # 延時以等待底層打開的鏈接關閉 loop.run_until_complete(asyncio.sleep(0.250)) loop.close() return spider_data if __name__ == '__main__': # 爬取感冒了怎麼辦相關內容 start_spider("感冒了怎麼辦")
下面講一下代碼中某些方法的做用:
1.handle_flag()方法用於去掉html字符串中的style樣式標籤,保留html中的其餘標籤,便於前端的展現;
2.get_data()方法用於爬取具體數據,並使用aiomysql將爬取道德數據保存到數據庫;
數據庫的配置文件config.py:數據庫
# coding=utf-8 class Config(object): DB_ENGINE = 'mysql' DB_HOST = '127.0.0.1' DB_PORT = 3306 DB_USER = 'root' DB_PASSWORD = 'wyzane' DB_NAME = 'db_tornado' DB_OPTIONS = { 'init_command': "SET sql_mode='STRICT_TRANS_TABLES'", 'charset': 'utf8mb4', }
3.get_rules()方法用於從rules.csv文件中讀取爬取的規則。由於這裏同時爬取了3個不一樣的網站,因爲每一個網站解析html的xpath規則不一樣,而且每一個網站提取的數據條數不一樣,因此把這些規則寫到了rules.csv文件(就是一個excel文件)中。先讀取規則,再爬取數據。api
以上就是基於asyncio的異步爬蟲的代碼,若有錯誤,歡迎交流指正!session