Python異步非阻塞IO多路複用Select/Poll/Epoll使用,線程,進程,協程

1.使用select模擬socketserver僞併發處理客戶端請求,代碼以下:php

import socket
import select

sk = socket.socket()
sk.bind(('127.0.0.1', 9999,))
sk.listen(5)

inputs = [sk,]
outputs = []
messages = {}
# del messages[白宇]
# messages = {
#    白宇:[消息1,消息2,]
#    吳文煜:[消息1,消息2,]
# }
while True:
    rlist,wlist,elist, = select.select(inputs, outputs,[sk,],1)
    print(len(inputs),len(rlist),len(wlist), len(outputs))
    # 監聽sk(服務器端)對象,若是sk對象發生變化,表示有客戶端來鏈接了,此時rlist值爲【sk】
    # 監聽conn對象,若是conn發生變化,表示客戶端有新消息發送過來了,此時rlist的之爲 【客戶端】
    # rlist = 【吳文煜,】
    # rlist = 【張磊,白宇,】
    # rlist = [sk,]
    # wlist 發過消息的對象追加到添加到此列表中
    # elist socket出現異常狀況,追加到此列表中
    for r in rlist:
        if r == sk:
            # 新客戶來鏈接
            conn, address = r.accept()
            # conn是什麼?其實socket對象
            inputs.append(conn)
            messages[conn] = []
            conn.sendall(bytes('hello', encoding='utf-8'))
        else:
            # 有人給我發消息了
            print('=======')
            try:
                ret = r.recv(1024)
                # r.sendall(ret)
                if not ret:
                    raise Exception('斷開鏈接')
                else:
                    outputs.append(r)
                    messages[r].append(ret)
            except Exception as e:
                inputs.remove(r)
                del messages[r]

    # 全部給我發過消息的人
    for w in wlist:
        msg = messages[w].pop()
        resp = msg + bytes('response', encoding='utf-8')
        w.sendall(resp)
        outputs.remove(w)



# rlist = [sk,],rlist=[sk1,],rlist = [sk1,sk2]
# rlist = []
socket_select_server
import socket

sk = socket.socket()
sk.connect(("127.0.0.1", 9999,))
data = sk.recv(1024)
print(data)
while True:
    inp = input(">>>")
    sk.sendall(bytes(inp,encoding='utf-8'))
    print(sk.recv(1024))
sk.close()
client

有許多封裝好的異步非阻塞IO多路複用框架,底層在linux基於最新的epoll實現,爲了更好的使用,瞭解其底層原理仍是有必要的。
下面記錄下分別基於Select/Poll/Epoll的echo server實現。
Python Select Server,可監控事件數量有限制:html

#!/usr/bin/python
# -*- coding: utf-8 -*-
import select
import socket
import Queue
  
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1)
server_address= ('192.168.1.5',8080)
server.bind(server_address)
server.listen(10)
  
#select輪詢等待讀socket集合
inputs = [server]
#select輪詢等待寫socket集合
outputs = []
message_queues = {}
#select超時時間
timeout = 20
  
while True:
    print "等待活動鏈接......"
    readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
  
    if not (readable or writable or exceptional) :
        print "select超時無活動鏈接,從新select...... "
        continue; 
    #循環可讀事件
    for s in readable :
        #若是是server監聽的socket
        if s is server:
            #贊成鏈接
            connection, client_address = s.accept()
            print "新鏈接: ", client_address
            connection.setblocking(0)
            #將鏈接加入到select可讀事件隊列
            inputs.append(connection)
            #新建鏈接爲key的字典,寫回讀取到的消息
            message_queues[connection] = Queue.Queue()
        else:
            #不是本機監聽就是客戶端發來的消息
            data = s.recv(1024)
            if data :
                print "收到數據:" , data , "客戶端:",s.getpeername()
                message_queues[s].put(data)
                if s not in outputs:
                    #將讀取到的socket加入到可寫事件隊列
                    outputs.append(s)
            else:
                #空白消息,關閉鏈接
                print "關閉鏈接:", client_address
                if s in outputs :
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]
    for s in writable:
        try:
            msg = message_queues[s].get_nowait()
        except Queue.Empty:
            print "鏈接:" , s.getpeername() , '消息隊列爲空'
            outputs.remove(s)
        else:
            print "發送數據:" , msg , "", s.getpeername()
            s.send(msg)
      
    for s in exceptional:
        print "異常鏈接:", s.getpeername()
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
        del message_queues[s]
