socket客戶端:python
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket import json def client(ip, port, message): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((ip, port)) try: print "Send: %s"%message sock.sendall(message) response = sock.recv(1024) # jresp = json.loads(response) print "Recv: ",response finally: sock.close() if __name__ == "__main__": HOST, PORT = "192.168.15.33", 50001 f={"event":"sup","data":{ "inqno":123456 } } client(HOST, PORT, f)
socket服務器端:shell
#coding=utf-8 #接收到數據放到kafka上,同時記錄本地日誌 import subprocess import SocketServer import json import os, time import kafka hosts="192.168.1.1:9092,192.168.1.2:9092" topic='topic_123456' class _KafkaMgrSingleton(type): def __init__(self, name, bases, dict): super(_KafkaMgrSingleton, self).__init__(name, bases, dict) self._instance = {} def __call__(self, hosts): if not self._instance.has_key(hosts): self._instance[hosts] = super(_KafkaMgrSingleton, self).__call__(hosts) return self._instance[hosts] class KafkaMgr(object): "kafka操做類" __metaclass__ = _KafkaMgrSingleton def __init__(self,hosts): self.hosts=hosts self.client=kafka.KafkaClient(hosts,timeout=2851200) self.productor=kafka.SimpleProducer(self.client) def send_message(self,topic,*messages): messages=[(i if isinstance(i,str) else i.encode("utf-8")) for i in messages] return self.productor.send_messages(topic,*messages) def __del__(self): self.client.close() class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): '''基於socket的多線程的server端,處理包括了發送接收到的數據到kafka隊列,對數據進行的驗證,並將其寫入日誌文件''' def handle(self): write_time = time.strftime("%Y-%m-%d:%H:%M:%S",time.localtime()) start_time=time.time() print write_time,start_time data = self.request.recv(2048) message = {} message["recv_data"] = data message['write_time'] = write_time try: kafka=KafkaMgr(hosts) kafka.send_message(topic,data) except Exception as e: message["status"]=False else: message["status"]=True log(message) msg = json.dumps(message) self.request.sendall(msg) print 'count_time---',time.time()-start_time def log(message): log_file = 'tj_' if not os.path.exists('/tmp/tj'): os.makedirs('/tmp/tj') file_start_name = time.strftime("%Y%m%d",time.localtime()) subprocess.Popen("cd /tmp/tj/;echo {0} >> {1}{2}_socket_server.log".format(message,log_file,file_start_name),shell=True,stdout=None,stderr=None) if __name__ == "__main__": from threading import Thread server = SocketServer.ThreadingTCPServer(('192.168.15.33',50001), ThreadedTCPRequestHandler) # Port 0 means to select an arbitrary unused port # SocketServer.TCPServer.allow_reuse_address = True # ip, port = server.server_address # home_thread = threading.Thread(target=server.serve_forever) # home_thread.start() # print "Server loop running in thread:", home_thread.name print " .... waiting for connection" server.serve_forever()
消費kafka:json
from kafka import KafkaConsumer consumer=KafkaConsumer("topic_123456",group_id='group_123456',bootstrap_servers=['192.168.1.1:9092','192.168.1.2:9092'],auto_offset_reset='smallest',auto_commit_enable=False) #指定分區消費 #consumer=KafkaConsumer(group_id='group_123456',bootstrap_servers=['192.168.1.1:9092','192.168.1.2:9092'],auto_offset_reset='smallest',auto_commit_enable=False) #consumer.set_topic_partitions(("topic_123456",0),("topic_123456",1)) for message in consumer: try: value=message.value offset=message.offset except Exception, e : # print e consumer.task_done(message) if offset%1000==0: # print '-----' consumer.commit() pass else: try: consumer_function()#自定義消費函數 except Exception as e: # print body # print e pass finally: consumer.task_done(message) consumer.commit()