通常登陸後,用戶就能夠一段時間內可使用該用戶身份操做,不須要頻繁登陸。這背後每每使用了Cookie技術html
登陸後,用戶得到一個cookie 值,這個值在瀏覽器當前會話中保存,只要不過時甚至能夠保存好久web
用戶每次想服務器提交請求時,將這些cookie提交到服務器,服務器通過分析cookie中的信息,以確認用戶身份,確認是信任的用戶身份,就能夠繼續使用網站功能。json
Cookie:網景公司發明,cookie 通常是一個鍵值對name=value ,但還能夠包括 expire 過時,path路徑,domain域, secure安全等信息。瀏覽器
清空 oschina.net 的 全部cookie 從新登陸,勾選「記住密碼」安全
登陸後的請求頭以下: 服務器
GET /?nocache=1544444524642 HTTP/1.1 Host: www.oschina.net Connection: keep-alive Cache-Control: max-age=0 Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3486.0 Safari/537.36 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8 Referer: https://www.oschina.net/home/login?goto_page=https%3A%2F%2Fwww.oschina.net%2F Accept-Encoding: gzip, deflate, br Accept-Language: zh-CN,zh;q=0.9,en;q=0.8 Cookie: _user_behavior_=d2104a4f-2484-4f85-8a31-4fe2a86accb8; aliyungf_tc=AQAAAAR/MWXo0QAAV8CVPSF2shLDVU11; Hm_lvt_a411c4d1664dd70048ee98afe7b28f0b=1544444408; _reg_key_=foI49279hton2EYg1ZJz; socialauth_id=n6SsxSVbY6yycMzklFO7; oscid=ZV2oveUqo28xv80qumQtfRqukWzpKq2brNqjn0Y0a5kFTeUQUUbcPj2dwLIiVt%2FuqEFRQShwYl7DjeTX5ZGViddJVodYy0RwW38eexYn%2FPq9afSRNy7SJarEKkqVYfw%2BdNYj1bbHQEhDiqhDeFBZbsf7ouMp1Msoa4cH6mU1ZtM%3D; Hm_lpvt_a411c4d1664dd70048ee98afe7b28f0b=1544444525
對比登陸先後的cookie 值,發現登陸後又oscidcookie
那就 把這個登陸後的HTTP 請求頭放在代碼中:多線程
技巧: 使用postman 工具:app
代碼以下:(修改後) dom
1 import requests 2 3 url = "https://www.oschina.net" 4 5 6 headers = { 7 'Host': "www.oschina.net", 8 'Connection': "keep-alive", 9 'Cache-Control': "max-age=0", 10 'Upgrade-Insecure-Requests': "1", 11 'User-Agent': "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3486.0 Safari/537.36", 12 'Accept': "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", 13 'Referer': "https://www.oschina.net/home/login?goto_page=https%3A%2F%2Fwww.oschina.net%2F", 14 'Accept-Encoding': "gzip, deflate, br", 15 'Accept-Language': "zh-CN,zh;q=0.9,en;q=0.8", 16 'Cookie': "_user_behavior_=d2104a4f-2484-4f85-8a31-4fe2a86accb8; aliyungf_tc=AQAAAAR/MWXo0QAAV8CVPSF2shLDVU11; Hm_lvt_a411c4d1664dd70048ee98afe7b28f0b=1544444408; _reg_key_=foI49279hton2EYg1ZJz; socialauth_id=n6SsxSVbY6yycMzklFO7; oscid=ZV2oveUqo28xv80qumQtfRqukWzpKq2brNqjn0Y0a5kFTeUQUUbcPj2dwLIiVt%2FuqEFRQShwYl7DjeTX5ZGViddJVodYy0RwW38eexYn%2FPq9afSRNy7SJarEKkqVYfw%2BdNYj1bbHQEhDiqhDeFBZbsf7ouMp1Msoa4cH6mU1ZtM%3D; Hm_lpvt_a411c4d1664dd70048ee98afe7b28f0b=1544444525", 17 'cache-control': "no-cache", 18 'Postman-Token': "7d3714a6-c3d7-45ef-9b14-815ffb022535" 19 } 20 21 response = requests.request("GET", url, headers=headers) 22 23 with response: 24 with open('f://text.html','w',encoding='utf-8') as f: 25 text = response.text 26 f.write(text) 27 print(text) 28 print(response.status_code,'==========')
輸出文件:
博客園的新聞分頁地址:https://news.cnblogs.com/n/page/10/, 多線程成批 爬取新聞標題和鏈接
https://news.cnblogs.com/n/page/2/ 這個url 中變化的是最後的數字一直在變,它是頁碼
1 import requests 2 from concurrent.futures import ThreadPoolExecutor 3 from queue import Queue 4 from bs4 import BeautifulSoup 5 import threading 6 import time 7 import logging 8 9 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s" 10 logging.basicConfig(format=FORMAT, level=logging.INFO) 11 12 BASE_URL = "https://news.cnblogs.com" 13 NEW_PAGE = '/n/page/' 14 15 headers = { 16 'User-agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36 Maxthon/5.2.4.3000' 17 } 18 19 # 使用池,之後可使用第三方消息隊列完成 20 urls = Queue() # url 的隊列 21 htmls = Queue() # 響應數據隊列 22 outputs = Queue() # 結果輸出隊列 23 24 # 建立博客園的新聞urls,每頁30條新聞 25 def create_url(start, end, step=1): 26 for i in range(start, end + 1, step): 27 url = '{}{}{}/'.format(BASE_URL,NEW_PAGE, i) 28 print(url) 29 urls.put(url) 30 print('urls建立完畢') 31 32 event = threading.Event() 33 34 # 爬取頁面線程函數 35 def crawler(): 36 while not event.is_set(): 37 try: 38 url = urls.get(True, 1) 39 with requests.request('GET', url , headers=headers) as response: 40 html = response.text 41 htmls.put(html) 42 except: 43 pass 44 # 解析線程函數 45 def parse(): 46 while not event.is_set(): 47 try: 48 html = htmls.get(True, 1) 49 soup =BeautifulSoup(html, 'lxml') 50 titles = soup.select('h2.news_entry a') 51 for title in titles: 52 # <a href='/n/60287/' target='_blank'> 特斯拉</a> 53 val = (BASE_URL + title.attrs['href'], title.text) 54 outputs.put(val) 55 print(val) 56 except: 57 pass 58 # 持久化線程函數 59 def persist(path): 60 with open(path, 'a+', encoding='utf-8') as f: 61 while not event.is_set(): 62 try: 63 url, text = outputs.get(True, 1) 64 print(url, text) 65 f.write('{}\x01{}'.format(url, text)) 66 f.flush() 67 except: 68 pass 69 70 # 線程池 71 executor = ThreadPoolExecutor(10) 72 73 executor.submit(create_url, 1, 10) # 模擬url收集,結束後,線程權讓出 74 executor.submit(persist, 'f;/new.txt') 75 76 # 爬取頁面並分析 77 for i in range(5): 78 executor.submit(crawler) 79 for i in range(4): 80 executor.submit(parse) 81 82 83 while True: 84 cmd = input('>>>>>>') 85 if cmd.strip() == 'quit': 86 event.set() 87 time.sleep(4) 88 break 89 print(threading.enumerate())
解析內容是一個比較耗時的過程,不適合放在crawler中同步處理,一樣適用隊列解耦
html 分析函數,parse ,分析完成後,須要將結果持久化, 不要在parse中直接持久化,放入隊列中,統一持久化
這樣一個實用的並行的爬蟲就基本完成了
能夠很方便的擴展成多進程版本
將隊列換成第三方服務,本次採用較爲經常使用的RabbitMQ
搭建RabbitMQ服務
隊列工做模式選擇:
以爬蟲程序的htmls 隊列爲例,這個隊列有不少個生產者(爬取函數 )寫入,有多個消費者(解析函數)讀取每個消息只須要一個消費者使用,因此採用 RabbitMQ 的工做隊列模式。
隊列中如何分發呢:
其實說究竟是路由,RabbitMQ的隊列和工做隊列,其實都是路由模式,只不過使用了缺省交換機
隊列是否斷開刪除:
每一數據都要處理,不能由於某一端斷開,而後隊列就刪除了,形成數據丟失。
測試代碼:
send.py
1 import pika 2 import time 3 4 exchange = 'news' 5 queue = 'urls' 6 7 params = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 8 9 connection = pika.BlockingConnection(params) 10 channel = connection.channel() 11 12 # 生成一個交換機 13 channel.exchange_declare( 14 exchange=exchange, 15 exchange_type='direct' 16 ) 17 18 channel.queue_declare(queue, exclusive=False) # 生成隊列 19 # 綁定隊列到交換機, 沒有指定routing_key ,將使用隊列名 20 channel.queue_bind(queue, exchange) 21 22 with connection: 23 for i in range(10): 24 msg = 'data{:02}'.format(i) # 讓消息帶上routing_key 便於觀察 25 pub = channel.basic_publish( 26 exchange=exchange, 27 routing_key=queue, # 指定routing_key ,沒有指定,就使用隊列名稱匹配 28 body=msg # 消息 29 ) 30 print(msg, '==================') 31 32 print('===== send ok ===========')
receive.py
1 import pika 2 import time 3 4 exchange = 'news' 5 queue = 'urls' 6 7 params = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 8 9 connection = pika.BlockingConnection(params) 10 channel = connection.channel() 11 12 # 生成一個交換機 13 channel.exchange_declare( 14 exchange=exchange, 15 exchange_type='direct' 16 ) 17 18 channel.queue_declare(queue, exclusive=False) # 生成隊列 19 # 綁定隊列到交換機, 沒有指定routing_key ,將使用隊列名 20 channel.queue_bind(queue, exchange) 21 22 time.sleep(2) 23 with connection: 24 msg = channel.basic_get(queue, True) # 從指定的隊列獲取一個消息 25 method , props, body = msg 26 if body: 27 print(body) 28 else: 29 print('empty')
重複獲取消息:
1 import pika 2 import time 3 4 exchange = 'news' 5 queue = 'urls' 6 7 params = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test') 8 9 connection = pika.BlockingConnection(params) 10 channel = connection.channel() 11 12 # 生成一個交換機 13 channel.exchange_declare( 14 exchange=exchange, 15 exchange_type='direct' 16 ) 17 18 channel.queue_declare(queue, exclusive=False) # 生成隊列 19 # 綁定隊列到交換機, 沒有指定routing_key ,將使用隊列名 20 channel.queue_bind(queue, exchange) 21 22 23 def callback(channel, method, properties, body): 24 print(body) 25 26 27 tag = None 28 def cancel(tag): 29 print(tag) 30 channel.basic_cancel(tag) # 取消basic_consume 31 32 import threading 33 34 time.sleep(10) 35 36 def start(): 37 with connection: 38 tag = channel.basic_consume( 39 callback, 40 queue, 41 True 42 ) # 從指定隊列獲取一個消息回調,tag獲取不到,阻塞,獲取到,表示結束了 43 threading.Timer(10, cancel, args=(tag,)).start() 44 channel.start_consuming() # 等待全部的basic_consume消費者消費完, 就結束 45 46 threading.Thread(target=start).start() # 因爲channel.start_consuming() 會阻塞,再開啓一個線程 47 print('======== end ===========')
注:上面的多線程代碼,寫的很差,大量使用了全局變量,只是爲了說明問題
重構消息隊列:
1 import pika 2 import time 3 import threading 4 5 class MessageQueue: 6 def __init__(self, host,port,user,password, vhost, exchange,queue): 7 url = 'amqp://{}:{}@{}:{}/{}'.format( 8 user,password,host, port,vhost 9 ) 10 params = pika.URLParameters(url) 11 self.connection = pika.BlockingConnection(params) 12 self.channel = self.connection.channel() 13 self.exchange = self.channel.exchange_declare(exchange, 'direct') 14 self.exchange_name = exchange 15 self.channel.queue_declare(queue, exclusive=False) # 生成隊列 16 self.queue = queue # 隊列名,當routing_key 17 self.channel.queue_bind(queue, exchange) 18 19 def __enter__(self): 20 return self.channel 21 22 def __exit__(self, exc_type, exc_val, exc_tb): 23 self.connection.close()# 關閉鏈接s 24 25 # 生產者 26 class Producter(MessageQueue): 27 def sendmsg(self, msg): 28 self.channel.basic_publish( 29 exchange=self.exchange_name, 30 routing_key=self.queue, 31 body = msg 32 ) 33 34 # 消費者 35 class Consumer(MessageQueue): 36 def recvmsg(self): 37 return self.channel.basic_get(self.queue, True)[2] # body
重構爬蟲代碼:
1 import requests 2 from concurrent.futures import ThreadPoolExecutor 3 from queue import Queue 4 from bs4 import BeautifulSoup 5 import threading 6 import time 7 import logging 8 import pika 9 import simplejson 10 from messagequeue import Producter, Consumer 11 12 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s" 13 logging.basicConfig(format=FORMAT, level=logging.INFO) 14 15 BASE_URL = "https://news.cnblogs.com" 16 NEW_PAGE = '/n/page/' 17 18 headers = { 19 'User-agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36 Maxthon/5.2.4.3000' 20 } 21 22 # 建立博客園的新聞urls,每頁30條新聞 23 def create_url(start, end, step=1): 24 try: 25 p = Producter('192.168.112.111',5672,'rab','123456','test','news','urls') 26 for i in range(start, end + 1, step): 27 url = '{}{}{}/'.format(BASE_URL,NEW_PAGE, i) 28 print(url) 29 p.sendmsg(url) 30 print('urls建立完畢') 31 except Exception as e: 32 print(e) 33 34 event = threading.Event() 35 36 # 爬取頁面線程函數 37 def crawler(): 38 try: 39 p = Producter('192.168.112.111', 5672, 'rab', '123456', 'test', 'news', 'htmls') 40 c = Consumer('192.168.112.111', 5672, 'rab', '123456', 'test', 'news', 'urls') 41 while not event.wait(1): 42 try: 43 # url = urls.get(True, 1) 44 url = c.recvmsg() 45 with requests.request('GET', url , headers=headers) as response: 46 html = response.text 47 p.sendmsg(html) 48 except: 49 raise 50 except Exception as e: 51 print(e) 52 # 解析線程函數 53 def parse(): 54 try: 55 p = Producter('192.168.112.111', 5672, 'rab', '123456', 'test', 'news', 'outputs') 56 c = Consumer('192.168.112.111', 5672, 'rab', '123456', 'test', 'news', 'htmls') 57 while not event.wait(1): 58 try: 59 # html = htmls.get(True, 1) 60 html = c.recvmsg() 61 if html: 62 soup =BeautifulSoup(html, 'lxml') 63 titles = soup.select('h2.news_entry a') 64 for title in titles: 65 # <a href='/n/60287/' target='_blank'> 特斯拉</a> 66 # val = (BASE_URL + title.attrs['href'], title.text) 67 # outputs.put(val) 68 val = simplejson.dumps({ 69 'title':title.text, 70 'url':BASE_URL + title.attrs['href'] 71 }) 72 p.sendmsg(val) 73 print(val) 74 except: 75 raise 76 except Exception as e: 77 print(e) 78 # 持久化線程函數 79 def persist(path): 80 try: 81 c = Consumer('192.168.112.111', 5672, 'rab', '123456', 'test', 'news', 'outputs') 82 with open(path, 'a+', encoding='utf-8') as f: 83 while not event.is_set(): 84 try: 85 # url, text = outputs.get(True, 1) 86 data = c.recvmsg() 87 print(data,'==========================================') 88 print(type(data)) 89 if data : 90 d = simplejson.loads(data) 91 print(d,'------------------------------------------') 92 print(type(d)) 93 # print(url, text) 94 f.write('{}\x01{}'.format(d['url'],d['title'])) 95 f.flush() 96 except: 97 pass 98 except Exception as e: 99 print( e) 100 # 線程池 101 executor = ThreadPoolExecutor(10) 102 103 executor.submit(create_url, 1, 10) # 模擬url收集,結束後,線程權讓出 104 executor.submit(persist, 'f;/new.txt') 105 106 # 爬取頁面並分析 107 for i in range(5): 108 executor.submit(crawler) 109 for i in range(4): 110 executor.submit(parse) 111 112 113 while True: 114 cmd = input('>>>>>>') 115 if cmd.strip() == 'quit': 116 event.set() 117 time.sleep(4) 118 break 119 print(threading.enumerate())