#!/usr/bin/env python #coding:utf8 import threading,Queue import time import random def producer(name,n):#生產者 # while True:#無限循環生產包子 time.sleep(random.randrange(3))#random生成一個3之內不包含3的隨機數,經過隨機數決定等待多長時間,主要便於測試 if q.qsize()<4:#判斷隊列個數若是小於4程序繼續往下執行,#包子剩餘的個數若是小於4個才生產包子,避免浪費.每一個廚師在生產包子都會看還剩餘幾個 for i in range(2):#每一個廚師生產2個包子上傳到隊列 print '%s生產了[%d]個包子\n'%(name,n) q.put(n) q.join()#隊列的個數爲空則阻塞.#由於消費者每吃完一個包子都會告訴廚師,當全部包子都吃完廚師繼續生產包子.#就是繼續下一次循環 print '包子已賣光了,[%d]'%q.qsize() def consumer(name,n):#消費者 while True:#無限循環吃包子 print '%s 吃了[%d]個包子\n'%(name,n) q.get()#吃掉一個包子從隊列減1 time.sleep(1)#每一個消費者吃掉一個包子的時候等待1秒 q.task_done()#每一個消費者吃掉一個包子通知隊列(廚師) if __name__=='__main__': q=Queue.Queue() c_name=['z1','z2','z3','z4']#4個消費者 p_name=['p1','p2']#2個廚師 for name in p_name: p=threading.Thread(target=producer,args=[name,1,])#開啓2個線程調用producer函數,#2個廚師同時生產包子 p.start()#開啓線程,線程的開關 for name in c_name: c=threading.Thread(target=consumer,args=[name,1,])#開啓4個線程調用consumer函數,#4個消費者同時吃包子 c.start()#開啓線程,線程的開關
執行結果:python
p1生產了[1]個包子git
p1生產了[1]個包子程序員
z1 吃了[1]個包子github
z4 吃了[1]個包子
z1 吃了[1]個包子編程
包子已賣光了,[0]
數組
協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程。安全
協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:服務器
協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。網絡
協程的好處:多線程
缺點:
使用yield實現協程操做例子
import time import queue def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name,new_baozi)) #time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n +=1 con.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
#!/usr/bin/env python # -*- coding:utf-8 -*- from greenlet import greenlet def test1(): print 12 gr2.switch() print 34 gr2.switch() def test2(): print 56 gr1.switch() print 78 gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
import gevent def foo(): print('Running in foo') gevent.sleep(0) print('Explicit context switch to foo again') def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
輸出:
Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar
同步與異步的性能區別
import gevent def task(pid): """ Some non-deterministic task """ gevent.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(1,10): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in range(10)] gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn
。 初始化的greenlet列表存放在數組threads
中,此數組被傳給gevent.joinall
函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。
遇到IO阻塞時會自動切換任務
from gevent import monkey; monkey.patch_all() import gevent from urllib.request import urlopen def f(url): print('GET: %s' % url) resp = urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])
經過gevent實現單線程下的多socket併發
server side
import sys import socket import time import gevent from gevent import socket,monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli) def handle_request(s): try: while True: data = s.recv(1024) print("recv:", data) s.send(data) if not data: s.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: s.close() if __name__ == '__main__': server(8001)
client side
import socket HOST = 'localhost' # The remote host PORT = 8001 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: msg = bytes(input(">>:"),encoding="utf8") s.sendall(msg) data = s.recv(1024) #print(data) print('Received', repr(data)) s.close()
事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。
讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。
在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。
在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙且使人痛不欲生的bug。
在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序儘量的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。
當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:
當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。
網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。
select-server端代碼:
#!/usr/bin/env python #coding:utf8 import select import socket import time import sys import Queue#用於存放客服端發送過來的消息 server_ip=('0.0.0.0',9003)#定義元組,服務器IP,端口 sk=socket.socket()#實例化socket模塊的socket類建立一個對象爲sk sk.bind(server_ip)#調用sk對象中的bind方法,傳入參數.綁定IP和端口 sk.listen(20)#server端容許的最大鏈接數 sk.setblocking(False)#遇到IO的時候不阻塞 inputs=[sk,]#定義一個列表存放服務端和客服端socket對象 outputs=[]#定義一個列表存放客服端socket對象 message={}#定義一個字典存放"{客服端socket對象:隊列}",隊列中是放的客服端發送過來的消息 while True: """ select一共能夠設置4個參數 rList=inputs=[sk,客服端socket對象] wList=outputs[客服端socket對象] 第三個參數異常信息 第四個參數是超時時間,若是客服端沒有鏈接server端,0.5秒超時,程序會繼續往下執行 程序第一次啓動的時候 rList=sk 感知server的變化,只有客服端鏈接過來server纔會變化 若是客服端已經鏈接進來此時 inputs列表中至少存在2個元素 server的socket對象和client端的socket對象,select會遍歷列表中的每個元素並感知時候是否有變化 若是有變化那麼知足條件select不會阻塞程序繼續向下執行 若是select感知到rList發生變化,好比客服端給服務端發送消息,程序向下執行 """ rList,wList,error=select.select(inputs,outputs,inputs)#讀,寫,錯誤,超時時間 #客服端鏈接過來,rList [<socket._socketobject object at 0x101445750>] for r in rList: #若是rList有變化進入for循環,判斷r == <socket._socketobject object at 0x101445750> if r == sk:# conn,address=r.accept()#監聽客服端socket對象 inputs.append(conn)#把客服端socket對象放入inputs列表中 message[conn]=Queue.Queue()#message={socket對象:隊列} print address#打印客服端IP ####注視中客服端鏈接指的是客服端socket對象##### else: #若是r==客服端鏈接,前提條件是已經感知到客服端socket對象發生變化,程序纔會執行到此處 #監聽客服端發送過來的數據 data=r.recv(1024) if data: #若是有數據把客服端socket對象放入outputs列表中讓select感知wList的變化,也就是感知客服端是否發送過來消息,用於讀寫分離 print data #打印客服端發送過來的消息 outputs.append(r)#把客服端鏈接添加到outputs列表 message[r].put(data)#message[客服端鏈接].put(接受的數據), message{客服端socket對象:客服端發送過來的消息上次到隊列} else: inputs.remove(r)#若是客服端異常斷開,刪除inputs列表中客服端socket對象 del message[w]#若是客服端異常斷開,刪除message字典中客服端socket對象和客服端的消息隊列 #客服端異常斷開的時候會發送空數據,此時在inputs列表中刪除客服端鏈接 for w in wList: #select遍歷wList的時候感知到了變化,也就是服務端已經接受到客服端已經發送過來的消息了 try: data=message[w].get_nowait()#message[客服端socket對象]獲取到消息隊列,最後獲得發送過來的消息,get_nowait若是從隊列中沒有獲取到數據也不會阻塞 w.sendall(data)#發送數據給客服端 #給客服端發送數據 except Queue.Empty as e:#捕捉隊列是否爲空 if message[w]: del message[w]#刪除message字典中客服端socket對象和客服端的消息隊列 outputs.remove(w)#刪除outputs列表中客服端socket對象 #刪除客服端鏈接
select-client端代碼:
import socket server_ip=('127.0.0.1', 9003) sk=socket.socket() sk.connect(server_ip) while True: data=raw_input('Please:').strip() if len(data) ==0:continue sk.sendall(data) server_response=sk.recv(1024) print server_response