import threading import SocketServer import json import time import kafka hosts="172.17.13.1:9092,172.17.13.10:9092" topic='statis-baseinfo-serverevent' class _KafkaMgrSingleton(type): def __init__(self, name, bases, dict): super(_KafkaMgrSingleton, self).__init__(name, bases, dict) self._instance = {} def __call__(self, hosts): if not self._instance.has_key(hosts): self._instance[hosts] = super(_KafkaMgrSingleton, self).__call__(hosts) return self._instance[hosts] class KafkaMgr(object): "kafka操做類" __metaclass__ = _KafkaMgrSingleton def __init__(self,hosts): self.hosts=hosts self.client=kafka.KafkaClient(hosts,timeout=2851200) self.productor=kafka.SimpleProducer(self.client) def send_message(self,topic,*messages): messages=[(i if isinstance(i,str) else i.encode("utf-8")) for i in messages] return self.productor.send_messages(topic,*messages) def __del__(self): self.client.close() class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): def handle(self): start_time=time.time() print start_time data = self.request.recv(1024) jdata = json.loads(data) try: kafka=KafkaMgr(hosts) kafka.send_message(topic,data) except Exception as e: msg=e.message else: msg='success' # host=self.client_address # sub_thread = threading.current_thread() response = {'data':data,'message':msg} print response jresp = json.dumps(response) self.request.sendall(jresp) print 'count_time---',time.time()-start_time if __name__ == "__main__": server=SocketServer.ThreadingTCPServer(('192.168.15.32',5000),ThreadedTCPRequestHandler) # ip, port = server.server_address print " .... waiting for connection" server.serve_forever()