from queue import Queue from threading import Thread, Lock import time import requests import json from lxml import etree # 採集線程是否退出:True退出,False不退出 crawl_exit = False parse_exit = False # 採集數據的線程 class ThreadCrawl(Thread): def __init__(self, thread_name, page_queue, data_queue): super(ThreadCrawl, self).__init__() self.thread_name = thread_name self.page_queue = page_queue self.data_queue = data_queue self.headers = {"User-Agent": "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/6.0)"} def run(self): while not crawl_exit: try: page = self.page_queue.get(block=False) url = "https://www.qiushibaike.com/8hr/page/" + str(page) + "/" print("%s開始工做了,頁數是:%d,url=%s" % (self.thread_name, page, url)) request = requests.get(url, headers=self.headers) html = request.text # print(html) # 把數據裝入data_queue隊列 self.data_queue.put(html) time.sleep(1) except Exception as e: pass # break # 解析數據的線程 class ThreadParse(Thread): def __init__(self, thread_name, data_queue, file_name, lock): super(ThreadParse, self).__init__() self.thread_name = thread_name self.file_name = file_name self.data_queue = data_queue self.lock = lock def run(self): while not parse_exit: try: # print("%s開始工做" %self.thread_name ) html = self.data_queue.get(block=False) print("%s開始解析數據:%s" % (self.thread_name, html[:10])) self.parse(html) except Exception as e: pass # break # 解析數據 def parse(self, html): content = etree.HTML(html) # 使用xpath語法獲得全部含有段子div node_lists = content.xpath("//div[contains(@id, 'qiushi_tag_')]") # print(node_lists) # items = [] for node in node_lists: item = {} user_image = node.xpath('.//div/a/img[@class="illustration"]/@src') user_name = node.xpath(".//div//h2/text()") text = node.xpath(".//a/div/span/text()") zan = node.xpath(".//div/span/i/text()") comments = node.xpath(".//div/span/a/i/text()") # print(user_name,user_image,text,zan,comments) if len(user_image) > 0: item["user_image"] = user_image[0] if len(user_name) > 0: item["user_name"] = user_name[0] if len(text) > 0: item["text"] = text[0] if len(zan) > 0: item["zan"] = zan[0] if len(comments) > 0: item["comments"] = comments[0] print(item) # 添加到列表裏面 # items.append(item) # 得到鎖,釋放鎖的功能,其餘線程沒法得到 with self.lock: # 保存到qiushibaike.json中 json.dump(item, self.file_name, ensure_ascii=False) # 定義入口函數 def main(): global crawl_exit global parse_exit # 建立互斥鎖 lock = Lock() # 定義裝頁數的隊列,最多爬取十個頁面的數據 page_queue = Queue(10) for page in range(1, 11): page_queue.put(page) # 定義裝每頁的數據的隊列 data_queue = Queue() # 建立三個採集線程用於:數據的採集(請求網絡獲得數據) # 存儲三個採集線程 thread_crawls = [] thread_names = ["採集線程1", "採集線程2", "採集線程3"] for thread_name in thread_names: crawl = ThreadCrawl(thread_name, page_queue, data_queue) # 啓動線程 crawl.start() thread_crawls.append(crawl) # 存儲json數據的文件 file_name = open("糗事百科.json", "a", encoding="utf-8") # 建立三個解析線程用於:解析html頁面的數據 thread_parses = [] thread_names = ["解析線程1", "解析線程2", "解析線程3"] for thread_name in thread_names: parse = ThreadParse(thread_name, data_queue, file_name, lock) # 啓動線程 parse.start() thread_parses.append(parse) # 採集線程------ # 等待採集線程接收,主線程才能結束 while not page_queue.empty(): pass # 採集線程結束了,退出結束 crawl_exit = True # 等待採集線程結束 for crawl in thread_crawls: crawl.join() print("%s線程結束" % str(crawl)) # 解析線程------ while not data_queue.empty(): pass # 解析線程結束 parse_exit = True # 等待採集線程結束 for parse in thread_parses: parse.join() print("%s線程結束" % str(parse)) # 得到鎖後,其餘線程沒法操做文件,關閉文件-------------- with lock: file_name.close() print("主線程執行結束了------------") if __name__ == "__main__": main()