併發網絡通訊模型

常見模型分類

循環服務器模型 :循環接收客戶端請求,處理請求。同一時刻只能處理一個請求,處理完畢後再數據庫

處理下一個。服務器

  • 優勢:實現簡單,佔用資源少
  • 缺點:沒法同時處理多個客戶端請求
  • 適用狀況:處理的任務能夠很快完成,客戶端無需長期佔用服務端程序。udp比tcp更適合循環。

IO併發模型:利用IO多路複用,異步IO等技術,同時處理多個客戶端IO請求。網絡

  • 優勢 : 資源消耗少,能同時高效處理多個IO行爲
  • 缺點 : 只能處理併發產生的IO事件,沒法處理cpu計算
  • 適用狀況:HTTP請求,網絡傳輸等都是IO行爲。

多進程/線程網絡併發模型:每當一個客戶端鏈接服務器,就建立一個新的進程/線程爲該客戶端服務,客戶端退出時再銷燬該進程/線程。多線程

  • 優勢:能同時知足多個客戶端長期佔有服務端需求,能夠處理各類請求。
  • 缺點: 資源消耗較大
  • 適用狀況:客戶端同時鏈接量較少,須要處理行爲較複雜狀況。

基於fork的多進程網絡併發模型

  1. 建立監聽套接字
  2. 等待接收客戶端請求
  3. 客戶端鏈接建立新的進程處理客戶端請求
  4. 原進程繼續等待其餘客戶端鏈接
  5. 若是客戶端退出,則銷燬對應的進程
from socket import *
import os
import signal

# 建立監聽套接字
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST,PORT)

# 客戶端服務函數
def handle(c):
  while True:
    data = c.recv(1024)
    if not data:
      break
    print(data.decode())
    c.send(b'OK')
  c.close()

s = socket()  # tcp套接字
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)   # 設置套接字端口重用
s.bind(ADDR)
s.listen(3)

signal.signal(signal.SIGCHLD,signal.SIG_IGN)    # 處理殭屍進程

print("Listen the port %d..." % PORT)

# 循環等待客戶端鏈接
while True:
  try:
    c,addr = s.accept()
  except KeyboardInterrupt:
    os._exit(0)
  except Exception as e:
    print(e)
    continue

  # 建立子進程處理這個客戶端
  pid = os.fork()
  if pid == 0:  # 處理客戶端請求
    s.close()
    handle(c)
    os._exit(0)  # handle處理完客戶端請求子進程也退出

  # 不管出錯或者父進程都要循環回去接受請求
  # c對於父進程沒用
  c.close() 

基於threading的多線程網絡併發

  1. 建立監聽套接字
  2. 循環接收客戶端鏈接請求
  3. 當有新的客戶端鏈接建立線程處理客戶端請求
  4. 主線程繼續等待其餘客戶端鏈接
  5. 當客戶端退出,則對應分支線程退出
from socket import *
from threading import Thread
import sys

# 建立監聽套接字
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST,PORT)

# 處理客戶端請求
def handle(c):
  while True:
    data = c.recv(1024)
    if not data:
      break
    print(data.decode())
    c.send(b'OK')
  c.close()

s = socket()  # tcp套接字
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(ADDR)
s.listen(3)

print("Listen the port %d..."%PORT)
# 循環等待客戶端鏈接
while True:
  try:
    c,addr = s.accept()
  except KeyboardInterrupt:
    sys.exit("服務器退出")
  except Exception as e:
    print(e)
    continue

  # 建立線程處理客戶端請求
  t = Thread(target=handle, args=(c,))
  t.setDaemon(True)   # 父進程結束則全部進程終止
  t.start()

ftp 文件服務器

項目功能 :併發

* 客戶端有簡單的頁面命令提示: 功能包含:app

  1. 查看服務器文件庫中的文件列表(普通文件)
  2. 能夠下載其中的某個文件到本地
  3. 能夠上傳客戶端文件到服務器文件庫 

* 服務器需求 :異步

  1. 容許多個客戶端同時操做
  2. 每一個客戶端可能回連續發送命令 

