首先,推薦一篇講解進程與線程關係的漫畫:http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.htmlhtml
在平時,咱們若是要執行一個任務,須要排隊執行,可是咱們有了線程和進程就不同了。好比,公司只有我和老闆,有一天,老闆給我派任務了,只有我一我的來作任務,用了一天完成,這就叫單線程;又有一天,老闆又派了一樣的任務,而且招了好幾個技術人員,讓我和他們一塊兒完成任務,結果只用了一個小時,這就叫多線程。python
咱們寫一段代碼,編譯器從上到下讀的過程叫主線程,遇到threading.Thread就叫子線程。程序員
threading用於提供線程相關的操做,線程是應用程序中工做的最小單元。算法
import threading import time def worker(num): time.sleep(1) print("Thread %d" % num) for i in range(10): #args裏面的參數必須是元組 t = threading.Thread(target=worker,args=(i,),name = "t.%d" % i) t.start() print(t.name)#線程名
上述代碼中建立了10個線程,而後控制器就交給了CPU,CPU根據指定算法進行調度,分片執行命令。多線程
若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止
若是是前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止併發
setdaemon:不等待子線程執行完就關閉app
import threading import time def f0(): pass def f1(a1,a2): time.sleep(10) f0() t1 = threading.Thread(target=f1,args=(123,111)) t1.setDaemon(True) t1.start() t2 = threading.Thread(target=f1,args=(123,111)) t2.setDaemon(True) t2.start() t3 = threading.Thread(target=f1,args=(123,111)) t3.setDaemon(True) t3.start()
等待子線程執行完畢才關閉async
import threading import time def f0(): print("f0") def f1(a1,a2): time.sleep(10) f0() t1 = threading.Thread(target=f1,args=(123,111)) t1.start() t2 = threading.Thread(target=f1,args=(123,111)) t2.start() t3 = threading.Thread(target=f1,args=(123,111)) t3.start()
join:等當前線程執行結束後再執行下一個線程,可傳參數,參數表示最多等的秒數ide
import threading import time def f0(): print("f0") def f1(a1,a2): print("f1") f0() t1 = threading.Thread(target=f1,args=(123,111)) t1.start() t1.join() t2 = threading.Thread(target=f1,args=(123,111)) t2.start() t2.join()
咱們使用線程對數據進行操做的時候,若是多個線程同時修改某個數據,可能會出現髒數據(不許確的數據),爲了保證數據的準確性,就須要加把鎖。函數
鎖有兩種:RLock和Lock,這裏咱們使用RLock。
import threading import time globals_num = 0 lock = threading.RLock() def func(): lock.acquire() # 得到鎖 global globals_num globals_num +=1 time.sleep(1) print(globals_num) lock.release() #釋放鎖 for i in range(10): r = threading.Thread(target=func)#建立線程鎖 r.start()
Lock:得到鎖以後必須釋放鎖才能再次得到鎖,否則會產生死鎖,就會一直等待釋放鎖。
import threading lock = threading.Lock() lock.acquire() lock.acquire() #產生死鎖 lock.release() lock.release() import threading lock = threading.Lock() lock.acquire() print("1") lock.release() print("ok") lock.acquire() print("2") lock.release()
RLock:得到幾把鎖就要釋放幾把鎖。
import threading rlock = threading.RLock() rlock.acquire() rlock.acquire() rlock.release() rlock.release() print("end") import threading rlock = threading.RLock() rlock.acquire() print("1") rlock.acquire() print("2") rlock.release() print("3") rlock.release() print("4")
Python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法:set、wait、clear。
事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
import threading def do(event): print('start') event.wait() print('execute') event_obj = threading.Event() for i in range(10): t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear() inp = input('input:') if inp == 'true': event_obj.set()
Queue就是隊列,規則是先進先出。這個模型也叫生產者-消費者模型。
生產者-消費者:建立一個爲10的隊列,生產12個,只消費10個。
import queue import threading message = queue.Queue(10) # 建立隊列 def producer(i): print("put:",i) message.put(i) def consumer(i): print("get:",i) msg = message.get() for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
join:等到隊列爲空時,再繼續往下執行
put(item, block=True, timeout=None):放入隊列尾部
get(block=True, timeout=None):獲取隊列第一個值
put_nowait:等效於 put(item,block=False)
get_nowait:等效於 get(item,block=False)
先說一下線程池是什麼鬼!之因此這麼說,是由於我本覺得很簡單的東東,結果好複雜...可是!懂了以後,又以爲還好還好,沒有特別難。其實就是用最少的勞動力,創造出最多的利益!線程池裏面有不少線程,同時還有一個任務隊列。執行任務過程就是,也就是說重複利用線程來執行任務,減小系統資源的開銷。
簡單版線程池是線程執行完任務以後銷燬,若是還有任務,就再建立線程去執行。
高級版線程池就是線程執行完任務以後回來告訴回調函數執行完畢,回調函數不銷燬它,而是把它做爲空閒線程,不用建立新線程,若是任務隊列中還有任務,就讓它再去執行任務。
so easy!媽媽不再用擔憂我會掉進各類池啦......
簡易版線程池(武大神說這是low版本):
import queue import threading import time class ThreadPool(object): def __init__(self,max_num=20): self.queue = queue.Queue(max_num) # 長度爲20的隊列 for i in range(max_num): self.queue.put(threading.Thread) # threading.Thread是類對象,放入隊列中 def get_thread(self): return self.queue.get() #獲取隊列中的類對象 def add_thread(self): self.queue.put(threading.Thread) def func(pool,a1): time.sleep(1) print(a1) pool.add_thread() p = ThreadPool(10) for i in range(50): thread = p.get_thread() #threading.Thread #獲得一個線程 t = thread(target=func,args=(p,i)) # 執行func函數 t.start()
高級版線程池:
import queue import threading import contextlib import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num): self.q = queue.Queue() # 建立隊列 self.max_num = max_num # 最多建立線程數量 self.terminal = False # 默認爲False self.generate_list = [] # 實際建立的線程 self.free_list = [] # 空閒的線程 def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數) :return: 若是線程池已經終止,則返回True不然None """ if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: # 若是空閒列表爲0而且實際建立的線程列表長度小於最多建立的線程數 self.generate_thread() # 建立線程 w = (func, args, callback,) # 將任務元組賦值給w變量 self.q.put(w) # 將任務放入隊列中 def generate_thread(self): """ 建立一個線程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.currentThread # 建立當前線程 self.generate_list.append(current_thread) # 將線程添加到實際建立線程列表中 event = self.q.get() # 等待着去隊列中獲取線程 while event != StopEvent: # 隊列中沒有中止符 func, arguments, callback = event # 得到任務 try: result = func(*arguments) #函數 status = True #賦值True except Exception as e: status = False # 發生錯誤賦值False result = e # 錯誤類型 if callback is not None: # 回調函數不爲空 try: callback(status, result) # 執行回調函數 except Exception as e: pass if self.terminal: # False event = StopEvent else: with self.worker_state(self.free_list,current_thread): #with:上下文管理 event = self.q.get() #before:append;after:remove else: self.generate_list.remove(current_thread) #沒有任務時,就把實際建立線程列表裏的線程刪掉 @contextlib.contextmanager # 能夠實現上下文管理的裝飾器 def worker_state(self, lis, val): lis.append(val) try: yield finally: lis.remove(val) def close(self): num = len(self.generate_list) #時間建立線程列表長度 while num: self.q.put(StopEvent) #中止符放入已建立的線程中 num -= 1 # 終止線程(清空隊列) def terminate(self): self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() def work(i): print(i) pool = ThreadPool(10) for item in range(50): pool.run(func=work, args=(item,)) pool.terminate() # 執行一下就關閉
ps:這裏要補充一個知識點。
實現上下文管理(高級版本線程池中有出現)。須要導入模塊,而後加上裝飾器就ok。
示例:
import contextlib @contextlib.contextmanager def myopen(file_path,mode): f = open(file_path,mode,encoding="utf-8") try: yield f finally: f.close()
with myopen("index.html","r") as file_obj: print(file_obj.readline())
注:因爲進程之間的數據須要各自持有一份,因此建立進程須要很是大的開銷,也就是佔有很大的內存。同一個進程中,線程跟線程之間的內存是共享的。
from multiprocessing import Process def f(name): print("hello",name) if __name__ == "__main__": p = Process(target=f,args=("bob",)) p.start() print("start") p.join() # 等待 print("end")
#默認數據沒有共享 from multiprocessing import Process,Manager def Foo(i,dic): dic[i] = 100 + i print(dic) for k,v in dic.items(): print(k,v) if __name__ == "__main__": manage = Manager() dic = {} for i in range(2): p = Process(target=Foo,args=(i,dic,)) p.start() p.join() #打印結果: {0: 100} 0 100 {1: 101} 1 101
#實現數據共享 from multiprocessing import Process,Manager def foo(i,dic): dic[i] = 100 + i print(dic) print(len(dic)) if __name__ == "__main__": manage = Manager() dic = manage.dict() for i in range(2): p = Process(target=foo,args=(i,dic,)) p.start() p.join() #打印結果: {0: 100} 1 {0: 100, 1: 101} 2
'c': ctypes.c_char, 'u': ctypes.c_wchar, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池程序中沒有可供使用的進程,那麼進程就會等待,直到進程池中有可用進程爲止。
進程池中有兩個方法:apply_async、apply
apply_async:
一次建立多個進程,併發執行,執行完一個將返回值給回調函數而後執行回調函數
from multiprocessing import Pool import time def f1(a): time.sleep(1) print(a) return 1000 def f2(arg): print(arg) if __name__ == "__main__": pool = Pool(5) for i in range(10): pool.apply_async(func=f1,args=(i,),callback=f2) print("111111") pool.close() pool.join()
join:進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。
apply:
#排隊執行,apply裏面有join from multiprocessing import Pool import time def f1(a): time.sleep(1) print(a) if __name__ == "__main__": pool = Pool(5) for i in range(10): pool.apply(func=f1,args=(i,)) print("111111")
線程是屬於進程的,線程運行在進程空間內,而且同一進程所產生的線程共享同一內存空間,當進程退出時該進程所產生的線程都會被強制退出並清除。不論是進程仍是線程,都是爲了實現一個併發操做。
io密集型:使用線程;計算密集型:使用進程。根本的緣由是Python的線程裏面有個鎖(只有Python裏有這個鎖:GIL):全局解釋器鎖,規定進程裏面只有一個線程能出來被CPU調用。
線程在執行任務時,發現要等一段時間才能獲得結果,就不等結果,再去執行其餘任務。
線程和進程的操做是由程序觸發系統接口,最後的執行者是系統;協程的操做則是程序員。
協程存在的意義:對於多線程應用,CPU經過切片的方式來切換線程間的執行,線程切換時須要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。
協程的適用場景:當程序中存在大量不須要CPU的操做時(IO),適用於協程;
gevent
手動版協程:爲了看出執行任務時須要等待,因此咱們就睡一會咯(gevent.sleep)
import gevent def foo(): print("1") gevent.sleep() #表示暫停,去執行下一個 print("3") def bar(): print("2") gevent.sleep() print("4") gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ]) #打印結果: 1 2 3 4
終極版協程:終極版就是不用手動讓它睡一會就能看出效果的版本!
from gevent import monkey; monkey.patch_all() import gevent import requests def f(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text print(url,len(data)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://baidu.com/'), ]) #執行結果:須要你們本身去執行才能感同身受呢!!!