Python利用Selenium模擬瀏覽器自動操做

概述

在進行網站爬取數據的時候,會發現不少網站都進行了反爬蟲的處理,如JS加密,Ajax加密,反Debug等方法,經過請求獲取數據和頁面展現的內容徹底不一樣,這時候就用到Selenium技術,來模擬瀏覽器的操做,而後獲取數據。本文以一個簡單的小例子,簡述Python搭配Tkinter和Selenium進行瀏覽器的模擬操做,僅供學習分享使用,若有不足之處,還請指正。css

什麼是Selenium?

Selenium 是一個用於Web應用程序測試的工具,Selenium測試直接運行在瀏覽器中,就像真正的用戶在操做同樣。支持的瀏覽器包括IE(7, 8, 9, 10, 11),Mozilla Firefox,Safari,Google Chrome,Opera等。Selenium支持多種操做系統,如Windows、Linux、IOS等,若是須要支持Android,則須要特殊的selenium,本文主要以IE11瀏覽器爲例。web

安裝Selenium

 

經過pip install selenium 進行安裝便可,若是速度慢,則可使用國內的鏡像進行安裝。數組

涉及知識點

程序雖小,除了須要掌握的Html ,JavaScript,CSS等基礎知識外,本例涉及的Python相關知識點仍是蠻多的,具體以下:瀏覽器

  • Selenium相關:
    • Selenium進行元素定位,主要有ID,Name,ClassName,Css Selector,Partial LinkText,LinkText,XPath,TagName等8種方式。
    • Selenium獲取單一元素(如:find_element_by_xpath)和獲取元素數組(如:find_elements_by_xpath)兩種方式。
    • Selenium元素定位後,能夠給元素進行賦值和取值,或者進行相應的事件操做(如:click)。
  • 線程(Thread)相關:
    • 爲了防止前臺頁面卡主,本文用到了線程進行後臺操做,若是要定義一個新的線程,只須要定義一個類並繼承threading.Thread,而後重寫run方法便可。
    • 在使用線程的過程當中,爲了保證線程的同步,本例用到了線程鎖,如:threading.Lock()。
  • 隊列(queue)相關:
    • 本例將Selenium執行的過程信息,保存到對列中,並經過線程輸出到頁面顯示。queue默認先進先出方式。
    • 對列經過put進行壓棧,經過get進行出棧。經過qsize()用於獲取當前對列元素個數。
  • 日誌(logging.Logger)相關:
    • 爲了保存Selenium執行過程當中的日誌,本例用到了日誌模塊,爲Pyhton自帶的模塊,不須要額外安裝。
    • Python的日誌共六種級別,分別是:NOTSET,DEBUG,INFO,WARN,ERROR,FATAL,CRITICAL。

示例效果圖

本例主要針對某一配置好的商品ID進行輪詢,監控是否有貨,有貨則加入購物車,無貨則繼續輪詢,以下圖所示:socket

核心代碼

