Python selectors實現socket併發

selectors模塊
  •   此模塊容許基於選擇模塊原語構建高級別和高效的I / O多路複用。
  •   鼓勵用戶使用此模塊,除非他們想要精確控制使用的os級別的原語。

注:selectors也是包裝了select高級的包裝內置函數,它包裝了select與epoll,優先使用epoll windos內只支持select。併發

 

實現接收上萬併發
服務端:
import selectors
import socket

# 生成select實例對象
sel = selectors.DefaultSelector()

def accept(sock, mask):

    # 接收連接
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)

    # 連接設置非阻塞模式
    conn.setblocking(False)

    # 註冊conn,回調 read函數
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    data = conn.recv(1024)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)

        # 取消註冊
        sel.unregister(conn)

        # 關閉連接
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 9999))
sock.listen(10000)
sock.setblocking(False)

# 註冊server事件:
# 參數一:sock 進行監聽
# 參數二:selectors.EVENT_READ 執行動做
# 參數三:accept,只要來一個連接就回調這個函數
sel.register(sock, selectors.EVENT_READ, accept)

# 第一次調用server,register accept
# 第二次調用client,register read
while True:

    # 調用select:優先使用epoll
    events = sel.select()

    # 只要不阻塞就有調用的數據,返回一個列表
    # 默認阻塞,有活動連接就返回活動的連接列表
    for key, mask in events:

        # callback至關於調accept函數
        callback = key.data

        # 獲取函數內存地址,加入參數
        # key.fileobj = 文件句柄
        callback(key.fileobj, mask)

 

客戶端:socket

import socket
import sys

messages = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]

# 傳入連接參數
server_address = ('localhost', 9999)

# Create a TCP/IP socket
# 使用列表生成式,生成多個請求。
# Winuds使用select支持併發並很少 這裏測試40個併發
# Linux默認使用epoll可支持上萬併發可修改10000
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM)
          for i in range(40)]

print('connecting to %s port %s' % server_address)

# 循環socks連續連接4次
for s in socks:

    s.connect(server_address)

# 循環發送數據
for message in messages:

    # Send messages on both sockets
    # 每一個客戶端依次發送數據
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)

    # Read responses on both sockets
    # 服務端收數據
    for s in socks:
        data = s.recv(1024)

        # getsockname()服務端返回
        print( '%s: received "%s"' % (s.getsockname(), data) )

        # 沒有數據打印客戶端要關閉了
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname() )
注:創建一個socket就須要創建一個文件句柄,最大數65535
注:Linux修改文件描述符數值 當前執行
ulimit -SHn 65535    #修改連接端口
ulimit -n                #查看連接端口   
基於Linux下 須要修改配置

 

epoll實現多併發socketide

import socket, logging
import select, errno

logger = logging.getLogger("network-server")

def InitLog():
    logger.setLevel(logging.DEBUG)

    fh = logging.FileHandler("network-server.log")
    fh.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)

    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    fh.setFormatter(formatter)

    logger.addHandler(fh)
    logger.addHandler(ch)


if __name__ == "__main__":
    InitLog()

    try:
        # 建立 TCP socket 做爲監聽 socket
        listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    except socket.error as  msg:
        logger.error("create socket failed")

    try:
        # 設置 SO_REUSEADDR 選項
        listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    except socket.error as  msg:
        logger.error("setsocketopt SO_REUSEADDR failed")

    try:
        # 進行 bind -- 此處未指定 ip 地址,即 bind 了所有網卡 ip 上
        listen_fd.bind(('', 2003))
    except socket.error as  msg:
        logger.error("bind failed")

    try:
        # 設置 listen 的 backlog 數
        listen_fd.listen(10)
    except socket.error as  msg:
        logger.error(msg)

    try:
        # 建立 epoll 句柄
        epoll_fd = select.epoll()
        # 向 epoll 句柄中註冊 監聽 socket 的 可讀 事件
        epoll_fd.register(listen_fd.fileno(), select.EPOLLIN)
    except select.error as  msg:
        logger.error(msg)

    connections = {}
    addresses = {}
    datalist = {}
    while True:
        # epoll 進行 fd 掃描的地方 -- 未指定超時時間則爲阻塞等待
        epoll_list = epoll_fd.poll()

        for fd, events in epoll_list:
            # 若爲監聽 fd 被激活
            if fd == listen_fd.fileno():
                # 進行 accept -- 得到鏈接上來 client 的 ip 和 port,以及 socket 句柄
                conn, addr = listen_fd.accept()
                logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno()))
                # 將鏈接 socket 設置爲 非阻塞
                conn.setblocking(0)
                # 向 epoll 句柄中註冊 鏈接 socket 的 可讀 事件
                epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET)
                # 將 conn 和 addr 信息分別保存起來
                connections[conn.fileno()] = conn
                addresses[conn.fileno()] = addr
            elif select.EPOLLIN & events:
                # 有 可讀 事件激活
                datas = ''
                while True:
                    try:
                        # 從激活 fd 上 recv 10 字節數據
                        data = connections[fd].recv(10)
                        # 若當前沒有接收到數據,而且以前的累計數據也沒有
                        if not data and not datas:
                            # 從 epoll 句柄中移除該 鏈接 fd
                            epoll_fd.unregister(fd)
                            # server 側主動關閉該 鏈接 fd
                            connections[fd].close()
                            logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
                            break
                        else:
                            # 將接收到的數據拼接保存在 datas 中
                            datas += data
                    except socket.error as  msg:
                        # 在 非阻塞 socket 上進行 recv 須要處理 讀穿 的狀況
                        # 這裏其實是利用 讀穿 出 異常 的方式跳到這裏進行後續處理
                        if msg.errno == errno.EAGAIN:
                            logger.debug("%s receive %s" % (fd, datas))
                            # 將已接收數據保存起來
                            datalist[fd] = datas
                            # 更新 epoll 句柄中鏈接d 註冊事件爲 可寫
                            epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT)
                            break
                        else:
                            # 出錯處理
                            epoll_fd.unregister(fd)
                            connections[fd].close()
                            logger.error(msg)
                            break
            elif select.EPOLLHUP & events:
                # 有 HUP 事件激活
                epoll_fd.unregister(fd)
                connections[fd].close()
                logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
            elif select.EPOLLOUT & events:
                # 有 可寫 事件激活
                sendLen = 0
                # 經過 while 循環確保將 buf 中的數據所有發送出去
                while True:
                    # 將以前收到的數據發回 client -- 經過 sendLen 來控制發送位置
                    sendLen += connections[fd].send(datalist[fd][sendLen:])
                    # 在所有發送完畢後退出 while 循環
                    if sendLen == len(datalist[fd]):
                        break
                # 更新 epoll 句柄中鏈接 fd 註冊事件爲 可讀
                epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET)
            else:
                # 其餘 epoll 事件不進行處理
                continue
相關文章
相關標籤/搜索