python queue, pipe, manage

 

 

 

線程中的Queue

 1 import time
 2 import threading
 3 import queue
 4 import random
 5 
 6 def putMessage():
 7     for i in "Hello World!!!":
 8         q.put(i)
 9         time.sleep(random.random())
10         # print("size:%s"%q.qsize())            # 查看隊列長度
11         #
12         # print("full:%s"%q.full())             # 查看隊列是否爲滿的狀態
13         #
14         # print("empty:%s"%q.empty())        # 查看隊列是否爲空的狀態
15 
16 
17 def getMessage():
18     while True:
19         if not q.empty():
20             print(q.get())
21         else:
22             time.sleep(random.random())
23 
24 
25 if __name__ == "__main__":
26     q = queue.Queue()
27 
28     t1 = threading.Thread(target=putMessage)
29     t1.setDaemon(True)
30     t1.start()
31 
32     t2 = threading.Thread(target=getMessage)
33     t2.setDaemon(True)
34     t2.start()
35 
36     time.sleep(10)

 

進程中的Queue

 1 from multiprocessing import Queue
 2 
 3 q = Queue(3)    # 初始化一個Queue對象,最多能夠put三條信息,若是不寫3,那麼久無限制
 4 
 5 q.put("Message01")         # 添加信息的方法
 6 q.put("Message02")
 7 print(q.full())          # 查看 隊列 是否滿了的方法
 8 
 9 q.put("Message03")
10 print(q.full())
11 
12 # 由於隊列已經滿了,因此下面的消息會出現異常,第一個 try 會等待2秒後再拋出異常,
13 # 第二個 try 會馬上拋出異常
14 try:
15     q.put("Message04", True, 2)
16 except:
17     print("消息隊列已滿,現有消息數量:%s"%q.qsize())
18 
19 try:
20     q.put_nowait("Message04")
21 except:
22     print("消息隊列已滿,現有消息數量:%s"%q.qsize())
23 
24 # 推薦使用的方式,先判斷隊列是否已滿,再寫入
25 if not q.full():
26     q.put_nowait("Message04")
27 
28 # 讀取消息的時候,先判斷消息隊列是否爲空,再讀取
29 if not q.empty():
30     for i in range(q.qsize()):
31         print(q.get_nowait())

隊列:
  爲何要用隊列?列表也很好用啊。:數據安全
  建立方法:
    模式1:FIFO -- queue.Queue()
    模式2:FILO -- queue.LifoQueue()
    模式3:priorty -- queue.PriorityQueue()
      q.put([1, 'hello'])
      q.put([2, 'world'])
      級別 1 比 2 低, 1 先出來

  方法的參數:
    put()
      調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常
    get()
      調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。

  其它方法:
    q.empty() 若是隊列爲空,返回True,反之False
    q.full() 若是隊列滿了,返回True,反之False
    q.full 與 maxsize 大小對應
    q.get([block[, timeout]]) 獲取隊列,timeout等待時間
    q.get_nowait() 至關q.get(False)
    非阻塞 q.put(item) 寫入隊列,timeout等待時間
    q.put_nowait(item) 至關q.put(item, False)
    q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號

    q.join() 實際上意味着等到隊列爲空,再執行別的操做
    # join多少次,就須要用幾回 task_donepython

多進程優勢:

  1. 能夠利用多核實現並行運算安全

缺點:

  1. 開銷大
  2. 通訊困難併發

 

管道Pipe

multiprocessing.Pipe([duplex])
返回2個鏈接對象(conn1, conn2),表明管道的兩端,
默認是雙向通訊.若是duplex=False,conn1只能用來接收消息,conn2只能用來發送消息.

主要用到的方法:
send() 發送數據
recv() 接收數據
 1 import multiprocessing
 2 
 3 
 4 from multiprocessing import Process, Pipe
 5 
 6 def send(pipe):
 7     pipe.send(['spam'] + [42, 'egg'])
 8     pipe.close()
 9 
10 def talk(pipe):
11     pipe.send(dict(name = 'Bob', spam = 42))
12     reply = pipe.recv()
13     print('talker got:', reply)
14 
15 if __name__ == '__main__':
16     (con1, con2) = Pipe()
17     sender = Process(target = send, name = 'send', args = (con1, ))
18     sender.start()
19     print("con2 got: %s" % con2.recv())#從send收到消息
20     con2.close()
21 
22     (parentEnd, childEnd) = Pipe()
23     child = Process(target = talk, name = 'talk', args = (childEnd,))
24     child.start()
25     print('parent got:', parentEnd.recv())
26     parentEnd.send({x * 2 for x in 'spam'})
27     child.join()
28     print('parent exit')

 

進程間的信息共享Manage

Python中進程間共享數據,處理基本的queue,pipe外,還提供了更高層次的封裝。使用multiprocessing.Manager能夠簡單地使用這些高級接口。

Manager支持的類型有list,dict,Namespace, Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
 1 import multiprocessing
 2 import time
 3 
 4 
 5 def worker(d, key, value):
 6     d[key] = value
 7 
 8 if __name__ == '__main__':
 9     mgr = multiprocessing.Manager()
