Python之路——線程池

1 線程基礎

  1.1 線程狀態html

  線程有5種狀態,狀態轉換的過程以下圖所示:python

  1.2 線程同步——鎖算法

      多線程的優點在於能夠同時運行多個任務(至少感受起來是這樣,其實Python中是僞多線程)。可是當線程須要共享數據時,可能存在數據不一樣步的問題。考慮這樣一種狀況:一個列表裏全部元素都是0,線程"set"從後向前把全部元素改爲1,而線程"print"負責從前日後讀取列表並打印。那麼,可能線程"set"開始改的時候,線程"print"便來打印列表了,輸出就成了一半0一半1,這就是數據的不一樣步。爲了不這種狀況,引入了鎖的概念。數組

    鎖有兩種狀態—鎖定和未鎖定。每當一個線程好比"set"要訪問共享數據時,必須先得到鎖定;若是已經有別的線程好比"print"得到鎖定了,那麼就讓線程"set"暫停,也就是同步阻塞;等到線程"print"訪問完畢,釋放鎖之後,再讓線程"set"繼續。通過這樣的處理,打印列表時要麼所有輸出0, 要麼所有輸出1,不會再出現一半0一半1的尷尬場面。服務器

    線程與鎖的交互以下圖所示:多線程

  1.3 線程通訊(條件變量)併發

    然而還有另一種尷尬的狀況:列表並非一開始就有的;而是經過線程"create"建立 的。若是"set"或者"print" 在"create"尚未運行的時候就訪問列表,將會出現一個異常。使用鎖能夠解決這個問題,可是"set"和"print"將須要一個無限循環——他們不知道"create"何時會運行,讓"create"在運行後通知"set"和"print"顯然是一個更好的解決方案。因而,引入了條件變量。app

    條件變量容許線程好比"set"和"print"在條件不知足的時候(列表爲None時)等待,等到條件知足的時候(列表已經建立)發出一個通知,告訴"set" 和"print"條件已經有了,大家該起牀幹活了;而後"set"和"print"才繼續運行。dom

   線程與條件變量的交互以下圖所示:ide

   1.4 線程運行和阻塞的狀態轉換

        最後看看線程運行和阻塞狀態的轉換。

     阻塞有三種狀況:
     同步阻塞是指處於競爭鎖定的狀態,線程請求鎖定時將進入這個狀態,一旦成功得到鎖定又恢復到運行狀態;
     等待阻塞是指等待其餘線程通知的狀態,線程得到條件鎖定後,調用「等待」將進入這個狀態,一旦其餘線程發出通知,線程將進入同步阻塞狀態,再次競爭條件鎖定;
     而其餘阻塞是指調用time.sleep()、anotherthread.join()或等待IO時的阻塞,這個狀態下線程不會釋放已得到的鎖定。

2 多線程

     爲充分利用cpu資源,減小資源浪費,Python中也提供了多線程技術,但因爲存在GIL的關係,實際上同一時刻只存在一個線程在執行,即Python中不存在真正意義上的多線程,可是多線程技術仍是提升了CPU資源的利用率,因此仍是頗有價值的。

3 線程池

    咱們知道系統處理任務時,須要爲每一個請求建立和銷燬對象。當有大量併發任務須要處理時,再使用傳統的多線程就會形成大量的資源建立銷燬致使服務器效率的降低。這時候,線程池就派上用場了。線程池技術爲線程建立、銷燬的開銷問題和系統資源不足問題提供了很好的解決方案。

   3.1 優勢:

   (1)能夠控制產生線程的數量。經過預先建立必定數量的工做線程並限制其數量,控制線程對象的內存消耗。

   (2)下降系統開銷和資源消耗。經過對多個請求重用線程,線程建立、銷燬的開銷被分攤到了多個請求上。另外經過限制線程數量,下降虛擬機在垃圾回收方面的開銷。

   (3)提升系統響應速度。線程事先已被建立,請求到達時可直接進行處理,消除了因線程建立所帶來的延遲,另外多個線程可併發處理。

   「池」的概念使得人們能夠定製必定量的資源,而後對這些資源進行復用,而不是頻繁的建立和銷燬。

   3.2 注意事項

    雖然線程池是構建多線程應用程序的強大機制,但使用它並非沒有風險的。在使用線程池時需注意線程池大小與性能的關係,注意併發風險、死鎖、資源不足和線程泄漏等問題。
    一、線程池大小。多線程應用並不是線程越多越好,須要根據系統運行的軟硬件環境以及應用自己的特色決定線程池的大小。