技術分析:socket

  1. tcp套接字更適合文件傳輸
  2. 併發方案 ---》 fork 多進程併發
  3. 對文件的讀寫操做
  4. 獲取文件列表 ----》 os.listdir() 

粘包的處理tcp

總體結構設計ide

  1. 服務器功能封裝在類中(上傳,下載,查看列表)
  2. 建立套接字,流程函數調用 main()
  3. 客戶端負責發起請求,接受回覆,展現

服務端負責接受請求,邏輯處理

from socket import *
from threading import Thread import os import time # 全局變量
HOST = '0.0.0.0' PORT = 8080 ADDR = (HOST,PORT) FTP = "/home/tarena/FTP/"  # 文件庫位置

# 建立文件服務器服務端功能類
class FTPServer(Thread): def __init__(self,connfd): self.connfd = connfd super().__init__() def do_list(self): # 獲取文件列表
    files = os.listdir(FTP) if not files: self.connfd.send("文件庫爲空".encode()) return
    else: self.connfd.send(b'OK') time.sleep(0.1)  # 防止和後面發送內容粘包

    # 拼接文件列表
    files_ = ""
    for file in files: if file[0] != '.' and \ os.path.isfile(FTP+file): files_ += file + '\n' self.connfd.send(files_.encode()) def do_get(self,filename): try: fd = open(FTP+filename,'rb') except Exception: self.connfd.send("文件不存在".encode()) return
    else: self.connfd.send(b'OK') time.sleep(0.1) # 文件發送
    while True: data = fd.read(1024) if not data: time.sleep(0.1) self.connfd.send(b'##') break self.connfd.send(data) # 循環接收客戶端請求
  def run(self): while True: data = self.connfd.recv(1024).decode() if not data or data == 'Q': return 
      elif data == 'L': self.do_list() elif data[0] == 'G':   # G filename
        filename = data.split(' ')[-1] self.do_get(filename) # 網絡搭建
def main(): # 建立套接字
  sockfd = socket() sockfd.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) sockfd.bind(ADDR) sockfd.listen(3) print("Listen the port %d..."%PORT) while True: try: connfd,addr = sockfd.accept() print("Connect from",addr) except KeyboardInterrupt: print("服務器程序退出") return
    except Exception as e: print(e) continue

    # 建立新的線程處理客戶端
    client = FTPServer(connfd) client.setDaemon(True) client.start() # 運行run方法


if __name__ == "__main__": main()
ftp_sever
from socket import *
import sys ADDR = ('127.0.0.1',8080) # 服務器地址

# 客戶端功能處理類
class FTPClient: def __init__(self,sockfd): self.sockfd = sockfd def do_list(self): self.sockfd.send(b'L')  # 發送請求
    # 等待回覆
    data = self.sockfd.recv(128).decode() if data == 'OK': # 一次接收文件列表字符串
      data = self.sockfd.recv(4096) print(data.decode()) else: print(data) def do_get(self,filename): # 發送請求
    self.sockfd.send(('G '+filename).encode()) # 等待回覆
    data = self.sockfd.recv(128).decode() if data == 'OK': fd = open(filename,'wb') # 接收文件
      while True: data = self.sockfd.recv(1024) if data == b'##': break fd.write(data) fd.close() else: print(data) def do_quit(self): self.sockfd.send(b'Q') self.sockfd.close() sys.exit("謝謝使用") # 建立客戶端網絡
def main(): sockfd = socket() try: sockfd.connect(ADDR) except Exception as e: print(e) return ftp = FTPClient(sockfd) # 實例化對象

  # 循環發送請求
  while True: print("\n=========命令選項==========") print("**** list ****") print("**** get file ****") print("**** put file ****") print("**** quit ****") print("=============================") cmd = input("輸入命令:") if cmd.strip() == 'list': ftp.do_list() elif cmd[:3] == 'get': # get filename
      filename = cmd.strip().split(' ')[-1] ftp.do_get(filename) elif cmd[:3] == 'put': # put ../filename
      filename = cmd.strip().split(' ')[-1] ftp.do_put(filename) elif cmd.strip() == 'quit': ftp.do_quit() else: print("請輸入正確命令") if __name__ == "__main__": main()
