python 的一個socket和kafka實例

socket客戶端:python

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
import json

def client(ip, port, message):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((ip, port))

    try:
        print "Send: %s"%message
        sock.sendall(message)
        response = sock.recv(1024)
        # jresp = json.loads(response)
        print "Recv: ",response

    finally:
        sock.close()

if __name__ == "__main__":
    HOST, PORT = "192.168.15.33", 50001
    f={"event":"sup","data":{
        "inqno":123456
        }
       }
    client(HOST, PORT, f)

socket服務器端:shell

#coding=utf-8
#接收到數據放到kafka上,同時記錄本地日誌
import subprocess
import SocketServer
import json
import os, time
import kafka
hosts="192.168.1.1:9092,192.168.1.2:9092"
topic='topic_123456'
class _KafkaMgrSingleton(type):
    def __init__(self, name, bases, dict):
        super(_KafkaMgrSingleton, self).__init__(name, bases, dict)
        self._instance = {}
    def __call__(self, hosts):
        if not self._instance.has_key(hosts):
            self._instance[hosts] = super(_KafkaMgrSingleton, self).__call__(hosts)
        return self._instance[hosts]
class KafkaMgr(object):
    "kafka操做類"
    __metaclass__ = _KafkaMgrSingleton
    def __init__(self,hosts):
        self.hosts=hosts
        self.client=kafka.KafkaClient(hosts,timeout=2851200)
        self.productor=kafka.SimpleProducer(self.client)
    def send_message(self,topic,*messages):
        messages=[(i if isinstance(i,str) else i.encode("utf-8")) for i in messages]
        return self.productor.send_messages(topic,*messages)
    def __del__(self):
        self.client.close()
class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
    '''基於socket的多線程的server端,處理包括了發送接收到的數據到kafka隊列,對數據進行的驗證,並將其寫入日誌文件'''
    def handle(self):
        write_time = time.strftime("%Y-%m-%d:%H:%M:%S",time.localtime())
        start_time=time.time()
        print write_time,start_time
        data = self.request.recv(2048)
        message = {}
        message["recv_data"] = data
        message['write_time'] = write_time
        try:
            kafka=KafkaMgr(hosts)
            kafka.send_message(topic,data)
        except Exception as e:
            message["status"]=False
        else:
            message["status"]=True
        log(message)
        msg = json.dumps(message)
        self.request.sendall(msg)
        print 'count_time---',time.time()-start_time
def log(message):
    log_file = 'tj_'
    if not os.path.exists('/tmp/tj'):
        os.makedirs('/tmp/tj')
    file_start_name = time.strftime("%Y%m%d",time.localtime())
    subprocess.Popen("cd /tmp/tj/;echo {0} >> {1}{2}_socket_server.log".format(message,log_file,file_start_name),shell=True,stdout=None,stderr=None)

if __name__ == "__main__":
    from threading import Thread
    server = SocketServer.ThreadingTCPServer(('192.168.15.33',50001), ThreadedTCPRequestHandler)
    # Port 0 means to select an arbitrary unused port
    # SocketServer.TCPServer.allow_reuse_address = True

    # ip, port = server.server_address
    # home_thread = threading.Thread(target=server.serve_forever)
    # home_thread.start()
    # print "Server loop running in thread:", home_thread.name
    print " .... waiting for connection"
    server.serve_forever()

 

消費kafka:json

from kafka import KafkaConsumer
consumer=KafkaConsumer("topic_123456",group_id='group_123456',bootstrap_servers=['192.168.1.1:9092','192.168.1.2:9092'],auto_offset_reset='smallest',auto_commit_enable=False)
#指定分區消費
#consumer=KafkaConsumer(group_id='group_123456',bootstrap_servers=['192.168.1.1:9092','192.168.1.2:9092'],auto_offset_reset='smallest',auto_commit_enable=False)
#consumer.set_topic_partitions(("topic_123456",0),("topic_123456",1))

for message in consumer:
    try:
        value=message.value
        offset=message.offset
    except Exception, e :
        # print e
        consumer.task_done(message)
        if offset%1000==0:
            # print '-----'
            consumer.commit()
        pass
    else:

        try:
            consumer_function()#自定義消費函數
        except Exception as e:
            # print body
            # print e
            pass
        finally:
            consumer.task_done(message)
            consumer.commit()
相關文章
相關標籤/搜索