python線程、協程

線程python

Threading用於提供線程相關的操做,線程是應用程序中工做的最小單元。
git

更多方法:程序員

  • start            線程準備就緒,等待CPU調度github

  • setName      爲線程設置名稱編程

  • getName      獲取線程名稱安全

  • setDaemon   設置爲後臺線程或前臺線程(默認)
                        若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止
                        若是是前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完後,等待前臺線程也執行完成後,中止
    服務器

  • join              逐個執行每一個線程,執行完畢後繼續往下執行,該方法使得多線程變得無心義網絡

  • run              線程被cpu調度後執行Thread類對象的run方法數據結構

直接調用多線程


#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
 
def show(arg):
   time.sleep(1)
   print('thread'+str(arg))
t_list = []  
for i in range(10):
   t = threading.Thread(target=show, args=(i,))
   t.start()
   t_list.append(t)
for i in t_list:
   t.join()        #等待全部線程結束結束程序
print('main thread stop')

繼承式調用

import threading
import time


class MyThread(threading.Thread):
   def __init__(self,num):
       threading.Thread.__init__(self)
       self.num = num

   def run(self):#定義每一個線程要運行的函數

       
print("running on number:%s" %self.num)

       time.sleep(3)

if __name__ == '__main__':

   t1 = MyThread(1)
   t2 = MyThread(2)
   t1.start()
   t2.start()

守護線程

import time
import threading

def run(n):

   print('[%s]------running----\n' % n)
   time.sleep(2)
   print('--done--')

def main():
   for i in range(5):
       t = threading.Thread(target=run,args=[i,])
       #time.sleep(1)
       
t.start()
       t.join(1)
       print('starting thread', t.getName())
m = threading.Thread(target=main,args=[])
m.setDaemon(True) #將主線程設置爲Daemon線程,它退出時,其它子線程會同時退出,不論是否執行完任務
m.start()
#m.join(timeout=2)
print("---main thread done----")

線程鎖

若是同一時間有多個線程操做同一數據可能會出現混亂現象,因此須要加線程鎖保證同一時間只有一個線程操做這一數據,保證數據一致性

加鎖:lock = threading.RLock()    解鎖:lock.release()

      lock.acquire()

#!/usr/bin/env python
#coding:utf-8

import threading
import time

gl_num = 0

lock = threading.Lock()

def Func():
   lock.acquire()   #修改數據前加鎖
   
global gl_num
   gl_num +=1      #對此公共變量進行-1操做
   
time.sleep(1)
   print gl_num
   lock.release()  #修改後釋放

for i in range(10):
   t = threading.Thread(target=Func)
   t.start()


正常來說,這個num結果應該是0, 但在python 2.7上多運行幾回,會發現,最後打印出來的num結果不老是0,爲何每次運行的結果不同呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操做, 因爲2個線程是併發同時運行的,因此2個線程頗有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量後,結果就都是99。那怎麼辦呢? 很簡單,每一個線程在要修改公共數據時,爲了不本身在還沒改完的時候別人也來修改此數據,能夠給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉後才能再訪問此數據。 

RLock(遞歸鎖)

說白了就是在一個大鎖中還要再包含子鎖

import threading,time

def run1():
   print("grab the first part data")
   lock.acquire()
   global num
   num +=1
   
lock.release()
   return num
def run2():
   print("grab the second part data")
   lock.acquire()
   global  num2
   num2+=1
   
lock.release()
   return num2
def run3():
   lock.acquire()
   res = run1()
   print('--------between run1 and run2-----')
   res2 = run2()
   lock.release()
   print(res,res2)


if __name__ == '__main__':

   num,num2 = 0,0
   
lock = threading.RLock()
   for i in range(10):
       t = threading.Thread(target=run3)
       t.start()

while threading.active_count() != 1:
   print('active num:',threading.active_count())
else:
   print('----all threads done---')
   print(num,num2)

Semaphore(信號量)

互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。

import threading,time