ftp_client

IO併發

定義:在內存中數據交換的操做被定義爲IO操做,IO------輸入輸出

  內存和磁盤進行數據交換: 文件的讀寫 數據庫更新
  內存和終端數據交換 : input  print
              sys.stdin  sys.stdout   sys.stderr
  內存和網絡數據的交換: 網絡鏈接 recv send recvfrom

  IO密集型程序 : 程序執行中有大量的IO操做,而較少的cpu運算操做。消耗cpu較少,IO運行時間長

  CPU(計算)密集型程序:程序中存在大量的cpu運算,IO操做相對較少,消耗cpu大。

IO分類

IO分爲:阻塞IO、非阻塞IO、IO多路複用、事件驅動IO、異步IO

阻塞IO 

  • 定義: 在執行IO操做時若是執行條件不知足則阻塞。阻塞IO是IO的默認形態。
  • 效率: 阻塞IO是效率很低的一種IO。可是因爲邏輯簡單因此是默認IO行爲。

阻塞狀況:

  • 由於某種執行條件沒有知足形成的函數阻塞  e.g. accept input recv
  • 處理IO的時間較長產生的阻塞狀態  e.g. 網絡傳輸, 大文件讀寫

非阻塞IO

定義 : 經過修改IO屬性行爲, 使本來阻塞的IO變爲非阻塞的狀態。

  • 設置套接字爲非阻塞IO
    • sockfd.setblocking(bool)
    • 功能: 設置套接字爲非阻塞IO
    • 參數: 默認爲True,表示套接字IO阻塞;設置爲False則套接字IO變爲非阻塞
  • 超時檢測 :設置一個最長阻塞時間,超過該時間後則再也不阻塞等待。
    • sockfd.settimeout(sec)
    • 功能:設置套接字的超時時間
    • 參數:設置的時間

IO多路複用

定義 經過一個監測,能夠同時監控多個IO事件的行爲。當哪一個IO事件能夠執行,即讓這個IO事件發生。

rs, ws, xs = select(rlist, wlist, xlist[, timeout])  監控IO事件,阻塞等待監控的IO時間發生

參數 :

  • rlist  列表,存放(被動)等待處理的IO (接收)
  • wlist  列表,存放主動處理的IO(發送)
  • xlist  列表,存放出錯,但願去處理的IO(異常)
  • timeout   超時檢測

返回值:

  • rs  列表  rlist中準備就緒的IO
  • ws  列表  wlist中準備就緒的IO
  • xs  列表  xlist中準備就緒的IO

select 實現tcp服務

  1. 將關注的IO放入對應的監控類別列表
  2. 經過select函數進行監控
  3. 遍歷select返回值列表,肯定就緒IO事件
  4. 處理髮生的IO事件
from socket import *
from select import select

# 建立一個監聽套接字做爲關注的IO
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)

# 設置關注列表
rlist = [s]
wlist = []
xlist = [s]

# 循環監控IO
while True:
  rs,ws,xs = select(rlist,wlist,xlist)
  # 遍歷三個返回列表,處理IO
  for r in rs:
    # 根據遍歷到IO的不一樣使用if分狀況處理
    if r is s:
      c,addr = r.accept()
      print("Connect from",addr)
      rlist.append(c) # 增長新的IO事件
    # else爲客戶端套接字就緒狀況
    else:
      data = r.recv(1024)
      # 客戶端退出
      if not data:
        rlist.remove(r) # 從關注列表移除
        r.close()
        continue # 繼續處理其餘就緒IO
      print("Receive:",data.decode())
      # r.send(b'OK')
      # 咱們但願主動處理這個IO對象
      wlist.append(r)

  for w in ws:
    w.send(b'OK')
    wlist.remove(w) # 使用後移除

  for x in xs:
    pass 

注意: 

  • wlist中若是存在IO事件,則select當即返回給ws
  • 處理IO過程當中不要出現死循環佔有服務端的狀況
  • IO多路複用消耗資源較少,效率較高

擴展: 位運算

