#!/usr/bin/env python # __Author__: "wanyongzhen" # Date: 2017/5/9 import time import threading def addNum(): global num # 在每一個線程中都獲取這個全局變量 Lock.acquire() # 每次只能有一個線程在運行Lock塊的代碼 temp = num time.sleep(0.01) num = temp - 1 # 對此公共變量進行-1操做 Lock.release() num = 100 # 設定一個共享變量 Lock = threading.Lock() # 定義同步鎖 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待全部線程執行完畢 t.join() print('Result: ', num)
#!/usr/bin/env python # __Author__: "wanyongzhen" # Date: 2017/5/9 import threading import time mutexA = threading.Lock() mutexB = threading.Lock() class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): self.fun1() self.fun2() def fun1(self): mutexA.acquire() # 若是鎖被佔用,則阻塞在這裏,等待鎖的釋放 print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutexB.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) mutexB.release() mutexA.release() def fun2(self): mutexB.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) time.sleep(0.2) mutexA.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutexA.release() mutexB.release() if __name__ == "__main__": print("start---------------------------%s"%time.time()) for i in range(0, 10): my_thread = MyThread() my_thread.start()
#!/usr/bin/env python # __Author__: "wanyongzhen" # Date: 2017/5/9 import threading import time RLock = threading.RLock() # 得到遞歸鎖,能夠acquire屢次 class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): self.fun1() self.fun2() def fun1(self): RLock.acquire() # acquire計數加1 print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) RLock.acquire() # acquire計數加1 print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) RLock.release() # acquire計數減1 RLock.release() # acquire計數減1 計數爲0後其餘線程就能夠競爭這把鎖 def fun2(self): RLock.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) time.sleep(0.2) RLock.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) RLock.release() RLock.release() if __name__ == "__main__": print("start---------------------------%s"%time.time()) for i in range(0, 10): my_thread = MyThread() my_thread.start()
線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就 會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行python
#!/usr/bin/env python # __Author__: "wanyongzhen" # Date: 2017/5/9 import threading import time import logging logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',) def worker(event): logging.debug('Waiting for redis ready...') event.wait() logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime()) time.sleep(1) def main(): readis_ready = threading.Event() # 完成線程間的通訊 至關於標誌位 t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1') t1.start() t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2') t2.start() logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event') time.sleep(3) # simulate the check progress readis_ready.set() if __name__=="__main__": main() # event.isSet():返回event的狀態值; # event.wait():若是 event.isSet()==False將阻塞線程; # event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; # event.clear():恢復event的狀態值爲False。
#!/usr/bin/env python # __Author__: "wanyongzhen" # Date: 2017/5/9 from multiprocessing import Process import os import time def info(name): print("name:",name) print('parent process:', os.getppid()) print('process id:', os.getpid()) print("------------------") time.sleep(1) def foo(name): info(name) if __name__ == '__main__': info('main process line') p1 = Process(target=info, args=('alvin',)) p2 = Process(target=foo, args=('egon',)) p1.start() p2.start() p1.join() p2.join() print("ending")
#!/usr/bin/env python # __Author__: "wanyongzhen" # Date: 2017/5/9 import time """ 傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,經過鎖機制控制隊列和等待,但一不當心就可能死鎖。 若是改用協程,生產者生產消息後,直接經過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高。 """ # 注意到consumer函數是一個generator(生成器): # 任何包含yield關鍵字的函數都會自動成爲生成器(generator)對象 def consumer(): r = '' while True: # 三、consumer經過yield拿到消息,處理,又經過yield把結果傳回; # yield指令具備return關鍵字的做用。而後函數的堆棧會自動凍結(freeze)在這一行。 # 當函數調用者的下一次利用next()或generator.send()或for-in來再次調用該函數時, # 就會從yield代碼的下一行開始,繼續執行,再返回下一次迭代結果。經過這種方式,迭代器能夠實現無限序列和惰性求值。 n = yield r if not n: return print('[CONSUMER] ←← Consuming %s...' % n) time.sleep(1) r = '200 OK' def produce(c): # 一、首先調用c.next()啓動生成器 next(c) n = 0 while n < 5: n = n + 1 print('[PRODUCER] →→ Producing %s...' % n) # 二、而後,一旦生產了東西,經過c.send(n)切換到consumer執行; cr = c.send(n) # 四、produce拿到consumer處理的結果,繼續生產下一條消息; print('[PRODUCER] Consumer return: %s' % cr) # 五、produce決定不生產了,經過c.close()關閉consumer,整個過程結束。 c.close() if __name__=='__main__': # 六、整個流程無鎖,由一個線程執行,produce和consumer協做完成任務,因此稱爲「協程」,而非線程的搶佔式多任務。 c = consumer() produce(c) ''' result: [PRODUCER] →→ Producing 1... [CONSUMER] ←← Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 2... [CONSUMER] ←← Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 3... [CONSUMER] ←← Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 4... [CONSUMER] ←← Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 5... [CONSUMER] ←← Consuming 5... [PRODUCER] Consumer return: 200 OK '''
greelet機制的主要思想是:生成器函數或者協程函數中的yield語句掛起函數的執行,直到稍後使用next()或send()操做進行恢復爲止。可使用一個調度器循環在一組生成器函數之間協做多個任務。greentlet是python中實現咱們所謂的"Coroutine(協程)"的一個基礎庫.git
#!/usr/bin/env python # __Author__: "wanyongzhen" # Date: 2017/5/9 from greenlet import greenlet def test1(): print (12) gr2.switch() print (34) gr2.switch() def test2(): print (56) gr1.switch() print (78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
Python經過yield提供了對協程的基本支持,可是不徹底。而第三方的gevent爲Python提供了比較完善的協程支持。github
gevent是第三方庫,經過greenlet實現協程,其基本思想是:redis
當一個greenlet遇到IO操做時,好比訪問網絡,就自動切換到其餘的greenlet,等到IO操做完成,再在適當的時候切換回來繼續執行。因爲IO操做很是耗時,常常使程序處於等待狀態,有了gevent爲咱們自動切換協程,就保證總有greenlet在運行,而不是等待IO。網絡
因爲切換是在IO操做時自動完成,因此gevent須要修改Python自帶的一些標準庫,這一過程在啓動時經過monkey patch完成:app
#!/usr/bin/env python # __Author__: "wanyongzhen" # Date: 2017/5/9 import gevent import time def foo(): print("running in foo") gevent.sleep(2) print("switch to foo again") def bar(): print("switch to bar") gevent.sleep(5) print("switch to bar again") start=time.time() gevent.joinall( [gevent.spawn(foo), gevent.spawn(bar)] ) print(time.time()-start)
固然,實際代碼裏,咱們不會用gevent.sleep()去切換協程,而是在執行到IO操做時,gevent自動切換,代碼以下:函數
#!/usr/bin/env python # __Author__: "wanyongzhen" # Date: 2017/5/9 from gevent import monkey monkey.patch_all() import gevent from urllib import request import time import ssl ssl._create_default_https_context = ssl._create_unverified_context # 解決Mac上的報錯 def f(url): print('GET: %s' % url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) start=time.time() gevent.joinall([ gevent.spawn(f, 'https://itk.org/'), gevent.spawn(f, 'https://www.github.com/'), gevent.spawn(f, 'https://zhihu.com/'), ]) # f('https://itk.org/') # f('https://www.github.com/') # f('https://zhihu.com/') print(time.time()-start)