Python Select Code

Python Poll Server,Select升級版,無可監控事件數量限制,仍是要輪詢全部事件:python

#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket
import select
import Queue
  
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.5", 8080)
server.bind(server_address)
server.listen(5)
print "服務器啓動成功,監聽IP:" , server_address
message_queues = {}
#超時,毫秒
timeout = 5000
#監聽哪些事件
READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE = (READ_ONLY|select.POLLOUT)
#新建輪詢事件對象
poller = select.poll()
#註冊本機監聽socket到等待可讀事件事件集合
poller.register(server,READ_ONLY)
#文件描述符到socket映射
fd_to_socket = {server.fileno():server,}
while True:
    print "等待活動鏈接......"
    #輪詢註冊的事件集合
    events = poller.poll(timeout)
    if not events:
      print "poll超時,無活動鏈接,從新poll......"
      continue
    print "" , len(events), "個新事件,開始處理......"
    for fd ,flag in events:
        s = fd_to_socket[fd]
        #可讀事件
        if flag & (select.POLLIN | select.POLLPRI) :
            if s is server :
                #若是socket是監聽的server表明有新鏈接
                connection , client_address = s.accept()
                print "新鏈接:" , client_address
                connection.setblocking(False)
                  
                fd_to_socket[connection.fileno()] = connection
                #加入到等待讀事件集合
                poller.register(connection,READ_ONLY)
                message_queues[connection] = Queue.Queue()
            else :
                #接收客戶端發送的數據
                data = s.recv(1024)
                if data:
                    print "收到數據:" , data , "客戶端:" , s.getpeername()
                    message_queues[s].put(data)
                    #修改讀取到消息的鏈接到等待寫事件集合
                    poller.modify(s,READ_WRITE)
                else :
                    # Close the connection
                    print " closing" , s.getpeername()
                    # Stop listening for input on the connection
                    poller.unregister(s)
                    s.close()
                    del message_queues[s]
        #鏈接關閉事件
        elif flag & select.POLLHUP :
            print " Closing ", s.getpeername() ,"(HUP)"
            poller.unregister(s)
            s.close()
        #可寫事件
        elif flag & select.POLLOUT :
            try:
                msg = message_queues[s].get_nowait()
            except Queue.Empty:
                print s.getpeername() , " queue empty"
                poller.modify(s,READ_ONLY)
            else :
                print "發送數據:" , data , "客戶端:" , s.getpeername()
                s.send(msg)
        #異常事件
        elif flag & select.POLLERR:
            print " exception on" , s.getpeername()
            poller.unregister(s)
            s.close()
            del message_queues[s]
Python Poll server

Python Epoll Server,基於回調的事件通知模式,輕鬆管理大量鏈接:linux

#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket, select
import Queue
 
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.5", 8080)
serversocket.bind(server_address)
serversocket.listen(1)
print "服務器啓動成功,監聽IP:" , server_address
serversocket.setblocking(0)
timeout = 10
#新建epoll事件對象,後續要監控的事件添加到其中
epoll = select.epoll()
#添加服務器監聽fd到等待讀事件集合
epoll.register(serversocket.fileno(), select.EPOLLIN)
message_queues = {}
 
