目錄javascript
aiohttp須要python3.5.3以及更高的版本,它不但能作客戶端爬蟲,也能作服務器端,利用asyncio,協程,十分高效
官方文檔html
import asyncio import logging import time from aiohttp import ClientSession, ClientTimeout logging.basicConfig(level=logging.INFO, format='[%(asctime)s] - %(levelname)s in %(filename)s.%(funcName)s: %(message)s') # 默認請求頭 HEADERS = { 'accept': 'text/javascript, text/html, application/xml, text/xml, */*', 'accept-encoding': 'gzip, deflate, br', 'accept-language': 'zh-CN,zh;q=0.9', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/69.0.3497.100 Safari/537.36', } # 默認超時時間 TIMEOUT = 15 class AioCrawl: def __init__(self): self.logger = logging.getLogger(__name__) async def fetch(self, url, method='GET', headers=None, timeout=TIMEOUT, cookies=None, data=None): """採集纖程""" method = 'POST' if method.upper() == 'POST' else 'GET' headers = headers if headers else HEADERS timeout = ClientTimeout(total=timeout) cookies = cookies if cookies else None data = data if data and isinstance(data, dict) else {} async with ClientSession(headers=headers, timeout=timeout, cookies=cookies) as session: try: if method == 'GET': async with session.get(url) as response: return await response.read() else: async with session.post(url, data=data) as response: return await response.read() except Exception as e: raise e def prepare_fetch(self, urls): """準備future_list""" return [asyncio.ensure_future(self.fetch(url)) for url in urls] def crawl_batch_urls(self, urls): """執行採集""" future_list = self.prepare_fetch(urls) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(future_list)) self.logger.info('採集完一批: {}'.format(len(urls))) return future_list if __name__ == '__main__': a = AioCrawl() # 2-4秒 t0 = time.time() future_list = a.crawl_batch_urls(['https://www.sina.com.cn' for _ in range(5)]) print(time.time() - t0) for future in future_list: if future.exception(): print(future.exception()) else: print(len(future.result()))
import asyncio from threading import Thread from aiohttp import ClientSession, ClientTimeout # 默認請求頭 HEADERS = { 'accept': 'text/javascript, text/html, application/xml, text/xml, */*', 'accept-encoding': 'gzip, deflate, br', 'accept-language': 'zh-CN,zh;q=0.9', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/69.0.3497.100 Safari/537.36', } # 默認超時時間 TIMEOUT = 15 def start_loop(loop): """驅動事件循環""" asyncio.set_event_loop(loop) loop.run_forever() async def fetch(url, method='GET', headers=None, timeout=TIMEOUT, cookies=None, data=None): """採集纖程""" method = 'POST' if method.upper() == 'POST' else 'GET' headers = headers if headers else HEADERS timeout = ClientTimeout(total=timeout) cookies = cookies if cookies else None data = data if data and isinstance(data, dict) else {} async with ClientSession(headers=headers, timeout=timeout, cookies=cookies) as session: try: if method == 'GET': async with session.get(url) as response: return await response.read() else: async with session.post(url, data=data) as response: return await response.read() except Exception as e: raise e def callback(future): """回調函數""" print(future) if __name__ == '__main__': # 啓動事件循環 event_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(event_loop,)) t.setDaemon(True) t.start() # 添加任務 future = asyncio.run_coroutine_threadsafe(fetch('https://www.sina.com.cn'), event_loop) future.add_done_callback(callback) # 給future對象添加回調函數
import asyncio import logging import time from threading import Thread from aiohttp import ClientSession, ClientTimeout, TCPConnector # 默認請求頭 HEADERS = { 'accept': 'text/javascript, text/html, application/xml, text/xml, */*', 'accept-encoding': 'gzip, deflate, br', 'accept-language': 'zh-CN,zh;q=0.9', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36', } # 默認超時時間 TIMEOUT = 15 def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() class AioCrawl: def __init__(self): self.logger = logging.getLogger(__name__) # 啓動事件循環 self.event_loop = asyncio.new_event_loop() self.t = Thread(target=start_loop, args=(self.event_loop,)) self.t.setDaemon(True) self.t.start() self.concurrent = 0 # 記錄併發數 async def fetch(self, url, method='GET', headers=None, timeout=TIMEOUT, cookies=None, data=None, proxy=None): """採集纖程 :param url: str :param method: 'GET' or 'POST' :param headers: dict() :param timeout: int :param cookies: :param data: dict() :param proxy: str :return: (status, content) """ method = 'POST' if method.upper() == 'POST' else 'GET' headers = headers if headers else HEADERS timeout = ClientTimeout(total=timeout) cookies = cookies if cookies else None data = data if data and isinstance(data, dict) else {} tcp_connector = TCPConnector(verify_ssl=False) # 禁用證書驗證 async with ClientSession(headers=headers, timeout=timeout, cookies=cookies, connector=tcp_connector) as session: try: if method == 'GET': async with session.get(url, proxy=proxy) as response: content = await response.read() return response.status, content else: async with session.post(url, data=data, proxy=proxy) as response: content = await response.read() return response.status, content except Exception as e: raise e def callback(self, future): """回調函數 1.處理並轉換成Result對象 2.寫數據庫 """ msg = str(future.exception()) if future.exception() else 'success' code = 1 if msg == 'success' else 0 status = future.result()[0] if code == 1 else None data = future.result()[1] if code == 1 else b'' # 空串 data_len = len(data) if data else 0 if code == 0 or (status is not None and status != 200): # 打印小異常 self.logger.warning('<url="{}", code={}, msg="{}", status={}, data(len):{}>'.format( future.url, code, msg, status, data_len)) self.concurrent -= 1 # 併發數-1 print(len(data)) def add_tasks(self, tasks): """添加任務 :param tasks: list <class Task> :return: future """ for task in tasks: # asyncio.run_coroutine_threadsafe 接收一個協程對象和,事件循環對象 future = asyncio.run_coroutine_threadsafe(self.fetch(task), self.event_loop) future.add_done_callback(self.callback) # 給future對象添加回調函數 self.concurrent += 1 # 併發數加 1 if __name__ == '__main__': a = AioCrawl() for _ in range(5): a.add_tasks(['https://www.sina.com.cn' for _ in range(2)]) # 模擬動態添加任務 time.sleep(1)