python消費和生產kafka消息

-------------------------------------------------------------------------------
 kafka = KafkaProducer(bootstrap_servers==: 10: : : : : : =, msg, partition=-------------------------------------------------------------------------------- kafka = KafkaConsumer(, bootstrap_servers=[ msg =  %--------------------------------------------------------------------------------
二、若是想要完成負載均衡,就須要知道kafka的分區機制,同一個主題,能夠爲其分區,在生產者不指定分區的狀況,kafka會將多個消息分發到不一樣的分區,消費者訂閱時候若是不指定服務組,
會收到全部分區的消息,若是指定了服務組,則同一服務組的消費者會消費不一樣的分區,若是2個分區兩個消費者的消費者組消費,則,每一個消費者消費一個分區,若是有三個消費者的服務組,
則會出現一個消費者消費不到數據;若是想要消費同一分區,則須要用不一樣的服務組。以此爲原理,咱們對消費者作以下修改:


 kafka = KafkaConsumer(, group_id=, bootstrap_servers=[ msg =  %------------------------------------------------------------------------------------

三、kafka提供了偏移量的概念,容許消費者根據偏移量消費以前遺漏的內容,這基於kafka名義上的全量存儲,能夠保留大量的歷史數據,歷史保存時間是可配置的,通常是7天,若是偏移量定位到了已刪除的位置那也會有問題,可是這種狀況可能很小;每一個保存的數據文件都是以偏移量命名的,當前要查的偏移量減去文件名就是數據在該文件的相對位置。要指定偏移量消費數據,須要指定該消費者要消費的分區,不然代碼會找不到分區而沒法消費,代碼以下:bootstrap

 kafka  kafka.structs = KafkaConsumer(group_id=, bootstrap_servers=[=, partition=0), TopicPartition(topic=, partition=1 consumer.partitions_for_topic()  =, partition= msg =  %-----------------------------------------------------------------------------------

四、有時候,咱們並不須要實時獲取數據,由於這樣可能會形成性能瓶頸,咱們只須要定時去獲取隊列裏的數據而後批量處理就能夠,這種狀況,咱們能夠選擇主動拉取數據負載均衡


 kafka = KafkaConsumer(group_id=, bootstrap_servers=[=(== consumer.poll(timeout_ms=5)  
    2+= 1
  %-----------------------------------------------------------------------------------
相關文章
相關標籤/搜索