回顧:同步、異步、阻塞、非阻塞html
同步:python
所謂同步,就是在發出一個功能調用時,在沒有獲得結果以前,該調用就不會返回。按照這個定義,其實絕大多數函數都是同步調用。可是通常而言,咱們在說同步、異步的時候,特指那些須要其餘部件協做或者須要必定時間完成的任務。程序員
舉例:編程
1. multiprocessing.Pool下的apply #發起同步調用後,就在原地等着任務結束,根本不考慮任務是在計算仍是在io阻塞,總之就是一股腦地等任務結束windows
2. concurrent.futures.ProcessPoolExecutor().submit(func,).result()安全
3. concurrent.futures.ThreadPoolExecutor().submit(func,).result()服務器
異步:網絡
異步的概念和同步相對。當一個異步功能調用發出後,調用者不能馬上獲得結果。當該異步功能完成後,經過狀態、通知或回調來通知調用者。若是異步功能用狀態來通知,那麼調用者就須要每隔必定時間檢查一次,效率就很低(有些初學多線程編程的人,總喜歡用一個循環去檢查某個變量的值,這實際上是一 種很嚴重的錯誤)。若是是使用通知的方式,效率則很高,由於異步功能幾乎不須要作額外的操做。至於回調函數,其實和通知沒太多區別。多線程
舉例:併發
1. multiprocessing.Pool().apply_async() #發起異步調用後,並不會等待任務結束才返回,相反,會當即獲取一個臨時結果(並非最終的結果,多是封裝好的一個對象)。
2. concurrent.futures.ProcessPoolExecutor(3).submit(func,)
3. concurrent.futures.ThreadPoolExecutor(3).submit(func,)
阻塞:
阻塞調用是指調用結果返回以前,當前線程會被掛起(如遇到io操做)。函數只有在獲得結果以後纔會將阻塞的線程激活。有人也許會把阻塞調用和同步調用等同起來,實際上他是不一樣的。對於同步調用來講,不少時候當前線程仍是激活的,只是從邏輯上當前函數沒有返回而已。
舉例:
1. 同步調用:apply一個累計1億次的任務,該調用會一直等待,直到任務返回結果爲止,但並未阻塞住(即使是被搶走cpu的執行權限,那也是處於就緒態);
2. 阻塞調用:當socket工做在阻塞模式的時候,若是沒有數據的狀況下調用recv函數,則當前線程就會被掛起,直到有數據爲止。
非阻塞:
非阻塞和阻塞的概念相對應,指在不能馬上獲得結果以前也會馬上返回,同時該函數不會阻塞當前線程。
對於一個network IO (這裏咱們以read舉例),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另外一個就是系統內核(kernel)。當一個read操做發生時,該操做會經歷兩個階段:
1)等待數據準備 (Waiting for the data to be ready)
2)將數據從內核拷貝到進程中(Copying the data from the kernel to the process)
記住這兩點很重要,由於這些IO模型的區別就是在兩個階段上各有不一樣的狀況。
一、輸入操做:read、readv、recv、recvfrom、recvmsg共5個函數,若是會阻塞狀態,則會經理wait data和copy data兩個階段,若是設置爲非阻塞則在wait 不到data時拋出異常
二、輸出操做:write、writev、send、sendto、sendmsg共5個函數,在發送緩衝區滿了會阻塞在原地,若是設置爲非阻塞,則會拋出異常
三、接收外來連接:accept,與輸入操做相似
四、發起外出連接:connect,與輸出操做相似
在UI編程中,經常要對鼠標點擊進行相應,首先如何得到鼠標點擊呢?
方式一:建立一個線程,該線程一直循環檢測是否有鼠標點擊,那麼這個方式有如下幾個缺點:
1. CPU資源浪費,可能鼠標點擊的頻率很是小,可是掃描線程仍是會一直循環檢測,這會形成不少的CPU資源浪費;若是掃描鼠標點擊的接口是阻塞的呢?
2. 若是是堵塞的,又會出現下面這樣的問題,若是咱們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,因爲掃描鼠標時被堵塞了,那麼可能永遠不會去掃描鍵盤;
3. 若是一個循環須要掃描的設備很是多,這又會引來響應時間的問題;
因此,該方式是很是很差的。
方式二:就是事件驅動模型
目前大部分的UI編程都是事件驅動模型,如不少UI平臺都會提供onClick()事件,這個事件就表明鼠標按下事件。事件驅動模型大致思路以下:
1. 有一個事件(消息)隊列;
2. 鼠標按下時,往這個隊列中增長一個點擊事件(消息);
3. 有個循環,不斷從隊列取出事件,根據不一樣的事件,調用不一樣的函數,如onClick()、onKeyDown()等;
4. 事件(消息)通常都各自保存各自的處理函數指針,這樣,每一個消息都有獨立的處理函數;
事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。
讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。
在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。
在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙且使人痛不欲生的bug。
在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序儘量的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。
當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:
一、程序中有許多任務,並且…
二、任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)並且…
三、在等待事件到來時,某些任務會阻塞。
當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。
網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。
此處要提出一個問題,就是,上面的事件驅動模型中,只要一遇到IO就註冊一個事件,而後主程序就能夠繼續幹其它的事情了,只到io處理完畢後,繼續恢復以前中斷的任務,這本質上是怎麼實現的呢?哈哈,下面咱們就來一塊兒揭開這神祕的面紗。。。。
select,poll,epoll都是IO多路複用的機制。I/O多路複用就是經過一種機制,一個進程能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。但select,poll,epoll本質上都是同步I/O,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需本身負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。
select
select(rlist, wlist, xlist, timeout=None)
函數監視的文件描述符分3類,分別是writefds、readfds、和exceptfds。調用後select函數會阻塞,直到有描述符就緒(有數據 可讀、可寫、或者有except),或者超時(timeout指定等待時間,若是當即返回設爲null便可),函數返回。當select函數返回後,能夠 經過遍歷fdset,來找到就緒的描述符。
select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢。select的一 個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制,可是這樣也會形成效率的下降。
select 多併發socket 例子
#select socket server
#_*_coding:utf-8_*_
__author__ = 'YL'
import select import socket import sys import queue
server = socket.socket() server.setblocking(0) server_addr = ('localhost',10000) print('starting up on %s port %s' % server_addr) server.bind(server_addr) server.listen(5) inputs = [server, ] #本身也要監測呀,由於server自己也是個fd
outputs = [] message_queues = {} while True: print("waiting for next event...") readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,那程序就會一直阻塞在這裏
for s in readable: #每一個s就是一個socket
if s is server: #別忘記,上面咱們server本身也當作一個fd放在了inputs列表裏,傳給了select,若是這個s是server,表明server這個fd就緒了,
#就是有活動了, 什麼狀況下它纔有活動? 固然 是有新鏈接進來的時候 呀
#新鏈接進來了,接受這個鏈接
conn, client_addr = s.accept() print("new connection from",client_addr) conn.setblocking(0) inputs.append(conn) #爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接
#就會被交給select去監聽,若是這個鏈接的客戶端發來了數據 ,那這個鏈接的fd在server端就會變成就續的,select就會把這個鏈接返回,返回到
#readable 列表裏,而後你就能夠loop readable列表,取出這個鏈接,開始接收數據了, 下面就是這麼幹 的
message_queues[conn] = queue.Queue() #接收到客戶端的數據後,不馬上返回 ,暫存在隊列裏,之後發送
else: #s不是server的話,那就只能是一個 與客戶端創建的鏈接的fd了
#客戶端的數據過來了,在這接收
data = s.recv(1024) if data: print("收到來自[%s]的數據:" % s.getpeername()[0], data) message_queues[s].put(data) #收到的數據先放到queue裏,一會返回給客戶端
if s not in outputs: outputs.append(s) #爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端
else:#若是收不到data表明什麼呢? 表明客戶端斷開了呀
print("客戶端斷開了",s) if s in outputs: outputs.remove(s) #清理已斷開的鏈接
inputs.remove(s) #清理已斷開的鏈接
del message_queues[s] ##清理已斷開的鏈接
for s in writeable: try : next_msg = message_queues[s].get_nowait() except queue.Empty: print("client [%s]" %s.getpeername()[0], "queue is empty..") outputs.remove(s) else: print("sending msg to [%s]"%s.getpeername()[0], next_msg) s.send(next_msg.upper()) for s in exeptional: print("handling exception for ",s.getpeername()) inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s]
#select socket client
#_*_coding:utf-8_*_
__author__ = 'YL'
import socket import sys messages = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] server_address = ('localhost', 10000) # Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ] # Connect the socket to the port where the server is listening
print('connecting to %s port %s' % server_address) for s in socks: s.connect(server_address) for message in messages: # Send messages on both sockets
for s in socks: print('%s: sending "%s"' % (s.getsockname(), message) ) s.send(message) # Read responses on both sockets
for s in socks: data = s.recv(1024) print( '%s: received "%s"' % (s.getsockname(), data) ) if not data: print(sys.stderr, 'closing socket', s.getsockname() )
安裝:http://www.rabbitmq.com/install-windows.html
安裝 python rabbitMQ module :pip install pika
實現最簡單的隊列通訊:
send端
#!/usr/bin/env python
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #聲明queue
channel.queue_declare(queue='hello') #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()