使用Kafka(附Golang代碼)

Kafka是由LinkedIn開發的一個分佈式的消息中間件。git

安裝

首先到官網下載頁面下載最新的發佈版本,目前最新版是2.3.0(發佈於2019年6月25日)。github

❯ wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
❯ tar -xzf kafka_2.12-2.3.0.tgz
❯ cd kafka_2.12-2.3.0
複製代碼

Kafka須要配置Zookeeper使用,Zookeeper是Hadoop和Hbase的重要組件,能夠爲分佈式應用程序協調服務。因此須要先啓動一個ZooKeeper服務器,kafka源碼中自帶了便捷腳原本快速簡單地建立一個單節點ZooKeeper實例:golang

❯ bin/zookeeper-server-start.sh config/zookeeper.properties  # 放在終端1或者tmux裏面
複製代碼

而後啓動Kafka服務器(啓動前應該已經安裝Openjdk了):apache

❯ bin/kafka-server-start.sh config/server.properties
複製代碼

這樣Kafka服務器就啓動了,首先體驗下在終端用源碼自帶的腳本建立Topic,發佈和消費消息等:bootstrap

# 建立叫作「strconv」 的topic, 它有一個分區和一個副本
❯ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic strconv
# 列出所有topic
❯ bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
strconv
test
# 啓動生產者,在交互模式下輸入2條消息
❯ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic strconv
>Message 1
>Message 2
# 啓動消費者,從開始部分消費,會將消息轉儲到標準輸出
❯ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic strconv --from-beginning
Message 1
Message 2
複製代碼

我就不繼續演示多代理集羣等用法了,能夠看官方文檔瞭解。bash

接着用Golang編寫生產者和消費者,目前有2個主流的Golang客戶端,咱們挨個體驗下。服務器

順便解釋下,雖然 LinkedIn 開源了 Kafka,可是這個公司的核心語言用的是 Java,並且鮮少 Golang 應用,因此本身並無 Golang 客戶端。session

confluent-kafka-go

confluent-kafka-go是Confluent公司的開源的Golang客戶端。它其 實是C/C++客戶端librdkafka的Golang封裝。先安裝它:異步

❯ brew install librdkafka pkg-config
❯ go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
複製代碼

項目下的examples 目錄下有不少例子供參考。我就仿着寫一個例子,首先看生產者:分佈式

package main

import (
	"fmt"
	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
	"os"
)

func main() {

	if len(os.Args) != 3 {
		fmt.Fprintf(os.Stderr, "Usage: %s <broker> <topic>\n",
			os.Args[0])
		os.Exit(1)
	}

	broker := os.Args[1]
	topic := os.Args[2]

	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})

	if err != nil {
		fmt.Printf("Failed to create producer: %s\n", err)
		os.Exit(1)
	}

	fmt.Printf("Created Producer %v\n", p)
	deliveryChan := make(chan kafka.Event)

	value := "Hello Go!"
	err = p.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
		Value:          []byte(value),
		Headers:        []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
	}, deliveryChan)

	e := <-deliveryChan
	m := e.(*kafka.Message)

	if m.TopicPartition.Error != nil {
		fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
	} else {
		fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
			*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
	}

	close(deliveryChan)
}
複製代碼

例子裏面用了os.Args,能夠得到終端位置參數的結果,confluent_producer.go須要傳遞2個參數:broker和topic。用Produce方法就發佈一條消息,內容是"Hello Go!",另外消息中有個鍵爲myTestHeader值爲"header values are binary"的頭信息" 。最後看Message的結果判斷是否是交付成功了,成功後會打印分區和消費進度offset。而後是消費者:

package main

import (
	"fmt"
	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
	"os"
	"os/signal"
	"syscall"
)

func main() {

	if len(os.Args) < 4 {
		fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
			os.Args[0])
		os.Exit(1)
	}

	broker := os.Args[1]
	group := os.Args[2]
	topics := os.Args[3:]
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": broker,
		"broker.address.family": "v4",
		"group.id":              group,
		"session.timeout.ms":    6000,
		"auto.offset.reset":     "earliest"})

	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
		os.Exit(1)
	}

	fmt.Printf("Created Consumer %v\n", c)

	err = c.SubscribeTopics(topics, nil)

	run := true

	for run == true {
		select {
		case sig := <-sigchan:
			fmt.Printf("Caught signal %v: terminating\n", sig)
			run = false
		default:
			ev := c.Poll(100)
			if ev == nil {
				continue
			}

			switch e := ev.(type) {
			case *kafka.Message:
				fmt.Printf("%% Message on %s:\n%s\n",
					e.TopicPartition, string(e.Value))
				if e.Headers != nil {
					fmt.Printf("%% Headers: %v\n", e.Headers)
				}
			case kafka.Error:
				fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
				if e.Code() == kafka.ErrAllBrokersDown {
					run = false
				}
			default:
				fmt.Printf("Ignored %v\n", e)
			}
		}
	}

	fmt.Printf("Closing consumer\n")
	c.Close()
}
複製代碼