def run(n):
   semaphore.acquire()
   time.sleep(1)
   print("run the thread: %s\n" %n)
   semaphore.release()

if __name__ == '__main__':

   num= 0
   
semaphore  = threading.BoundedSemaphore(
5) #最多容許5個線程同時運行
   
for i in range(20):
       t = threading.Thread(
target=run,args=(i,))
       t.start()

while threading.active_count() != 1:
   
pass #print threading.active_count()
else:
   
print('----all threads done---')
   
print(num)

Events

python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。

事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

  • clear:將「Flag」設置爲False

  • set:將「Flag」設置爲True

import threading,time
def light():
   if not event.isSet():
       event.set() #綠燈狀態
   
count = 0
   
while True:
       if count < 10:
           print('\033[42;1m--green light on---\033[0m')
       elif count <13:
           print('\033[43;1m--yellow light on---\033[0m')
       elif count <20:
           if event.isSet():
               event.clear()
           print('\033[41;1m--red light on---\033[0m')
       else:
           count = 0
           
event.set() #打開綠燈
       
time.sleep(1)
       count +=1
def car(n):
   while 1:
       time.sleep(1)
       if  event.isSet(): #綠燈
           
print("car [%s] is running.." % n)
       else:
           print("car [%s] is waiting for the red light.." %n)
           event.wait()
if __name__ == '__main__':
   event = threading.Event()
   Light = threading.Thread(target=light)
   Light.start()
   for i in range(3):
       t = threading.Thread(target=car,args=(i,))
       t.start()

queue隊列 

queue.qsize() 返回隊列的大小 
queue.empty() 若是隊列爲空,返回True,反之False 
queue.full() 若是隊列滿了,返回True,反之False
queue.full 與 maxsize 大小對應 

queue.get([block[, timeout]])獲取隊列,timeout等待時間 
queue.get_nowait() 至關queue.get(False)
queue.put(itemblock=Truetimeout=None) 非阻塞寫入隊列,timeout等待時間 

queue.put_nowait(item) 至關queue.put(item, False)
queue.task_done() 在完成一項工做以後,queue.task_done()函數向任務已經完成的隊列發送一個信號
queue.join() 實際上意味着等到隊列爲空,再執行別的操做

  • class queue.Queue(maxsize=0) #先入先出

  • class queue.LifoQueue(maxsize=0) #last in fisrt out 

  • class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列(priority_number, data)

1.多線程採用的是分時複用技術,即不存在真正的多線程,cpu作的事是快速地切換線程,以達到相似同步運行的目的,由於高密集運算方面多線程是沒有用的,可是對於存在延遲的狀況(延遲IO,網絡等)多線程能夠大大減小等待時間,避免沒必要要的浪費。

2.原子操做:這件事情是不可再分的,如變量的賦值,不可能一個線程在賦值,到一半切到另一個線程工做去了……可是一些數據結構的操做,如棧的push什麼的,並不是是原子操做,好比要通過棧頂指針上移、賦值、計數器加1等等,在其中的任何一步中斷,切換到另外一線程再操做這個棧時,就會產生嚴重的問題,所以要使用鎖來避免這樣的狀況。好比加鎖後的push操做就能夠認爲是原子的了……

3.阻塞:所謂的阻塞,就是這個線程等待,一直到能夠運行爲止。最簡單的例子就是一線程原子操做下,其它線程都是阻塞狀態,這是微觀的狀況。對於宏觀的狀況,好比服務器等待用戶鏈接,若是始終沒有鏈接,那麼這個線程就在阻塞狀態。同理,最簡單的input語句,在等待輸入時也是阻塞狀態。

4.在建立線程後,執行p.start(),這個函數是非阻塞的,即主線程會繼續執行之後的指令,至關於主線程和子線程都並行地執行。因此非阻塞的函數馬上返回值的~

對於資源,加鎖是個重要的環節。由於python原生的list,dict等,都是not thread safe的。而Queue,是線程安全的,所以在知足使用條件下,建議使用隊列。

