import asyncio import re import aiohttp import aiomysql from pyquery import PyQuery from lxml import etree start_url = 'http://news.baidu.com/' waitting_urs = [] seen_uels = set() stoppint = False sem = asyncio.Semaphore(10) # 如今併發爲3個 headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko"} async def fetch(url, session): async with sem: # await asyncio.sleep(1) try: async with session.get(url, headers=headers, timeout=1) as resp: print('url status:{}'.format(resp.status)) # if resp.status in [200, 201]: data = etree.HTML(await resp.read()) return data except Exception as e: print('錯誤爲:{} url:{}'.format(e, url)) def extract_urls(html): try: for url in html.xpath('//a/@href'): if url and url.startswith("http") and url not in seen_uels: if re.findall(r'baidu', url): waitting_urs.append(url) except: pass async def init_urls(url, session): html = await fetch(url, session) seen_uels.add(url) extract_urls(html) async def article_handler(url, session, pool): # 獲取文章詳情 html = await fetch(url, session) seen_uels.add(url) extract_urls(html) try: title = html.xpath('//title/text()')[0].strip() print('title:{}'.format(title)) async with pool.acquire() as conn: async with conn.cursor() as cursor: try: # 插入 await cursor.execute('insert into async_test_async(title) values("{}")'.format(title)) # 插入數據 await cursor.execute("insert into async_test_async(title) values('{}')".format(title)) # 查詢數據 await cursor.execute("select * from async_test_async") data = await cursor.fetchall() print("data:", data) # 更新數據 await cursor.execute("update async_test_async set title='{}' where id={}".format('update', 10168)) # 刪除數據 await cursor.execute("delete from async_test_async where id={}".format(10174)) except: pass except: pass async def consumer(pool): async with aiohttp.ClientSession() as session: while not stoppint: if len(waitting_urs) < 10: if url not in seen_uels: asyncio.ensure_future(init_urls(url, session)) url = waitting_urs.pop() print('start get url:{}'.format(url)) if re.findall(r'baidu', url): if url not in seen_uels: print('waitting_urs:{}'.format(waitting_urs[0: 3])) asyncio.ensure_future(article_handler(url, session, pool)) await asyncio.sleep(0.1) async def main(loop): pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='root', db='cfda', loop=loop, charset='utf8', autocommit=True) async with aiohttp.ClientSession() as session: html = await fetch(start_url, session) seen_uels.add(start_url) extract_urls(html) asyncio.ensure_future(consumer(pool)) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.run_forever()
import asyncio import re import aiohttp import aiomysql from pyquery import PyQuery from lxml import etree start_url = 'http://news.baidu.com/' waitting_urs = [] seen_uels = set() stoppint = False sem = asyncio.Semaphore(10) # 如今併發爲3個 headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko"} class async_text(object): async def fetch(self, url, session): print("self:", self) async with sem: # await asyncio.sleep(1) try: async with session.get(url, headers=headers, timeout=1) as resp: print('url status:{}'.format(resp.status)) # if resp.status in [200, 201]: data = etree.HTML(await resp.read()) return data except Exception as e: print('錯誤爲:{} url:{}'.format(e, url)) def extract_urls(self, html): try: for url in html.xpath('//a/@href'): if url and url.startswith("http") and url not in seen_uels: if re.findall(r'baidu', url): waitting_urs.append(url) except: pass async def init_urls(self, url, session): html = await self.fetch(self, url, session) seen_uels.add(url) self.extract_urls(self, html) async def article_handler(self, url, session, pool): # 獲取文章詳情 html = await self.fetch(self, url, session) seen_uels.add(url) self.extract_urls(self, html) try: title = html.xpath('//title/text()')[0].strip() print('title:{}'.format(title)) async with pool.acquire() as conn: async with conn.cursor() as cur: try: # 插入 await cur.execute('insert into async_test_async(title) values("{}")'.format(title)) except: pass except: pass async def consumer(self, pool): async with aiohttp.ClientSession() as session: while not stoppint: if len(waitting_urs) < 10: if url not in seen_uels: asyncio.ensure_future(self.init_urls(self, url, session)) url = waitting_urs.pop() print('start get url:{}'.format(url)) if re.findall(r'baidu', url): if url not in seen_uels: print('waitting_urs:{}'.format(waitting_urs[0: 3])) asyncio.ensure_future(self.article_handler(self, url, session, pool)) await asyncio.sleep(0.1) @classmethod async def main(self, loop): pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='root', db='cfda', loop=loop, charset='utf8', autocommit=True) async with aiohttp.ClientSession() as session: html = await self.fetch(self, start_url, session) seen_uels.add(start_url) self.extract_urls(self, html) asyncio.ensure_future(self.consumer(self, pool)) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(async_text.main(loop)) loop.run_forever()