kafka

什麼是Kafka

Kafka是一個分佈式流處理系統,流處理系統使它能夠像消息隊列同樣publish或者subscribe消息,分佈式提供了容錯性,併發處理消息的機制。

Kafka的基本概念python

kafka運行在集羣上,集羣包含一個或多個服務器。kafka把消息存在topic中,每一條消息包含鍵值(key),值(value)和時間戳(timestamp)。

kafka有如下一些基本概念:bootstrap

  • Producer

消息生產者,就是向kafka broker發消息的客戶端。
  • Consumer

消息消費者,是消息的使用方,負責消費Kafka服務器上的消息。
  • Topic

主題,由用戶定義並配置在Kafka服務器,用於創建Producer和Consumer之間的訂閱關係。生產者發送消息到指定的Topic下,消息者從這個Topic下消費消息。
  • Partition

消息分區,一個topic能夠分爲多個 partition,每一個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。
  • Broker

一臺kafka服務器就是一個broker。一個集羣由多個broker組成。一個broker能夠容納多個topic。
  • Consumer Group

消費組,用於歸組同類消費者。每一個consumer屬於一個特定的consumer group,多個消費組能夠共同消息一個Topic下的消息,每消費組中的消費者消費Topic的部分消息,這些消費者就組成了一個分組。
  • Offset

消息在partition中的偏移量。每一條消息在partition都有惟一的偏移量,消息者能夠指定偏移量來指定要消費的消息。

Kafka分佈式架構

clipboard.png

如上圖所示,kafka將topic中的消息存在不一樣的partition中。若是存在鍵值(key),消息按照鍵值(key)作分類存在不一樣的partiition中,若是不存在鍵值(key),消息按照輪詢(Round Robin)機制存在不一樣的partition中。默認狀況下,鍵值(key)決定了一條消息會被存在哪一個partition中。服務器

partition中的消息序列是有序的消息序列。kafka在partition使用偏移量(offset)來指定消息的位置。一個topic的一個partition只能被一個consumer group中的一個consumer消費,多個consumer消費同一個partition中的數據是不容許的,可是一個consumer能夠消費多個partition中的數據。架構

kafka將partition的數據複製到不一樣的broker,提供了partition數據的備份。每個partition都有一個broker做爲leader,若干個broker做爲follower。全部的數據讀寫都經過leader所在的服務器進行,而且leader在不一樣broker之間複製數據。併發

clipboard.png

上圖中,對於Partition 0,broker 1是它的leader,broker 2和broker 3是follower。對於Partition 1,broker 2是它的leader,broker 1和broker 3是follower。分佈式

clipboard.png

在上圖中,當有Client(也就是Producer)要寫入數據到Partition 0時,會寫入到leader Broker 1,Broker 1再將數據複製到follower Broker 2和Broker 3。spa

clipboard.png

在上圖中,Client向Partition 1中寫入數據時,會寫入到Broker 2,由於Broker 2是Partition 1的Leader,而後Broker 2再將數據複製到follower Broker 1和Broker 3中。3d

上圖中的topic一共有3個partition,對每一個partition的讀寫都由不一樣的broker處理,所以總的吞吐量獲得了提高。code

clipboard.png

實驗一:kafka-python實現生產者消費者

kafka-python是一個python的Kafka客戶端,能夠用來向kafka的topic發送消息、消費消息。server

這個實驗會實現一個producer和一個consumer,producer向kafka發送消息,consumer從topic中消費消息。結構以下圖

clipboard.png

producer代碼

#-*- coding: utf-8 -*-

from kafka import KafkaProducer
import time

producer = KafkaProducer(bootstrap_servers='localhost:9092')

i = 1000
while True:
    ts = int(time.time() * 1000)
    producer.send(topic='py_test', value=str(i), key=str(i), timestamp_ms=ts)
    producer.flush()
    print i
    i += 1
    time.sleep(1)

consumer代碼

#-*- coding: utf-8 -*-
from kafka import KafkaConsumer

