爬蟲實戰—模擬登錄oschina

一、模擬登錄oschina(新浪)

  通常登陸後,用戶就能夠一段時間內可使用該用戶身份操做,不須要頻繁登陸。這背後每每使用了Cookie技術html

  登陸後,用戶得到一個cookie 值,這個值在瀏覽器當前會話中保存,只要不過時甚至能夠保存好久web

  用戶每次想服務器提交請求時,將這些cookie提交到服務器,服務器通過分析cookie中的信息,以確認用戶身份,確認是信任的用戶身份,就能夠繼續使用網站功能。json

  Cookie:網景公司發明,cookie 通常是一個鍵值對name=value ,但還能夠包括 expire 過時,path路徑,domain域, secure安全等信息。瀏覽器

    

  清空 oschina.net 的 全部cookie 從新登陸,勾選「記住密碼」安全

    

  登陸後的請求頭以下:   服務器

GET /?nocache=1544444524642 HTTP/1.1
Host: www.oschina.net
Connection: keep-alive
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3486.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8
Referer: https://www.oschina.net/home/login?goto_page=https%3A%2F%2Fwww.oschina.net%2F
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Cookie: _user_behavior_=d2104a4f-2484-4f85-8a31-4fe2a86accb8; aliyungf_tc=AQAAAAR/MWXo0QAAV8CVPSF2shLDVU11; Hm_lvt_a411c4d1664dd70048ee98afe7b28f0b=1544444408; _reg_key_=foI49279hton2EYg1ZJz; socialauth_id=n6SsxSVbY6yycMzklFO7; oscid=ZV2oveUqo28xv80qumQtfRqukWzpKq2brNqjn0Y0a5kFTeUQUUbcPj2dwLIiVt%2FuqEFRQShwYl7DjeTX5ZGViddJVodYy0RwW38eexYn%2FPq9afSRNy7SJarEKkqVYfw%2BdNYj1bbHQEhDiqhDeFBZbsf7ouMp1Msoa4cH6mU1ZtM%3D; Hm_lpvt_a411c4d1664dd70048ee98afe7b28f0b=1544444525

 

  對比登陸先後的cookie 值,發現登陸後又oscidcookie

  那就 把這個登陸後的HTTP 請求頭放在代碼中:多線程

  技巧: 使用postman 工具:app

    

    

      代碼以下:(修改後)  dom

 1 import requests
 2 
 3 url = "https://www.oschina.net"
 4 
 5 
 6 headers = {
 7     'Host': "www.oschina.net",
 8     'Connection': "keep-alive",
 9     'Cache-Control': "max-age=0",
10     'Upgrade-Insecure-Requests': "1",
11     'User-Agent': "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3486.0 Safari/537.36",
12     'Accept': "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
13     'Referer': "https://www.oschina.net/home/login?goto_page=https%3A%2F%2Fwww.oschina.net%2F",
14     'Accept-Encoding': "gzip, deflate, br",
15     'Accept-Language': "zh-CN,zh;q=0.9,en;q=0.8",
16     'Cookie': "_user_behavior_=d2104a4f-2484-4f85-8a31-4fe2a86accb8; aliyungf_tc=AQAAAAR/MWXo0QAAV8CVPSF2shLDVU11; Hm_lvt_a411c4d1664dd70048ee98afe7b28f0b=1544444408; _reg_key_=foI49279hton2EYg1ZJz; socialauth_id=n6SsxSVbY6yycMzklFO7; oscid=ZV2oveUqo28xv80qumQtfRqukWzpKq2brNqjn0Y0a5kFTeUQUUbcPj2dwLIiVt%2FuqEFRQShwYl7DjeTX5ZGViddJVodYy0RwW38eexYn%2FPq9afSRNy7SJarEKkqVYfw%2BdNYj1bbHQEhDiqhDeFBZbsf7ouMp1Msoa4cH6mU1ZtM%3D; Hm_lpvt_a411c4d1664dd70048ee98afe7b28f0b=1544444525",
17     'cache-control': "no-cache",
18     'Postman-Token': "7d3714a6-c3d7-45ef-9b14-815ffb022535"
19     }
20 
21 response = requests.request("GET", url, headers=headers)
22 
23 with response:
24     with open('f://text.html','w',encoding='utf-8') as f:
25         text = response.text
26         f.write(text)
27         print(text)
28         print(response.status_code,'==========')

 

      輸出文件:

  

 

