python下高性能的網絡模式: 多進程+多協程

     人生苦短,我用python。衆所周知,python因爲全局鎖的存在,使用多線程模型並不能提升程序性能。使用協程是較好的選擇。但這仍不能發揮機器多cpu的性能。因而乎,有了下文的 多進程+多協程的網絡模型。大致思路是使用gevent的StreamServer監聽一個端口。而後使用multiprocessing.Process啓動多進程。當有客戶端鏈接到來時,會隨機分配到某個進程的ClientHandler實例中去處理,只須要繼承自該實例,處理你的業務便可。若是沒有重載該類的函數,默認會把全部的新鏈接到來,數據到達,鏈接關閉事件經過multiprocessing.Queue傳遞給主進程(即開始啓動監聽端口的進程),而後由主程從隊列中取出事件處理便可。須要特別說明的是window不支持該模型,爲了方便在windows下調試業務代碼,windos下實現爲普通的單程gevent多協程模式。python

具體使用例子看代碼下面__main__部份,用python tcpserver.py啓動便可測試。windows

------------------------------------------------------------------代碼下如-------------------------------------------------------------------------------網絡

import gevent
from gevent import monkey
monkey.patch_all(thread=False)
from gevent.server import StreamServer
import gevent.queue
import multiprocessing
import os多線程

EVENT_CONNECT = 0
EVENT_RECV = 1
EVENT_SEND = 2
EVENT_CLOSE = 3
EVENT_STOP  = 4app


class ClientHandler(object):
    def __init__(self,socket, address,clientprocess):
        self.socket = socket
        self.address = address
        self._sock_id = 0
        self._is_close = False
        self._buffer = ""
        self._clientprocess = clientprocess
        self._send_queues = gevent.queue.Queue() 
        self._send_spawn = gevent.spawn(self._run_send)socket

    def set_socket_id(self,socket_id):
        self._sock_id = socket_idtcp

    def on_connect(self):
        self._clientprocess.on_connect(self._sock_id,self.address)
        return True函數

    def on_recv(self,data):
        self._clientprocess.on_recv(self._sock_id ,data)
        return Trueoop

    def on_close(self):
        self._clientprocess.on_close(self._sock_id)性能

    def send(self,data):
        self._send_queues.put((EVENT_SEND,data))

    def _run_send(self):
        combine_message = b""
        while self._is_close == False:
            msg = self._send_queues.get()
            if msg is None:
                continue
            (event,data) = msg
            if event == EVENT_SEND:
                try:
                    combine_message += data
                except:
                    pass
                if self._send_queues.qsize() > 0:
                    continue
                try:
                    self.socket.sendall(combine_message)
                    combine_message = b""
                except:
                    if self._is_close == False:
                       self._clientprocess.on_close(self._sock_id)
                    break
            elif event == EVENT_STOP:
                if combine_message != b"":
                    self.socket.sendall(combine_message)
                break
            else:
                break

    def close(self):
        if self._is_close:
            return
        self._is_close = True
        self._send_queues.put((EVENT_STOP,0))
        self.socket.close()
        gevent.joinall([self._send_spawn])


class ClientProcess:
    def __init__(self,server,index,queue_cls,hanlder_cls):                   
          self._index = index                 
          self._send_queues = queue_cls()
          self._hanlder_cls = hanlder_cls
          self.server = server
          self._is_stop = False
          
    def run(self):
        self._clients = {}                
        self._send_spawn = gevent.spawn(self._run_send)

    def stop(self):
        self.put((EVENT_STOP,0,0))

    def get(self,block=True,outtime=None):
        return self._send_queues.get(block,outtime)
    
    def put(self,data,block=True,outtime=None):
        self._send_queues.put(data,block,outtime)

    def on_new_connect(self,socket, address):
        client = self._hanlder_cls(socket, address,self)
        sock_id = id(client)*100+self._index
        self._clients[sock_id] = client       
        client.set_socket_id(sock_id)        
        if client.on_connect() == False:
          return

        while True:
            try:
               data = socket.recv(1024)            
               if data is not None and len(data)>0 and client.on_recv(data):                
                  continue
               else: 
                  client.on_close()               
                  break
            except:
                client.on_close()
                break


    def on_connect(self,sock_id,address):
        self.server.put((EVENT_CONNECT,sock_id,address))


    def on_close(self,sock_id):
        self.server.put((EVENT_CLOSE,sock_id,0))

    def on_recv(self,sock_id,data):
        self.server.put((EVENT_RECV,sock_id,data))

    def send(self,sock_id,data):
        try:
           self.put((EVENT_SEND,sock_id,data),True,0.001)
        except:
            print("send put timeout sock_id:%d"%(sock_id)) 
            return False
        return True

    def close_socket(self,sock_id):
        self.put((EVENT_CLOSE,sock_id,0))

    def __send__(self,sock_id,data):
        client = self._clients.get(sock_id,None)
        if client is not None:
           client.send(data)

    def __close_socket__(self,sock_id):
        client = self._clients.get(sock_id,None)
        if client is not None:
           client.close()
           self._clients.pop(sock_id)

    def _run_send(self):        
        msg = None
        print("_run_send pid:%d" %(os.getpid()))
        while True:
            try:
               msg = self.get(False)
            except :
                gevent.sleep(0.01)
                continue

            if msg is None: 
                continue
            (event,socket_id,data) = msg
            if event == EVENT_SEND:
                self.__send__(socket_id,data)
            elif event == EVENT_CLOSE:
                if socket_id != 0:
                   self.__close_socket__(socket_id)
            elif event == EVENT_STOP:
                for sock_id,client in self._clients.items():
                       self.__close_socket__(sock_id)
                break


