在進行網站爬取數據的時候,會發現不少網站都進行了反爬蟲的處理,如JS加密,Ajax加密,反Debug等方法,經過請求獲取數據和頁面展現的內容徹底不一樣,這時候就用到Selenium技術,來模擬瀏覽器的操做,而後獲取數據。本文以一個簡單的小例子,簡述Python搭配Tkinter和Selenium進行瀏覽器的模擬操做,僅供學習分享使用,若有不足之處,還請指正。css
Selenium 是一個用於Web應用程序測試的工具,Selenium測試直接運行在瀏覽器中,就像真正的用戶在操做同樣。支持的瀏覽器包括IE(7, 8, 9, 10, 11),Mozilla Firefox,Safari,Google Chrome,Opera等。Selenium支持多種操做系統,如Windows、Linux、IOS等,若是須要支持Android,則須要特殊的selenium,本文主要以IE11瀏覽器爲例。web
經過pip install selenium 進行安裝便可,若是速度慢,則可使用國內的鏡像進行安裝。數組
程序雖小,除了須要掌握的Html ,JavaScript,CSS等基礎知識外,本例涉及的Python相關知識點仍是蠻多的,具體以下:瀏覽器
本例主要針對某一配置好的商品ID進行輪詢,監控是否有貨,有貨則加入購物車,無貨則繼續輪詢,以下圖所示:socket
本例最核心的代碼,就是利用Selenium進行網站的模擬操做,以下所示:ide
1 class Smoking: 2 """定義Smoking類""" 3 4 # 瀏覽器驅動 5 __driver: webdriver = None 6 # 配置幫助類 7 __cfg_info: dict = {} 8 # 日誌幫助類 9 __log_helper: LogHelper = None 10 # 主程序目錄 11 __work_path: str = '' 12 # 是否正在運行 13 __running: bool = False 14 # 無貨 15 __no_stock = 'Currently Out of Stock' 16 # 線程等待秒數 17 __wait_sec = 2 18 19 def __init__(self, work_path, cfg_info, log_helper: LogHelper): 20 """初始化""" 21 self.__cfg_info = cfg_info 22 self.__log_helper = log_helper 23 self.__work_path = work_path 24 self.__wait_sec = int(cfg_info['wait_sec']) 25 # 若是小於2,則等於2 26 self.__wait_sec = (2 if self.__wait_sec < 2 else self.__wait_sec) 27 28 def checkIsExistsById(self, id): 29 """經過ID判斷是否存在""" 30 try: 31 i = 0 32 while self.__running and i < 3: 33 if len(self.__driver.find_elements_by_id(id)) > 0: 34 break 35 else: 36 time.sleep(self.__wait_sec) 37 i = i + 1 38 return len(self.__driver.find_elements_by_id(id)) > 0 39 except BaseException as e: 40 return False 41 42 def checkIsExistsByName(self, name): 43 """經過名稱判斷是否存在""" 44 try: 45 i = 0 46 while self.__running and i < 3: 47 if len(self.__driver.find_elements_by_name(name)) > 0: 48 break 49 else: 50 time.sleep(self.__wait_sec) 51 i = i + 1 52 return len(self.__driver.find_elements_by_name(name)) > 0 53 except BaseException as e: 54 return False 55 56 def checkIsExistsByPath(self, path): 57 """經過xpath判斷是否存在""" 58 try: 59 i = 0 60 while self.__running and i < 3: 61 if len(self.__driver.find_elements_by_xpath(path)) > 0: 62 break 63 else: 64 time.sleep(self.__wait_sec) 65 i = i + 1 66 return len(self.__driver.find_elements_by_xpath(path)) > 0 67 except BaseException as e: 68 return False 69 70 def checkIsExistsByClass(self, cls): 71 """經過class名稱判斷是否存在""" 72 try: 73 i = 0 74 while self.__running and i < 3: 75 if len(self.__driver.find_elements_by_class_name(cls)) > 0: 76 break 77 else: 78 time.sleep(self.__wait_sec) 79 i = i + 1 80 return len(self.__driver.find_elements_by_class_name(cls)) > 0 81 except BaseException as e: 82 return False 83 84 def checkIsExistsByLinkText(self, link_text): 85 """判斷LinkText是否存在""" 86 try: 87 i = 0 88 while self.__running and i < 3: 89 if len(self.__driver.find_elements_by_link_text(link_text)) > 0: 90 break 91 else: 92 time.sleep(self.__wait_sec) 93 i = i + 1 94 return len(self.__driver.find_elements_by_link_text(link_text)) > 0 95 except BaseException as e: 96 return False 97 98 def checkIsExistsByPartialLinkText(self, link_text): 99 """判斷包含LinkText是否存在""" 100 try: 101 i = 0 102 while self.__running and i < 3: 103 if len(self.__driver.find_elements_by_partial_link_text(link_text)) > 0: 104 break 105 else: 106 time.sleep(self.__wait_sec) 107 i = i + 1 108 return len(self.__driver.find_elements_by_partial_link_text(link_text)) > 0 109 except BaseException as e: 110 return False 111 112 # def waiting(self, *locator): 113 # """等待完成""" 114 # # self.__driver.switch_to.window(self.__driver.window_handles[1]) 115 # Wait(self.__driver, 60).until(EC.visibility_of_element_located(locator)) 116 117 def login(self, username, password): 118 """登陸""" 119 # 5. 點擊連接跳轉到登陸頁面 120 self.__driver.find_element_by_link_text('帳戶登陸').click() 121 # 6. 輸入帳號密碼 122 # 判斷是否加載完成 123 # self.waiting((By.ID, "email")) 124 if self.checkIsExistsById('email'): 125 self.__driver.find_element_by_id('email').send_keys(username) 126 self.__driver.find_element_by_id('password').send_keys(password) 127 # 7. 點擊登陸按鈕 128 self.__driver.find_element_by_id('sign-in').click() 129 130 def working(self, item_id): 131 """工做狀態""" 132 while self.__running: 133 try: 134 # 正常獲取信息 135 if self.checkIsExistsById('string'): 136 self.__driver.find_element_by_id('string').clear() 137 self.__driver.find_element_by_id('string').send_keys(item_id) 138 self.__driver.find_element_by_id('string').send_keys(Keys.ENTER) 139 # 判斷是否查詢到商品 140 xpath = "//div[@class='specialty-header search']/div[@class='specialty-description']/div[" \ 141 "@class='gt-450']/span[2] " 142 if self.checkIsExistsByPath(xpath): 143 count = int(self.__driver.find_element_by_xpath(xpath).text) 144 if count < 1: 145 time.sleep(self.__wait_sec) 146 self.__log_helper.put('沒有查詢到item id =' + item_id + '對應的信息') 147 continue 148 else: 149 time.sleep(self.__wait_sec) 150 self.__log_helper.put('沒有查詢到item id2 =' + item_id + '對應的信息') 151 continue 152 # 判斷當前庫存是否有貨 153 154 xpath1 = "//div[@class='product-list']/div[@class='product']/div[@class='price-and-detail']/div[" \ 155 "@class='price']/span[@class='noStock'] " 156 if self.checkIsExistsByPath(xpath1): 157 txt = self.__driver.find_element_by_xpath(xpath1).text 158 if txt == self.__no_stock: 159 # 當前無貨 160 time.sleep(self.__wait_sec) 161 self.__log_helper.put('查詢一次' + item_id + ',無貨') 162 continue 163 164 # 連接path1 165 xpath2 = "//div[@class='product-list']/div[@class='product']/div[@class='imgDiv']/a" 166 # 判斷是否加載完畢 167 # self.waiting((By.CLASS_NAME, "imgDiv")) 168 if self.checkIsExistsByPath(xpath2): 169 self.__driver.find_element_by_xpath(xpath2).click() 170 time.sleep(self.__wait_sec) 171 # 加入購物車 172 if self.checkIsExistsByClass('add-to-cart'): 173 self.__driver.find_element_by_class_name('add-to-cart').click() 174 self.__log_helper.put('加入購物車成功,商品item-id:' + item_id) 175 break 176 else: 177 self.__log_helper.put('未找到加入購物車按鈕') 178 else: 179 self.__log_helper.put('沒有查詢到,多是商品編碼不對,或者已下架') 180 except BaseException as e: 181 self.__log_helper.put(e) 182 183 def startRun(self): 184 """運行起來""" 185 try: 186 self.__running = True 187 url: str = self.__cfg_info['url'] 188 username = self.__cfg_info['username'] 189 password = self.__cfg_info['password'] 190 item_id = self.__cfg_info['item_id'] 191 if url is None or len(url) == 0 or username is None or len(username) == 0 or password is None or len( 192 password) == 0 or item_id is None or len(item_id) == 0: 193 self.__log_helper.put('配置信息不全,請檢查config.cfg文件是否爲空,而後再重啓') 194 return 195 if self.__driver is None: 196 options = webdriver.IeOptions() 197 options.add_argument('encoding=UTF-8') 198 options.add_argument('Accept= text / css, * / *') 199 options.add_argument('Accept - Language= zh - Hans - CN, zh - Hans;q = 0.5') 200 options.add_argument('Accept - Encoding= gzip, deflate') 201 options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko') 202 # 2. 定義瀏覽器驅動對象 203 self.__driver = webdriver.Ie(executable_path=self.__work_path + r'\IEDriverServer.exe', options=options) 204 self.run(url, username, password, item_id) 205 except BaseException as e: 206 self.__log_helper.put('運行過程當中出錯,請從新打開再試') 207 208 def run(self, url, username, password, item_id): 209 """運行起來""" 210 # 3. 訪問網站 211 self.__driver.get(url) 212 # 4. 最大化窗口 213 self.__driver.maximize_window() 214 if self.checkIsExistsByLinkText('帳戶登陸'): 215 # 判斷是否登陸:未登陸 216 self.login(username, password) 217 if self.checkIsExistsByPartialLinkText('歡迎回來'): 218 # 判斷是否登陸:已登陸 219 self.__log_helper.put('登陸成功,下一步開始工做了') 220 self.working(item_id) 221 else: 222 self.__log_helper.put('登陸失敗,請設置帳號密碼') 223 224 def stop(self): 225 """中止""" 226 try: 227 self.__running = False 228 # 若是驅動不爲空,則關閉 229 self.close_browser_nicely(self.__driver) 230 if self.__driver is not None: 231 self.__driver.quit() 232 # 關閉後切要爲None,不然啓動報錯 233 self.__driver = None 234 except BaseException as e: 235 print('Stop Failure') 236 finally: 237 self.__driver = None 238 239 def close_browser_nicely(self, browser): 240 try: 241 browser.execute_script("window.onunload=null; window.onbeforeunload=null") 242 except Exception as err: 243 print("Fail to execute_script:'window.onunload=null; window.onbeforeunload=null'") 244 245 socket.setdefaulttimeout(10) 246 try: 247 browser.quit() 248 print("Close browser and firefox by calling quit()") 249 except Exception as err: 250 print("Fail to quit from browser, error-type:%s, reason:%s" % (type(err), str(err))) 251 socket.setdefaulttimeout(30)
其餘輔助類工具
日誌類(LogHelper),代碼以下:學習
1 class LogHelper: 2 """日誌幫助類""" 3 __queue: queue.Queue = None # 隊列 4 __logging: logging.Logger = None # 日誌 5 __running: bool = False # 是否記錄日誌 6 7 def __init__(self, log_path): 8 """初始化類""" 9 self.__queue = queue.Queue(1000) 10 self.init_log(log_path) 11 12 def put(self, value): 13 """添加數據""" 14 # 記錄日誌 15 self.__logging.info(value) 16 # 添加到隊列 17 if self.__queue.qsize() < self.__queue.maxsize: 18 self.__queue.put(value) 19 20 def get(self): 21 """獲取數據""" 22 if self.__queue.qsize() > 0: 23 try: 24 return self.__queue.get(block=False) 25 except BaseException as e: 26 return None 27 else: 28 return None 29 30 def init_log(self, log_path): 31 """初始化日誌""" 32 self.__logging = logging.getLogger() 33 self.__logging.setLevel(logging.INFO) 34 # 日誌 35 rq = time.strftime('%Y%m%d%H%M', time.localtime(time.time())) 36 log_name = log_path + rq + '.log' 37 logfile = log_name 38 # if not os.path.exists(logfile): 39 # # 建立空文件 40 # open(logfile, mode='r') 41 fh = logging.FileHandler(logfile, mode='a', encoding='UTF-8') 42 fh.setLevel(logging.DEBUG) # 輸出到file的log等級的開關 43 # 第三步,定義handler的輸出格式 44 formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s") 45 fh.setFormatter(formatter) 46 # 第四步,將logger添加到handler裏面 47 self.__logging.addHandler(fh) 48 49 def get_running(self): 50 # 獲取當前記錄日誌的狀態 51 return self.__running 52 53 def set_running(self, v: bool): 54 # 設置當前記錄日誌的狀態 55 56 self.__running = v
配置類(ConfigHelper)測試
1 class ConfigHelper: 2 """初始化數據類""" 3 4 __config_dir = None 5 __dic_cfg = {} 6 7 def __init__(self, config_dir): 8 """初始化""" 9 self.__config_dir = config_dir 10 11 def ReadConfigInfo(self): 12 """獲得配置項""" 13 parser = ConfigParser() 14 parser.read(self.__config_dir + r"\config.cfg") 15 section = parser.sections()[0] 16 items = parser.items(section) 17 self.__dic_cfg.clear() 18 for item in items: 19 self.__dic_cfg.__setitem__(item[0], item[1]) 20 21 def getConfigInfo(self): 22 """獲取配置信息""" 23 if len(self.__dic_cfg) == 0: 24 self.ReadConfigInfo() 25 return self.__dic_cfg
線程類(MyThread)網站
1 class MyThread(threading.Thread): 2 """後臺監控線程""" 3 4 def __init__(self, tid, name, smoking: Smoking, log_helper: LogHelper): 5 """線程初始化""" 6 threading.Thread.__init__(self) 7 self.threadID = tid 8 self.name = name 9 self.smoking = smoking 10 self.log_helper = log_helper 11 12 def run(self): 13 print("開啓線程: " + self.name) 14 self.log_helper.put("開啓線程: " + self.name) 15 # 獲取鎖,用於線程同步 16 # lock = threading.Lock() 17 # lock.acquire() 18 self.smoking.startRun() 19 # 釋放鎖,開啓下一個線程 20 # lock.release() 21 print("結束線程: " + self.name) 22 self.log_helper.put("結束線程: " + self.name)