1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # asyncio爬蟲、去重、入庫 4 5 import asyncio 6 import re 7 8 import aiohttp 9 import aiomysql 10 from pyquery import PyQuery 11 from aiohttp import TCPConnector 12 13 14 start_url = 'http://www.xbiquge.la/paihangbang/' 15 waitting_urls = [] 16 seen_urls = set() 17 stopping = False 18 19 sem = asyncio.Semaphore(3) 20 21 22 async def fetch(url, session): 23 async with sem: 24 try: 25 async with session.get(url) as resp: 26 print('url status: {}'.format(resp.status)) 27 if resp.status in [200, 201]: 28 data = await resp.text() 29 return data 30 except Exception as e: 31 print(e) 32 33 34 def extract_urls(html): 35 pq = PyQuery(html) 36 for link in pq.items('a'): 37 url = link.attr('href') 38 if url and url.startswith('http') and url not in seen_urls: 39 global waitting_urls 40 waitting_urls.append(url) 41 42 43 async def init_urls(url, session): 44 html = await fetch(url, session) 45 seen_urls.add(url) 46 extract_urls(html) 47 48 49 async def article_handle(url, session, pool): 50 # 獲取文章詳情並解析入庫 51 html = await fetch(url, session) 52 seen_urls.add(url) 53 extract_urls(html) 54 pq = PyQuery(html) 55 title = pq("title").text() 56 # title = title + '\n' 57 async with pool.acquire() as conn: 58 async with conn.cursor() as cur: 59 insert_sql = "INSERT INTO article_test VALUES('{}')".format(title) 60 print(insert_sql) 61 await cur.execute(insert_sql) 62 # 文件操做 63 # with open('aiohttp_spider.txt', mode='a', encoding='utf-8') as file_object: 64 # file_object.write(title) 65 66 67 async def consumer(pool, session): 68 while not stopping: 69 if len(waitting_urls) == 0: 70 await asyncio.sleep(0.5) 71 continue 72 73 url = waitting_urls.pop() 74 print('start get url: {}'.format(url)) 75 if re.match('http://www.xbiquge.la/\d+/\d+/', url): 76 if url not in seen_urls: 77 asyncio.ensure_future(article_handle(url, session, pool)) 78 else: 79 if url not in seen_urls: 80 asyncio.ensure_future(init_urls(url, session)) 81 82 83 async def main(loop): 84 # 等待mysql鏈接創建好 85 pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, 86 user='root', password='123456', 87 db='aiomysql_test', loop=loop, 88 charset='utf8', autocommit=True) 89 90 session = aiohttp.ClientSession() 91 html = await fetch(start_url, session) 92 seen_urls.add(start_url) 93 extract_urls(html) 94 95 asyncio.ensure_future(consumer(pool, session)) 96 97 98 if __name__ == '__main__': 99 loop = asyncio.get_event_loop() 100 asyncio.ensure_future(main(loop)) 101 loop.run_forever()