消費者接受三個參數:broker地址、GroupID和Topic名字。GroupID做用於消費者組,相同的GroupID標識消費者在一個組內,這些消費者協調在一塊兒來消費訂閱主題的全部分區,因此用一個新的GroupID能夠再訂閱主題的全部分區的消息一次。先生產2個消息:

❯ go run confluent_producer.go localhost:9092 strconv
Created Producer rdkafka#producer-1
Delivered message to topic strconv [0] at offset 0
❯ go run confluent_producer.go localhost:9092 strconv
Created Producer rdkafka#producer-1
Delivered message to topic strconv [0] at offset 1
複製代碼

能夠看到執行一次就會發佈一個消息,因爲strconv只有一個分區,因此輸出都是[0],而offset會從0開始遞增。接着啓動消費者:

# 終端1
❯ go run confluent_consumer.go localhost:9092 1 strconv
Created Consumer rdkafka#consumer-1
% Message on strconv[0]@0:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
% Message on strconv[0]@1:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
Ignored OffsetsCommitted (<nil>, [strconv[0]@2])
# 終端2
❯ go run confluent_consumer.go localhost:9092 2 strconv
Created Consumer rdkafka#consumer-1
% Message on strconv[0]@0:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
% Message on strconv[0]@1:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
Ignored OffsetsCommitted (<nil>, [strconv[0]@2])
複製代碼

2個終端下,GroupID不一樣,因此他們各自消費了所有消息(2個)。

另外說明一下,這篇文章裏全部代碼部分,生產者都接收2個參數:消息代理服務器地址、Topic名字,消費者都接收三個參數:消息代理服務器地址、GroupID、Topic名字。

Sarama

sarama是Shopify開源的Golang客戶端。第一步仍是先安裝它:

❯ go get -u github.com/Shopify/sarama
複製代碼

爲了演示多分區消息,此次從新建立一個Topic(叫作sarama),建2個分區:

❯ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic sarama
複製代碼

而後看生產者:

package main

import (
	"fmt"
	"log"
	"math/rand"
	"os"
	"os/signal"
	"strconv"
	"time"

	"github.com/Shopify/sarama"
)

func main() {
	if len(os.Args) != 3 {
		fmt.Fprintf(os.Stderr, "Usage: %s <broker> <topic>\n",
			os.Args[0])
		os.Exit(1)
	}

	broker := os.Args[1]
	topic := os.Args[2]

	config := sarama.NewConfig()
	config.Producer.Retry.Max = 5
	config.Producer.RequiredAcks = sarama.WaitForAll
	producer, err := sarama.NewAsyncProducer([]string{broker}, config)
	if err != nil {
		panic(err)
	}

	defer func() {
		if err := producer.Close(); err != nil {
			panic(err)
		}
	}()

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	chars := []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ")

	var enqueued, errors int
	doneCh := make(chan struct{})
	go func() {
		for {

			time.Sleep(1 * time.Second)

			buf := make([]byte, 4)
			for i := 0; i < 4; i++ {
				buf[i] = chars[rand.Intn(len(chars))]
			}

			strTime := strconv.Itoa(int(time.Now().Unix()))
			msg := &sarama.ProducerMessage{
				Topic: topic,
				Key:   sarama.StringEncoder(strTime),
				Value: sarama.StringEncoder(buf),
			}
			select {
			case producer.Input() <- msg:
				enqueued++
				fmt.Printf("Produce message: %s\n", buf)
			case err := <-producer.Errors():
				errors++
				fmt.Println("Failed to produce message:", err)
			case <-signals:
				doneCh <- struct{}{}
			}
		}
	}()

	<-doneCh
	log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
}
複製代碼

sarama有AsyncProducer和SyncProducer2中,這裏是異步的。每次把鍵位當時時間戳,值爲隨機4個字符串的消息經過producer.Input()通道發佈。

消費程序也是用消費組:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"

	"github.com/Shopify/sarama"
)

type Consumer struct {
	ready chan bool
}

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
	close(consumer.ready)
	return nil
}

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		log.Printf("Message claimed: key = %s, value = %v, topic = %s, partition = %v, offset = %v", string(message.Key), string(message.Value), message.Topic, message.Partition, message.Offset)
		session.MarkMessage(message, "")
	}

	return nil
}

