一個進程必有一個線程,進程也可由多個線程組成,但有一個線程爲主線程。
若一個任務須要花10Mins,當只有一個線程時,花費10Mins,當有十個線程時,可能就花費1Mins,因此多線程能夠提高任務執行時間,提升工做效率。
python裏與線程有關的模塊:php
查看當前運行的線程個數:threading.current_thread()
查看當前線程信息:threading.active_count()python
import _thread import threading def job(): print("當前線程個數:",threading.active_count()) print("當前線程信息",threading.current_thread()) if __name__=='__main__': job()
調用thread模塊中的start_new_thread()函數來產生新線程。
thread.start_new_thread(function,args = ())git
#_thread建立多線程 import _thread import time def job(name): print("name:%s,time:%s" %(name,time.ctime())) if __name__=="__main__": # 建立多個線程, 可是沒有開始執行任務 _thread.start_new_thread(job,('thread1',)) _thread.start_new_thread(job,('thread2',)) while True: #盲等待 pass
_thread模塊提供了低級別的、原始的線程以及一個簡單的鎖。
threading模塊是對_thread再封裝,對使用者更友好
經過實例化Thread對象建立線程,Thread的方法有:github
import threading def job(name): print("當前執行的任務名:",name) print("當前線程個數:",threading.active_count()) print("當前線程信息:",threading.current_thread()) if __name__=="__main__": t1 = threading.Thread(target=job,name='thread1',args=('job1',)) t2 = threading.Thread(target=job,name='thread2',args=('job2',)) t1.start() #Start the thread's activity. t2.start()
不使用多線程執行任務,程序會一直等待sleep時間過去,在執行下一條命令。json
#不使用多線程 import time def music(name): for i in range(2): print("i am listening :",name) time.sleep(2) def read(book): for i in range(2): print("i am reading :",book) time.sleep(1) if __name__ == '__main__': start_time = time.time() music("空空如也") read('面紗') print("花費時間: %s" %(time.time()-start_time))
使用多線程執行任務,在遇到某一線程須要等待時,會執行其餘線程
Thread.join()會等待當前線程執行結束,再執行主線程。api
import threading import time def music(name): for i in range(2): print("i am listening :",name) time.sleep(2) def read(book): for i in range(2): print("i am reading :",book) time.sleep(1) if __name__=="__main__": start_time = time.time() t1 = threading.Thread(target=music,args=('空空如也',)) t2 = threading.Thread(target=read,args=('面紗',)) t1.start() t2.start() t1.join() #等待線程執行結束,才執行主程序,防止主線程阻塞子線程 t2.join() end_time = time.time() print("任務執行時間:",end_time-start_time)
當申明一個子線程爲守護線程時,主線程結束時,子線程也結束。
申明守護線程須要在開啓線程以前。網絡
import threading import time def music(name): for i in range(2): print("listening music :",name) time.sleep(4) def code(pro): for i in range(2): print('i am coding :',pro) time.sleep(5) if __name__=='__main__': st_time = time.time() t1 = threading.Thread(target=music,args=('hello',)) t2 = threading.Thread(target=code,args=('mydiff',)) #將線程申明爲守護線程,若是設置爲True,當主線程結束,子線程也結束 #必須在啓動線程以前進行設置 t1.setDaemon(True) t2.setDaemon(True) #主線程執行結束以後,子線程還沒來得及執行結束,整個程序就退出了 t1.start() t2.start() end_time = time.time() print('運行時間:',end_time-st_time)
若是多個線程共同對某個數據修改,則可能出現不可預料的結果,爲了保證數據的正確性,須要對多個線程進行同步。
使用Thread對象的Lock和Rlock能夠實現簡單的線程同步,這兩個對象都有acquire方法和release方法,對於那些須要每次只容許一個線程操做的數據,能夠將其操做放到acquire和release方法之間。多線程
import threading def add(lock): #操做變量以前加鎖 lock.acquire() global money for i in range(1389993): money+=1 #變量操做完成以後,解鎖 lock.release() def reduce(lock): #操做變量以前加鎖 lock.acquire() global money for i in range(4728937): money-=1 #變量操做完成以後,解鎖 lock.release() if __name__=="__main__": money = 0 lock = threading.Lock() #示例化一個鎖對象 t1 = threading.Thread(target=add,args=(lock,)) t2 = threading.Thread(target=reduce,args=(lock,)) t1.start() t2.start() t1.join() t2.join() print('最終金額爲:',money)
Python 代碼的執行由 Python 虛擬機(也叫解釋器主循環)來控制。Python 在設計之初就考慮到要在主循環中,同時只有一個線程在執行,就像單 CPU 的系統中運行多個進程那樣,內存中能夠存放多個程序,但任意時刻,只有一個程序在 CPU 中運行。一樣地,雖然 Python 解釋器中能夠「運行」,多個線程,但在任意時刻,只有一個線程在解釋器中運行。app
對 Python 虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。函數
執行過程:
1). 設置GIL 2). 切換到線程去運行對應的任務; 3). 運行 - 執行完了 - time.sleep() - 獲取其餘信息才能繼續執行, eg: 從網絡上獲取網頁信息等; 3. 把線程設置爲睡眠狀態 4. 解鎖GIL 5.再次重複執行上述內容;
在工做中,某些模塊生成一些數據,由另外一些模塊負責處理。產生數據的模塊,就形象地稱爲生產者;而處理數據的模塊,就稱爲消費者。在生產者與消費者之間在加個緩衝區,咱們形象的稱之爲倉庫,生產者負責往倉庫了進商品,而消費者負責從倉庫裏拿商品,這就構成了生產者消費者模式。
這裏,咱們用生產者消費者模型來實現多線程的網址訪問,節省時間。
#多線程實現生產者消費者模型 #實現不一樣的網址或ip訪問 import threading from urllib.request import urlopen def create_data(): with open('ips.txt','w') as f: f.write("www.baidu.com\n") f.write("www.163.com\n") for i in range(100): f.write('172.25.254.%s\n' %(i+1)) def creat_url(filename='ips.txt'): ports=[80,443] with open(filename) as f: ips = [url_info.strip() for url_info in f.readlines()] urls = ['http://%s:%s' %(ip,port) for ip in ips for port in ports] return urls def job(url): try: urlObj = urlopen(url) except Exception as e : print('Warnning!!! %s不可訪問' %(url)) else: print("%s能夠訪問" %(url)) if __name__=="__main__": urls = creat_url() threads = [] for url in urls: t = threading.Thread(target=job,args=(url,)) threads.append(t) t.start() [thread.join() for thread in threads] print("任務執行結束")
對threading.Thread類的再封裝,執行時無需傳遞參數
from threading import Thread class IpThread(Thread): def __init__(self): super(IpThread, self).__init__() # 將多線程須要執行的任務重寫到run方法中; def run(self): print("this is a JOB") print(type(self)) t = IpThread() t.start()
實現訪問Ip地址
import json from threading import Thread from urllib.request import urlopen class IpThread(Thread): #重寫構造方法,若是執行的任務須要傳遞參數,那將參數與self綁定 def __init__(self,jobname,ip): super(IpThread, self).__init__() self.jobname = jobname self.ip = ip #將多線程須要執行的任務重寫到run方法中 def run(self): print('this is a %s job' %(self.jobname)) #須要有一個參數,傳遞ip url = "http://ip.taobao.com/service/getIpInfo.php?ip=%s" % (self.ip) try : # 根據url獲取網頁的內容, 而且解碼爲utf-8格式, 識別中文; result = urlopen(url).read().decode('utf-8') except Exception as e: print("訪問%s失敗" %(self.ip)) else: # 將獲取的字符串類型轉換爲字典, 方便處理 d = json.loads(result)['data'] country = d['country'] city = d['city'] print("%s位於%s,城市爲%s" %(self.ip,country,city)) if __name__=="__main__": ips = ['172.25.254.22','8.8.8.8','89.31.136.0'] threads = [] for ip in ips : t = IpThread(jobname='Clawer',ip=ip) threads.append(t) t.start() [thread.join() for thread in threads] print("程序執行結束")
線程池是一種多線程處理形式,處理過程當中將任務添加到隊列,而後在建立線程後自動啓動這些任務。線程池線程都是後臺線程。每一個線程都使用默認的堆棧大小,以默認的優先級運行,並處於多線程單元中。
from concurrent.futures import ThreadPoolExecutor import time #須要執行的任務 def job(): print("morning sheen") return 'new day' if __name__=='__main__': #示例化對象,線程池裏最多有10個線程 pool = ThreadPoolExecutor(max_workers=10) #往線程池裏扔須要執行的任務,返回一個對象 _base.Future()示例化出來的 f1 = pool.submit(job) f2 = pool.submit(job) #判斷任務是否執行結束 print(f1.done()) time.sleep(1) print(f2.done()) #判斷是否釋放了線程 #獲取執行任務的結果 print(f1.result()) print(f2.result())
略
concurrent.futures.ThreadPoolExecutor,在提交任務的時候,有兩種方式,一種是submit()函數,另外一種是map()函數,二者的主要區別在於:
from urllib.error import HTTPError from urllib.request import urlopen from concurrent.futures import ThreadPoolExecutor,as_completed import time URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']*3 def get_page(url,timeout = 0.3): #爬取網頁信息 try: content = urlopen(url).read() return {'url':url, 'len':len(content)} except HTTPError as e: return {'url':url, 'len':0} # 方法1: submit提交任務 start_time = time.time() pool = ThreadPoolExecutor(max_workers=20) #submit返回的是Future對象,對於Future對象能夠簡單地理解爲一個在將來完成的操做 futuresObj = [pool.submit(get_page, url) for url in URLS] # # 注意: 傳遞的是包含futures對象的序列, as_complete返回已經執行完任務的future對象, # # 直到全部的future對應的任務執行完成, 循環結束; for finish_fs in as_completed(futuresObj): print(finish_fs.result() ) #submit返回值Future的方法result(self, timeout=None) """Return the result of the call that the future represents. Args: timeout: The number of seconds to wait for the result if the future isn't done. If None, then there is no limit on the wait time. Returns: The result of the call that the future represents.""" print("執行時間:%s" %(time.time()-start_time)) # 方法2:經過map方式執行 start2_time = time.time() pool2 = ThreadPoolExecutor(max_workers=20) for res in pool2.map(get_page, URLS): print(res) print("執行時間:%s" %(time.time()-start2_time))