本例最核心的代碼,就是利用Selenium進行網站的模擬操做,以下所示:ide

  1 class Smoking:
  2     """定義Smoking類"""
  3 
  4     # 瀏覽器驅動
  5     __driver: webdriver = None
  6     # 配置幫助類
  7     __cfg_info: dict = {}
  8     # 日誌幫助類
  9     __log_helper: LogHelper = None
 10     # 主程序目錄
 11     __work_path: str = ''
 12     # 是否正在運行
 13     __running: bool = False
 14     # 無貨
 15     __no_stock = 'Currently Out of Stock'
 16     # 線程等待秒數
 17     __wait_sec = 2
 18 
 19     def __init__(self, work_path, cfg_info, log_helper: LogHelper):
 20         """初始化"""
 21         self.__cfg_info = cfg_info
 22         self.__log_helper = log_helper
 23         self.__work_path = work_path
 24         self.__wait_sec = int(cfg_info['wait_sec'])
 25         # 若是小於2,則等於2
 26         self.__wait_sec = (2 if self.__wait_sec < 2 else self.__wait_sec)
 27 
 28     def checkIsExistsById(self, id):
 29         """經過ID判斷是否存在"""
 30         try:
 31             i = 0
 32             while self.__running and i < 3:
 33                 if len(self.__driver.find_elements_by_id(id)) > 0:
 34                     break
 35                 else:
 36                     time.sleep(self.__wait_sec)
 37                     i = i + 1
 38             return len(self.__driver.find_elements_by_id(id)) > 0
 39         except BaseException as e:
 40             return False
 41 
 42     def checkIsExistsByName(self, name):
 43         """經過名稱判斷是否存在"""
 44         try:
 45             i = 0
 46             while self.__running and i < 3:
 47                 if len(self.__driver.find_elements_by_name(name)) > 0:
 48                     break
 49                 else:
 50                     time.sleep(self.__wait_sec)
 51                     i = i + 1
 52             return len(self.__driver.find_elements_by_name(name)) > 0
 53         except BaseException as e:
 54             return False
 55 
 56     def checkIsExistsByPath(self, path):
 57         """經過xpath判斷是否存在"""
 58         try:
 59             i = 0
 60             while self.__running and i < 3:
 61                 if len(self.__driver.find_elements_by_xpath(path)) > 0:
 62                     break
 63                 else:
 64                     time.sleep(self.__wait_sec)
 65                     i = i + 1
 66             return len(self.__driver.find_elements_by_xpath(path)) > 0
 67         except BaseException as e:
 68             return False
 69 
 70     def checkIsExistsByClass(self, cls):
 71         """經過class名稱判斷是否存在"""
 72         try:
 73             i = 0
 74             while self.__running and i < 3:
 75                 if len(self.__driver.find_elements_by_class_name(cls)) > 0:
 76                     break
 77                 else:
 78                     time.sleep(self.__wait_sec)
 79                     i = i + 1
 80             return len(self.__driver.find_elements_by_class_name(cls)) > 0
 81         except BaseException as e:
 82             return False
 83 
 84     def checkIsExistsByLinkText(self, link_text):
 85         """判斷LinkText是否存在"""
 86         try:
 87             i = 0
 88             while self.__running and i < 3:
 89                 if len(self.__driver.find_elements_by_link_text(link_text)) > 0:
 90                     break
 91                 else:
 92                     time.sleep(self.__wait_sec)
 93                     i = i + 1
 94             return len(self.__driver.find_elements_by_link_text(link_text)) > 0
 95         except BaseException as e:
 96             return False
 97 
 98     def checkIsExistsByPartialLinkText(self, link_text):
 99         """判斷包含LinkText是否存在"""