consumer = KafkaConsumer('py_test', bootstrap_servers=["localhost:9092"])
for message in consumer:
    print message

接下來建立test topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

打開兩個窗口中,咱們在window1中運行producer,以下:
clipboard.png
在window2中運行consumer,以下:
clipboard.png

實驗二:消費組實現容錯性機制

這個實驗將展現消費組的容錯性的特色。這個實驗中將建立一個有2個partition的topic,和2個consumer,這2個consumer共同消費同一個topic中的數據。結構以下所示

clipboard.png

producer部分代碼和實驗一相同,這裏再也不重複。consumer須要指定所屬的consumer group,代碼以下

#-*- coding: utf-8 -*-
from kafka import KafkaConsumer

consumer = KafkaConsumer('py_test', group_id='testgt', bootstrap_servers=["localhost:9092"])
for message in consumer:
    print message

接下來咱們建立topic,名字test,設置partition數量爲2

clipboard.png

打開三個窗口,一個窗口運行producer,還有兩個窗口運行consumer。
運行consumer的兩個窗口的輸出以下:
clipboard.png
能夠看到兩個consumer同時運行的狀況下,它們分別消費不一樣partition中的數據。window1中的consumer消費partition 0中的數據,window2中的consumer消費parition 1中的數據。
咱們嘗試關閉window1中的consumer,能夠看到以下結果
clipboard.png
剛開始window2中的consumer只消費partition1中的數據,當window1中的consumer退出後,window2中的consumer中也開始消費partition 0中的數據了。

實驗三:offset管理

kafka容許consumer將當前消費的消息的offset提交到kafka中,這樣若是consumer因異常退出後,下次啓動仍然能夠從上次記錄的offset開始向後繼續消費消息。

這個實驗的結構和實驗一的結構是同樣的,使用一個producer,一個consumer,test topic的partition數量設爲1。

producer的代碼和實驗一中的同樣,這裏再也不重複。consumer的代碼稍做修改,這裏consumer中打印出下一個要被消費的消息的offset。consumer代碼以下

#-*- coding: utf-8 -*-
from kafka import KafkaConsumer, TopicPartition

tp = TopicPartition('py_test', 0)
consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id='test_g', auto_offset_reset='earliest', enable_auto_commit=False)
consumer.assign([tp])
print "starting offset is", consumer.position(tp)
for message in consumer:
    # pass
    print message.value
auto.offset.reset值含義解釋
earliest 
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 
latest 
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 
none 
topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常

在一個窗口中啓動producer,在另外一個窗口而且啓動consumer。consumer的輸出以下

clipboard.png
能夠嘗試退出consumer,再啓動consumer。每一次從新啓動,consumer都是從offset=98的消息開始消費的。
修改consumer的代碼以下 在consumer消費每一條消息後將offset提交回kafka

#-*- coding: utf-8 -*-
from kafka import KafkaConsumer, TopicPartition

tp = TopicPartition('py_test', 0)
consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id='test', auto_offset_reset='earliest', enable_auto_commit=True)
consumer.assign([tp])
print "starting offset is", consumer.position(tp)
for message in consumer:
    print message.offset
    # consumer.commit() 也能夠主動提交offset

啓動consumer

clipboard.png

能夠看到consumer從offset=98的消息開始消費,到offset=829時,咱們Ctrl+C退出consumer。

咱們再次啓動consumer

clipboard.png

能夠看到從新啓動後,consumer從上一次記錄的offset開始繼續消費消息。以後每一次consumer從新啓動,consumer都會從上一次中止的地方繼續開始消費。

不一樣的消費組有不一樣的offset管理,相互不影響

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from kafka import KafkaConsumer, TopicPartition

tp = TopicPartition('py_test', 0)
consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id='test_1', auto_offset_reset='earliest', enable_auto_commit=False)
consumer.assign([tp])
print "starting offset is", consumer.position(tp)
for message in consumer:
    print message.offset
    # consumer.commit()

換一個group_id test_1,會從starting offset is 0開始輸出:

starting offset is 0
0
相關文章
相關標籤/搜索