源碼之Queue

看源碼能夠把python看得更透,更懂,想必也是開發人員的必經之路。python

如今有個任務,寫個線程池。使用Queue就能寫一個最簡單的,下面就來學學Queue源碼。app

 

源碼之Queueide

class Queue:
    """Create a queue object with a given maximum size.  
    If maxsize is <= 0, the queue size is infinite.   

告訴你建立一個給出長度的隊列,若是長度不大於0那麼隊列長度將變成無限。函數

Queue構造方法fetch

 1 def __init__(self, maxsize=0):
 2     self.maxsize = maxsize        #定義maxsize字段爲最大值字段
 3     self._init(maxsize)         #調用_init方法
 4         
 5     # mutex must be held whenever the queue is mutating.  All methods  
 6     # that acquire mutex must release it before returning.  
 7     # is shared between the three conditions, so acquiring and
 8     # releasing the conditions also acquires and releases mutex.
 9     '''
10     當隊列正在改變時,鎖必須被持有,全部得到鎖的方法必須在返回以前釋放它。
11     鎖在三種條件下被共享,因此獲取和釋放條件也就獲取和釋放鎖。
12     ''' 
13   
14     self.mutex = _threading.Lock()      #定義mutex字段爲一個鎖對象
15         
16     # Notify not_empty whenever an item is added to the queue; a
17     # thread waiting to get is notified then.
18     '''
19     當有一項被加入進隊列時通知非空,而後通知一個線程將被等待獲得
20     '''
21   
22     self.not_empty = _threading.Condition(self.mutex) #返回一個Condition對象
23         
24     # Notify not_full whenever an item is removed from the queue;
25         # a thread waiting to put is notified then.
26     '''
27     當一項被移除出隊列時通知未滿,而後通知一個線程等待被放進隊列
28     '''
29         
30     self.not_full = _threading.Condition(self.mutex) #返回一個Condition對象
31         
32     # Notify all_tasks_done whenever the number of unfinished tasks
33         # drops to zero; thread waiting to join() is notified to resume
34     '''
35     在未完成任務的數量被刪除至0時,通知全部任務完成
36     '''
37         
38     self.all_tasks_done = _threading.Condition(self.mutex) #返回一個Condition對象
39     self.unfinished_tasks = 0        #定義未完成任務數量

解析:優化

  將maxsize參數傳遞給了_init()方法,後面再看這個方法,它實際上是建立了一個deque對象(雙管道)。ui

以後建立了一個鎖對象,又經過鎖對象建立了3個Condition對象。關於Condition對象,它屬於線程的領域,後面介紹。spa

 

類Queue中的方法:線程

1.task_donedebug

 1 def task_done(self):
 2         """Indicate that a formerly enqueued task is complete.
 3 
 4         Used by Queue consumer threads.  For each get() used to fetch a task,
 5         a subsequent call to task_done() tells the queue that the processing
 6         on the task is complete.
 7 
 8         If a join() is currently blocking, it will resume when all items
 9         have been processed (meaning that a task_done() call was received
10         for every item that had been put() into the queue).
11 
12         Raises a ValueError if called more times than there were items
13         placed in the queue.
14         代表一個之前的隊列任務完成了
15 
16         使用隊列消費者進程。對於每個get()用來獲得任務,隨後調用task_done 方法告訴隊列任務的處理已經完成了
17         若是一個join正在阻塞,當全部項都已經被加工了他將從新佔用。
18         若是調用次數超過隊列中放置的項目,則會拋ValueError異常
19         """
20         self.all_tasks_done.acquire()       #得到鎖
21         try:
22             unfinished = self.unfinished_tasks - 1  #判斷隊列中一個線程的任務是否所有完成
23             if unfinished <= 0:                     #是則進行通知,或在過量調用時報異常
24                 if unfinished < 0:
25                     raise ValueError('task_done() called too many times')
26                 self.all_tasks_done.notify_all()
27             self.unfinished_tasks = unfinished      #不然未完成任務數量-1
28         finally:
29             self.all_tasks_done.release()           #最後釋放鎖

解析:

  這個方法判斷隊列中一個線程的任務是否所有完成,首先會經過all_tasks_done對象得到鎖,若是是則進行通知,最後釋放鎖。nodify這個方法在Condition對象中研究。

2.join

 1 def join(self):
 2     """Blocks until all items in the Queue have been gotten and processed.
 3 
 4     The count of unfinished tasks goes up whenever an item is added to the
 5     queue. The count goes down whenever a consumer thread calls task_done()
 6     to indicate the item was retrieved and all work on it is complete.
 7 
 8     When the count of unfinished tasks drops to zero, join() unblocks.
 9 