通常來講,若是代碼結構合理的話,線程數目與CPU 數量相適合便可。若是線程運行時可能出現阻塞現象,可相應增長池的大小;若有必要可採用自適應算法來動態調整線程池的大小,以提升CPU 的有效利用率和系統的總體性能。
    二、併發錯誤。多線程應用要特別注意併發錯誤,要從邏輯上保證程序的正確性,注意避免死鎖現象的發生。
    三、線程泄漏。這是線程池應用中一個嚴重的問題,當任務執行完畢而線程沒能返回池中就會發生線程泄漏現象。

  3.3 線程池設計的關鍵點

   一、取任務數與線程數的最小值來決定開啓線程的數量,即min(任務數,線程數);

    二、當前線程池中線程的狀態,即正在運行的線程數量和正在等待的線程數量;

    三、關閉線程。

 1 #!/usr/bin/env python
 2 #-*- coding:utf-8 -*-
 3 
 4 import Queue
 5 import threading
 6 import time
 7 
 8 '''
 9 要點:
10 一、利用Queue隊列特性,將建立的線程對象放入隊列中;
11 二、當執行任務時,從queue隊列中取線程執行任務;
12 三、當任務執行完畢後,線程放回線程池
13 '''
14 
15 #線程池類
16 class ThreadPool(object): 
17     def __init__(self,max_thread=20):  #默認最大線程數爲20
18         self.queue = Queue.Queue(max_thread) #建立隊列,大小爲max_thread
19         #把線程對象(線程類名)加入到隊列中,此時並無建立線程對象,只佔用很小內存
20         for i in xrange(max_thread):
21             self.queue.put(threading.Thread) #注意此處是類名,若是這裏self.queue.put(threading.Thread())(即把建立的對象放入隊列中),那麼每次都要在內存中開闢空間,這樣內存浪費很大
22             
23 
24     def get_thread(self):#從隊列裏獲取線程
25         return self.queue.get()
26 
27     def add_thread(self):#在隊列裏添加線程
28         self.queue.put(threading.Thread) #注意此處是類名
29 
30 pool = ThreadPool(10) #建立大小爲10的線程池
31 
32 #任務函數
33 def func(arg,p):
34     print arg
35     time.sleep(2)
36     p.add_thread() #當前線程執行完,在隊列裏增長一個線程
37 
38 for i in xrange(300):
39     #獲取隊列中的線程對象(此時線程對象還沒建立),默認queue.get(),若是隊列裏沒有線程就會阻塞等待。
40     thread = pool.get_thread() 
41     t = thread(target=func,args=(i,pool)) #此時才真正建立線程對象,並傳遞參數
42     t.start()  #開啓線程
43 
44 '''
45 for i in xrange(300):
46     thread = pool.get_thread()
47     t = thread(target=func,args=(i,pool))
48     t.start()
49     在threading.Thread的構造函數中:self.__args = args  self.__target = target
50     
51 #固然也能夠經過下面方法賦值,可是不推薦(對於私有字段,通常建議在構造函數中傳參賦值)
52 for i in xrange(300):
53     ret = pool.get_thread()
54     ret._Thread__target = func
55     ret._Thread__args = (i,pool)
56     ret.start()
57 '''
簡單線程池

    3.4  複雜線程池預備知識

 1 #!/usr/bin/env python
 2 #-*- coding:utf-8 -*-
 3 
 4 import Queue
 5 
 6 obj = object() #先建立一個object對象obj
 7 q = Queue.Queue()  #建立隊列
 8 
 9 #把obj對象放入隊列,注意此處與上述例子中的區別,上例是每次都建立對象,那麼每一個對象都會佔用內存;
10 #而此處是先建立對象,再把此對象加入隊列,即對象只建立了1次,而隊列中加入了10次,那麼對象只會佔用一分內存
11 for i in range(10):
12     q.put(obj) 
13     
14 for i in range(10):
15     print id(q.get())  #實驗結果是此處id徹底相同
預備知識1-佔用內存優化
 1 #!/usr/bin/env python
 2 #-*- coding:utf-8 -*-
 3 
 4 import contextlib
 5 import threading
 6 import time
 7 import random
 8 
 9 doing = [] #線程列表
