class Thread(_Verbose): """A class that represents a thread of control. This class can be safely subclassed in a limited fashion. """ __initialized = False # Need to store a reference to sys.exc_info for printing # out exceptions when a thread tries to use a global var. during interp. # shutdown and thus raises an exception about trying to perform some # operation on/with a NoneType __exc_info = _sys.exc_info # Keep sys.exc_clear too to clear the exception just before # allowing .join() to return. __exc_clear = _sys.exc_clear def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): """This constructor should always be called with keyword arguments. Arguments are: *group* should be None; reserved for future extension when a ThreadGroup class is implemented. *target* is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called. *name* is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number. *args* is the argument tuple for the target invocation. Defaults to (). *kwargs* is a dictionary of keyword arguments for the target invocation. Defaults to {}. If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread. """ assert group is None, "group argument must be None for now" _Verbose.__init__(self, verbose) if kwargs is None: kwargs = {} self.__target = target self.__name = str(name or _newname()) self.__args = args self.__kwargs = kwargs self.__daemonic = self._set_daemon() self.__ident = None self.__started = Event() self.__stopped = False self.__block = Condition(Lock()) self.__initialized = True # sys.stderr is not stored in the class like # sys.exc_info since it can be changed between instances self.__stderr = _sys.stderr
#!/usr/bin/env python # _*_ coding:utf-8 _*_ __Author__ = 'KongZhaGen' import threading import time # 定義用於線程執行的函數 def sayhi(num): print "runing on number :%s"%num time.sleep(3) print 'done',num # main函數同時執行5個線程 def main(): for i in range(5): t = threading.Thread(target=sayhi,args=(i,)) t.start() # 定義一個線程m,執行main函數 m = threading.Thread(target=main,args=()) # m線程定義爲守護線程 m.setDaemon(True) m.start() # 守護線程執行結束,其中的全部子線程所有被迫結束運行,接着繼續執行其後的代碼 m.join(timeout=2) print "---------main thread done-----"
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import time import threading def addNum(): global num print '---getNum--',num time.sleep(1) num -= 1 num = 100 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) # 同時開啓100個線程 t.start() thread_list.append(t) for t in thread_list: t.join() print "final num",num
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import time import threading def addNum(): global num print '---getNum--',num time.sleep(1) lock.acquire() # 數據修改前加鎖 num -= 1 lock.release() # 數據修改後釋放鎖 num = 100 thread_list = [] lock = threading.Lock() # 生成全局鎖 for i in range(100): t = threading.Thread(target=addNum) # 同時開啓100個線程 t.start() thread_list.append(t) for t in thread_list: t.join() print "final num",num
互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 dom
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import threading def hello(): print "Hello Word.." t = threading.Timer(20,hello) # 20秒鐘以後執行hello函數 t.start()
Queue.qsize() 返回隊列的大小
Queue.empty() 若是隊列爲空,返回True,反之False
Queue.full() 若是隊列滿了,返回True,反之False
Queue.full 與 maxsize 大小對應
Queue.get([block[, timeout]])獲取隊列,timeout等待時間
Queue.get_nowait() 至關Queue.get(False)
非阻塞 Queue.put(item) 寫入隊列,timeout等待時間
Queue.put_nowait(item) 至關Queue.put(item, False)
Queue.task_done() 在完成一項工做以後,Queue.task_done()函數向任務已經完成的隊列發送一個信號
Queue.join() block直到queue被消費完畢ide
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import Queue q = Queue.Queue(maxsize=3) q.put(1) q.put(3) q.put(4) # q.put(7) # 超出三個會阻塞 print q.get() # 1 print q.get() # 3 print q.get() # 4 # 先進先出 q.put(7) print q.get()
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import Queue q = Queue.LifoQueue(maxsize=3) q.put(1) q.put(7) q.put(4) print q.get() # 4 print q.get() # 7 print q.get() # 1 # 結果:後進先出
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import Queue q = Queue.PriorityQueue(maxsize=4) print q.empty() # 空隊列 print q.qsize() # 隊列個數0 q.put((1,5)) q.put((5,1)) print q.qsize() # 隊列個數2 print q.full() # 隊列未滿 q.put((3,6)) print q.full() # 隊列未滿 q.put((2,6)) print q.full() # 隊列已滿 # q.put_nowait(10) # 隊列滿後再填入不阻塞,直接報錯 print q.get() # 1 print q.get() # 2 print q.get() # 3 print q.get() # 5 # 結果:優先小的先出 print q.get_nowait() # 隊列爲空後再取出不阻塞,直接報錯
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import Queue import threading import time # 生產者 def producer(): for i in range(5): q.put(i) # 往隊列放15個麪包 print "等待取走麪包....\n" q.join() # 阻塞直到隊列消費完畢 print "麪包被所有取走...\n" # 消費者 def consumer(n): # 隊列不爲空則一直消費 while q.qsize() > 0: print "%s 取走麪包 %s"%(n, q.get()) q.task_done() # 告訴queue完成了一次消費,完成消費次數等於qsize,隊列消費完畢,join取消阻塞 time.sleep(1) q = Queue.Queue() p = threading.Thread(target=producer,) p.start() c = threading.Thread(target=consumer, args=(['小鬼'])) c.start()
小鬼 取走麪包 0
小鬼 取走麪包 1
小鬼 取走麪包 2
小鬼 取走麪包 3
小鬼 取走麪包 4
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' import Queue import threading import time,random q = Queue.Queue() # 生產者 def producer(n): count = 0 # 少於5個麪包就生產 while count < 5: time.sleep(random.randrange(3)) q.put(count) print "%s product 麪包"%n count += 1 print "%s 已生產麪包%s" %(n, q.qsize()) def consumer(n): while True: time.sleep(random.randrange(4)) # 有面包就消費 if not q.empty(): print "\033[31;1m%s 吃了 麪包 %s\033[0m"%(n, q.get()) else: # 沒麪包就退出 print "沒有面包了..." break p1 = threading.Thread(target=producer, args=(['廚師1'])) p2 = threading.Thread(target=producer, args=(['廚師2'])) p1.start() p2.start() c1 = threading.Thread(target=consumer, args=(['客戶1'])) c2 = threading.Thread(target=consumer, args=(['客戶2'])) c1.start() c2.start()
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' from multiprocessing import Process,Queue import time q = Queue() # 往隊列傳入三個值 def pro_put(q): q.put(1) q.put(2) q.put(3) if __name__ == '__main__': # 開始一個新進程 p = Process(target=pro_put, args=(q,)) p.start() # 從隊列獲取原進程的數據 print q.get() print q.get() print q.get()
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' from multiprocessing import Process,Pipe def f(conn): # 往管道發送 conn.send(1) conn.send(2) conn.send(3) conn.close() if __name__ == '__main__': # Pipe 分兩端,任何一端send, 任何一端recv con_left, con_right = Pipe() p = Process(target=f, args=(con_right,)) p.start() # 從管道接收原進程生成的數據 print con_left.recv() print con_left.recv() print con_left.recv() p.join()
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' from multiprocessing import Process, Lock import time def f(l, i): # 當多個進程須要訪問共享資源的時候,Lock能夠用來避免訪問的衝突 l.acquire() # Lock實現進程同步進行 try: print "hello, ",i finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' from multiprocessing import Pool import time,os print 'parent:',os.getpid() def f(msg): print 'child:',os.getpid() print "start:",msg time.sleep(2) print "end :",msg print "============" if __name__ == '__main__': # 進程池最大數量2 pool = Pool(2) for i in range(5): msg = "hello %s" % i # 同時啓動5個進程 pool.apply_async(func=f, args=(msg,)) print "---------------------" pool.close() pool.join()
#!/usr/bin/env python # -*- coding:utf-8 -*- __Author__ = 'kongZhaGen' from multiprocessing import Pool import time def f(msg): print "start:",msg time.sleep(2) print "end :",msg print "============" if __name__ == '__main__': # 進程池最大數量2 pool = Pool(2) for i in range(5): msg = "hello %s" % i # 同時啓動5個進程 pool.apply(func=f, args=(msg,)) print "---------------------" pool.close() pool.join()