線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其餘線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就 會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象,而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件,繼續執行。python
event.isSet(): 返回event的狀態值True或者False; event.wait(): 若是 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear(): 恢復event的狀態值爲False。
import threading,time event = threading.Event() def foo(): while not event.is_set(): print('wait....') event.wait() print('Connect to redis server') print('attempt to start redis server') for i in range(5): t = threading.Thread(target=foo) t.start() time.sleep(10) event.set() ''' 運行結果: attempt to start redis server wait.... wait.... wait.... wait.... wait.... Connect to redis server Connect to redis server Connect to redis server Connect to redis server Connect to redis server '''
import threading import time import logging logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',) def worker(event): logging.debug('Waiting for redis ready...') event.wait() logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime()) time.sleep(1) def main(): readis_ready = threading.Event() t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1') t1.start() t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2') t2.start() logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event') time.sleep(3) # simulate the check progress readis_ready.set() if __name__=="__main__": main()
import threading,time event = threading.Event() def foo(): while not event.is_set(): print('wait....') event.wait(2) print('Connect to redis server') print('attempt to start redis server') for i in range(2): t = threading.Thread(target=foo) t.start() time.sleep(5) event.set() ''' 運行結果: attempt to start redis server wait.... wait.... wait.... wait.... wait.... wait.... Connect to redis server Connect to redis server '''
def worker(event): while not event.is_set(): logging.debug('Waiting for redis ready...') event.wait(2) logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime()) time.sleep(1)
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.app
''' 建立一個「隊列」對象 import queue q = queue.Queue(maxsize = 10) queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數 maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。 將一個值放入隊列中
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。 將一個值從隊列中取出 q.get() 調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。 '''
import queue q = queue.Queue(3) q.put(11) q.put('hello') q.put(3.123) print(q.get()) print(q.get()) print(q.get()) ''' 運行結果: 11 hello 3.123 '''
''' join() 阻塞進程,直到全部任務完成,須要配合另外一個方法task_done。 def join(self): with self.all_tasks_done: while self.unfinished_tasks: self.all_tasks_done.wait() task_done() 表示某個任務完成。每一條get語句後須要一條task_done。 import queue q = queue.Queue(5) q.put(10) q.put(20) print(q.get()) q.task_done() print(q.get()) q.task_done() q.join() print("ending!") '''
import queue,threading q = queue.Queue(3) def foo(): q.put(11) q.put('hello') q.put(3.123) q.join() def bar(): print(q.get()) q.task_done() #註釋掉本行,程序將不會結束。 t1 = threading.Thread(target=foo) t1.start() for i in range(3): t = threading.Thread(target=bar) t.start() ''' 運行結果: 11 hello 3.123 '''
''' 此包中的經常使用方法(q = queue.Queue()): q.qsize() 返回隊列的大小 q.empty() 若是隊列爲空,返回True,反之False q.full() 若是隊列滿了,返回True,反之False q.full 與 maxsize 大小對應 q.get([block[, timeout]]) 獲取隊列,timeout等待時間 q.get_nowait() 至關q.get(False)非阻塞 q.put(item) 寫入隊列,timeout等待時間 q.put_nowait(item) 至關q.put(item, False) q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號 q.join() 實際上意味着等到隊列爲空,再執行別的操做 '''
Python queue模塊有三種隊列及構造函數: 一、Python queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize) 二、LIFO相似於堆棧,即先進後出。 class queue.LifoQueue(maxsize) 三、還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize) import queue #先進後出 q=queue.LifoQueue() q.put(34) q.put(56) q.put(12) print(q.get()) print(q.get()) print(q.get()) ''' 運行結果: 12 56 34 ''' #優先級 q=queue.PriorityQueue() q.put([5,100]) q.put([7,200]) q.put([3,"hello"]) q.put([4,{"name":"alex"}]) while 1: data=q.get() print(data) ''' 運行結果: [3, 'hello'] [4, {'name': 'alex'}] [5, 100] [7, 200] '''
import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <10: print("making........") time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1 print("ok......") def Consumer(name): count = 0 while count <10: time.sleep(random.randrange(3)) if not q.empty(): data = q.get() print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) else: print("-----no baozi anymore----") count +=1 p1 = threading.Thread(target=Producer, args=('A',)) c1 = threading.Thread(target=Consumer, args=('B',)) p1.start() c1.start()
''' 運行結果: making........ Producer A has produced 0 baozi.. ok...... making........ Consumer B has eat 0 baozi... Producer A has produced 1 baozi.. ok...... making........ Producer A has produced 2 baozi.. ok...... making........ Consumer B has eat 1 baozi... Producer A has produced 3 baozi.. ok...... making........ Consumer B has eat 2 baozi... Consumer B has eat 3 baozi... Producer A has produced 4 baozi.. ok...... making........ Producer A has produced 5 baozi.. ok...... making........ Consumer B has eat 4 baozi... Consumer B has eat 5 baozi... Producer A has produced 6 baozi.. ok...... making........ Producer A has produced 7 baozi.. ok...... making........ Producer A has produced 8 baozi.. ok...... making........ Consumer B has eat 6 baozi... Consumer B has eat 7 baozi... Producer A has produced 9 baozi.. ok...... Consumer B has eat 8 baozi... Consumer B has eat 9 baozi... '''
Multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.ide
multiprocessing包是Python中的多進程管理包。與threading.Thread相似,它能夠利用multiprocessing.Process對象來建立一個進程。該進程能夠運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象能夠像多線程那樣,經過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。因此,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。
# Process類調用 from multiprocessing import Process import time def f(name): print('hello', name,time.ctime()) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(3): p = Process(target=f, args=('alvin:%s'%i,)) p_list.append(p) p.start() for i in p_list: p.join() print('end') ''' 運行結果: hello alvin:0 Wed Jul 19 16:06:40 2017 hello alvin:2 Wed Jul 19 16:06:40 2017 hello alvin:1 Wed Jul 19 16:06:40 2017 end '''
#繼承Process類調用 from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() # self.name = name def run(self): print ('hello', self.name,time.ctime()) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end') ''' 運行結果: hello MyProcess-3 Wed Jul 19 16:09:39 2017 hello MyProcess-1 Wed Jul 19 16:09:39 2017 hello MyProcess-2 Wed Jul 19 16:09:39 2017 end '''
Process([group [, target [, name [, args [, kwargs]]]]])
group: 線程組,目前尚未實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 進程名;
args/kwargs: 要傳入方法的參數。
from multiprocessing import Process import os import time def info(name): print("name:",name) print('parent process:', os.getppid()) print('process id:', os.getpid()) print("------------------") time.sleep(1) if __name__ == '__main__': info('main process line') p1 = Process(target=info, args=('alvin',)) p2 = Process(target=info, args=('egon',)) p1.start() p2.start() p1.join() p2.join() print("ending") ''' 運行結果: name: main process line parent process: 3400 process id: 1712 ------------------ name: alvin parent process: 1712 process id: 8428 ------------------ name: egon parent process: 1712 process id: 8212 ------------------ ending '''
from multiprocessing import Process, Queue def f(q,n): q.put(n*n+1) print("son process",id(q)) if __name__ == '__main__': q = Queue() #若是使用線程間的隊列queue.Queue則沒法運行 print("main process",id(q)) for i in range(3): p = Process(target=f, args=(q,i)) p.start() print(q.get()) print(q.get()) print(q.get()) ''' 運行結果: main process 41655376 son process 45073408 1 son process 44942336 2 son process 44942392 5 '''
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).
For example:
from multiprocessing import Process, Pipe def f(conn): conn.send([12, {"name": "yuan"}, 'hello']) response = conn.recv() print("response", response) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() #管道兩個對象 p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) parent_conn.send("兒子你好!") p.join() ''' 運行結果: [12, {'name': 'yuan'}, 'hello'] response 兒子你好! '''
Pipe()返回的兩個鏈接對象表明管道的兩端。 每一個鏈接對象都有send()和recv()方法(等等)。 請注意,若是兩個進程(或線程)嘗試同時讀取或寫入管道的同一端,管道中的數據可能會損壞
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
from multiprocessing import Process, Manager def f(d, l, n): d[n] = n d["name"] ="alvin" l.append(n) #print("l",l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() #字典 l = manager.list(range(5)) #列表 print(d,'\n',l) p_list = [] for i in range(10): p = Process(target=f, args=(d,l,i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l) ''' 運行結果: {} 初始化的字典 [0, 1, 2, 3, 4] 初始化的列表 {3: 3, 'name': 'alvin', 0: 0, 2: 2, 7: 7, 5: 5, 4: 4, 1: 1, 6: 6, 8: 8, 9: 9} [0, 1, 2, 3, 4, 3, 0, 2, 7, 5, 4, 1, 6, 8, 9] '''
from multiprocessing import Pool import time def foo(args): time.sleep(5) print(args) if __name__ == '__main__': p = Pool(5) for i in range(30): p.apply_async(func=foo, args= (i,)) p.close() # 等子進程執行完畢後關閉進程池 # time.sleep(2) # p.terminate() # 馬上關閉進程池 p.join() # 沒有join會當即結束
1 import threading,time,queue,random 2 3 class Producer(threading.Thread): 4 def __init__(self,name,i): 5 super().__init__() 6 self.name=name 7 self.i=i 8 9 def run(self): 10 while True: 11 time.sleep(self.i) 12 if q.qsize()<10: 13 a=random.choice(['baozi','jianbing','doujiang'])+str(random.randint(1,10)) 14 q.put(a) 15 print('%s produce %s current menu %s'%(self.name,a,q.queue)) 16 17 18 class Consumer(threading.Thread): 19 def __init__(self,name,q): 20 super().__init__() 21 self.name=name 22 23 def run(self): 24 while True: 25 time.sleep(1) 26 if not q.empty(): 27 for i in range(random.randint(1,5)): 28 a=q.get() 29 print('%s eat %s'%(self.name,a)) 30 31 if __name__ == '__main__': 32 33 q = queue.Queue() 34 35 p=Producer('egon0',1) 36 p.start() 37 p = Producer('egon1', 0.5) 38 p.start() 39 40 for i in range(3): 41 c=Consumer('yuan%s'%i,q) 42 c.start()
對於紅綠燈線程: 首先默認是綠燈,作一個計數器,十秒前,每隔一秒打印「light green」;第十秒到第十三秒,每隔一秒打印「light yellow」,13秒到20秒, ‘light red’,20秒之後計數器清零。從新循環。
1 import threading,random,time 2 3 event=threading.Event() 4 def traffic_lights(): 5 count=0 6 lights=['green light','yellow light','red light'] 7 current_light=lights[0] 8 while True: 9 while count<10: 10 print(current_light,9-count) 11 count+=1 12 time.sleep(1) 13 else: 14 current_light=lights[1] 15 event.set() 16 17 while count<13: 18 print(current_light,12-count) 19 count+=1 20 time.sleep(1) 21 else: 22 current_light=lights[2] 23 24 while count<20: 25 print(current_light,19-count) 26 count += 1 27 time.sleep(1) 28 if count == 20: 29 count=0 30 current_light=lights[0] 31 event.clear() 32 break 33 34 35 def car(name): 36 print(name,'starting...') 37 while True: 38 time.sleep(random.randint(1,4)) 39 if not event.is_set(): 40 print('%s is running'%name) 41 else: 42 print('%s is waiting'%name) 43 44 if __name__ == '__main__': 45 t=threading.Thread(target=traffic_lights) 46 t.start() 47 for i in range(5): 48 c=threading.Thread(target=car,args=('car%s'%(i+1),)) 49 c.start()