將整數轉換爲二進制, 按照二進制位進行運算符操做
  & 按位與   | 按位或   ^ 按位異或    << 左移   >> 右移
  11 1011    14 1110
  (11 & 14    1010)   (11 | 14    1111)   (11 ^ 14    0101 )
  11 << 2 ===> 44 右側補0    14 >> 2 ===> 3 擠掉右側的數字
  使用 : 1. 在作底層硬件時操做寄存器
      2. 作標誌位的過濾

poll方法實現IO多路複用

p = select.poll()    建立poll對象

p.register(fd,event)  註冊關注的IO事件

  • fd       要關注的IO
  • event 要關注的IO事件類型

  經常使用類型:

    1. POLLIN 讀IO事件(rlist)
    2. POLLOUT 寫IO事件 (wlist)
    3. POLLERR 異常IO (xlist)
    4. POLLHUP 斷開鏈接

      e.g. p.register(sockfd,POLLIN|POLLERR)

p.unregister(fd)    取消對IO的關注

  • 參數: IO對象或者IO對象的fileno

events = p.poll()       

  • 功能:    阻塞等待監控的IO事件發生
  • 返回值: 返回發生的IO事件

    events 是一個列表 [(fileno,evnet),(),()....]

    每一個元組爲一個就緒IO,元組第一項是該IO的fileno,第二項爲該IO就緒的事件類型

poll_server 步驟

  1. 建立套接字
  2. 將套接字register
  3. 建立查找字典,並維護
  4. 循環監控IO發生
  5. 處理髮生的IO
from socket import *
from select import *

# 建立套接字
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)

# 建立poll對象關注s
p = poll()

# 創建查找字典,用於經過fileno查找IO對象
fdmap = {s.fileno():s}

# 關注s
p.register(s,POLLIN|POLLERR)

# 循環監控
while True:
  events = p.poll()
  # 循環遍歷發生的事件 fd-->fileno
  for fd,event in events:
    # 區分事件進行處理
    if fd == s.fileno():
      c,addr = fdmap[fd].accept()
      print("Connect from",addr)
      # 添加新的關注IO
      p.register(c,POLLIN|POLLERR)
      fdmap[c.fileno()] = c # 維護字典
    # 按位與斷定是POLLIN就緒
    elif event & POLLIN:
      data = fdmap[fd].recv(1024)
      if not data:
        p.unregister(fd) # 取消關注
        fdmap[fd].close()
        del fdmap[fd]  # 從字典中刪除
        continue
      print("Receive:",data.decode())
      fdmap[fd].send(b'OK')

 

 

epoll方法

1. 使用方法 : 基本與poll相同

  • 生成對象改成 epoll()
  • 將全部事件類型改成EPOLL類型

2. epoll特色

  • epoll 效率比select poll要高
  • epoll 監控IO數量比select要多
  • epoll 的觸發方式比poll要多 (EPOLLET邊緣觸發)

 

from socket import *
from select import *

# 建立套接字
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)

# 建立epoll對象關注s
ep = epoll()

# 創建查找字典,用於經過fileno查找IO對象
fdmap = {s.fileno():s}

# 關注s
ep.register(s,EPOLLIN|EPOLLERR)

# 循環監控
while True:
  events = ep.poll()
  # 循環遍歷發生的事件 fd-->fileno
  for fd,event in events:
    print("親,你有IO須要處理哦")
    # 區分事件進行處理
    if fd == s.fileno():
      c,addr = fdmap[fd].accept()
      print("Connect from",addr)
      # 添加新的關注IO
      # 將觸發方式變爲邊緣觸發
      ep.register(c,EPOLLIN|EPOLLERR|EPOLLET)
      fdmap[c.fileno()] = c # 維護字典
    # 按位與斷定是EPOLLIN就緒
    # elif event & EPOLLIN:
    #   data = fdmap[fd].recv(1024)
    #   if not data:
    #     ep.unregister(fd) # 取消關注
    #     fdmap[fd].close()
    #     del fdmap[fd]  # 從字典中刪除
    #     continue
    #   print("Receive:",data.decode())
    #   fdmap[fd].send(b'OK')
相關文章
相關標籤/搜索