class Thread(_Verbose): """A class that represents a thread of control. This class can be safely subclassed in a limited fashion. """ __initialized = False # Need to store a reference to sys.exc_info for printing # out exceptions when a thread tries to use a global var. during interp. # shutdown and thus raises an exception about trying to perform some # operation on/with a NoneType __exc_info = _sys.exc_info # Keep sys.exc_clear too to clear the exception just before # allowing .join() to return. __exc_clear = _sys.exc_clear def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): """This constructor should always be called with keyword arguments. Arguments are: *group* should be None; reserved for future extension when a ThreadGroup class is implemented. *target* is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called. *name* is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number. *args* is the argument tuple for the target invocation. Defaults to (). *kwargs* is a dictionary of keyword arguments for the target invocation. Defaults to {}. If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread. """ assert group is None, "group argument must be None for now" _Verbose.__init__(self, verbose) if kwargs is None: kwargs = {} self.__target = target self.__name = str(name or _newname()) self.__args = args self.__kwargs = kwargs self.__daemonic = self._set_daemon() self.__ident = None self.__started = Event() self.__stopped = False self.__block = Condition(Lock()) self.__initialized = True # sys.stderr is not stored in the class like # sys.exc_info since it can be changed between instances self.__stderr = _sys.stderr
……
線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務python
#!/usr/bin/env python # _*_ coding:utf-8 _*_ __Author__ = 'KongZhaGen' import threading import time # 定義用於線程執行的函數 def sayhi(num): print "runing on number :%s"%num time.sleep(3) print 'done',num # main函數同時執行5個線程 def main(): for i in range(5): t = threading.Thread(target=sayhi,args=(i,)) t.start() # 定義一個線程m,執行main函數 m = threading.Thread(target=main,args=()) # m線程定義爲守護線程 m.setDaemon(True) m.start() # 守護線程執行結束,其中的全部子線程所有被迫結束運行,接着繼續執行其後的代碼 m.join(timeout=2) print "---------main thread done-----"
一個進程下能夠啓動多個線程,多個線程共享父進程的內存空間,也就意味着每一個線程能夠訪問同一份數據,此時,若是2個線程同時要修改同一份數據,會出現什麼情況?編程
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import time import threading def addNum(): global num print '---getNum--',num time.sleep(1) num -= 1 num = 100 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) # 同時開啓100個線程 t.start() thread_list.append(t) for t in thread_list: t.join() print "final num",num
其結果是:一會爲0,一會爲其它值多線程
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import time import threading def addNum(): global num print '---getNum--',num time.sleep(1) lock.acquire() # 數據修改前加鎖 num -= 1 lock.release() # 數據修改後釋放鎖 num = 100 thread_list = [] lock = threading.Lock() # 生成全局鎖 for i in range(100): t = threading.Thread(target=addNum) # 同時開啓100個線程 t.start() thread_list.append(t) for t in thread_list: t.join() print "final num",num
其結果一直爲0,說明鎖起了做用。併發
Semaphore(信號量)app
互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 dom
定時器async
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import threading def hello(): print "Hello Word.." t = threading.Timer(20,hello) # 20秒鐘以後執行hello函數 t.start()
Queue.qsize() 返回隊列的大小
Queue.empty() 若是隊列爲空,返回True,反之False
Queue.full() 若是隊列滿了,返回True,反之False
Queue.full 與 maxsize 大小對應
Queue.get([block[, timeout]])獲取隊列,timeout等待時間
Queue.get_nowait() 至關Queue.get(False)
非阻塞 Queue.put(item) 寫入隊列,timeout等待時間
Queue.put_nowait(item) 至關Queue.put(item, False)
Queue.task_done() 在完成一項工做以後,Queue.task_done()函數向任務已經完成的隊列發送一個信號
Queue.join() block直到queue被消費完畢ide
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import Queue q = Queue.Queue(maxsize=3) q.put(1) q.put(3) q.put(4) # q.put(7) # 超出三個會阻塞 print q.get() # 1 print q.get() # 3 print q.get() # 4 # 先進先出 q.put(7) print q.get()
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import Queue q = Queue.LifoQueue(maxsize=3) q.put(1) q.put(7) q.put(4) print q.get() # 4 print q.get() # 7 print q.get() # 1 # 結果:後進先出
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import Queue q = Queue.PriorityQueue(maxsize=4) print q.empty() # 空隊列 print q.qsize() # 隊列個數0 q.put((1,5)) q.put((5,1)) print q.qsize() # 隊列個數2 print q.full() # 隊列未滿 q.put((3,6)) print q.full() # 隊列未滿 q.put((2,6)) print q.full() # 隊列已滿 # q.put_nowait(10) # 隊列滿後再填入不阻塞,直接報錯 print q.get() # 1 print q.get() # 2 print q.get() # 3 print q.get() # 5 # 結果:優先小的先出 print q.get_nowait() # 隊列爲空後再取出不阻塞,直接報錯
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。函數
爲何要使用生產者和消費者模式ui
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import Queue import threading import time # 生產者 def producer(): for i in range(5): q.put(i) # 往隊列放15個麪包 print "等待取走麪包....\n" q.join() # 阻塞直到隊列消費完畢 print "麪包被所有取走...\n" # 消費者 def consumer(n): # 隊列不爲空則一直消費 while q.qsize() > 0: print "%s 取走麪包 %s"%(n, q.get()) q.task_done() # 告訴queue完成了一次消費,完成消費次數等於qsize,隊列消費完畢,join取消阻塞 time.sleep(1) q = Queue.Queue() p = threading.Thread(target=producer,) p.start() c = threading.Thread(target=consumer, args=(['小鬼'])) c.start()
結果:
等待取走麪包....
小鬼 取走麪包 0
小鬼 取走麪包 1
小鬼 取走麪包 2
小鬼 取走麪包 3
小鬼 取走麪包 4
麪包被所有取走...
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import Queue import threading import time,random q = Queue.Queue() # 生產者 def producer(n): count = 0 # 少於5個麪包就生產 while count < 5: time.sleep(random.randrange(3)) q.put(count) print "%s product 麪包"%n count += 1 print "%s 已生產麪包%s" %(n, q.qsize()) def consumer(n): while True: time.sleep(random.randrange(4)) # 有面包就消費 if not q.empty(): print "\033[31;1m%s 吃了 麪包 %s\033[0m"%(n, q.get()) else: # 沒麪包就退出 print "沒有面包了..." break p1 = threading.Thread(target=producer, args=(['廚師1'])) p2 = threading.Thread(target=producer, args=(['廚師2'])) p1.start() p2.start() c1 = threading.Thread(target=consumer, args=(['客戶1'])) c2 = threading.Thread(target=consumer, args=(['客戶2'])) c1.start() c2.start()
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' from multiprocessing import Process,Queue import time q = Queue() # 往隊列傳入三個值 def pro_put(q): q.put(1) q.put(2) q.put(3) if __name__ == '__main__': # 開始一個新進程 p = Process(target=pro_put, args=(q,)) p.start() # 從隊列獲取原進程的數據 print q.get() print q.get() print q.get()
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' from multiprocessing import Process,Pipe def f(conn): # 往管道發送 conn.send(1) conn.send(2) conn.send(3) conn.close() if __name__ == '__main__': # Pipe 分兩端,任何一端send, 任何一端recv con_left, con_right = Pipe() p = Process(target=f, args=(con_right,)) p.start() # 從管道接收原進程生成的數據 print con_left.recv() print con_left.recv() print con_left.recv() p.join()
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' from multiprocessing import Process, Lock import time def f(l, i): # 當多個進程須要訪問共享資源的時候,Lock能夠用來避免訪問的衝突 l.acquire() # Lock實現進程同步進行 try: print "hello, ",i finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' from multiprocessing import Pool import time,os print 'parent:',os.getpid() def f(msg): print 'child:',os.getpid() print "start:",msg time.sleep(2) print "end :",msg print "============" if __name__ == '__main__': # 進程池最大數量2 pool = Pool(2) for i in range(5): msg = "hello %s" % i # 同時啓動5個進程 pool.apply_async(func=f, args=(msg,)) print "---------------------" pool.close() pool.join()
結果:
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' from multiprocessing import Pool import time def f(msg): print "start:",msg time.sleep(2) print "end :",msg print "============" if __name__ == '__main__': # 進程池最大數量2 pool = Pool(2) for i in range(5): msg = "hello %s" % i # 同時啓動5個進程 pool.apply(func=f, args=(msg,)) print "---------------------" pool.close() pool.join()
結果: