併發編程-epoll模型的探索與實踐

前言

咱們知道nginx的效率很是高,能處理上萬級的併發,其之因此高效離不開epoll的支持,python

epoll是什麼呢?,epoll是IO模型中的一種,屬於多路複用IO模型;linux

到這裏你應該想到了,select,的確select也是一種多路複用的IO模型,可是其單個select最多隻能同時處理1024個socket,效率實在算不上高,這時候epoll來救場了nginx

本文從阻塞IO模型的基礎上展開討論,一步步靠近epoll的實現原理,最後以一個簡單的epoll案例程序做爲結束面試

親手寫一個epoll,而後去虐面試官吧!服務器

在select的學習過程當中咱們知道了select 只能同時處理1024個客戶端,多線程

而多線程會遇到資源瓶頸,什麼纔是解決高併發最有效的方式呢併發

linux中提供了epoll 這種高效的多路複用IO模型app

注意其餘平臺沒有相應的實現因此epoll僅在linux中可用socket

程序阻塞過程分析

epoll代碼實現並不複雜,可是要搞清楚其高效的原理仍是須要花一些時間的函數

咱們從最原始的阻塞模型開始分析

假設系統目前運行了三個進程 A B C

進程A正在運行一下socket程序

server = socket.socket()
server.bind(("127.0.0.1",1688))
server.listen()
server.accept()

1.系統會建立文件描述符指向一個socket對象 ,其包含了讀寫緩衝區,已經進行等待隊列

2.當執行到accept / recv 時系統會講進程A 從工做隊列中移除

3.將進程A的引用添加到 socket對象的等待隊列中

進程的喚醒

1.當網卡收到數據後會現將數據寫入到緩衝區

2.發送中斷信號給CPU

3.CPU執行中斷程序,將數據從內核copy到socket的緩衝區

4.喚醒進程,即將進程A切換到就緒態,同時從socket的等待隊列中移除這個進程引用

select監控多個socket

select的實現思路比較直接

1.先將全部socket放到一個列表中,

2.遍歷這個列表將進程A 添加到每一個socket的等待隊列中 而後阻塞進程

3.當數據到達時,cpu執行中斷程序將數據copy給socket 同時喚醒處於等待隊列中的進程A

爲了防止重複添加等待隊列 還須要移除已經存在的進程A

4.進程A喚醒後 因爲不清楚那個socket有數據,因此須要遍歷一遍全部socket列表

從上面的過程當中不難看出

1.select,須要遍歷socket列表,頻繁的對等待隊列進行添加移除操做,

2.數據到達後還須要給變量全部socket才能獲知哪些socket有數據

兩個操做消耗的時間隨着要監控的socket的數量增長而大大增長,

處於效率考慮才規定了最大隻能監視1024個socket

epol要解決的問題

1.避免頻繁的對等待隊列進行操做
2.避免遍歷全部socket
對於第一個問題咱們先看select的處理方式
while True:
    r_list,w_list,x_list = select.select(rlist,wlist,xlist)

每次處理完一次讀寫後,都須要將全部過沖重複一遍,包括移除進程,添加進程,默認就會將進程添加到等待隊列,並阻塞住進程,然而等待隊列的更新操做並不頻繁,

因此對於第一個問題epoll採起的方案是,將對等待隊列的維護和,阻塞進程這兩個操做進行拆分,

相關代碼以下

import socket,select
server = socket.socket()
server.bind(("127.0.0.1",1688))
server.listen(5)

#建立epoll事件對象,後續要監控的事件添加到其中
epoll = select.epoll()
#註冊服務器監聽fd到等待讀事件集合
epoll.register(server.fileno(), select.EPOLLIN)

# 等待事件發生
while True:
    for sock,event in epoll.poll():
    pass

在epoll中register 與 unregister函數用於維護等待隊列

epoll.poll則用於阻塞進程

這樣一來就避免了 每次處理都須要從新操做等待隊列的問題

第二個問題是select中進程沒法獲知哪些socket是有數據的因此須要遍歷

epol爲了解決這個問題,在內核中維護了一個就緒列表,

1.建立epoll對象,epoll也會對應一個文件,由文件系統管理

2.執行register時,將epoll對象 添加到socket的等待隊列中

3.數據到達後,CPU執行中斷程序,將數據copy給socket

4.在epoll中,中斷程序接下來會執行epoll對象中的回調函數,傳入就緒的socket對象

5.將socket,添加到就緒列表中

6.喚醒epoll等待隊列中的進程,

進程喚醒後,因爲存在就緒列表,因此不須要再遍歷socket了,直接處理就緒列表便可

解決了這兩個問題後,併發量獲得大幅度提高,最大可同時維護上萬級別的socket

epoll相關函數

import select 導入select模塊

epoll = select.epoll() 建立一個epoll對象

epoll.register(文件句柄,事件類型) 註冊要監控的文件句柄和事件

事件類型:

  select.EPOLLIN    可讀事件

  select.EPOLLOUT   可寫事件

  select.EPOLLERR   錯誤事件

  select.EPOLLHUP   客戶端斷開事件

epoll.unregister(文件句柄)   銷燬文件句柄

epoll.poll(timeout)  當文件句柄發生變化,則會以列表的形式主動報告給用戶進程,timeout

                     爲超時時間,默認爲-1,即一直等待直到文件句柄發生變化,若是指定爲1

                     那麼epoll每1秒彙報一次當前文件句柄的變化狀況,若是無變化則返回空

epoll.fileno() 返回epoll的控制文件描述符(Return the epoll control file descriptor)

epoll.modfiy(fineno,event) fineno爲文件描述符 event爲事件類型  做用是修改文件描述符所對應的事件

epoll.fromfd(fileno) 從1個指定的文件描述符建立1個epoll對象

epoll.close()   關閉epoll對象的控制文件描述符

案例:

#coding:utf-8
#客戶端
#建立客戶端socket對象
import socket
clientsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
#服務端IP地址和端口號元組
server_address = ('127.0.0.1',1688)
#客戶端鏈接指定的IP地址和端口號
clientsocket.connect(server_address)

while True:
    #輸入數據
    data = raw_input('please input:')
    if data == "q":
        break
    if not data:
      continue
    #客戶端發送數據
    clientsocket.send(data.encode("utf-8"))
    #客戶端接收數據
    server_data = clientsocket.recv(1024)
    print ('客戶端收到的數據:',server_data)
#關閉客戶端socket
clientsocket.close()

服務器:

# coding:utf-8
import socket, select

server = socket.socket()
server.bind(("127.0.0.1", 1688))
server.listen(5)

msgs = []


fd_socket = {server.fileno(): server}
epoll = select.epoll()
# 註冊服務器的 寫就緒
epoll.register(server.fileno(), select.EPOLLIN)

while True:
    for fd, event in epoll.poll():
        sock = fd_socket[fd]
        print(fd, event)
        # 返回的是文件描述符 須要獲取對應socket
        if sock == server:  # 若是是服務器 就接受請求
            client, addr = server.accept()
            # 註冊客戶端寫就緒
            epoll.register(client.fileno(), select.EPOLLIN)
            # 添加對應關係
            fd_socket[client.fileno()] = client

        # 讀就緒
        elif event == select.EPOLLIN:
            data = sock.recv(2018)
            if not data:
                # 註銷事件
                epoll.unregister(fd)
                # 關閉socket
                sock.close()
                # 刪除socket對應關係
                del fd_socket[fd]
                print(" somebody fuck out...")
                continue

            print(data.decode("utf-8"))
            # 讀完數據 須要把數據發回去因此接下來更改成寫就緒=事件
            epoll.modify(fd, select.EPOLLOUT)
            #記錄數據
            msgs.append((sock,data.upper()))
        elif event == select.EPOLLOUT:
            for item in msgs[:]:
                if item[0] == sock:
                    sock.send(item[1])
                    msgs.remove(item)
            # 切換關注事件爲寫就緒
            epoll.modify(fd,select.EPOLLIN)

注意:上述代碼只能在linux下運行,由於epoll模型是linux內核提供的,上層代碼沒法實現!

相關文章
相關標籤/搜索