10      阻塞,直到隊列中全部項都被獲得和處理
11 
12      當一個項目被添加到隊列中時,未完成任務上升。當一個消費線程調用task_done方法代表這項被恢復且全部工做都完成時數量降低。
13      當未完成爲0時,join解除阻塞
14     """
15 
16     self.all_tasks_done.acquire()
17     try:
18         while self.unfinished_tasks:            #若是有未完成的任務,將調用wait()方法等待
19             self.all_tasks_done.wait()
20     finally:
21         self.all_tasks_done.release()

解析:

  阻塞方法,當隊列中有未完成進程時,調用join方法來阻塞,直到他們都完成。wait方法後面看。

3.qsize

1 def qsize(self):
2         """Return the approximate size of the queue (not reliable!).
3             返回一個估計的隊列大小,不可靠
4         """
5         self.mutex.acquire()
6         n = self._qsize()           #這個方法返回了deque對象的長度
7         self.mutex.release()
8         return n

解析:

  後面會提_qsize這個方法,它返回了deque對象的長度。這個一個估計值,並不可靠。

4.empty

1 def empty(self):
2         """Return True if the queue is empty, False otherwise (not reliable!).
3             當隊列爲空時返回True,不然False
4         """
5         self.mutex.acquire()
6         n = not self._qsize()       #若是長度爲0返回True,不然False
7         self.mutex.release()
8         return n

解析:

  判斷隊列長度是否爲空,也是基於qsize的方法,因此仍然不可靠。

5.full

1 def full(self):
2     """Return True if the queue is full, False otherwise (not reliable!)."""
3     self.mutex.acquire()
4     n = 0 < self.maxsize == self._qsize()
5     self.mutex.release()
6     return n

沒啥說的了,判斷隊列是否滿了

6.put

 1 def put(self, item, block=True, timeout=None):
 2         """Put an item into the queue.
 3 
 4         If optional args 'block' is true and 'timeout' is None (the default),
 5         block if necessary until a free slot is available. If 'timeout' is
 6         a non-negative number, it blocks at most 'timeout' seconds and raises
 7         the Full exception if no free slot was available within that time.
 8         Otherwise ('block' is false), put an item on the queue if a free slot
 9         is immediately available, else raise the Full exception ('timeout'
10         is ignored in that case).
11 
12         若是可選的參數block和timeout爲空(默認),若是必要的話阻塞直到有一個空閒位置可用。
13         若是timeout是一個非負的數字,它將阻塞至多這個數字的秒數而且若是沒有可用位置時報Full異常。
14         另外,block 爲false時,若是有可用的位置將會放一項進去,不然報Full異常
15 
16         """
17         self.not_full.acquire()                  #not_full得到鎖
18         try:
19             if self.maxsize > 0:                 #若是隊列長度有限制
20                 if not block:                    #若是沒阻塞
21                     if self._qsize() == self.maxsize:   #若是隊列滿了拋異常
22                         raise Full
23                 elif timeout is None:           #有阻塞且超時爲空,等待
24                     while self._qsize() == self.maxsize:
25                         self.not_full.wait()
26                 elif timeout < 0:
27                     raise ValueError("'timeout' must be a non-negative number")
28                 else:                           #若是有阻塞,且超時非負時,結束時間=當前時間+超時時間
29                     endtime = _time() + timeout
30                     while self._qsize() == self.maxsize:
31                         remaining = endtime - _time()
32                         if remaining <= 0.0:       #到時後,拋異常
33                             raise Full
34                         self.not_full.wait(remaining)
35             self._put(item)                         #調用_put方法
36             self.unfinished_tasks += 1              #未完成任務+1
37             self.not_empty.notify()                 #通知非空
38         finally:
39             self.not_full.release()                 #not_full釋放鎖

解析:

  默認狀況下block爲True,timeout爲None。若是隊列滿則會等待,未滿則會調用_put方法將進程加入deque中(後面介紹),而且未完成任務加1還會通知隊列非空。

若是設置block參數爲Flase,隊列滿時則會拋異常。若是設置了超時那麼在時間到以前進行阻塞,時間一到拋異常。這個方法使用not_full對象進行操做。

7.put_nowait

1 def put_nowait(self, item):
2         """Put an item into the queue without blocking.
3 
4         Only enqueue the item if a free slot is immediately available.
5         Otherwise raise the Full exception.
6         """
7         return self.put(item, False)

就是put方法的block設置成Fasle的效果,沒啥說的。

8.get

 1 def get(self, block=True, timeout=None):
 2         """Remove and return an item from the queue.
 3 
 4         If optional args 'block' is true and 'timeout' is None (the default),
 5         block if necessary until an item is available. If 'timeout' is
 6         a non-negative number, it blocks at most 'timeout' seconds and raises
 7         the Empty exception if no item was available within that time.
 8         Otherwise ('block' is false), return an item if one is immediately
 9         available, else raise the Empty exception ('timeout' is ignored
10         in that case).
11         """
12         self.not_empty.acquire()                #not_empty得到鎖
13         try:
14             if not block:                       #不阻塞時
15                 if not self._qsize():           #隊列爲空時拋異常
16                     raise Empty
17             elif timeout is None:               #不限時時,隊列爲空則會等待
18                 while not self._qsize():
19                     self.not_empty.wait()
20             elif timeout < 0:
21                 raise ValueError("'timeout' must be a non-negative number")
22             else:
23                 endtime = _time() + timeout
24                 while not self._qsize():
25                     remaining = endtime - _time()
26                     if remaining <= 0.0:
27                         raise Empty
28                     self.not_empty.wait(remaining)
29             item = self._get()                  #調用_get方法,移除並得到項目
30             self.not_full.notify()              #通知非滿
31             return item                        #返回項目
32         finally:
33             self.not_empty.release()            #釋放鎖  

解析:

  能夠看出邏輯同put相似,參數默認狀況下隊列空了則會等待,不然將會調用_get方法(往下看)移除並得到一個項,最後返回這個項。這個方法使用not_empty對象進行操做。

9.get_nowait

block=False版的get方法

10._init()

def _init(self, maxsize):
        self.queue = deque()

生成了一個deque對象,這個deque對象纔是真正操做隊列添加或者刪除一項的源頭,因此有必要讀一下這個類。

 

源碼之deque

class deque(object):
    """
    deque([iterable[, maxlen]]) --> deque object
    
    Build an ordered collection with optimized access from its endpoints.
    """

告訴你這個類是建立一個能優化訪問它的端點的有序的集合

deque構造方法:

1 def __init__(self, iterable=(), maxlen=None): # known case of _collections.deque.__init__
2         """
3         deque([iterable[, maxlen]]) --> deque object
4         
5         Build an ordered collection with optimized access from its endpoints.
6         # (copied from class doc)
7         """
8         pass

構造函數中有兩個參數,迭代器和一個最大長度,默認都爲空。

類Queue/deque中的方法:

類中方法有不少,先挑Queue中遇到的看。

11._qsize

def _qsize(self, len=len):
        return len(self.queue)

這個方法直接獲得deque對象的長度並返回

12._put

def _put(self, item):
    self.queue.append(item)

這個方法調用了deque的append方法:

def append(self, *args, **kwargs): # real signature unknown
    """ Add an element to the right side of the deque. """
    pass

在deque隊列右邊添加一個元素。注意,這個方法只有一句pass,並無實現其功能的代碼。可是看到 real signature unknown(真正未知的簽名)這句話,我以爲就是把代碼放在我看不見的地方了。

13._get

def _get(self):
    return self.queue.popleft()

這個方法調用了deque的popleft方法:

def popleft(self, *args, **kwargs): # real signature unknown
        """ Remove and return the leftmost element. """
        pass

從最左端刪除並返回一個元素,這有點像序列中的pop方法。一樣的,也是一句 real signayure unknown

 

到此Queue類的源碼已經看完了,不過如今對Queue的原理只瞭解了一部分,接下來看看鎖和Condition對象在隊列中是如何工做的。

 

源碼之Condition

class _Condition(_Verbose):
    """Condition variables allow one or more threads to wait until they are
       notified by another thread.
    """

這個類繼承了Vserbose類,還告訴你條件變量容許一個或多個線程等待直到他們被其餘線程通知

Condition構造方法:

 1 def __init__(self, lock=None, verbose=None):
 2         _Verbose.__init__(self, verbose)    #調用了父類的構造方法
 3         if lock is None:
 4             lock = RLock()                  #得到一個RLock對象
 5         self.__lock = lock
 6         # Export the lock's acquire() and release() methods
 7         self.acquire = lock.acquire         #兩個字段分別引用鎖的得到和釋放
 8         self.release = lock.release
 9         # If the lock defines _release_save() and/or _acquire_restore(),
10         # these override the default implementations (which just call
11         # release() and acquire() on the lock).  Ditto for _is_owned().
12         try:
13             self._release_save = lock._release_save
14         except AttributeError:
15             pass
16         try:
17             self._acquire_restore = lock._acquire_restore
18         except AttributeError:
19             pass
20         try:
21             self._is_owned = lock._is_owned
22         except AttributeError:
23             pass
24         self.__waiters = []

這個方法中執行了父類的構造方法,而後得到了RLock對象,又將其方法的引用賦給了多個字段。來看Verbose的構造方法:

class _Verbose(object):
        def __init__(self, verbose=None):
            pass

什麼也沒作,下面來看RLock。

源碼之RLock

class _RLock(_Verbose):
    """A reentrant lock must be released by the thread that acquired it. Once a
       thread has acquired a reentrant lock, the same thread may acquire it
       again without blocking; the thread must release it once for each time it
       has acquired it.
    """

說什麼一個進程獲得一個鎖後必須釋放它

類RLock構造方法

def __init__(self, verbose=None):
        _Verbose.__init__(self, verbose)
        self.__block = _allocate_lock()
        self.__owner = None
        self.__count = 0

又調用了Vserbose的__init__(),一樣是pass。還調用了_allocate_lock()方法。

def allocate_lock(): # real signature unknown; restored from __doc__
    """
    allocate_lock() -> lock object
    (allocate() is an obsolete synonym)
    
    Create a new lock object.  See help(LockType) for information about locks.
    """
    pass

又是real signature unknown,看註釋:建立一個新的鎖對象。它就幹了這個。

還有兩個字段,全部者和數量。接下來就看Condition中用到的RlLock的3個的方法。

 

1._release_save

def _release_save(self):
        if __debug__:
            self._note("%s._release_save()", self)
        count = self.__count
        self.__count = 0
        owner = self.__owner
        self.__owner = None
        self.__block.release()
        return (count, owner)

字面意思爲保存,釋放。若是debug爲真,將本方法的返回值傳遞給了note方法。而後將owner和count字段保存,返回,重置。那麼debug爲什麼,note方法又作了什麼?

__debug__在__builtin__.py中,默認爲True。來看note方法

 1 _VERBOSE = False
 2 
 3 if __debug__:
 4 
 5     class _Verbose(object):
 6 
 7         def __init__(self, verbose=None):
 8             if verbose is None:
 9                 verbose = _VERBOSE
10             self.__verbose = verbose
11 
12         def _note(self, format, *args):
13             if self.__verbose:
14                 format = format % args
15                 # Issue #4188: calling current_thread() can incur an infinite
16                 # recursion if it has to create a DummyThread on the fly.
17                 ident = _get_ident()
18                 try:
19                     name = _active[ident].name
20                 except KeyError:
21                     name = "<OS thread %d>" % ident
22                 format = "%s: %s\n" % (name, format)
23                 _sys.stderr.write(format)

__init__中將self._verbose賦值成了False,而在_note中若是self._verbose爲假那麼這個方法就啥也沒執行。

若是self._verbose爲真,就會調用了_get_ident()方法,簡單看了下這個方法,解釋爲:返回一個非0整數,在其餘同時存在的線程中惟一標識 一個進程。

因此這個方法乾的活就是返回鎖的擁有者,數量,而後釋放鎖。

 

2._acquire_restore

1 def _acquire_restore(self, count_owner):
2         count, owner = count_owner
3         self.__block.acquire()
4         self.__count = count
5         self.__owner = owner
6         if __debug__:
7             self._note("%s._acquire_restore()", self)

顧名思義,得到鎖,而後將owner和count做爲參數賦值給對象,恢復字段的值。

 

3._is_owned

def _is_owned(self):
        return self.__owner == _get_ident()

判斷owner的值與get_ident的返回值是否相等。其返回值就是惟一標識線程的一個整數,那麼這個比較的意義就是肯定這個鎖對象的持有者是否爲這個進程。

類Condition的方法

condition中主要用到了的wait,nodify方法,讓其餘線程等待或喚醒它們。因爲時間有限,這部份內容待續。

 

 

至此,Queue源碼已經看的差很少了,總結一下

Queue調用RLock對象使線程得到和釋放鎖,並記錄線程的信息。調用Condition對象控制線程等待或是喚醒它們。而後deque對象負責操做管道中的元素。

心得

剛開始讀源碼時方法和類間調來調去有點難如下手,不過多讀幾遍就差很少搞明白了。個人思路是先攻主體,好比就先看Queue裏的代碼,遇到了什麼deque,Condition的能夠先跳過,以後再逐個擊破。最後回過頭來把各個部分組合整理一下,再畫個類圖什麼的就更容易明白了。

最後附上一張類圖。

 

相關文章
相關標籤/搜索