python3鏈接kafka模塊pykafka生產者簡單封裝

1.1安裝模塊

pip install pykafka

1.2基本使用

# -* coding:utf8 *-  
  from pykafka import KafkaClient  
  host = 'IP:9092, IP:9092, IP:9092'
  client = KafkaClient(hosts = host)  
  # 生產者  
  topicdocu = client.topics['my-topic']  
  producer = topicdocu.get_producer()  
  for i in range(100):  
      print i  
      producer.produce('test message ' + str(i ** 2))  
  producer.stop()

1.3簡單封裝

class KafkaProduct():

      def __init__(self,hosts,topic):
          """
          初始化實例
          :param hosts: 鏈接地址
          :param topic:
          """
          self.__client = KafkaClient(hosts=hosts)
          self.__topic = self.__client.topics[topic.encode()]
  
      def __set_topic(self, topic):
          self.__topic = self.__client.topics[topic.encode()]
  
      def set_topic(self, topic):
          """
          設置topic
          :param topic:
          :return:
          """
          self.__set_topic(topic)
  
      def get_topics(self):
          """
          獲取當前全部topic
          :return:
          """
          return self.__client.topics
  
      def get_topic(self):
          """
          獲取當前topic
          :return:
          """
          return self.__topic
  
      def Producer(self):
          """
          生產者對象
          :return:
          """
          with self.__topic.get_producer(delivery_reports=True) as producer:
              next_data = ''
              while True:
                  if next_data:
                      producer.produce(str(next_data).encode())
                  next_data = yield True
  
      def send_data(self,datas):
          """
          發送數據
          :param datas:須要傳入的可迭代對象
          :return:
          """
          c = self.Producer()
          next(c)
          for i in datas:
              c.send(i)

if __name__ == '__main__':

    hosts = "1.2.3.4:9999,2.3.4.5:9090" #鏈接hosts
    topic = "test_523"
    K = KafkaProduct(hosts=hosts, topic=topic)  #
    #K.set_topic("test")  #切換設置新的topic
    K.get_topic()  #獲取當前設置的topic
    #K.get_topics() #獲取全部topic
    data = range(10000)  #要發送的可迭代對象
    K.send_data(data)

1.4引用來源

博客園:Python測試Kafka集羣(pykafka)html

知乎:使用生成器把Kafka寫入效率提升1000倍測試

相關文章
相關標籤/搜索