瞭解相關概念以前,咱們先來看一張圖python
進程:git
線程:程序員
經過對比,咱們能夠得出:github
Threading用於提供線程相關的操做,線程是應用程序中工做的最小單元算法
import threading import time def show(arg): time.sleep(1) print('thread'+str(arg)) for i in range(10): t = threading.Thread(target=show,args=(i,)) t.start() print('main thread stop')
上述代碼建立了10個前臺線程,而後控制器就交給了cpu,cpu根據指定算法進行調度,分片執行指令編程
更多方法:數組
若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止安全
若是是前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止數據結構
自定義線程類:多線程
import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self): #定義每一個線程要運行的函數 print('running on number:%s'%self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
不過咱們有個疑問啦,在定義的這個類中,根本沒涉及調用run函數,這是怎麼實現的呢??
那咱們去看下源碼就明白了,實際上是start方法再起做用
因此start方法在底層是調用了run方法
因爲線程之間是進行隨機調度,因此不可避免的存在多個線程同時修改同一條數據,從而可能會出現髒數據,因此出現了線程鎖,同一時刻只容許一個線程執行操做。
未上鎖的:
import threading import time num = 0 def show(arg): global num time.sleep(1) num += 1 print(num) for i in range(10): t = threading.Thread(target=show,args=(i,)) t.start() print('main thread stop')
上鎖的:
import threading import time num = 0 lock = threading.RLock() def func(): lock.acquire() global num num += 1 time.sleep(1) print(num) lock.release() for i in range(10): t = threading.Thread(target=func) t.start()
咱們會發現,這兩段代碼輸出的結果都同樣,並無產生髒數據,可是細心你會發現:打印結果的過程是不一樣的,未加鎖的--能夠說是幾乎同時打印結果,而加了鎖的,則是一個一個打印,這就是鎖在起做用,對同一資源,在一個點上只能執行一個線程。
Python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法:set、wait、clear
事件處理的機制:全局定義了一個‘flag’,若是‘flag’值爲False,那麼當程序執行event.wait方法時就會阻塞,若是‘Flag’值爲True,那麼event.wait方法時便再也不阻塞
import threading import time def do(event): print('start') event.wait() print('execute') event_obj = threading.Event() for i in range(10): t = threading.Thread(target=do,args=(event_obj,)) t.start() event_obj.clear() # time.sleep(2) inp = input('input:') if inp == 'true': event_obj.set()
import threading import time event = threading.Event() def func(): print('%s wait for event...'%threading.currentThread().getName()) #等待--阻塞 event.wait() #收到事件後進入運行狀態 print('%s recv event.'%threading.currentThread().getName()) t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t1.start() t2.start() time.sleep(2) #發出事件通知 print('MainThread set event.') event.set()
Semaphore是同時容許必定數量的線程更改數據,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。
import threading,time def run(n): semaphore.acquire() time.sleep(1) print('run the thread:%s'%n) semaphore.release() if __name__ == '__main__': num = 0 #最多容許5個線程同時運行 semaphore = threading.BoundedSemaphore(5) for i in range(20): t = threading.Thread(target=run,args=(i,)) t.start()
使用線程等待,只有知足某條件時,才釋放n個線程
import threading import time def consumer(cond): with cond: print("consumer before wait") cond.wait() print("consumer after wait") def producer(cond): with cond: print("producer before notifyAll") cond.notifyAll() print("producer after notifyAll") condition = threading.Condition() c1 = threading.Thread(name="c1", target=consumer, args=(condition,)) c2 = threading.Thread(name="c2", target=consumer, args=(condition,)) p = threading.Thread(name="p", target=producer, args=(condition,)) c1.start() time.sleep(2) c2.start() time.sleep(2) p.start() # consumer()線程要等待producer()設置了Condition以後才能繼續。
import threading def run(n): con.acquire() con.wait() print('run the thread:%s'%n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run,args=(i,)) t.start() while True: inp = input('>>>') if inp == 'q': break con.acquire() con.notify(int(inp)) con.release()
from multiprocessing import Process import time def foo(i): print('say hi',i) if __name__ == '__main__': for i in range(10): p = Process(target=foo,args=(i,)) p.start()
咱們能夠看到,進程和線程代碼實現幾乎是相同的,對於進程而言,模塊是multiprocessing,另外,在建立進程前,加了一個__name__的驗證,這是因爲操做系統的緣由,反正你只要加上了就能夠了。
另外,咱們已經提到過,建立進程就等同搭建了一個進程環境,消耗內存是不小的(相對線程)。
因爲進程建立時,數據是各持有一份的,默認狀況下進程間是沒法共享數據的。
from multiprocessing import Process import time li = [] def foo(i): li.append(i) print('say hi',li) if __name__ == '__main__': for i in range(10): p = Process(target=foo,args=(i,)) p.start() print('ending',li) 結果爲: say hi [1] say hi [0] say hi [2] say hi [3] say hi [4] say hi [5] say hi [6] say hi [7] ending [] say hi [8] say hi [9]
從結果裏,咱們也知道,進程間數據是不共享的,列表元素沒有實現累加。
不過,若是你硬要實現共享的話,辦法仍是有的,請往下看:
方法一:引用數組Array
from multiprocessing import Process,Array def Foo(temp,i): temp[i] = 100+i for item in temp: print(i,'----->',item) if __name__ == '__main__': temp = Array('i', [11, 22, 33, 44]) for i in range(2): p = Process(target=Foo,args=(temp,i,)) p.start()
方法二:manage.dict()
from multiprocessing import Process,Manager def Foo(dic,i): dic[i] = 100 + i print(dic.values()) if __name__ == '__main__': manage = Manager() dic = manage.dict() for i in range(2): p = Process(target=Foo,args=(dic,i,)) p.start() p.join()
方法三:multiprocessing.Queue
from multiprocessing import Process, Queue def f(i,q): print(i,q.get()) if __name__ == '__main__': q = Queue() q.put("h1") q.put("h2") q.put("h3") for i in range(10): p = Process(target=f, args=(i,q,)) p.start()
當建立進程時(非使用時),共享數據會被拿到子進程中,當進程中執行完畢後,再賦值給原值,另外涉及數據共享就一定存在同一份數據被多個進程同時修改,因此在multiprocessing模塊裏也也提供了RLock類。
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可以使用的進程,那麼程序就會等待,直到進程池中有可用進程爲止。
apply(func[, args[, kwds]]) :使用arg和kwds參數調用func函數,結果返回前會一直阻塞,因爲這個緣由,apply_async()更適合併發執行,另外,func函數僅被pool中的一個進程運行。
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一個變體,會返回一個結果對象。若是callback被指定,那麼callback能夠接收一個參數而後被調用,當結果準備好回調時 會調用callback,調用失敗時,則用error_callback替換callback。 Callbacks應被當即完成,不然處理結果的線程會被阻塞。
close() : 阻止更多的任務提交到pool,待任務完成後,工做進程會退出。
terminate() : 無論任務是否完成,當即中止工做進程。在對pool對象進程垃圾回收的時候,會當即調用terminate()。
join() : wait工做線程的退出,在調用join()前,必須調用close() or terminate()。這樣是由於被終止的進程須要被父進程調用wait(join等價與wait),不然進程會成爲殭屍進程
from multiprocessing import Process,Pool import time def Foo(i): time.sleep(2) return i+100 def Bar(arg): print(arg) if __name__ == '__main__': pool = Pool(5) for i in range(10): pool.apply_async(func=Foo,args=(i,),callback=Bar) print('end') pool.close() pool.join() #進程池中進程執行完畢後再關閉 print('really end')
適用於多線程編程的先進先出數據結構,能夠用來安全的傳遞多線程信息。
import threading import queue que = queue.Queue(10) def s(i): que.put(i) def x(i): g = que.get(i) print('get',g) for i in range(1,13): t = threading.Thread(target=s,args=(i,)) t.start() for i in range(1,11): t = threading.Thread(target=x,args=(i,)) t.start() print('size',que.qsize()) 結果爲: get 1 get 2 get 3 get 4 get 5 get 6 get 7 get 8 get 9 get 10 size
線程和進程的操做是由程序觸發系統接口,最後的執行者是系統;而協程的操做則是程序員
協程存在的意義:對於多線程應用,cpu經過切片來切換線程間的執行,線程切換時須要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼執行順序。
協程的適用場景:當程序中存在大量不須要cpu的操做時(IO),適用於協程。例如:爬蟲
from greenlet import greenlet def test1(): print 12 gr2.switch() print 34 gr2.switch() def test2(): print 56 gr1.switch() print 78 gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
import gevent def foo(): print('Running in foo') gevent.sleep(0) print('Explicit context switch to foo again') def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
1 from gevent import monkey; monkey.patch_all() 2 import gevent 3 import urllib2 4 5 def f(url): 6 print('GET: %s' % url) 7 resp = urllib2.urlopen(url) 8 data = resp.read() 9 print('%d bytes received from %s.' % (len(data), url)) 10 11 gevent.joinall([ 12 gevent.spawn(f, 'https://www.python.org/'), 13 gevent.spawn(f, 'https://www.yahoo.com/'), 14 gevent.spawn(f, 'https://github.com/'), 15 ])
歡迎你們對個人博客內容提出質疑和提問!謝謝
筆者:拍省先生