多任務能夠由多進程完成,也能夠由一個進程內的多線程完成,一個進程內的全部線程,共享同一塊內存python中建立線程比較簡單,導入threading模塊,下面來看一下代碼中如何建立多線程。html
def f1(i): time.sleep(1) print(i) if __name__ == '__main__': for i in range(5): t = threading.Thread(target=f1, args=(i,)) t.start() print('start') # 主線程等待子線程完成,子線程併發執行 >>start >>2 >>1 >>3 >>0 >>4
主線程從上到下執行,建立5個子線程,打印出'start',而後等待子線程執行完結束,若是想讓線程要一個個依次執行完,而不是併發操做,那麼就要使用join方法。下面來看一下代碼前端
import threading import time def f1(i): time.sleep(1) print(i) if __name__ == '__main__': for i in range(5): t = threading.Thread(target=f1, args=(i,)) t.start() t.join() print('start') # 線程從上到下依次執行,最後打印出start >>0 >>1 >>2 >>3 >>4 >>start
上面的代碼不適用join的話,主線程會默認等待子線程結束,纔會結束,若是不想讓主線程等待子線程的話,能夠子線程啓動以前設置將其設置爲後臺線程,若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止,前臺線程則相反,若果不加指定的話,默認爲前臺線程,下面從代碼來看一下,如何設置爲後臺線程。例以下面的例子,主線程直接打印start,執行完後就結束,而不會去等待子線程,子線程中的數據也就不會打印出來python
import threading import time def f1(i): time.sleep(1) print(i) if __name__ == '__main__': for i in range(5): t = threading.Thread(target=f1, args=(i,)) t.setDaemon(True) t.start() print('start') # 主線程不等待子線程 >> start
除此以外,本身還能夠爲線程自定義名字,經過 t = threading.Thread(target=f1, args=(i,), name='mythread{}'.format(i)) 中的name參數,除此以外,Thread還有一下一些方法git
因爲線程是共享同一分內存的,因此若是操做同一份數據,很容易形成衝突,這時候就能夠爲線程加上一個鎖了,這裏咱們使用Rlock,而不使用Lock,由於Lock若是屢次獲取鎖的時候會出錯,而RLock容許在同一線程中被屢次acquire,可是須要用n次的release才能真正釋放所佔用的瑣,一個線程獲取了鎖在釋放以前,其餘線程只有等待。 github
import threading G = 1 lock = threading.RLock() def fun(): lock.acquire() # 獲取鎖 global G G += 2 print(G, threading.current_thread().name) lock.release() # 釋放鎖 return for i in range(10): t = threading.Thread(target=fun, name='t-{}'.format(i)) t.start() 3 t-0 5 t-1 7 t-2 9 t-3 11 t-4 13 t-5 15 t-6 17 t-7 19 t-8 21 t-9
Event是線程間通訊最間的機制之一,主要用於主線程控制其餘線程的執行,主要用過wait,clear,set,這三個方法來實現的的,下面來看一個簡單的例子,windows
import threading import time def f1(event): print('start:') event.wait() # 阻塞在,等待 set print('end:') if __name__ == '__main__': event_obj = threading.Event() for i in range(5): t = threading.Thread(target=f1, args=(event_obj,)) t.start() event_obj.clear() # 清除標誌位 inp = input('>>>>:') if inp == 'true': event_obj.set() # 設置標誌位
能夠簡單的理解爲一種先進先出的數據結構,好比用於生產者消費者模型,或者用於寫線程池,以及前面寫select的時候,讀寫分離時候可用隊列存儲數據等等,之後用到隊列的地方不少,所以對於隊列的用法要熟練掌握。下面首先來看一下隊列提供了哪些用法數組
q = queue.Queue(maxsize=0) # 構造一個先進顯出隊列,maxsize指定隊列長度,爲0時,表示隊列長度無限制。 q.join() # 等到隊列爲kong的時候,在執行別的操做 q.qsize() # 返回隊列的大小 (不可靠) q.empty() # 當隊列爲空的時候,返回True 不然返回False (不可靠) q.full() # 當隊列滿的時候,返回True,不然返回False (不可靠) q.put(item, block=True, timeout=None) # 將item放入Queue尾部,item必須存在,參數block默認爲True,表示當隊列滿時,會等待 # 爲False時爲非阻塞,此時若是隊列已滿,會引起queue.Full 異常。 可選參數timeout,表示會阻塞設置的時間, # 若是在阻塞時間裏 隊列仍是沒法放入,則引起 queue.Full 異常 q.get(block=True, timeout=None) # 移除並返回隊列頭部的一個值,可選參數block默認爲True,表示獲取值的時候,若是隊列爲空,則阻塞 # 阻塞的話若此時隊列爲空,則引起queue.Empty異常。 可選參數timeout,表示會阻塞設置的時間, q.get_nowait() # 等效於 get(item,block=False)
下面用代碼來簡單的演示下,消費者生成者模型,只是簡單的演示下。數據結構
message = queue.Queue(10) def product(num): for i in range(num): message.put(i) print('將{}添加到隊列中'.format(i)) time.sleep(random.randrange(0, 1)) def consume(num): count = 0 while count<num: i = message.get() print('將{}從隊列取出'.format(i)) time.sleep(random.randrange(1, 2)) count += 1 t1 = threading.Thread(target=product, args=(10, )) t1.start() t2 = threading.Thread(target=consume, args=(10, )) t2.start()
線程的上一級就是進程,進程可包含不少線程,進程和線程的區別是進程間的數據不共享,多進程也能夠用來處理多任務,不過多進程很消耗資源,計算型的任務最好交給多進程來處理,IO密集型最好交給多線程來處理,此外進程的數量應該和cpu的核心說保持一致。多線程
在windows中不能用fork來建立多進程,所以只能導入multiprocessing,來模擬多進程,下面首先來看一下怎麼建立進程,你們能夠先猜一下下面的結果是什麼併發
l = [] def f(i): l.append(i) print('hi', l) if __name__ == '__main__': for i in range(10): p = multiprocessing.Process(target=f, args=(i,)) # 數據不共享,建立10份 l列表 p.start()
進程間的數據是不共享的,可是我若是非要數據共享了,那麼就須要用其餘方式了
def f(a, b): a.value = 3.111 for i in range(len(b)): b[i] += 100 if __name__ == '__main__': num = Value('f', 3.333) # 相似C語言中的 浮點型數 l = Array('i', range(10)) # 相似C語言中的整形數組,長度爲10 print(num.value) print(l[:]) p = Process(target=f, args=(num, l)) p.start() p.join() print(num.value) # 你們本身運行一下,看下兩次打印結果是否同樣 print(l[:])
方式一,使用的都是C語言中的數據結構,若是你們對c不熟悉的話,用起來比較麻煩,方式2就能夠支持python自帶的數據,下面來看一下
from multiprocessing import Process,Manager def Foo(dic, i): dic[i] = 100 + i print(dic.values()) if __name__ == '__main__': manage = Manager() dic = manage.dict() for i in range(2): p = Process(target=Foo, args=(dic, i)) p.start() p.join()
實際應用中,並非每次執行任務的時候,都去建立多進程,而是維護了一個進程池,每次執行的時候,都去進程池取一個,若是進程池裏面的進程取光了,就會阻塞在那裏,直到進程池中有可用進程爲止。首先來看一下進程池提供了哪些方法
apply(func[, args[, kwds]]) :使用arg和kwds參數調用func函數,結果返回前會一直阻塞,因爲這個緣由,apply_async()更適合併發執行,另外,func函數僅被pool中的一個進程運行。
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一個變體,會返回一個結果對象。若是callback被指定,那麼callback能夠接收一個參數而後被調用,當結果準備好回調時會調用callback,調用失敗時,則用error_callback替換callback。 Callbacks應被當即完成,不然處理結果的線程會被阻塞。
close() : 等待任務完成後在中止工做進程,阻止更多的任務提交到pool,待任務完成後,工做進程會退出。
terminate() : 無論任務是否完成,當即中止工做進程。在對pool對象進程垃圾回收的時候,會當即調用terminate()。
join() : 等待工做線程的退出,在調用join()前,必須調用close() or terminate()。這樣是由於被終止的進程須要被父進程調用wait(join等價與wait,不然進程會成爲殭屍進程。
下面來簡單的看一下代碼怎麼用的
from multiprocessing import Pool import time def f1(i): time.sleep(1) # print(i) return i def cb(i): print(i) if __name__ == '__main__': poo = Pool(5) for i in range(20): # poo.apply(func=f1, args=(i,)) # 串行執行,排隊執行 有join poo.apply_async(func=f1, args=(i,), callback=cb) # 併發執行 主進程不等子進程,無join print('**********') poo.close() poo.join()
對於前面的進程池,python自帶了一個模塊Pool供咱們使用,可是對於線程池,則沒有提供,所以須要咱們本身寫,本身寫的話,就須要用到隊列,下面咱們來看一下本身怎麼實現一個線程池,首先寫一個最簡單的版本。
import threading import time import queue class ThreadPool: def __init__(self, max_num=20): self.queue = queue.Queue(max_num) for i in range(max_num): self.add() def add(self): self.queue.put(threading.Thread) def get(self): return self.queue.get() def f(tp, i): time.sleep(1) print(i) tp.add() p = ThreadPool(10) for i in range(20): thread = p.get() t = thread(target=f, args=(p, i)) t.start()
上述代碼寫了一個線程池類,基本實現了線程池的功能,可是有不少缺點,沒有實現回掉函數,每次執行任務的時候,任務處理函數每次執行完都須要自動執行對象的add方法,將線程對象添加到隊列中去,並且類初始化的時候,一次性將全部的線程類都添加到隊列中去了,總之上面的線程池雖然實現簡單,可是實際上卻有不少問題,下面來看一個真正意義上的線程池。
在寫代碼以前,咱們先來看一下該怎麼設計這樣一個線程池,上面的線程池,咱們的隊列中,存的是線程類,咱們每處理一個任務都實例化一個線程,而後執行完了以後,該線程就被丟棄了,這樣有點不合適。咱們此次設計的時候,
下面來一下代碼是怎麼實現的
import threading import queue import time import contextlib class ThreadingPool: def __init__(self, num): self.max = num self.terminal = False self.q = queue.Queue() self.generate_list = [] # 保存已經生成的線程 self.free_list = [] # 保存那些已經完成任務的線程 def run(self, func, args=None, callbk=None): self.q.put((func, args, callbk)) # 將任務信息做爲一個元祖放到隊列中去 if len(self.free_list) == 0 and len(self.generate_list) < self.max: self.threadstart() def threadstart(self): t = threading.Thread(target=self.handel) t.start() def handel(self): current_thread = threading.current_thread() self.generate_list.append(current_thread) event = self.q.get() while event != 'stop': func, args, callbk = event flag = True try: ret = func(*args) except Exception as e: flag = False ret = e if callbk is not None: try: callbk(ret) except Exception as e: pass if not self.terminal: with self.auto_append_remove(current_thread): event = self.q.get() else: event = 'stop' else: self.generate_list.remove(current_thread) def terminate(self): self.terminal = True while self.generate_list: self.q.put('stop') self.q.empty() def close(self): num = len(self.generate_list) while num: self.q.put('stop') num -= 1 @contextlib.contextmanager def auto_append_remove(self, thread): self.free_list.append(thread) try: yield finally: self.free_list.remove(thread) def f(i): # time.sleep(1) return i def f1(i): print(i) p = ThreadingPool(5) for i in range(20): p.run(func=f, args=(i,), callbk=f1) p.close()
協程,又稱微線程,協程執行看起來有點像多線程,可是事實上協程就是隻有一個線程,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯,此外由於只有一個線程,不須要多線程的鎖機制,也不存在同時寫變量衝突。協程的適用場景:當程序中存在大量不須要CPU的操做時(IO)下面來看一個利用協程例子
from gevent import monkey import gevent import requests # 把標準庫中的thread/socket等給替換掉 # 這樣咱們在後面使用socket的時候能夠跟日常同樣使用,無需修改任何代碼,可是它變成非阻塞的了. monkey.patch_all() # 猴子補丁 def f(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])
上面的例子,利用協程,一個線程完成全部的請求,發出請求的時候,不會等待回覆,而是一次性將全部的請求都發出求,收到一個回覆就處理一個回覆,這樣一個線程就解決了全部的事情,效率極高。
這篇博文是pyton基礎知識的最後一篇,後面會講的博文會講開始講前端的知識,這裏附上目錄http://www.cnblogs.com/Wxtrkbc/p/5606048.html,之後會繼續更新的,