kafka環境搭建和使用(python API)

引言

上一篇文章瞭解了kafka的重要組件zookeeper,用來保存broker、consumer等相關信息,作到平滑擴展。這篇文章就實際操做部署下kafka,用幾個簡單的例子加深對kafka的理解,學會基本使用kafka。html

環境搭建

我將會在本地部署一個三臺機器的zookeeper集羣,和一個2臺機器的kafka集羣。python

zookeeper集羣

zookeeper的搭建能夠看個人上一篇文章分佈式系統中zookeeper實現配置管理+集羣管理,按照步驟,一步步能夠很容易的搭建3太服務器的zookeeper集羣。跟以前同樣,我仍是在本地的3個端口搭建了3臺服務器,地址以下所示:apache

192.168.0.105:2181
192.168.0.105:2182
192.168.0.105:2183

這三臺服務器一下子會在kafka配置中用到。json

kafka集羣

第一步. 下載kafkabootstrap

到kafka官網下載apache kafka,解壓到/path/to/kafka目錄。服務器

第二步. 修改配置文件
複製/path/to/kafka/config/server.properties,到/path/to/kafka/config/server-1.properties/path/to/kafka/config/server-2.properties異步

配置文件中修改的差別內容以下所示:
server-1.properties分佈式

broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183

server-2.properties函數

broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
zookeeper.connect=192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183

其中broker.id是broker的惟一標示,集羣中的broker標識必須惟一。
listeners是broker監聽的地址和端口,advertised.listeners用於和producer、consumer交互,後者未配置會默認使用前者,listeners的完整格式是listeners = listener_name://host_name:port,其中PLAINTEXT是協議,還有一種是SSL,具體還沒太搞明白(TODO)。
log.dirs是日誌數據的存放目錄,也就是producer產生的數據存放的目錄。
zookeeper.connect配置是zookeeper的集羣,broker啓動以後將信息註冊到zookeeper集羣中。性能

第三步. 啓動服務器

cd /path/to/kafka
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties

使用jps命令能夠看見2個kafka進程,證實啓動成功了。

第四步. 建立topic
建立topic通常使用kafka自帶的腳本建立:

bin/kafka-topics.sh --create --zookeeper 192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183 --replication-factor 2 --partitions 10 --topic user-event

其中--zookeeper就是後面就是咱們上面配置的zookeeper集羣,--replication-factor表明每一個分區在集羣中複製的份數,後面的值要小於kafka集羣中服務器數量,--partitions表示建立主題的分區數量,通常分區越大,性能越好,--topic後邊兒就是建立主題的名字,運行成功以後會看到Created topic "user-event".字樣,表示建立成功,會在kafka配置的日誌目錄下建立主題信息,好比下面的:
ll /tmp/kafka-logs-1

drwxr-xr-x  7 ritoyan  wheel  224  6  3 21:21 clock-tick-0
drwxr-xr-x  7 ritoyan  wheel  224  6  3 21:21 clock-tick-2
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-0
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-1
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-2
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-3
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-4
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-5
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-6
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-7
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-8
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-9

ll /tmp/kafka-logs-2

drwxr-xr-x  7 ritoyan  wheel  224  6  3 21:21 clock-tick-1
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-0
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-1
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-2
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-3
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-4
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-5
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-6
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-7
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-8
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-9

能夠看到兩個broker中都建立了主題user-event的10個分區。可能也有人要問了,clock-tick這個主題怎麼在broker1中有2個分區,broker2中有1個分區,這個是我以前建立的一個分區,用了下面的命令bin/kafka-topics.sh --create --zookeeper 192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183 --replication-factor 1 --partitions 3 --topic clock-tick,只有一份日誌記錄,3個分區,分區會均勻的分佈在全部broker上。

至此kafka環境配置好了,西面咱們看看如何使用。

基本使用

安裝kafka-python,用來操做kafka,pip3 install kafka-python,這裏是他的文檔,文檔寫的不錯,簡潔易懂kafka-python

producer 向broker發送消息

