Kafka是由LinkedIn開發的一個分佈式的消息中間件。git
首先到官網下載頁面下載最新的發佈版本,目前最新版是2.3.0(發佈於2019年6月25日)。github
❯ wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
❯ tar -xzf kafka_2.12-2.3.0.tgz
❯ cd kafka_2.12-2.3.0
複製代碼
Kafka須要配置Zookeeper使用,Zookeeper是Hadoop和Hbase的重要組件,能夠爲分佈式應用程序協調服務。因此須要先啓動一個ZooKeeper服務器,kafka源碼中自帶了便捷腳原本快速簡單地建立一個單節點ZooKeeper實例:golang
❯ bin/zookeeper-server-start.sh config/zookeeper.properties # 放在終端1或者tmux裏面
複製代碼
而後啓動Kafka服務器(啓動前應該已經安裝Openjdk了):apache
❯ bin/kafka-server-start.sh config/server.properties
複製代碼
這樣Kafka服務器就啓動了,首先體驗下在終端用源碼自帶的腳本建立Topic,發佈和消費消息等:bootstrap
# 建立叫作「strconv」 的topic, 它有一個分區和一個副本
❯ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic strconv
# 列出所有topic
❯ bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
strconv
test
# 啓動生產者,在交互模式下輸入2條消息
❯ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic strconv
>Message 1
>Message 2
# 啓動消費者,從開始部分消費,會將消息轉儲到標準輸出
❯ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic strconv --from-beginning
Message 1
Message 2
複製代碼
我就不繼續演示多代理集羣等用法了,能夠看官方文檔瞭解。bash
接着用Golang編寫生產者和消費者,目前有2個主流的Golang客戶端,咱們挨個體驗下。服務器
順便解釋下,雖然 LinkedIn 開源了 Kafka,可是這個公司的核心語言用的是 Java,並且鮮少 Golang 應用,因此本身並無 Golang 客戶端。session
confluent-kafka-go是Confluent公司的開源的Golang客戶端。它其 實是C/C++客戶端librdkafka的Golang封裝。先安裝它:異步
❯ brew install librdkafka pkg-config
❯ go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
複製代碼
項目下的examples 目錄下有不少例子供參考。我就仿着寫一個例子,首先看生產者:分佈式
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"os"
)
func main() {
if len(os.Args) != 3 {
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <topic>\n",
os.Args[0])
os.Exit(1)
}
broker := os.Args[1]
topic := os.Args[2]
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Producer %v\n", p)
deliveryChan := make(chan kafka.Event)
value := "Hello Go!"
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
}, deliveryChan)
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
close(deliveryChan)
}
複製代碼
例子裏面用了os.Args,能夠得到終端位置參數的結果,confluent_producer.go
須要傳遞2個參數:broker和topic。用Produce方法就發佈一條消息,內容是"Hello Go!",另外消息中有個鍵爲myTestHeader值爲"header values are binary"的頭信息" 。最後看Message的結果判斷是否是交付成功了,成功後會打印分區和消費進度offset。而後是消費者:
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"os"
"os/signal"
"syscall"
)
func main() {
if len(os.Args) < 4 {
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
os.Args[0])
os.Exit(1)
}
broker := os.Args[1]
group := os.Args[2]
topics := os.Args[3:]
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"broker.address.family": "v4",
"group.id": group,
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest"})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %v\n", c)
err = c.SubscribeTopics(topics, nil)
run := true
for run == true {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
run = false
}
default:
fmt.Printf("Ignored %v\n", e)
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
複製代碼
消費者接受三個參數:broker地址、GroupID和Topic名字。GroupID做用於消費者組,相同的GroupID標識消費者在一個組內,這些消費者協調在一塊兒來消費訂閱主題的全部分區,因此用一個新的GroupID能夠再訂閱主題的全部分區的消息一次。先生產2個消息:
❯ go run confluent_producer.go localhost:9092 strconv
Created Producer rdkafka#producer-1
Delivered message to topic strconv [0] at offset 0
❯ go run confluent_producer.go localhost:9092 strconv
Created Producer rdkafka#producer-1
Delivered message to topic strconv [0] at offset 1
複製代碼
能夠看到執行一次就會發佈一個消息,因爲strconv只有一個分區,因此輸出都是[0]
,而offset會從0開始遞增。接着啓動消費者:
# 終端1
❯ go run confluent_consumer.go localhost:9092 1 strconv
Created Consumer rdkafka#consumer-1
% Message on strconv[0]@0:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
% Message on strconv[0]@1:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
Ignored OffsetsCommitted (<nil>, [strconv[0]@2])
# 終端2
❯ go run confluent_consumer.go localhost:9092 2 strconv
Created Consumer rdkafka#consumer-1
% Message on strconv[0]@0:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
% Message on strconv[0]@1:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
Ignored OffsetsCommitted (<nil>, [strconv[0]@2])
複製代碼
2個終端下,GroupID不一樣,因此他們各自消費了所有消息(2個)。
另外說明一下,這篇文章裏全部代碼部分,生產者都接收2個參數:消息代理服務器地址、Topic名字,消費者都接收三個參數:消息代理服務器地址、GroupID、Topic名字。
sarama是Shopify開源的Golang客戶端。第一步仍是先安裝它:
❯ go get -u github.com/Shopify/sarama
複製代碼
爲了演示多分區消息,此次從新建立一個Topic(叫作sarama),建2個分區:
❯ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic sarama
複製代碼
而後看生產者:
package main
import (
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"strconv"
"time"
"github.com/Shopify/sarama"
)
func main() {
if len(os.Args) != 3 {
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <topic>\n",
os.Args[0])
os.Exit(1)
}
broker := os.Args[1]
topic := os.Args[2]
config := sarama.NewConfig()
config.Producer.Retry.Max = 5
config.Producer.RequiredAcks = sarama.WaitForAll
producer, err := sarama.NewAsyncProducer([]string{broker}, config)
if err != nil {
panic(err)
}
defer func() {
if err := producer.Close(); err != nil {
panic(err)
}
}()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
chars := []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
var enqueued, errors int
doneCh := make(chan struct{})
go func() {
for {
time.Sleep(1 * time.Second)
buf := make([]byte, 4)
for i := 0; i < 4; i++ {
buf[i] = chars[rand.Intn(len(chars))]
}
strTime := strconv.Itoa(int(time.Now().Unix()))
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(strTime),
Value: sarama.StringEncoder(buf),
}
select {
case producer.Input() <- msg:
enqueued++
fmt.Printf("Produce message: %s\n", buf)
case err := <-producer.Errors():
errors++
fmt.Println("Failed to produce message:", err)
case <-signals:
doneCh <- struct{}{}
}
}
}()
<-doneCh
log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
}
複製代碼
sarama有AsyncProducer和SyncProducer2中,這裏是異步的。每次把鍵位當時時間戳,值爲隨機4個字符串的消息經過producer.Input()
通道發佈。
消費程序也是用消費組:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/Shopify/sarama"
)
type Consumer struct {
ready chan bool
}
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
close(consumer.ready)
return nil
}
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: key = %s, value = %v, topic = %s, partition = %v, offset = %v", string(message.Key), string(message.Value), message.Topic, message.Partition, message.Offset)
session.MarkMessage(message, "")
}
return nil
}
func main() {
if len(os.Args) < 4 {
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
os.Args[0])
os.Exit(1)
}
broker := os.Args[1]
group := os.Args[2]
topics := os.Args[3:]
version, err := sarama.ParseKafkaVersion("2.3.0")
if err != nil {
log.Panicf("Error parsing Kafka version: %v", err)
}
config := sarama.NewConfig()
config.Version = version
consumer := Consumer{
ready: make(chan bool, 0),
}
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup([]string{broker}, group, config)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}
wg := &sync.WaitGroup{}
go func() {
wg.Add(1)
defer wg.Done()
for {
if err := client.Consume(ctx, topics, &consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool, 0)
}
}()
<-consumer.ready
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
}
複製代碼
消費者要這個要複雜一些,一開始聲明瞭Consumer結構體,包含Setup/Cleanup/ConsumeClaim方法,這個都是處理時須要的方法。另外在消費組用法下須要sarama.ParseKafkaVersion("2.3.0")
指定Kafka版本,另外這裏面也添加了信號Signal,當終端程序時會判斷終止緣由,若是是Ctrl+c之類的信號引發的會打印terminating: via signal
。
另外用了消費邏輯是放在協程中運行的,用了sync.WaitGroup保證協程運行結束再關閉。還要要注意用context.WithCancel
建立的上下文也也返回了取消函數,須要在最後取消上下文,要否則信號沒法終止程序。
運行一下:
❯ go run sarama_producer.go localhost:9092 sarama
Produce message: BZGB
Produce message: CTCU
Produce message: SJFB
Produce message: DNJO
Produce message: EZQL
Produce message: JZPF
Produce message: SBZR
Produce message: FDZD
❯ go run sarama_consumer.go localhost:9092 1 sarama
2019/07/19 13:01:07 Message claimed: key = 1563512466, value = BZGB, topic = sarama, partition = 0, offset = 0
2019/07/19 13:01:07 Message claimed: key = 1563512467, value = CTCU, topic = sarama, partition = 1, offset = 0
2019/07/19 13:01:08 Message claimed: key = 1563512468, value = SJFB, topic = sarama, partition = 0, offset = 1
2019/07/19 13:01:09 Message claimed: key = 1563512469, value = DNJO, topic = sarama, partition = 1, offset = 1
2019/07/19 13:01:10 Message claimed: key = 1563512470, value = EZQL, topic = sarama, partition = 1, offset = 2
2019/07/19 13:01:11 Message claimed: key = 1563512471, value = JZPF, topic = sarama, partition = 0, offset = 2
2019/07/19 13:01:12 Message claimed: key = 1563512472, value = SBZR, topic = sarama, partition = 1, offset = 3
2019/07/19 13:01:13 Message claimed: key = 1563512473, value = FDZD, topic = sarama, partition = 0, offset = 3
複製代碼
能夠看到這些消息被相對均勻的分佈到0和1這2個分區裏面。另外能夠在其餘終端上用別的GroupID訂閱消息如go run sarama_consumer.go localhost:9092 2 sarama
(GroupID爲2)。
若是多個終端的GroupID同樣,不一樣的進程會消費綁定的對應分區裏面的消息,不會重複消費。舉個例子:有2個終端執行go run sarama_consumer.go localhost:9092 1 sarama
,那麼終端A會消費0分區的,終端B會消費1分區的消息。但有3個終端執行的話,因爲目前只有2個分區,終端C因爲沒有綁定到分區,什麼都不會消費
Sarama不管是文檔仍是API都差不少,至於Star會比confluent-kafka-go高不少我不太理解,多是Sarama建立時間比較早。
我決定在之後的項目中使用confluent-kafka-go,若是你有對應的在生產環境中使用的經驗,歡迎留言告訴我~
原文地址: strconv.com/posts/use-k…
完整代碼能夠在這個地址找到。