100         try:
101             i = 0
102             while self.__running and i < 3:
103                 if len(self.__driver.find_elements_by_partial_link_text(link_text)) > 0:
104                     break
105                 else:
106                     time.sleep(self.__wait_sec)
107                     i = i + 1
108             return len(self.__driver.find_elements_by_partial_link_text(link_text)) > 0
109         except BaseException as e:
110             return False
111 
112     # def waiting(self, *locator):
113     #     """等待完成"""
114     #     # self.__driver.switch_to.window(self.__driver.window_handles[1])
115     #     Wait(self.__driver, 60).until(EC.visibility_of_element_located(locator))
116 
117     def login(self, username, password):
118         """登陸"""
119         # 5. 點擊連接跳轉到登陸頁面
120         self.__driver.find_element_by_link_text('帳戶登陸').click()
121         # 6. 輸入帳號密碼
122         # 判斷是否加載完成
123         # self.waiting((By.ID, "email"))
124         if self.checkIsExistsById('email'):
125             self.__driver.find_element_by_id('email').send_keys(username)
126             self.__driver.find_element_by_id('password').send_keys(password)
127             # 7. 點擊登陸按鈕
128             self.__driver.find_element_by_id('sign-in').click()
129 
130     def working(self, item_id):
131         """工做狀態"""
132         while self.__running:
133             try:
134                 # 正常獲取信息
135                 if self.checkIsExistsById('string'):
136                     self.__driver.find_element_by_id('string').clear()
137                     self.__driver.find_element_by_id('string').send_keys(item_id)
138                     self.__driver.find_element_by_id('string').send_keys(Keys.ENTER)
139                 # 判斷是否查詢到商品
140                 xpath = "//div[@class='specialty-header search']/div[@class='specialty-description']/div[" \
141                         "@class='gt-450']/span[2] "
142                 if self.checkIsExistsByPath(xpath):
143                     count = int(self.__driver.find_element_by_xpath(xpath).text)
144                     if count < 1:
145                         time.sleep(self.__wait_sec)
146                         self.__log_helper.put('沒有查詢到item id =' + item_id + '對應的信息')
147                         continue
148                 else:
149                     time.sleep(self.__wait_sec)
150                     self.__log_helper.put('沒有查詢到item id2 =' + item_id + '對應的信息')
151                     continue
152                 # 判斷當前庫存是否有貨
153 
154                 xpath1 = "//div[@class='product-list']/div[@class='product']/div[@class='price-and-detail']/div[" \
155                          "@class='price']/span[@class='noStock'] "
156                 if self.checkIsExistsByPath(xpath1):
157                     txt = self.__driver.find_element_by_xpath(xpath1).text
158                     if txt == self.__no_stock:
159                         # 當前無貨
160                         time.sleep(self.__wait_sec)
161                         self.__log_helper.put('查詢一次' + item_id + ',無貨')
162                         continue
163 
164                 # 連接path1
165                 xpath2 = "//div[@class='product-list']/div[@class='product']/div[@class='imgDiv']/a"
166                 # 判斷是否加載完畢
167                 # self.waiting((By.CLASS_NAME, "imgDiv"))
168                 if self.checkIsExistsByPath(xpath2):
169                     self.__driver.find_element_by_xpath(xpath2).click()
170                     time.sleep(self.__wait_sec)
171                     # 加入購物車
172                     if self.checkIsExistsByClass('add-to-cart'):
173                         self.__driver.find_element_by_class_name('add-to-cart').click()
174                         self.__log_helper.put('加入購物車成功,商品item-id:' + item_id)
175                         break
176                     else:
177                         self.__log_helper.put('未找到加入購物車按鈕')
178                 else:
179                     self.__log_helper.put('沒有查詢到,多是商品編碼不對,或者已下架')
180             except BaseException as e:
181                 self.__log_helper.put(e)
182 
183     def startRun(self):
184         """運行起來"""
185         try:
186             self.__running = True
187             url: str = self.__cfg_info['url']
188             username = self.__cfg_info['username']
189             password = self.__cfg_info['password']
190             item_id = self.__cfg_info['item_id']
191             if url is None or len(url) == 0 or username is None or len(username) == 0 or password is None or len(
192                     password) == 0 or item_id is None or len(item_id) == 0:
193                 self.__log_helper.put('配置信息不全,請檢查config.cfg文件是否爲空,而後再重啓')
194                 return
195             if self.__driver is None:
196                 options = webdriver.IeOptions()
197                 options.add_argument('encoding=UTF-8')
198                 options.add_argument('Accept= text / css, * / *')
199                 options.add_argument('Accept - Language= zh - Hans - CN, zh - Hans;q = 0.5')
200                 options.add_argument('Accept - Encoding= gzip, deflate')
201                 options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko')
202                 # 2. 定義瀏覽器驅動對象
203                 self.__driver = webdriver.Ie(executable_path=self.__work_path + r'\IEDriverServer.exe', options=options)
204             self.run(url, username, password, item_id)
205         except BaseException as e:
206             self.__log_helper.put('運行過程當中出錯,請從新打開再試')
207 
208     def run(self, url, username, password, item_id):
209         """運行起來"""
210         # 3. 訪問網站
211         self.__driver.get(url)
212         # 4. 最大化窗口
213         self.__driver.maximize_window()
214         if self.checkIsExistsByLinkText('帳戶登陸'):
215             # 判斷是否登陸:未登陸
216             self.login(username, password)
217         if self.checkIsExistsByPartialLinkText('歡迎回來'):
218             # 判斷是否登陸:已登陸
219             self.__log_helper.put('登陸成功,下一步開始工做了')
220             self.working(item_id)
221         else:
222             self.__log_helper.put('登陸失敗,請設置帳號密碼')
223 
224     def stop(self):
225         """中止"""
226         try:
227             self.__running = False
228             # 若是驅動不爲空,則關閉
229             self.close_browser_nicely(self.__driver)
230             if self.__driver is not None:
231                 self.__driver.quit()
232                 # 關閉後切要爲None,不然啓動報錯
233                 self.__driver = None
234         except BaseException as e:
235             print('Stop Failure')
236         finally:
237             self.__driver = None
238 
239     def close_browser_nicely(self, browser):
240         try:
241             browser.execute_script("window.onunload=null; window.onbeforeunload=null")
242         except Exception as err:
243             print("Fail to execute_script:'window.onunload=null; window.onbeforeunload=null'")
244 
245         socket.setdefaulttimeout(10)
246         try:
247             browser.quit()
248             print("Close browser and firefox by calling quit()")
249         except Exception as err:
250             print("Fail to quit from browser, error-type:%s, reason:%s" % (type(err), str(err)))
251         socket.setdefaulttimeout(30)
View Code

