Python之路--Python基礎13--異步IO、Redis\Memcached緩存、RabbitMQ隊列

1、事件驅動與異步IO

  回顧:同步、異步、阻塞、非阻塞html

同步:python

  所謂同步,就是在發出一個功能調用時,在沒有獲得結果以前,該調用就不會返回。按照這個定義,其實絕大多數函數都是同步調用。可是通常而言,咱們在說同步、異步的時候,特指那些須要其餘部件協做或者須要必定時間完成的任務。程序員

舉例:編程

1. multiprocessing.Pool下的apply #發起同步調用後,就在原地等着任務結束,根本不考慮任務是在計算仍是在io阻塞,總之就是一股腦地等任務結束windows

2. concurrent.futures.ProcessPoolExecutor().submit(func,).result()安全

3. concurrent.futures.ThreadPoolExecutor().submit(func,).result()服務器

 

異步:網絡

  異步的概念和同步相對。當一個異步功能調用發出後,調用者不能馬上獲得結果。當該異步功能完成後,經過狀態、通知或回調來通知調用者。若是異步功能用狀態來通知,那麼調用者就須要每隔必定時間檢查一次,效率就很低(有些初學多線程編程的人,總喜歡用一個循環去檢查某個變量的值,這實際上是一 種很嚴重的錯誤)。若是是使用通知的方式,效率則很高,由於異步功能幾乎不須要作額外的操做。至於回調函數,其實和通知沒太多區別。多線程

舉例:併發

1. multiprocessing.Pool().apply_async()  #發起異步調用後,並不會等待任務結束才返回,相反,會當即獲取一個臨時結果(並非最終的結果,多是封裝好的一個對象)。

2. concurrent.futures.ProcessPoolExecutor(3).submit(func,)

3. concurrent.futures.ThreadPoolExecutor(3).submit(func,)

 

阻塞:

  阻塞調用是指調用結果返回以前,當前線程會被掛起(如遇到io操做)。函數只有在獲得結果以後纔會將阻塞的線程激活。有人也許會把阻塞調用和同步調用等同起來,實際上他是不一樣的。對於同步調用來講,不少時候當前線程仍是激活的,只是從邏輯上當前函數沒有返回而已。

舉例:

1. 同步調用:apply一個累計1億次的任務,該調用會一直等待,直到任務返回結果爲止,但並未阻塞住(即使是被搶走cpu的執行權限,那也是處於就緒態);

2. 阻塞調用:當socket工做在阻塞模式的時候,若是沒有數據的狀況下調用recv函數,則當前線程就會被掛起,直到有數據爲止。

 

非阻塞:

  非阻塞和阻塞的概念相對應,指在不能馬上獲得結果以前也會馬上返回,同時該函數不會阻塞當前線程。

 

對於一個network IO (這裏咱們以read舉例),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另外一個就是系統內核(kernel)。當一個read操做發生時,該操做會經歷兩個階段:

  1)等待數據準備 (Waiting for the data to be ready)

  2)將數據從內核拷貝到進程中(Copying the data from the kernel to the process)

記住這兩點很重要,由於這些IO模型的區別就是在兩個階段上各有不一樣的狀況。

 

一、輸入操做:read、readv、recv、recvfrom、recvmsg共5個函數,若是會阻塞狀態,則會經理wait data和copy data兩個階段,若是設置爲非阻塞則在wait 不到data時拋出異常

二、輸出操做:write、writev、send、sendto、sendmsg共5個函數,在發送緩衝區滿了會阻塞在原地,若是設置爲非阻塞,則會拋出異常

三、接收外來連接:accept,與輸入操做相似

四、發起外出連接:connect,與輸出操做相似

 

一般,咱們寫服務器處理模型的程序時,有如下幾種模型:
  一、每收到一個請求,建立一個新的進程,來處理該請求;
  二、每收到一個請求,建立一個新的線程,來處理該請求;
  三、每收到一個請求,放入一個事件列表,讓主進程經過非阻塞I/O方式來處理請求
上面的幾種方式,各有千秋,
第1中方法,因爲建立新的進程的開銷比較大,因此,會致使服務器性能比較差,但實現比較簡單。
第2種方式,因爲要涉及到線程的同步,有可能會面臨死鎖等問題。
第3種方式,在寫應用程序代碼時,邏輯比前面兩種都複雜。
綜合考慮各方面因素,通常廣泛認爲第(3)種方式是大多數網絡服務器採用的方式

 

一、事件驅動模型

在UI編程中,經常要對鼠標點擊進行相應,首先如何得到鼠標點擊呢?
方式一:建立一個線程,該線程一直循環檢測是否有鼠標點擊,那麼這個方式有如下幾個缺點
1. CPU資源浪費,可能鼠標點擊的頻率很是小,可是掃描線程仍是會一直循環檢測,這會形成不少的CPU資源浪費;若是掃描鼠標點擊的接口是阻塞的呢?
2. 若是是堵塞的,又會出現下面這樣的問題,若是咱們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,因爲掃描鼠標時被堵塞了,那麼可能永遠不會去掃描鍵盤;
3. 若是一個循環須要掃描的設備很是多,這又會引來響應時間的問題;
因此,該方式是很是很差的。

