【12.8】asyncio高併發爬蟲

  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 # asyncio爬蟲、去重、入庫
  4 
  5 import asyncio
  6 import re
  7 
  8 import aiohttp
  9 import aiomysql
 10 from pyquery import PyQuery
 11 from aiohttp import TCPConnector
 12 
 13 
 14 start_url = 'http://www.xbiquge.la/paihangbang/'
 15 waitting_urls = []
 16 seen_urls = set()
 17 stopping = False
 18 
 19 sem = asyncio.Semaphore(3)
 20 
 21 
 22 async def fetch(url, session):
 23     async with sem:
 24         try:
 25             async with session.get(url) as resp:
 26                 print('url status: {}'.format(resp.status))
 27                 if resp.status in [200, 201]:
 28                     data = await resp.text()
 29                     return data
 30         except Exception as e:
 31             print(e)
 32 
 33 
 34 def extract_urls(html):
 35     pq = PyQuery(html)
 36     for link in pq.items('a'):
 37         url = link.attr('href')
 38         if url and url.startswith('http') and url not in seen_urls:
 39             global waitting_urls
 40             waitting_urls.append(url)
 41 
 42 
 43 async def init_urls(url, session):
 44     html = await fetch(url, session)
 45     seen_urls.add(url)
 46     extract_urls(html)
 47 
 48 
 49 async def article_handle(url, session, pool):
 50     # 獲取文章詳情並解析入庫
 51     html = await fetch(url, session)
 52     seen_urls.add(url)
 53     extract_urls(html)
 54     pq = PyQuery(html)
 55     title = pq("title").text()
 56     # title = title + '\n'
 57     async with pool.acquire() as conn:
 58         async with conn.cursor() as cur:
 59             insert_sql = "INSERT INTO article_test VALUES('{}')".format(title)
 60             print(insert_sql)
 61             await cur.execute(insert_sql)
 62     # 文件操做
 63     # with open('aiohttp_spider.txt', mode='a', encoding='utf-8') as file_object:
 64     #     file_object.write(title)
 65 
 66 
 67 async def consumer(pool, session):
 68     while not stopping:
 69         if len(waitting_urls) == 0:
 70                 await asyncio.sleep(0.5)
 71                 continue
 72 
 73         url = waitting_urls.pop()
 74         print('start get url: {}'.format(url))
 75         if re.match('http://www.xbiquge.la/\d+/\d+/', url):
 76             if url not in seen_urls:
 77                 asyncio.ensure_future(article_handle(url, session, pool))
 78         else:
 79             if url not in seen_urls:
 80                 asyncio.ensure_future(init_urls(url, session))
 81 
 82 
 83 async def main(loop):
 84     # 等待mysql鏈接創建好
 85     pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
 86                                       user='root', password='123456',
 87                                       db='aiomysql_test', loop=loop,
 88                                       charset='utf8', autocommit=True)
 89 
 90     session = aiohttp.ClientSession()
 91     html = await fetch(start_url, session)
 92     seen_urls.add(start_url)
 93     extract_urls(html)
 94 
 95     asyncio.ensure_future(consumer(pool, session))
 96 
 97 
 98 if __name__ == '__main__':
 99     loop = asyncio.get_event_loop()
100     asyncio.ensure_future(main(loop))
101     loop.run_forever()
相關文章
相關標籤/搜索