10 
11     d = mgr.dict()
12     jobs = [multiprocessing.Process(target=worker, args=(d, i, i*2))
13              for i in range(10)
14              ]
15     for j in jobs:
16         j.start()
17     for j in jobs:
18         j.join()
19     print('Results:' )
20     for key, value in enumerate(dict(d)):
21         print("%s=%s:%s" % (key, value, d[value]))
22 
23 
24     print("================================================================")
25 
26     manager = multiprocessing.Manager()
27     Global = manager.Namespace()
28     Global.x = 10
29     Global.y = 'hello'
30     print(Global)
31 
32     print("==================================================================")

 

問題:

列表不可變
在學習python多進程管理manager時候,當不使用join對當前進程(主進程)進行阻塞時會報錯:

這樣進行一下總結:在使用manager管理/進行多進程及其數據交互時候,必須對每個manager內的進程進行join-------待全部子進程完成後再回到主進程。

 

多進程之進程池

 1 import time
 2 from multiprocessing import Pool
 3 
 4 def worker():
 5     for i in range(10):
 6         print("From worker %s"%i)
 7         time.sleep(0.5)
 8 
 9 def foo():
10     for i in range(10):
11         print("From foo %s"%i)
12         time.sleep(0.5)
13 
14 def bar():
15     for i in range(10):
16         print("From bar %s"%i)
17         time.sleep(0.5)
18 
19 if __name__ == "__main__":
20     pool = Pool(4)            # 建立Pool對象, 3 表示同時最多能夠增長 3 條進程
21     pool.apply_async(worker)
22     pool.apply_async(worker)
23     pool.apply_async(worker)
24     pool.apply_async(foo)
25     pool.apply_async(foo)
26     pool.apply_async(foo)
27     pool.apply_async(bar)
28     pool.apply_async(bar)
29     pool.apply_async(bar)
30 
31     pool.close()                  # 關閉進程池,禁止添加任務
32     pool.join()          # 等待子進程結束後,主進程才往下走
33     print("Is done...")

 

併發之協程

 1 import time
 2 
 3 def consumer():
 4     r = ''
 5     while True:
 6         # 三、consumer經過yield拿到消息,處理,又經過yield把結果傳回;
 7         #    yield指令具備return關鍵字的做用。而後函數的堆棧會自動凍結(freeze)在這一行。
 8         #    當函數調用者的下一次利用next()或generator.send()或for-in來再次調用該函數時,
 9         #    就會從yield代碼的下一行開始,繼續執行,再返回下一次迭代結果。經過這種方式,迭代器能夠實現無限序列和惰性求值。
10         n = yield r
11         if not n:
12             return
13         print('[CONSUMER] ←← Consuming %s...' % n)
14         time.sleep(1)
15         r = '200 OK'
16 def produce(c):
17     # 一、首先調用c.next()啓動生成器
18     next(c)
19     n = 0
20     while n < 5:
21         n = n + 1
22         print('[PRODUCER] →→ Producing %s...' % n)
23         # 二、而後,一旦生產了東西,經過c.send(n)切換到consumer執行;
24         cr = c.send(n)
25         # 四、produce拿到consumer處理的結果,繼續生產下一條消息;
26         print('[PRODUCER] Consumer return: %s' % cr)
27     # 五、produce決定不生產了,經過c.close()關閉consumer,整個過程結束。
28     c.close()
29 if __name__=='__main__':
30     # 六、整個流程無鎖,由一個線程執行,produce和consumer協做完成任務,因此稱爲「協程」,而非線程的搶佔式多任務。
31     c = consumer()
32     produce(c)

 

協程封裝之greenlet

 1 import greenlet
 2 import time
 3 import random
 4 
 5 """
 6 建立方法:greenlet.greenlet(self, run=None, parent=None)
 7 主要方法:
 8         a.switch()    切換到 a 裏面執行
 9 """
10 def foo():
11     for i in range(10):
12         print("foo:",i)
13         time.sleep(random.random())
14         gb.switch()
15 
16 
17 def bar():
18     for i in range(10):
19         print("bar:", i)
20         time.sleep(random.random())
21         gf.switch()
22 
23 if __name__ == "__main__":
24     gf = greenlet.greenlet(foo)
25     gb = greenlet.greenlet(bar)
26 
27     gf.switch()

 

協程封裝之 gevent

 1 import gevent
 2 from gevent import monkey
 3 import time
 4 import random
 5 
 6 monkey.patch_all()      #  若是遇到 IO 阻塞,那麼就切換到下一個 協程的程序
 7 
 8 def foo():
 9     for i in range(10):
10         print("foo:",i)
11         time.sleep(random.random())
12 
13 
14 def bar():
15     for i in range(10):
16         print("bar:", i)
17         time.sleep(random.random())
18 
19 
20 if __name__ == "__main__":
21     gevent.joinall([gevent.spawn(foo),
22                     gevent.spawn(bar)])
23 
24 
25     # 固定用法,將裏面的函數放入到 協程的執行序列中
相關文章
相關標籤/搜索