二、多線程爬取博客園

  博客園的新聞分頁地址:https://news.cnblogs.com/n/page/10/, 多線程成批 爬取新聞標題和鏈接

  https://news.cnblogs.com/n/page/2/  這個url 中變化的是最後的數字一直在變,它是頁碼 

 1 import requests
 2 from  concurrent.futures import  ThreadPoolExecutor
 3 from queue import Queue
 4 from  bs4 import BeautifulSoup
 5 import threading
 6 import time
 7 import logging
 8 
 9 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
10 logging.basicConfig(format=FORMAT, level=logging.INFO)
11 
12 BASE_URL = "https://news.cnblogs.com"
13 NEW_PAGE = '/n/page/'
14 
15 headers = {
16     'User-agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36 Maxthon/5.2.4.3000'
17 }
18 
19 # 使用池,之後可使用第三方消息隊列完成
20 urls = Queue()  # url 的隊列
21 htmls = Queue() # 響應數據隊列
22 outputs = Queue() # 結果輸出隊列
23 
24 # 建立博客園的新聞urls,每頁30條新聞
25 def create_url(start, end, step=1):
26     for i in range(start, end + 1, step):
27         url = '{}{}{}/'.format(BASE_URL,NEW_PAGE, i)
28         print(url)
29         urls.put(url)
30     print('urls建立完畢')
31 
32 event = threading.Event()
33 
34 # 爬取頁面線程函數
35 def crawler():
36     while not event.is_set():
37         try:
38             url = urls.get(True, 1)
39             with requests.request('GET', url , headers=headers) as response:
40                 html = response.text
41                 htmls.put(html)
42         except:
43             pass
44 # 解析線程函數
45 def parse():
46     while not event.is_set():
47         try:
48             html = htmls.get(True, 1)
49             soup =BeautifulSoup(html, 'lxml')
50             titles = soup.select('h2.news_entry a')
51             for title in titles:
52                 # <a href='/n/60287/' target='_blank'> 特斯拉</a>
53                 val = (BASE_URL + title.attrs['href'], title.text)
54                 outputs.put(val)
55                 print(val)
56         except:
57             pass
58 # 持久化線程函數
59 def persist(path):
60     with open(path, 'a+', encoding='utf-8') as f:
61         while not event.is_set():
62             try:
63                 url, text = outputs.get(True, 1)
64                 print(url, text)
65                 f.write('{}\x01{}'.format(url, text))
66                 f.flush()
67             except:
68                 pass
69 
70 # 線程池
71 executor = ThreadPoolExecutor(10)
72 
73 executor.submit(create_url, 1, 10) # 模擬url收集,結束後,線程權讓出
74 executor.submit(persist, 'f;/new.txt')
75 
76 # 爬取頁面並分析
77 for i in range(5):
78     executor.submit(crawler)
79 for i in range(4):
80     executor.submit(parse)
81 
82 
83 while True:
84     cmd = input('>>>>>>')
85     if cmd.strip() == 'quit':
86         event.set()
87         time.sleep(4)
88         break
89     print(threading.enumerate())

 

     解析內容是一個比較耗時的過程,不適合放在crawler中同步處理,一樣適用隊列解耦

     html 分析函數,parse ,分析完成後,須要將結果持久化, 不要在parse中直接持久化,放入隊列中,統一持久化

    這樣一個實用的並行的爬蟲就基本完成了

    能夠很方便的擴展成多進程版本

三、進階(使用消息隊列)

   將隊列換成第三方服務,本次採用較爲經常使用的RabbitMQ

  搭建RabbitMQ服務

   隊列工做模式選擇:

     以爬蟲程序的htmls 隊列爲例,這個隊列有不少個生產者(爬取函數 )寫入,有多個消費者(解析函數)讀取每個消息只須要一個消費者使用,因此採用 RabbitMQ 的工做隊列模式。

  隊列中如何分發呢:

      其實說究竟是路由,RabbitMQ的隊列和工做隊列,其實都是路由模式,只不過使用了缺省交換機

  隊列是否斷開刪除: 

       每一數據都要處理,不能由於某一端斷開,而後隊列就刪除了,形成數據丟失。

  測試代碼:

     send.py   

 1 import pika
 2 import time
 3 
 4 exchange = 'news'
 5 queue = 'urls'
 6 
 7 params = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
 8 
 9 connection = pika.BlockingConnection(params)