10 
11 #打印正在運行的線程個數函數
12 def num(l2):
13     while True:
14         print len(l2)
15         time.sleep(1)
16 
17 #單首創建一個線程,用於打印當前正在運行的線程數量
18 t = threading.Thread(target=num,args=(doing,))  
19 t.start()
20 
21 #管理上下文的裝飾器
22 @contextlib.contextmanager
23 def show(l1,item):
24     l1.append(item)  #把正在執行的當前線程加入列表
25     yield  #運行到此處,程序就會跳出當前函數,去執行with關鍵字中的內容,執行完後在進入當前函數執行
26     l1.remove(item)  #當前線程執行完成後,則從列表中移除
27 
28 #任務函數
29 def task(i):
30     flag = threading.current_thread()
31     with show(doing,flag):  #with管理上下文,進行切換
32         print len(doing)
33         time.sleep(random.randint(1,4)) #等待
34 
35 for i in range(20): #建立20個線程
36     temp = threading.Thread(target=task,args=(i,))
37     temp.start()
預備知識2-with上下文管理

更多with上下文管理知識請參考:

        http://www.cnblogs.com/alan-babyblog/p/5153343.html

        http://www.cnblogs.com/alan-babyblog/p/5153386.html

   3.5 twisted源碼中經典線程池的實現

 1 #!/usr/bin/env python
 2 #-*- coding:utf-8 -*-
 3 
 4 from Queue import Queue
 5 import contextlib
 6 import threading
 7 
 8 WorkerStop = object()
 9 
10 class ThreadPool:
11     workers = 0
12     threadFactory = threading.Thread  #類名
13     currentThread = staticmethod(threading.currentThread) #靜態方法
14 
15     def __init__(self, maxthreads=20, name=None):
16         self.q = Queue(0) #建立隊列,參數0表示隊列大小不限,隊列用於存聽任務
17         self.max = maxthreads #定義最大線程數
18         self.name = name
19         self.waiters = [] #存放等待線程的列表
20         self.working = [] #存放工做線程的列表
21 
22     def start(self):
23         needSize = self.q.qsize()  #獲取任務所需的線程數
24         while self.workers < min(self.max, needSize): #wokers初始值爲0
25             self.startAWorker()  #調用開啓線程的方法
26 
27     def startAWorker(self):
28         self.workers += 1  #workers自增1
29         newThread = self.threadFactory(target=self._worker, name='test') #建立1個線程並去執行_worker方法
30         newThread.start()
31 
32     def callInThread(self, func, *args, **kw):
33         self.callInThreadWithCallback(None, func, *args, **kw)
34 
35     def callInThreadWithCallback(self, onResult, func, *args, **kw):
36         o = (func, args, kw, onResult)  #把參數組合成元組 
37         self.q.put(o)  #把任務加入隊列
38 
39     #上下文管理裝飾器    
40     @contextlib.contextmanager
41     def _workerState(self, stateList, workerThread):
42         stateList.append(workerThread)  #把當前執行線程加入線程狀態類表stateList
43         try:
44             yield
45         finally:
46             stateList.remove(workerThread)  #執行完後移除
47 
48     def _worker(self):
49         ct = self.currentThread() #獲取當前線程id
50         o = self.q.get() #隊列中獲取執行任務
51         
52         while o is not WorkerStop:  #當獲取的o不是workstop信號時,執行while循環
53             with self._workerState(self.working, ct):  #上下文切換
54                 function, args, kwargs, onResult = o  #元組分別賦值給變量
55                 del o  #刪除元組o
56                 try:
57                     result = function(*args, **kwargs)  
58                     success = True
59                 except:
60                     success = False
61                     if onResult is None:
62                         pass
63                     else:
64                         pass
65 
66                 del function, args, kwargs
67 
68                 if onResult is not None:
69                     try:
70                         onResult(success, result)
71                     except:
72                         #context.call(ctx, log.err)
73                         pass
74 
75                 del onResult, result
76 
77             with self._workerState(self.waiters, ct): 
78                 o = self.q.get()  #獲取任務
79 
80     def stop(self): #關閉線程
81         while self.workers: #循環workers
82             self.q.put(WorkerStop) #在隊列中增長一個信號~
83             self.workers -= 1 #workers值-1,直到全部線程關閉
84 
85 def show(arg):
86     import time
87     time.sleep(1)
88     print arg
89 
90 pool = ThreadPool(10)
91 
92 #建立500個任務,隊列裏添加500個任務
93 #每一個任務都是一個元組(方法名,動態參數,動態參數,默認爲NoNe)
94 for i in range(100):
95     pool.callInThread(show, i)
96 
97 pool.start() #開啓執行任務
98 
99 pool.stop()  #執行關閉線程方法
twisted中經典線程池的實現-部分代碼

 

參考資料:

       http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html

     http://www.cnblogs.com/wupeiqi/articles/4839959.html

相關文章
相關標籤/搜索