其餘輔助類工具

日誌類(LogHelper),代碼以下:學習

 1 class LogHelper:
 2     """日誌幫助類"""
 3     __queue: queue.Queue = None  # 隊列
 4     __logging: logging.Logger = None  # 日誌
 5     __running: bool = False  # 是否記錄日誌
 6 
 7     def __init__(self, log_path):
 8         """初始化類"""
 9         self.__queue = queue.Queue(1000)
10         self.init_log(log_path)
11 
12     def put(self, value):
13         """添加數據"""
14         # 記錄日誌
15         self.__logging.info(value)
16         # 添加到隊列
17         if self.__queue.qsize() < self.__queue.maxsize:
18             self.__queue.put(value)
19 
20     def get(self):
21         """獲取數據"""
22         if self.__queue.qsize() > 0:
23             try:
24                 return self.__queue.get(block=False)
25             except BaseException as e:
26                 return None
27         else:
28             return None
29 
30     def init_log(self, log_path):
31         """初始化日誌"""
32         self.__logging = logging.getLogger()
33         self.__logging.setLevel(logging.INFO)
34         # 日誌
35         rq = time.strftime('%Y%m%d%H%M', time.localtime(time.time()))
36         log_name = log_path + rq + '.log'
37         logfile = log_name
38         # if not os.path.exists(logfile):
39         #     # 建立空文件
40         #     open(logfile, mode='r')
41         fh = logging.FileHandler(logfile, mode='a', encoding='UTF-8')
42         fh.setLevel(logging.DEBUG)  # 輸出到file的log等級的開關
43         # 第三步,定義handler的輸出格式
44         formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
45         fh.setFormatter(formatter)
46         # 第四步,將logger添加到handler裏面
47         self.__logging.addHandler(fh)
48 
49     def get_running(self):
50         # 獲取當前記錄日誌的狀態
51         return self.__running
52 
53     def set_running(self, v: bool):
54         # 設置當前記錄日誌的狀態
55 
56         self.__running = v
View Code

配置類(ConfigHelper)測試

 1 class ConfigHelper:
 2     """初始化數據類"""
 3 
 4     __config_dir = None
 5     __dic_cfg = {}
 6 
 7     def __init__(self, config_dir):
 8         """初始化"""
 9         self.__config_dir = config_dir
10 
11     def ReadConfigInfo(self):
12         """獲得配置項"""
13         parser = ConfigParser()
14         parser.read(self.__config_dir + r"\config.cfg")
15         section = parser.sections()[0]
16         items = parser.items(section)
17         self.__dic_cfg.clear()
18         for item in items:
19             self.__dic_cfg.__setitem__(item[0], item[1])
20 
21     def getConfigInfo(self):
22         """獲取配置信息"""
23         if len(self.__dic_cfg) == 0:
24             self.ReadConfigInfo()
25         return self.__dic_cfg
View Code

線程類(MyThread)網站

 1 class MyThread(threading.Thread):
 2     """後臺監控線程"""
 3 
 4     def __init__(self, tid, name, smoking: Smoking, log_helper: LogHelper):
 5         """線程初始化"""
 6         threading.Thread.__init__(self)
 7         self.threadID = tid
 8         self.name = name
 9         self.smoking = smoking
10         self.log_helper = log_helper
11 
12     def run(self):
13         print("開啓線程: " + self.name)
14         self.log_helper.put("開啓線程: " + self.name)
15         # 獲取鎖,用於線程同步
16         # lock = threading.Lock()
17         # lock.acquire()
18         self.smoking.startRun()
19         # 釋放鎖,開啓下一個線程
20         # lock.release()
21         print("結束線程: " + self.name)
22         self.log_helper.put("結束線程: " + self.name)
View Code

備註

 

俠客行  [唐:李白]
趙客縵胡纓,吳鉤霜雪明。銀鞍照白馬,颯沓如流星。 十步殺一人,千里不留行。事了拂衣去,深藏身與名。 閒過信陵飲,脫劍膝前橫。將炙啖朱亥,持觴勸侯嬴。 三杯吐然諾,五嶽倒爲輕。眼花耳熱後,意氣素霓生。 救趙揮金槌,邯鄲先震驚。千秋二壯士,烜赫大梁城。 縱死俠骨香,不慚世上英。誰能書閣下,白首太玄經。
相關文章
相關標籤/搜索