人生苦短,我用python。衆所周知,python因爲全局鎖的存在,使用多線程模型並不能提升程序性能。使用協程是較好的選擇。但這仍不能發揮機器多cpu的性能。因而乎,有了下文的 多進程+多協程的網絡模型。大致思路是使用gevent的StreamServer監聽一個端口。而後使用multiprocessing.Process啓動多進程。當有客戶端鏈接到來時,會隨機分配到某個進程的ClientHandler實例中去處理,只須要繼承自該實例,處理你的業務便可。若是沒有重載該類的函數,默認會把全部的新鏈接到來,數據到達,鏈接關閉事件經過multiprocessing.Queue傳遞給主進程(即開始啓動監聽端口的進程),而後由主程從隊列中取出事件處理便可。須要特別說明的是window不支持該模型,爲了方便在windows下調試業務代碼,windos下實現爲普通的單程gevent多協程模式。python
具體使用例子看代碼下面__main__部份,用python tcpserver.py啓動便可測試。windows
------------------------------------------------------------------代碼下如-------------------------------------------------------------------------------網絡
import gevent
from gevent import monkey
monkey.patch_all(thread=False)
from gevent.server import StreamServer
import gevent.queue
import multiprocessing
import os多線程
EVENT_CONNECT = 0
EVENT_RECV = 1
EVENT_SEND = 2
EVENT_CLOSE = 3
EVENT_STOP = 4app
class ClientHandler(object):
def __init__(self,socket, address,clientprocess):
self.socket = socket
self.address = address
self._sock_id = 0
self._is_close = False
self._buffer = ""
self._clientprocess = clientprocess
self._send_queues = gevent.queue.Queue()
self._send_spawn = gevent.spawn(self._run_send)socket
def set_socket_id(self,socket_id):
self._sock_id = socket_idtcp
def on_connect(self):
self._clientprocess.on_connect(self._sock_id,self.address)
return True函數
def on_recv(self,data):
self._clientprocess.on_recv(self._sock_id ,data)
return Trueoop
def on_close(self):
self._clientprocess.on_close(self._sock_id)性能
def send(self,data):
self._send_queues.put((EVENT_SEND,data))
def _run_send(self):
combine_message = b""
while self._is_close == False:
msg = self._send_queues.get()
if msg is None:
continue
(event,data) = msg
if event == EVENT_SEND:
try:
combine_message += data
except:
pass
if self._send_queues.qsize() > 0:
continue
try:
self.socket.sendall(combine_message)
combine_message = b""
except:
if self._is_close == False:
self._clientprocess.on_close(self._sock_id)
break
elif event == EVENT_STOP:
if combine_message != b"":
self.socket.sendall(combine_message)
break
else:
break
def close(self):
if self._is_close:
return
self._is_close = True
self._send_queues.put((EVENT_STOP,0))
self.socket.close()
gevent.joinall([self._send_spawn])
class ClientProcess:
def __init__(self,server,index,queue_cls,hanlder_cls):
self._index = index
self._send_queues = queue_cls()
self._hanlder_cls = hanlder_cls
self.server = server
self._is_stop = False
def run(self):
self._clients = {}
self._send_spawn = gevent.spawn(self._run_send)
def stop(self):
self.put((EVENT_STOP,0,0))
def get(self,block=True,outtime=None):
return self._send_queues.get(block,outtime)
def put(self,data,block=True,outtime=None):
self._send_queues.put(data,block,outtime)
def on_new_connect(self,socket, address):
client = self._hanlder_cls(socket, address,self)
sock_id = id(client)*100+self._index
self._clients[sock_id] = client
client.set_socket_id(sock_id)
if client.on_connect() == False:
return
while True:
try:
data = socket.recv(1024)
if data is not None and len(data)>0 and client.on_recv(data):
continue
else:
client.on_close()
break
except:
client.on_close()
break
def on_connect(self,sock_id,address):
self.server.put((EVENT_CONNECT,sock_id,address))
def on_close(self,sock_id):
self.server.put((EVENT_CLOSE,sock_id,0))
def on_recv(self,sock_id,data):
self.server.put((EVENT_RECV,sock_id,data))
def send(self,sock_id,data):
try:
self.put((EVENT_SEND,sock_id,data),True,0.001)
except:
print("send put timeout sock_id:%d"%(sock_id))
return False
return True
def close_socket(self,sock_id):
self.put((EVENT_CLOSE,sock_id,0))
def __send__(self,sock_id,data):
client = self._clients.get(sock_id,None)
if client is not None:
client.send(data)
def __close_socket__(self,sock_id):
client = self._clients.get(sock_id,None)
if client is not None:
client.close()
self._clients.pop(sock_id)
def _run_send(self):
msg = None
print("_run_send pid:%d" %(os.getpid()))
while True:
try:
msg = self.get(False)
except :
gevent.sleep(0.01)
continue
if msg is None:
continue
(event,socket_id,data) = msg
if event == EVENT_SEND:
self.__send__(socket_id,data)
elif event == EVENT_CLOSE:
if socket_id != 0:
self.__close_socket__(socket_id)
elif event == EVENT_STOP:
for sock_id,client in self._clients.items():
self.__close_socket__(sock_id)
break
class TcpServer:
def __init__(self):
self._server = StreamServer(('0.0.0.0',0), self.on_new_connect,backlog=100000)
self._server.reuse_addr = 1
self._recv_queues = None
self._clent_process = []
self._process_num = 0
def start(self,ip,port,process_count = 1,queue_cls = gevent.queue.Queue,hanlder_cls = ClientHandler):
self._recv_queues = queue_cls()
self._process_num = process_count
self._server.set_listener((ip,port))
self._server.start()
for i in range(process_count):
self._clent_process.append(ClientProcess(self,i,queue_cls,hanlder_cls))
self.event_loop()
def event_loop(self):
self.serve_forever(0)
def get_recv_queue(self):
return self._recv_queues
def serve_forever(self,i):
print("server:%d,pid:%d,index = %d" %(id(self),os.getpid(),i))
self._index = i
self._clent_process[i].run()
self._send_spawn = gevent.spawn(self._server.serve_forever)
def stop_server(self):
for client in self._clent_process:
client.stop()
self._server.close()
def send(self,sock_id,data):
index = sock_id %100
if index >= self._process_num:
return False
client = self._clent_process[index]
if client is not None:
client.send(sock_id,data)
return True
def close_socket(self,sock_id):
index = sock_id %100
if index >= self._process_num:
return False
client = self._clent_process[index]
if client is not None:
client.close_socket(sock_id)
return True
def get(self,block=True,outtime=None):
return self._recv_queues.get(block,outtime)
def put(self,data,block=True,outtime=None):
self._recv_queues.put(data,block,outtime)
def on_new_connect(self,sock, address):
print("on_new_connect server:%d,pid:%d" %(id(self),os.getpid()))
self._clent_process[self._index].on_new_connect(sock,address)
class MultiprocessingTcpServer(TcpServer):
def start(self,ip,port,process_count = 1,queue_cls =multiprocessing.Queue,hanlder_cls = ClientHandler):
self._hanlder_cls = hanlder_cls
self._queue_cls = queue_cls
TcpServer.start(self,ip,port,process_count,queue_cls,hanlder_cls)
def event_loop(self):
for i in range(self._process_num):
multiprocessing.Process(target=self.serve_forever, args=(i,)).start()
def serve_forever(self,i):
TcpServer.serve_forever(self,i)
gevent.joinall([self._send_spawn])
def add_process(self):
index = len(self._clent_process)
process = ClientProcess(self,index,self._queue_cls,self._hanlder_cls)
self._clent_process.append(process)
self._process_num += 1
multiprocessing.Process(target=self.serve_forever, args=(index,)).start()
if __name__ == "__main__":
import sys
process_num = 2
server = None
if sys.platform == 'win32':
server = TcpServer() #windows下爲單程進程的gevent
else:
server = MultiprocessingTcpServer() #其餘平臺是多進程+多協程
server.start('0.0.0.0',8089,process_num) #監聽端口 msg = None while True: try: msg = server.get(True) #獲取事件 except KeyboardInterrupt: break except : gevent.sleep(0.1) continue if msg is not None: (event,socket_id,data) = msg if event == EVENT_RECV: #有數據到來 server.send(socket_id,data) #數據原樣返回 print("socket_id:%d ,recv data:%d" %(socket_id,len(data))) elif event == EVENT_CONNECT: #新鏈接到來 print("new connect socket_id:%d,address:%s" %(socket_id,data)) else: #鏈接斷開 server.close_socket(socket_id) print("socket_id:%d close" %(socket_id)) server.stop_server()