10 channel = connection.channel()
11 
12 # 生成一個交換機
13 channel.exchange_declare(
14     exchange=exchange,
15     exchange_type='direct'
16 )
17 
18 channel.queue_declare(queue, exclusive=False)  # 生成隊列
19 # 綁定隊列到交換機, 沒有指定routing_key ,將使用隊列名
20 channel.queue_bind(queue, exchange)
21 
22 with connection:
23     for i in range(10):
24         msg = 'data{:02}'.format(i)  # 讓消息帶上routing_key 便於觀察
25         pub = channel.basic_publish(
26             exchange=exchange,
27             routing_key=queue,  # 指定routing_key ,沒有指定,就使用隊列名稱匹配
28             body=msg  # 消息
29         )
30         print(msg, '==================')
31 
32     print('===== send ok ===========')

 

       receive.py

 1 import pika
 2 import time
 3 
 4 exchange = 'news'
 5 queue = 'urls'
 6 
 7 params = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
 8 
 9 connection = pika.BlockingConnection(params)
10 channel = connection.channel()
11 
12 # 生成一個交換機
13 channel.exchange_declare(
14     exchange=exchange,
15     exchange_type='direct'
16 )
17 
18 channel.queue_declare(queue, exclusive=False)  # 生成隊列
19 # 綁定隊列到交換機, 沒有指定routing_key ,將使用隊列名
20 channel.queue_bind(queue, exchange)
21 
22 time.sleep(2)
23 with  connection:
24     msg = channel.basic_get(queue, True) # 從指定的隊列獲取一個消息
25     method , props, body = msg
26     if body:
27         print(body)
28     else:
29         print('empty')

 

       重複獲取消息:

 1 import pika
 2 import time
 3 
 4 exchange = 'news'
 5 queue = 'urls'
 6 
 7 params = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
 8 
 9 connection = pika.BlockingConnection(params)
10 channel = connection.channel()
11 
12 # 生成一個交換機
13 channel.exchange_declare(
14     exchange=exchange,
15     exchange_type='direct'
16 )
17 
18 channel.queue_declare(queue, exclusive=False)  # 生成隊列
19 # 綁定隊列到交換機, 沒有指定routing_key ,將使用隊列名
20 channel.queue_bind(queue, exchange)
21 
22 
23 def callback(channel, method, properties, body):
24     print(body)
25 
26 
27 tag = None
28 def cancel(tag):
29     print(tag)
30     channel.basic_cancel(tag) # 取消basic_consume
31 
32 import threading
33 
34 time.sleep(10)
35 
36 def start():
37     with  connection:
38         tag = channel.basic_consume(
39             callback,
40             queue,
41             True
42         ) # 從指定隊列獲取一個消息回調,tag獲取不到,阻塞,獲取到,表示結束了
43         threading.Timer(10, cancel, args=(tag,)).start()
44         channel.start_consuming() # 等待全部的basic_consume消費者消費完, 就結束
45 
46 threading.Thread(target=start).start() # 因爲channel.start_consuming() 會阻塞,再開啓一個線程
47 print('======== end ===========')

 

     注:上面的多線程代碼,寫的很差,大量使用了全局變量,只是爲了說明問題

重構消息隊列:

 1 import pika
 2 import time
 3 import threading
 4 
 5 class MessageQueue:
 6     def __init__(self, host,port,user,password, vhost, exchange,queue):
 7         url = 'amqp://{}:{}@{}:{}/{}'.format(
 8             user,password,host, port,vhost
 9         )
10         params = pika.URLParameters(url)
11         self.connection = pika.BlockingConnection(params)
12         self.channel = self.connection.channel()
13         self.exchange = self.channel.exchange_declare(exchange, 'direct')
14         self.exchange_name = exchange
15         self.channel.queue_declare(queue, exclusive=False) # 生成隊列
16         self.queue = queue # 隊列名,當routing_key
17         self.channel.queue_bind(queue, exchange)
18 
19     def __enter__(self):
20         return self.channel
21 
22     def __exit__(self, exc_type, exc_val, exc_tb):
23         self.connection.close()# 關閉鏈接s
24 
25 # 生產者
26 class Producter(MessageQueue):
27     def sendmsg(self, msg):
28         self.channel.basic_publish(
29             exchange=self.exchange_name,
30             routing_key=self.queue,
31             body = msg
32         )
33 
34 # 消費者
35 class Consumer(MessageQueue):
36     def recvmsg(self):
37         return  self.channel.basic_get(self.queue, True)[2] # body

 