class TcpServer:

    def __init__(self):
        self._server = StreamServer(('0.0.0.0',0), self.on_new_connect,backlog=100000)
        self._server.reuse_addr = 1
        self._recv_queues = None
        self._clent_process = []
        self._process_num = 0

    def start(self,ip,port,process_count = 1,queue_cls = gevent.queue.Queue,hanlder_cls = ClientHandler):
        self._recv_queues = queue_cls()
        self._process_num = process_count        
        self._server.set_listener((ip,port))        
        self._server.start()
        for i in range(process_count):
           self._clent_process.append(ClientProcess(self,i,queue_cls,hanlder_cls))
        self.event_loop()

    def event_loop(self):
        self.serve_forever(0)
    
    def get_recv_queue(self):
        return self._recv_queues

    def serve_forever(self,i):
        print("server:%d,pid:%d,index = %d" %(id(self),os.getpid(),i))
        self._index = i
        self._clent_process[i].run()
        self._send_spawn = gevent.spawn(self._server.serve_forever)
        

    def stop_server(self):        
        for client in self._clent_process:
            client.stop()
        self._server.close()

    def send(self,sock_id,data):
        index = sock_id %100
        if index >= self._process_num:
            return False
        client = self._clent_process[index]
        if client is not None:
           client.send(sock_id,data)
        return True

    def close_socket(self,sock_id):
        index = sock_id %100
        if index >= self._process_num:
            return False
        client = self._clent_process[index]
        if client is not None:
           client.close_socket(sock_id)
        return True

    def get(self,block=True,outtime=None):
        return self._recv_queues.get(block,outtime)
    
    def put(self,data,block=True,outtime=None):
        self._recv_queues.put(data,block,outtime)

    def on_new_connect(self,sock, address):         
        print("on_new_connect server:%d,pid:%d" %(id(self),os.getpid()))
        self._clent_process[self._index].on_new_connect(sock,address) 

class MultiprocessingTcpServer(TcpServer):

    def start(self,ip,port,process_count = 1,queue_cls =multiprocessing.Queue,hanlder_cls = ClientHandler):
        self._hanlder_cls = hanlder_cls
        self._queue_cls = queue_cls
        TcpServer.start(self,ip,port,process_count,queue_cls,hanlder_cls)

    def event_loop(self):
        for i in range(self._process_num):
           multiprocessing.Process(target=self.serve_forever, args=(i,)).start()

    def serve_forever(self,i):
        TcpServer.serve_forever(self,i)
        gevent.joinall([self._send_spawn])

    def add_process(self):
        index = len(self._clent_process)
        process = ClientProcess(self,index,self._queue_cls,self._hanlder_cls)
        self._clent_process.append(process)
        self._process_num += 1
        multiprocessing.Process(target=self.serve_forever, args=(index,)).start()
        

if __name__ == "__main__":        
    import sys
    process_num = 2    
    server = None
    if sys.platform == 'win32':
       server = TcpServer()  #windows下爲單程進程的gevent
    else:
       server = MultiprocessingTcpServer()   #其餘平臺是多進程+多協程

    server.start('0.0.0.0',8089,process_num)    #監聽端口     msg = None     while True:         try:             msg = server.get(True)     #獲取事件         except KeyboardInterrupt:             break         except :                 gevent.sleep(0.1)                 continue         if msg is not None:             (event,socket_id,data) = msg             if event == EVENT_RECV:   #有數據到來                server.send(socket_id,data)   #數據原樣返回                print("socket_id:%d ,recv data:%d" %(socket_id,len(data)))             elif event == EVENT_CONNECT:   #新鏈接到來                 print("new connect socket_id:%d,address:%s" %(socket_id,data))             else:    #鏈接斷開                server.close_socket(socket_id)                print("socket_id:%d close" %(socket_id))     server.stop_server()

相關文章
相關標籤/搜索