前提:kafka有同步,多線程,gevent異步和rdkafka異步四種模式。可是在與celery和gevent連用的時候,有的模式會出錯。json
下面是我代碼運行的結果。多線程
結論:使用多線程方式!異步
使用同步方式能夠成功發送數據spa
def send_data_kafka(data): try: client = KafkaClient(hosts=broker_list) topic = client.topics[topic_name] with topic.get_sync_producer() as producer: for d in data: print "send data" msg = json.dumps(d) producer.produce(msg) producer.stop() except Exception, e: LOGGER.exception("error in send_data_kafka") print e
使用rdkafka異步,只打印了一條send data以後卡住線程
def send_data_kafka(data): try: client = KafkaClient(hosts=broker_list) topic = client.topics[topic_name] with topic.get_producer(use_rdkafka=True) as producer: for d in data: print "send data" msg = json.dumps(d) producer.produce(msg) producer.stop() except Exception, e: LOGGER.exception("error in send_data_kafka") print e
使用多線程,能夠正常生產全部數據code
def send_data_kafka(data): try: client = KafkaClient(hosts=broker_list) topic = client.topics[topic_name] with topic.get_producer() as producer: for d in data: print "send data" msg = json.dumps(d) producer.produce(msg) producer.stop() except Exception, e: LOGGER.exception("error in send_data_kafka") print e
沒有用with,rdkafka異步,打印了全部的send data,後卡住blog
client = KafkaClient(hosts=broker_list) topic = client.topics[topic_name] producer = topic.get_producer(use_rdkafka=True) # 異步,使用rdkafka庫,速度最快的方案 def send_data_kafka(data): try: for d in data: print "send data" msg = json.dumps(d) producer.produce(msg) producer.stop() except Exception, e: LOGGER.exception("error in send_data_kafka") print e