fd_to_socket = {serversocket.fileno():serversocket,}
while True:
  print "等待活動鏈接......"
  #輪詢註冊的事件集合
  events = epoll.poll(timeout)
  if not events:
     print "epoll超時無活動鏈接,從新輪詢......"
     continue
  print "" , len(events), "個新事件,開始處理......"
  for fd, event in events:
     socket = fd_to_socket[fd]
     #可讀事件
     if event & select.EPOLLIN:
         #若是活動socket爲服務器所監聽,有新鏈接
         if socket == serversocket:
            connection, address = serversocket.accept()
            print "新鏈接:" , address
            connection.setblocking(0)
            #註冊新鏈接fd到待讀事件集合
            epoll.register(connection.fileno(), select.EPOLLIN)
            fd_to_socket[connection.fileno()] = connection
            message_queues[connection] = Queue.Queue()
         #不然爲客戶端發送的數據
         else:
            data = socket.recv(1024)
            if data:
               print "收到數據:" , data , "客戶端:" , socket.getpeername()
               message_queues[socket].put(data)
               #修改讀取到消息的鏈接到等待寫事件集合
               epoll.modify(fd, select.EPOLLOUT)
     #可寫事件
     elif event & select.EPOLLOUT:
        try:
           msg = message_queues[socket].get_nowait()
        except Queue.Empty:
           print socket.getpeername() , " queue empty"
           epoll.modify(fd, select.EPOLLIN)
        else :
           print "發送數據:" , data , "客戶端:" , socket.getpeername()
           socket.send(msg)
     #關閉事件
     elif event & select.EPOLLHUP:
        epoll.unregister(fd)
        fd_to_socket[fd].close()
        del fd_to_socket[fd]
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()
Python Epoll server

初始化進程、線程與協成的概念瀏覽器

什麼是進程?安全

  進程,是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。前面的話我也沒懂,用非官方的白話來解釋就是——執行中的程序是進程,好比qq不是進程,可是當咱們雙擊qq開始使用它的時候,它就變成了一個進程。咱們寫的python程序,只有當咱們執行它的時候,它纔是進程。咱們正在執行的IE瀏覽器,QQ,pycharm都是進程,從操做系統的角度來說,每個進程都有它本身的內存空間,進程之間的內存是獨立的。服務器

什麼是線程?多線程

  線程,有時被稱爲輕量級進程,是程序執行流的最小單元。咱們能夠理解爲,線程是屬於進程的,咱們平時寫的簡單程序,是單線程的,多線程和單線程的區別在於多線程能夠同時處理多個任務,這時候咱們能夠理解爲多線程和多進程是同樣的,我能夠在個人進程中開啓一個線程放音樂,也能夠開啓另外的線程聊qq,可是進程之間的內存獨立,而屬於同一個進程多個線程之間的內存是共享的,多個線程能夠直接對它們所在進程的內存數據進行讀寫並在線程間進行交換。併發

進程與線程之間的關係app

先推薦一個連接,這篇文章用漫畫的形式講解了進程與線程的關係:http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html

 

在python界一直有着一個古老的傳說,那就是python的多線程是雞肋,那麼這個傳說的信度到底有多少呢?若是咱們的代碼是CPU密集型(涉及到大量的計算),多個線程的代碼頗有可能是線性執行的,因此這種狀況下多線程是雞肋,效率可能還不如單線程,由於有context switch(其實就是線程之間的切換和線程的建立等等都是須要消耗時間的);可是:若是是IO密集型,多線程能夠明顯提升效率。例如製做爬蟲,絕大多數時間爬蟲是在等待socket返回數據。這個時候C代碼裏是有release GIL的,最終結果是某個線程等待IO的時候其餘線程能夠繼續執行。

  那麼,爲何咱們大python會這麼不智能呢?咱們都知道,python是一種解釋性語言,在python執行的過程當中,須要解釋器一邊解釋一邊執行,咱們以前也介紹了,同一個進程的線程之間內存共享,那麼就會出現內存資源的安全問題,python爲了線程安全,就設置了全局解釋器鎖機制,既一個進程中同時只能有一個線程訪問cpu。做爲解釋型語言,python能引入多線程的概念就已經很是不易了,目前看到的資料php和perl等多線程機制都是不健全的。解釋型語言作多線程的艱難程度能夠想見。。。具體下面的連接推薦:python的最難問題。

   正是因爲python多線程的缺陷,咱們在這裏須要引入協成的概念。