重構爬蟲代碼:

  1 import requests
  2 from  concurrent.futures import  ThreadPoolExecutor
  3 from queue import Queue
  4 from  bs4 import BeautifulSoup
  5 import threading
  6 import time
  7 import logging
  8 import pika
  9 import simplejson
 10 from messagequeue import Producter, Consumer
 11 
 12 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
 13 logging.basicConfig(format=FORMAT, level=logging.INFO)
 14 
 15 BASE_URL = "https://news.cnblogs.com"
 16 NEW_PAGE = '/n/page/'
 17 
 18 headers = {
 19     'User-agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36 Maxthon/5.2.4.3000'
 20 }
 21 
 22 # 建立博客園的新聞urls,每頁30條新聞
 23 def create_url(start, end, step=1):
 24     try:
 25         p = Producter('192.168.112.111',5672,'rab','123456','test','news','urls')
 26         for i in range(start, end + 1, step):
 27             url = '{}{}{}/'.format(BASE_URL,NEW_PAGE, i)
 28             print(url)
 29             p.sendmsg(url)
 30         print('urls建立完畢')
 31     except Exception as e:
 32         print(e)
 33 
 34 event = threading.Event()
 35 
 36 # 爬取頁面線程函數
 37 def crawler():
 38     try:
 39         p = Producter('192.168.112.111', 5672, 'rab', '123456', 'test', 'news', 'htmls')
 40         c = Consumer('192.168.112.111', 5672, 'rab', '123456', 'test', 'news', 'urls')
 41         while not event.wait(1):
 42             try:
 43                 # url = urls.get(True, 1)
 44                 url = c.recvmsg()
 45                 with requests.request('GET', url , headers=headers) as response:
 46                     html = response.text
 47                     p.sendmsg(html)
 48             except:
 49                 raise
 50     except Exception as e:
 51         print(e)
 52 # 解析線程函數
 53 def parse():
 54     try:
 55         p = Producter('192.168.112.111', 5672, 'rab', '123456', 'test', 'news', 'outputs')
 56         c = Consumer('192.168.112.111', 5672, 'rab', '123456', 'test', 'news', 'htmls')
 57         while not event.wait(1):
 58             try:
 59                 # html = htmls.get(True, 1)
 60                 html = c.recvmsg()
 61                 if html:
 62                     soup =BeautifulSoup(html, 'lxml')
 63                     titles = soup.select('h2.news_entry a')
 64                     for title in titles:
 65                         # <a href='/n/60287/' target='_blank'> 特斯拉</a>
 66                         # val = (BASE_URL + title.attrs['href'], title.text)
 67                         # outputs.put(val)
 68                         val = simplejson.dumps({
 69                             'title':title.text,
 70                             'url':BASE_URL + title.attrs['href']
 71                         })
 72                         p.sendmsg(val)
 73                         print(val)
 74             except:
 75                 raise
 76     except Exception as e:
 77         print(e)
 78 # 持久化線程函數
 79 def persist(path):
 80     try:
 81         c = Consumer('192.168.112.111', 5672, 'rab', '123456', 'test', 'news', 'outputs')
 82         with open(path, 'a+', encoding='utf-8') as f:
 83             while not event.is_set():
 84                 try:
 85                     # url, text = outputs.get(True, 1)
 86                     data = c.recvmsg()
 87                     print(data,'==========================================')
 88                     print(type(data))
 89                     if data :
 90                         d = simplejson.loads(data)
 91                         print(d,'------------------------------------------')
 92                         print(type(d))
 93                         # print(url, text)
 94                         f.write('{}\x01{}'.format(d['url'],d['title']))
 95                         f.flush()
 96                 except:
 97                     pass
 98     except Exception as e:
 99         print( e)
100 # 線程池
101 executor = ThreadPoolExecutor(10)
102 
103 executor.submit(create_url, 1, 10) # 模擬url收集,結束後,線程權讓出
104 executor.submit(persist, 'f;/new.txt')
105 
106 # 爬取頁面並分析
107 for i in range(5):
108     executor.submit(crawler)
109 for i in range(4):
110     executor.submit(parse)
111 
112 
113 while True:
114     cmd = input('>>>>>>')
115     if cmd.strip() == 'quit':
116         event.set()
117         time.sleep(4)
118         break
119     print(threading.enumerate())
相關文章
相關標籤/搜索