方式二:就是事件驅動模型
目前大部分的UI編程都是事件驅動模型,如不少UI平臺都會提供onClick()事件,這個事件就表明鼠標按下事件。事件驅動模型大致思路以下:
1. 有一個事件(消息)隊列;
2. 鼠標按下時,往這個隊列中增長一個點擊事件(消息);
3. 有個循環,不斷從隊列取出事件,根據不一樣的事件,調用不一樣的函數,如onClick()、onKeyDown()等;
4. 事件(消息)通常都各自保存各自的處理函數指針,這樣,每一個消息都有獨立的處理函數;

 

  事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。

  讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。

  在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。

  在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙且使人痛不欲生的bug。

  在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序儘量的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。

當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:

  一、程序中有許多任務,並且…

  二、任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)並且…

  三、在等待事件到來時,某些任務會阻塞。

當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。

網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。

  此處要提出一個問題,就是,上面的事件驅動模型中,只要一遇到IO就註冊一個事件,而後主程序就能夠繼續幹其它的事情了,只到io處理完畢後,繼續恢復以前中斷的任務,這本質上是怎麼實現的呢?哈哈,下面咱們就來一塊兒揭開這神祕的面紗。。。。

 

二、Select\Poll\Epoll異步IO

  select,poll,epoll都是IO多路複用的機制。I/O多路複用就是經過一種機制,一個進程能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。但select,poll,epoll本質上都是同步I/O,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需本身負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。

select

select(rlist, wlist, xlist, timeout=None)

  函數監視的文件描述符分3類,分別是writefds、readfds、和exceptfds。調用後select函數會阻塞,直到有描述符就緒(有數據 可讀、可寫、或者有except),或者超時(timeout指定等待時間,若是當即返回設爲null便可),函數返回。當select函數返回後,能夠 經過遍歷fdset,來找到就緒的描述符。

select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢。select的一 個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制,可是這樣也會形成效率的下降。

 

select 多併發socket 例子

#select socket server

#_*_coding:utf-8_*_
__author__ = 'YL'

import select import socket import sys import queue
server
= socket.socket() server.setblocking(0) server_addr = ('localhost',10000) print('starting up on %s port %s' % server_addr) server.bind(server_addr) server.listen(5) inputs = [server, ] #本身也要監測呀,由於server自己也是個fd outputs = [] message_queues = {} while True: print("waiting for next event...") readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,那程序就會一直阻塞在這裏 for s in readable: #每一個s就是一個socket if s is server: #別忘記,上面咱們server本身也當作一個fd放在了inputs列表裏,傳給了select,若是這個s是server,表明server這個fd就緒了, #就是有活動了, 什麼狀況下它纔有活動? 固然 是有新鏈接進來的時候 呀 #新鏈接進來了,接受這個鏈接 conn, client_addr = s.accept() print("new connection from",client_addr) conn.setblocking(0) inputs.append(conn) #爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接 #就會被交給select去監聽,若是這個鏈接的客戶端發來了數據 ,那這個鏈接的fd在server端就會變成就續的,select就會把這個鏈接返回,返回到 #readable 列表裏,而後你就能夠loop readable列表,取出這個鏈接,開始接收數據了, 下面就是這麼幹 的 message_queues[conn] = queue.Queue() #接收到客戶端的數據後,不馬上返回 ,暫存在隊列裏,之後發送 else: #s不是server的話,那就只能是一個 與客戶端創建的鏈接的fd了 #客戶端的數據過來了,在這接收 data = s.recv(1024) if data: print("收到來自[%s]的數據:" % s.getpeername()[0], data) message_queues[s].put(data) #收到的數據先放到queue裏,一會返回給客戶端 if s not in outputs: outputs.append(s) #爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端 else:#若是收不到data表明什麼呢? 表明客戶端斷開了呀 print("客戶端斷開了",s) if s in outputs: outputs.remove(s) #清理已斷開的鏈接 inputs.remove(s) #清理已斷開的鏈接 del message_queues[s] ##清理已斷開的鏈接
for s in writeable: try : next_msg = message_queues[s].get_nowait() except queue.Empty: print("client [%s]" %s.getpeername()[0], "queue is empty..") outputs.remove(s) else: print("sending msg to [%s]"%s.getpeername()[0], next_msg) s.send(next_msg.upper()) for s in exeptional: print("handling exception for ",s.getpeername()) inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s]
 
 
#select socket client
#_*_coding:utf-8_*_
__author__ = 'YL'

import socket import sys messages = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] server_address = ('localhost', 10000) # Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ] # Connect the socket to the port where the server is listening
print('connecting to %s port %s' % server_address) for s in socks: s.connect(server_address) for message in messages: # Send messages on both sockets
    for s in socks: print('%s: sending "%s"' % (s.getsockname(), message) ) s.send(message) # Read responses on both sockets
    for s in socks: data = s.recv(1024) print( '%s: received "%s"' % (s.getsockname(), data) ) if not data: print(sys.stderr, 'closing socket', s.getsockname() )

 

2、RabbitMQ隊列

安裝:http://www.rabbitmq.com/install-windows.html

安裝 python rabbitMQ module  :pip install pika

實現最簡單的隊列通訊:

send端

#!/usr/bin/env python
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #聲明queue
channel.queue_declare(queue='hello') #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
相關文章
相關標籤/搜索