什麼是協程?

  協程是一種用戶態的輕量級線程。若是說多進程對於多CPU,多線程對應多核CPU,那麼事件驅動和協程則是在充分挖掘不斷提升性能的單核CPU的潛力。咱們既能夠利用異步優點,又能夠避免反覆系統調用,還有進程切換形成的開銷,這就是協程。協程也是單線程,可是它能讓原來要使用異步+回調方式寫的非人類代碼,能夠用看似同步的方式寫出來。它是實現推拉互動的所謂非搶佔式協做的關鍵。對於python來講,因爲python多線程中全局解釋器致使的同時只能有一個線程訪問cpu,因此對協程需求就相比於其餘語言更爲緊迫。

進程、線程與協程

  從硬件發展來看,從最初的單核單CPU,到單核多CPU,多核多CPU,彷佛已經到了極限了,可是單核CPU性能卻還在不斷提高。server端也在不斷的發展變化。若是將程序分爲IO密集型應用和CPU密集型應用,兩者的server的發展以下:

    IO密集型應用: 多進程->多線程->事件驅動->協程
    CPU密集型應用:多進程-->多線程                                                                                                                                                                    

  調度和切換的時間:進程   >   線程   >  協程

偷懶的同窗看這裏→_→:不須要實現複雜的內存共享且需利用多cpu,用多進程;實現複雜的內存共享及IO密集型應用:多線程或協程;實現複雜的內存共享及CPU密集型應用:協程

在python界一直有着一個古老的傳說,那就是python的多線程是雞肋,那麼這個傳說的信度到底有多少呢?若是咱們的代碼是CPU密集型(涉及到大量的計算),多個線程的代碼頗有可能是線性執行的,因此這種狀況下多線程是雞肋,效率可能還不如單線程,由於有context switch(其實就是線程之間的切換和線程的建立等等都是須要消耗時間的);可是:若是是IO密集型,多線程能夠明顯提升效率。例如製做爬蟲,絕大多數時間爬蟲是在等待socket返回數據。這個時候C代碼裏是有release GIL的,最終結果是某個線程等待IO的時候其餘線程能夠繼續執行。

  那麼,爲何咱們大python會這麼不智能呢?咱們都知道,python是一種解釋性語言,在python執行的過程當中,須要解釋器一邊解釋一邊執行,咱們以前也介紹了,同一個進程的線程之間內存共享,那麼就會出現內存資源的安全問題,python爲了線程安全,就設置了全局解釋器鎖機制,既一個進程中同時只能有一個線程訪問cpu。做爲解釋型語言,python能引入多線程的概念就已經很是不易了,目前看到的資料php和perl等多線程機制都是不健全的。解釋型語言作多線程的艱難程度能夠想見。。。具體下面的連接推薦:python的最難問題。

   正是因爲python多線程的缺陷,咱們在這裏須要引入協成的概念。

什麼是協程?

  協程是一種用戶態的輕量級線程。若是說多進程對於多CPU,多線程對應多核CPU,那麼事件驅動和協程則是在充分挖掘不斷提升性能的單核CPU的潛力。咱們既能夠利用異步優點,又能夠避免反覆系統調用,還有進程切換形成的開銷,這就是協程。協程也是單線程,可是它能讓原來要使用異步+回調方式寫的非人類代碼,能夠用看似同步的方式寫出來。它是實現推拉互動的所謂非搶佔式協做的關鍵。對於python來講,因爲python多線程中全局解釋器致使的同時只能有一個線程訪問cpu,因此對協程需求就相比於其餘語言更爲緊迫。

進程、線程與協程

  從硬件發展來看,從最初的單核單CPU,到單核多CPU,多核多CPU,彷佛已經到了極限了,可是單核CPU性能卻還在不斷提高。server端也在不斷的發展變化。若是將程序分爲IO密集型應用和CPU密集型應用,兩者的server的發展以下:

    IO密集型應用: 多進程->多線程->事件驅動->協程
    CPU密集型應用:多進程-->多線程                                                                                                                                                                    

  調度和切換的時間:進程   >   線程   >  協程

偷懶的同窗看這裏→_→:不須要實現複雜的內存共享且需利用多cpu,用多進程;實現複雜的內存共享及IO密集型應用:多線程或協程;實現複雜的內存共享及CPU密集型應用:協程

相關文章
相關標籤/搜索