運行多個線程同時運行幾個不一樣的程序相似,但具備如下優勢:
進程內共享多線程與主線程相同的數據空間,若是他們是獨立的進程,能夠共享信息或互相溝通更容易.
線程有時稱爲輕量級進程,他們並不須要多大的內存開銷,他們關心的不是過程便宜.
一個線程都有一個開始,執行順序,並得出結論。它有一個指令指針,保持它的上下文內正在運行的跟蹤.
(1)、它能夠是搶佔(中斷)
(2)、它能夠暫時擱置(又稱睡眠),而其餘線程正在運行
看一下如下的小案例:python
import thread from time import sleep, ctime def loop0(): print "loop 0開始時間:",ctime() #第一個函數loop0開始時間 sleep(4) # 休眠4秒 print "loop 0 結束時間:_’,ctime() def loopl(): print "loop 1 開始時間:",ctime() sleep(2) print "loop 1 結束時間:_’,ctime() def main(): print "程序開始時間:",ctime() thread.start_new_thread(loop0,()) # 第二個參數是必不可少的,即便loope沒有傳遞參數,仍然要寫一個空元組 thread.stant_new_thnead(loopl,()) sleep(6) #這裏休眠6秒的緣由是確保兩個線程己經執行完畢,主線程才接着執行下面的語句 print "程序結束時間:",ctime() if __name__ == '__main__': main()
在web測試中,不可避免的一個測試就是瀏覽器兼容性測試,在沒有自動化測試前,咱們老是苦逼的在一臺或多臺機器上安裝N種瀏覽器,而後手工在不一樣的瀏覽器上驗證主業務流程和關鍵功能模塊功能,以檢測不一樣瀏覽器或不一樣版本瀏覽器上,咱們的web應用是否能夠正常工做。若是咱們使用selenium webdriver,那咱們就可以自動的在IE、firefox、chrome、等不一樣瀏覽器上運行測試用例。爲了能在同一臺機器上不一樣瀏覽器上同時執行測試用例,咱們須要多線程技術。下面咱們基於python的多線程技術來嘗試同時啓動多個瀏覽器進行selenium自動化測試。web
#-*- coding:utf-8 from selenium import webdriver import sys from time import sleep from threading import Thread reload(sys) sys.setdefaultencoding("utf-8") def test_baidu_seanch(browsen, url): driver = None #可添加更多瀏覽器支持進來 if browser == "ie": driver = webdriver.Ie() elif browser == "finefox": driver = webdriver.Firefox() elif browser == "chrome": driver = webdriver.Chnome() if driver == None: exit() driver.get(url) sleep(3) driver.find_element_by_id("xxx").send_keys(u"xxxx") sleep(3) driver.find_element_by_id("xxx").click() sleep(3) driver.quit() if __name__ == "__main__": #瀏覽器和首頁url data = { "ie":"http://www.xxx.com", "firefox": "http: //www.xxx.com", "chrome":"http://www.xxxx.com" } #構建線程 threads =[] for b, url in data.items(): t = Thread(target=test_baidu_search,angs=(b, url)) threads.append(t) #啓動全部線程 for thr in threads: thr.start()
threading 高級線程接口chrome
import threading class MyThnead(threading.Thread):
def __init__(self, name=None): threading.Thread.__init__(self) self.name = name def run(self): print self.name def test(): for i in range(0, 100): t = MyThread("thread_" + str(i)) t.start() if __name__ == '__main__': test()
Lock 線程鎖
這裏建立實現了一個計數器 count 這個全局變量會被多個線程同時操做,使其可以被順序相加,須要靠線程鎖的幫助。編程
#-*- encoding: utf-8 import threading import time class Test(threading.Thread): def __init__(self, num): threading.Thread.—init—(self) self._run_num = num def run(self): global count, mutex threadname = threading.currentThnead().getName() for x in nange(int(self._run_num)): mutex.acquire() count = count + 1 mutex.release() print (thneadname, x, count) time.sleep(l) if __name__ == '__main__': global count^ mutex threads =[] num = 5 count =0 #建立鎖 mutex = threading.Lock() #建立線程對象 for x in nange(num): threads.append(Test(10)) #啓動線程 for t in threads: t. start() #等待子線程結束 for t in threads: t.join()
Queue隊列設計模式
#!/usr/bin/env python import Queue import threading import urllib2 import time hosts = ["http://xxxx.com", "http://xxxxx.com","http://xxxxxx.com","http://xxxxx.com", "http://xxxxx.com"] queue = Queue.Queue() class ThreadUrl(thneading.Thread): ""」Threaded Uni Grab def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): while True: #gnabs host from queue host = self.queue.get() url = urllib2.urlopen(host) #gnabs urls of hosts and prints first 1024 bytes of page uni = urllib2.urlopen(host) #signals to queue job is done self.queue.task_done() start = time.time() def main(): #spawn a pool of threads, and pass them queue instance for i in nange(5): t = ThreadUrl(queue) t.setDaemon(True) t.start() #populate queue with data for host in hosts: queue.put(host) #wait on the queue until everything has been processed queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
當線程須要共享數據或資源時,線程可能會變得複雜。線程模塊提供許多同步原語,包括信號量,條件變量,事件和鎖。雖然存在這些選項,但它被認爲是最佳作法,而是專一於使用隊列。隊列更容易處理,而且使線程編程更安全,由於它們有效地將資源訪問單個線程,並容許更清晰和更可讀的設計模式。
首先建立一個程序,該程序將按順序或一個接一個地獲取網站的URL,並打印頁面的前1024個字節。這是一個經典的例子,能夠使用線程更快地完成任務。首先,讓咱們一塊兒使用這個urllib2 模塊來抓住這些頁面,而後再使用代碼:瀏覽器
import urllib2 import time hosts = ["http://xxxx.com", "http://xxxxx.com","http://xxxxxx.com","http://xxxxx.com", "http://xxxxx.com"] start = time.time() for host in hosts: url = urllib2.urlopen(host) print url.read(1024) print "Elapsed Time: %s" % (time.time() - start)
導入兩個模塊首先, urllib2模塊是什麼是繁重的抓住網頁。其次,經過調用建立開始時間值 time.time(),而後再次調用它,並減去初始值以肯定程序執行多長時間。最後,在查看程序的速度時,「兩個半秒」的結果是不可怕的,可是若是您有數百個網頁來檢索,則考慮到目前的平均值,大概須要50秒。看看如何建立一個線程版本加快速度:安全
import Queue import threading import urllib2 import time hosts = ["http://xxxx.com", "http://xxxxx.com","http://xxxxxx.com","http://xxxxx.com", "http://xxxxx.com"] queue = Queue.Queue() class ThreadUrl(thneading.Thread): ""」Threaded Uni Grab def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): while True: #gnabs host from queue host = self.queue.get() url = urllib2.urlopen(host) #gnabs urls of hosts and prints first 1024 bytes of page print url.read(1024) #signals to queue job is done self.queue.task_done() def main(): #spawn a pool of threads, and pass them queue instance for i in nange(5): t = ThreadUrl(queue) t.setDaemon(True) t.start() for host in hosts: queue.put(host) queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
上面的案例並不比第一個線程示例複雜得多,這要歸功於使用排隊模塊。這種模式是使用Python的線程的一種很是常見的推薦方式。步驟描述以下:
1. 建立一個實例,Queue.Queue()而後用數據填充它。 多線程
2.將填充數據的實例傳遞到您從繼承中建立的線程類threading.Thread。app
3.產生一個守護進程池線程。函數
4. 一次將一個項目拉出隊列,並使用線程內的數據,運行方法來完成工做。
5.完成工做後,向queue.task_done()任務完成發送一個信號到隊列。
6. 加入隊列,這意味着等到隊列爲空,而後退出主程序。
只是一個關於這種模式的註釋:經過將守護進程線程設置爲true,它容許主線程或程序退出,若是隻有守護進程線程存活。這將建立一種控制程序流程的簡單方法,由於您能夠在退出以前鏈接隊列,或等到隊列爲空。具體過程最好在隊列模塊的文檔中描述,如相關主題所示:join()「塊直到隊列中的全部項目已經被處理完畢,每當一個項目被添加到隊列中時,未完成任務的計數就會上升,當消費者線程調用task_done()來指示項目被檢索時,全部的工做都是完成的,當未完成任務的計數降低到零時,join()解除阻塞。