# -*- coding: utf-8 -*- """ # @Time : 19-8-6 下午3:55 # @Author : fifa """ import time import aiohttp import asyncio from lxml import etree import motor.motor_asyncio from aiostream import stream # 限制併發數 sem = asyncio.Semaphore(5) client = motor.motor_asyncio.AsyncIOMotorClient('mongo鏈接url') db = client['google'] collection_insert = db['company_info'] collection_update = db['company'] async def insert_data(data): """插入數據""" if data.get('name'): try: await collection_insert.insert_one(data) await collection_update.update_one({'number': '{}'.format(data.get('number'))}, {'$set': {'status': 'True'}}) except: await collection_insert.detele_many({'number': '{}'.format(data.get('number'))}) await collection_update.update_one({'number': '{}'.format(data.get('number'))}, {'$set': {'status': 'False'}}) else: print(data) async def analysis_data(response, old_data): if response:
# 解析代碼 await insert_data(data) async def get_html(url): headers = { 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36' } async with aiohttp.ClientSession(headers=headers) as session: try: resp = await session.get(url='代理ip接口', timeout=3) proxy = await resp.text() response = await session.get(url, proxy='http://' + proxy, timeout=3) return await response.text() except: return '' async def bound_fetch(data): """限制併發數量""" async with sem: response = await get_html(data.get("url")) await analysis_data(response, data) async def run():
# 使用aiostream對抓取隊列進行分流 document = collection_update.find({'status': 'False'}) limit = 20 async_items = stream.preserve(document) while True: amount = async_items[0: 0 + limit] urls = await stream.list(amount) if not urls: break tasks = [asyncio.ensure_future(bound_fetch(i)) for i in urls] await asyncio.wait(tasks) client.close() start = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(run()) print(time.time() - start)