多線程socket服務器端

import threading
import SocketServer
import json
import time
import  kafka
hosts="172.17.13.1:9092,172.17.13.10:9092"
topic='statis-baseinfo-serverevent'
class _KafkaMgrSingleton(type):
    def __init__(self, name, bases, dict):
        super(_KafkaMgrSingleton, self).__init__(name, bases, dict)
        self._instance = {}
    def __call__(self, hosts):
        if not self._instance.has_key(hosts):
            self._instance[hosts] = super(_KafkaMgrSingleton, self).__call__(hosts)
        return self._instance[hosts]
class KafkaMgr(object):
    "kafka操做類"
    __metaclass__ = _KafkaMgrSingleton
    def __init__(self,hosts):
        self.hosts=hosts
        self.client=kafka.KafkaClient(hosts,timeout=2851200)
        self.productor=kafka.SimpleProducer(self.client)
    def send_message(self,topic,*messages):
        messages=[(i if isinstance(i,str) else i.encode("utf-8")) for i in messages]
        return self.productor.send_messages(topic,*messages)
    def __del__(self):
        self.client.close()
class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        start_time=time.time()
        print start_time
        data = self.request.recv(1024)
        jdata = json.loads(data)
        try:
            kafka=KafkaMgr(hosts)
            kafka.send_message(topic,data)
        except Exception as e:
            msg=e.message
        else:
            msg='success'

        # host=self.client_address
        # sub_thread = threading.current_thread()
        response = {'data':data,'message':msg}
        print response
        jresp = json.dumps(response)
        self.request.sendall(jresp)
        print 'count_time---',time.time()-start_time

if __name__ == "__main__":
    server=SocketServer.ThreadingTCPServer(('192.168.15.32',5000),ThreadedTCPRequestHandler)
    # ip, port = server.server_address
    print " .... waiting for connection"
    server.serve_forever()
相關文章
相關標籤/搜索