#aio 爬蟲,去重,入庫 import asyncio import aiohttp import aiomysql import re from pyquery import PyQuery stoping = False start_url = 'http://www.jobbole.com/' waiting_urls = [] seen_urls = set() # url去重 --布隆過濾器 bloom filter sem = asyncio.Semaphore(3) #限制併發數量 async def fetch(url,session): async with sem: #await asyncio.sleep(0.5) try: async with session.get(url) as resp: print(resp.status) if resp.status in [200,201]: data = await resp.text() return data except Exception as e : print(e) #由於不是耗費 io的 因此用普通函數 def extract_urls(html): urls = [] pq = PyQuery(html) for link in pq.items('a'): url = link.attr('href') if url and url.startswith('http') and url not in seen_urls: urls.append(url) waiting_urls.append(url) return urls async def init_urls(url,session): html = await fetch(url,session) seen_urls.add(url) extract_urls(html) async def article_handeler(url,session,pool): #獲取文章詳情,並解析入庫 html = await fetch(url,session) seen_urls.add(url) extract_urls(html) pq = PyQuery(html) title = pq('title').text() async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute('SELECT 42;') insert_sql = 'insert into aiomysql_test(title) VALUES ("{}")'.format(title) await cur.execute(insert_sql) async def consumer(pool): async with aiohttp.ClientSession() as session: while not stoping: if len(waiting_urls) == 0: await asyncio.sleep(0.5) continue url = waiting_urls.pop() print('start get url:{}'.format(url)) if re.match('http://.*?jobbole.com/\d+/',url): if url not in seen_urls: asyncio.ensure_future(article_handeler(url,session,pool)) await asyncio.sleep(0.5) else: if url not in seen_urls: asyncio.ensure_future(init_urls(url,session)) async def main(loop): #等待mysql連接創建好 pool = await aiomysql.create_pool(host='',port = 3306, user = 'root',password='123456', db = 'aiomysql_test',loop=loop, charset = 'utf8',autocommit = True) async with aiohttp.ClientSession() as session: html = await fetch(start_url, session) seen_urls.add(start_url) extract_urls(html) asyncio.ensure_future(consumer(pool)) if __name__ == "__main__": loop = asyncio.get_event_loop() asyncio.ensure_future(main(loop)) loop.run_forever()