隊列適用於 「生產者-消費者」模型。雙方不管數量多少,產生速度有何差別,均可以使用queue。

import time,queue,threading
def consumer(n):
   while True:
       print('consumer [%s] get task: [%s]' % (n,q.get()))
       time.sleep(1)
       q.task_done()
def producer(n):
   count = 1
   
while True:
       print('producer [%s] produced a new task :%s ' % (n,count))
       q.put(count)
       count +=1
       
q.join()
       print('task finished')
q = queue.Queue()
c1 = threading.Thread(target=consumer,args=[1,])
c2 = threading.Thread(target=consumer,args=[2,])
c3 = threading.Thread(target=consumer,args=[3,])

p1 = threading.Thread(target=producer,args=['user1',])
p2 = threading.Thread(target=producer,args=['user2',])
p3 = threading.Thread(target=producer,args=['user3',])

c1.start()
p1.start()

在上面的例子中,Producer在隨機的時間內生產一個「產品」,放入隊列中。Consumer發現隊列中有了「產品」,就去消費它。本例中,因爲Producer生產的速度快於Consumer消費的速度,因此每每Producer生產好幾個「產品」後,Consumer才消費一個產品。

Queue模塊實現了一個支持多producer和多consumer的FIFO隊列。當共享信息須要安全的在多線程之間交換時,Queue很是有用。Queue的默認長度是無限的,可是能夠設置其構造函數的maxsize參數來設定其長度。Queue的put方法在隊尾插入,該方法的原型是:

put( item[, block[, timeout]])

若是可選參數block爲true而且timeout爲None(缺省值),線程被block,直到隊列空出一個數據單元。若是timeout大於0,在timeout的時間內,仍然沒有可用的數據單元,Full exception被拋出。反之,若是block參數爲false(忽略timeout參數),item被當即加入到空閒數據單元中,若是沒有空閒數據單元,Full exception被拋出。

Queue的get方法是從隊首取數據,其參數和put方法同樣。若是block參數爲true且timeout爲None(缺省值),線程被block,直到隊列中有數據。若是timeout大於0,在timeout時間內,仍然沒有可取數據,Empty exception被拋出。反之,若是block參數爲false(忽略timeout參數),隊列中的數據被當即取出。若是此時沒有可取數據,Empty exception也會被拋出。




協程


線程和進程的操做是由程序觸發系統接口,最後的執行者是系統;協程的操做則是程序員。

協程存在的意義:對於多線程應用,CPU經過切片的方式來切換線程間的執行,線程切換時須要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:

協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

協程的適用場景:當程序中存在大量不須要CPU的操做時(IO),適用於協程。

協程的好處:

  • 無需線程上下文切換的開銷

  • 無需原子操做鎖定及同步的開銷

  • 方便切換控制流,簡化編程模型

  • 高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。

缺點:

  • 沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。

  • 進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序



greenlet


from greenlet import greenlet

def test1():
   
print(12)
   
gr2.switch()
   
print(34)
   
gr2.switch()

def test2():
   
print(56)
   
gr1.switch()
   
print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()


gevent


gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

import gevent

def foo():
   
print('Running in foo')
   
gevent.sleep(0)
   
print('Explicit context switch to foo again')

def bar():
   
print('Explicit context to bar')
   
gevent.sleep(0)
   
print('Implicit context switch back to bar')

gevent.joinall([
   
gevent.spawn(foo),
   gevent.spawn(bar),
])

遇到IO操做自動切換:


from gevent import monkey; monkey.patch_all()
import gevent
import urllib2

def f(url):
   
print('GET: %s' % url)
   
resp = urllib2.urlopen(url)
   
data = resp.read()
   
print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
       
gevent.spawn(f, 'https://www.python.org/'),
       gevent.spawn(f, 'https://www.yahoo.com/'),
       gevent.spawn(f, 'https://github.com/'),
])
相關文章
相關標籤/搜索