func main() {
	if len(os.Args) < 4 {
		fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
			os.Args[0])
		os.Exit(1)
	}

	broker := os.Args[1]
	group := os.Args[2]
	topics := os.Args[3:]

	version, err := sarama.ParseKafkaVersion("2.3.0")
	if err != nil {
		log.Panicf("Error parsing Kafka version: %v", err)
	}

	config := sarama.NewConfig()
	config.Version = version
	consumer := Consumer{
		ready: make(chan bool, 0),
	}

	ctx, cancel := context.WithCancel(context.Background())
	client, err := sarama.NewConsumerGroup([]string{broker}, group, config)
	if err != nil {
		log.Panicf("Error creating consumer group client: %v", err)
	}

	wg := &sync.WaitGroup{}
	go func() {
		wg.Add(1)
		defer wg.Done()
		for {
			if err := client.Consume(ctx, topics, &consumer); err != nil {
				log.Panicf("Error from consumer: %v", err)
			}
			if ctx.Err() != nil {
				return
			}
			consumer.ready = make(chan bool, 0)
		}
	}()

	<-consumer.ready

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-ctx.Done():
		log.Println("terminating: context cancelled")
	case <-sigterm:
		log.Println("terminating: via signal")
	}
	cancel()
	wg.Wait()
	if err = client.Close(); err != nil {
		log.Panicf("Error closing client: %v", err)
	}
}
複製代碼

消費者要這個要複雜一些,一開始聲明瞭Consumer結構體,包含Setup/Cleanup/ConsumeClaim方法,這個都是處理時須要的方法。另外在消費組用法下須要sarama.ParseKafkaVersion("2.3.0")指定Kafka版本,另外這裏面也添加了信號Signal,當終端程序時會判斷終止緣由,若是是Ctrl+c之類的信號引發的會打印terminating: via signal

另外用了消費邏輯是放在協程中運行的,用了sync.WaitGroup保證協程運行結束再關閉。還要要注意用context.WithCancel建立的上下文也也返回了取消函數,須要在最後取消上下文,要否則信號沒法終止程序。

運行一下:

❯ go run sarama_producer.go localhost:9092 sarama
Produce message: BZGB
Produce message: CTCU
Produce message: SJFB
Produce message: DNJO
Produce message: EZQL
Produce message: JZPF
Produce message: SBZR
Produce message: FDZD

❯ go run sarama_consumer.go localhost:9092 1 sarama
2019/07/19 13:01:07 Message claimed: key = 1563512466, value = BZGB, topic = sarama, partition = 0, offset = 0
2019/07/19 13:01:07 Message claimed: key = 1563512467, value = CTCU, topic = sarama, partition = 1, offset = 0
2019/07/19 13:01:08 Message claimed: key = 1563512468, value = SJFB, topic = sarama, partition = 0, offset = 1
2019/07/19 13:01:09 Message claimed: key = 1563512469, value = DNJO, topic = sarama, partition = 1, offset = 1
2019/07/19 13:01:10 Message claimed: key = 1563512470, value = EZQL, topic = sarama, partition = 1, offset = 2
2019/07/19 13:01:11 Message claimed: key = 1563512471, value = JZPF, topic = sarama, partition = 0, offset = 2
2019/07/19 13:01:12 Message claimed: key = 1563512472, value = SBZR, topic = sarama, partition = 1, offset = 3
2019/07/19 13:01:13 Message claimed: key = 1563512473, value = FDZD, topic = sarama, partition = 0, offset = 3
複製代碼

能夠看到這些消息被相對均勻的分佈到0和1這2個分區裏面。另外能夠在其餘終端上用別的GroupID訂閱消息如go run sarama_consumer.go localhost:9092 2 sarama(GroupID爲2)。

若是多個終端的GroupID同樣,不一樣的進程會消費綁定的對應分區裏面的消息,不會重複消費。舉個例子:有2個終端執行go run sarama_consumer.go localhost:9092 1 sarama,那麼終端A會消費0分區的,終端B會消費1分區的消息。但有3個終端執行的話,因爲目前只有2個分區,終端C因爲沒有綁定到分區,什麼都不會消費

後記

Sarama不管是文檔仍是API都差不少,至於Star會比confluent-kafka-go高不少我不太理解,多是Sarama建立時間比較早。

我決定在之後的項目中使用confluent-kafka-go,若是你有對應的在生產環境中使用的經驗,歡迎留言告訴我~

代碼地址

原文地址: strconv.com/posts/use-k…

完整代碼能夠在這個地址找到。

延伸閱讀

  1. kafka.apache.org/quickstart
相關文章
相關標籤/搜索