bootstrap_servers是kafka集羣地址信息,下面事項主題user-event發送一條消息,send發送消息是異步的,會立刻返回,所以咱們要經過阻塞的方式等待消息發送成功(或者flush()也能夠,flush會阻塞知道全部log都發送成功),不然消息可能會發送失敗,但也不會有提示,關於上面這個能夠經過刪除send以後的語句試試,會發現broker不會收到消息,而後在send後加上time.sleep(10)以後,會看到broker收到消息。

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(
    bootstrap_servers=[
        "localhost:9093",
  "localhost:9094"
  ]
)

future = producer.send("user-event", b'I am rito yan')
try:
    record_metadata = future.get(timeout=10)
    print_r(record_metadata)
except KafkaError as e:
    print(e)

阻塞等待發送成功以後,會看到返回插入記錄的信息:
RecordMetadata(topic='user-event', partition=7, topic_partition=TopicPartition(topic='user-event', partition=7), offset=1, timestamp=1528034253757, checksum=None, serialized_key_size=-1, serialized_value_size=13),裏面包括了插入log的主題、分區等信息。

格式化發送的信息

建立producer的時候能夠經過value_serializer指定格式化函數,好比咱們數據是個dict,能夠指定格式化函數,將dict轉化爲byte:

import json

producer = KafkaProducer(
    bootstrap_servers=[
        "localhost:9093",
        "localhost:9094"
    ],
    value_serializer=lambda m: json.dumps(m).encode('ascii')
)

future = producer.send("user-event", {
    "name": "燕睿濤",
    "age": 26,
    "friends": [
        "ritoyan",
        "luluyrt"
    ]
})

這樣就能夠將格式化以後的信息發送給broker,不用每次發送的時候都本身格式化,真是不要太好用。

consumer 消費數據

建立一個consumer,其中group_id是分組,broker中的每個數據只能被consumer組中的一個consumer消費。

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "user-event",
    group_id = "user-event-test",
    bootstrap_servers = [
        "localhost:9093",
        "localhost:9094"
    ]
)
for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

啓動以後,進程會一直阻塞在哪裏,等broker中有消息的時候就會去消費,啓動多個進程,只要保證group_id一致,就能夠保證消息只被組內的一個consumer消費,上面的程序會輸出:

user-event:8:2: key=None value=b'{"name": "\\u71d5\\u777f\\u6d9b", "age": 26, "friends": ["ritoyan", "luluyrt"]}'

一樣,進入的時候有value_serializer,出來的時候對應的也有value_deserializer,消費者能夠配置value_deserializer來格式化內容,跟producer對應起來

consumer = KafkaConsumer(
    "user-event",
  group_id = "user-event-test",
  bootstrap_servers = [
        "localhost:9093",
  "localhost:9094"
  ],
  value_deserializer=lambda m: json.loads(m.decode('ascii'))
)

輸出內容user-event:8:3: key=None value={'name': '燕睿濤', 'age': 26, 'friends': ['ritoyan', 'luluyrt']}

kafka其餘命令

查看分組

咱們的consumer可能有不少分組,能夠經過西面的命令查看分組信息:

cd /path/to/kafka
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093,localhost:9094 --list

能夠看到我使用中的分組有4個,分別以下所示

clock-tick-test3
user-event-test
clock-tick-test2
clock-tick-test

查看特定分組信息

能夠經過bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9093 --group user-event-test --describe,查看分組user-event-test的信息,能夠看到西面的信息,包含消費的主題、分區信息,以及consumer在分區中的offset和分區的總offset。(爲了格式化顯示,刪了部分列的部分字母)

TOPIC       PARTITION   CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID HOST    CLIENT-ID
user-event  3   0   0   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  0   0   0   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  1   1   1   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  2   1   1   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  4   0   0   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  9   1   1   0   kafka-python-78517 /127.0.0.1   kafka-python
user-event  8   4   4   0   kafka-python-78517 /127.0.0.1   kafka-python
user-event  7   2   2   0   kafka-python-78517 /127.0.0.1   kafka-python
user-event  6   1   1   0   kafka-python-78517 /127.0.0.1   kafka-python
user-event  5   0   0   0   kafka-python-78517 /127.0.0.1   kafka-python

結語

至此,kafka的基本使用算是掌握了,之後要是有機會在項目中實踐就行了,在實際工程中的各類問題能夠更加深入的理解其中的原理。

相關文章
相關標籤/搜索