1、kafka集羣搭建 html
至於kafka是什麼我都很少作介紹了,網上寫的已經很是詳盡了。java
(沒安裝java環境的須要先安裝 yum -y install java-1.8.0-openjdk*)git
1. 下載zookeeper https://zookeeper.apache.org/releases.htmlgithub
2. 下載kafka http://kafka.apache.org/downloadsgolang
3. 啓動zookeeper集羣(個人示例是3臺機器,後面的kafka也同樣,這裏就以1臺代指3臺,固然你也能夠只開1臺)apache
1)配置zookeeper。 修改複製一份 zookeeper-3.4.13/conf/zoo_sample.cfg 更名成zoo.cfg。修改如下幾個參數,改爲適合本身機器的。session
dataDir=/home/test/zookeeper/data dataLogDir=/home/test/zookeeper/log server.1=10.22.1.1:2888:3888 server.2=10.22.1.2:2888:3888 server.3=10.22.1.3:2888:3888
2) 建立myid文件,肯定機器編號。分別在3臺機器的/home/test/zookeeper/data目錄執行分別執行命令 echo 1 > myid(注意ip爲10.22.1.2把1改爲2,見上面的配置)app
3) 啓動zookeeper集羣。分別進入目錄zookeeper-3.4.13/bin 執行 sh zkServer.sh startdom
4. 啓動kafka集羣工具
1) 配置kafka。進入kafka_2.11-2.2.0/config。複製3份,分別爲server1.properties,server2.properties,server3.properties。修改如下幾項(注意對應的機器id)
log.dirs和zookeeper.connect 是同樣的。broker.id和listeners分別填對應的id和ip
broker.id=1 listeners=PLAINTEXT://10.22.1.1:9092 log.dirs=/home/test/kafka/log zookeeper.connect=10.22.1.1:2181,10.22.1.2:2181,10.22.1.3:2181
2) 啓動kafka集羣。分別進入kafka_2.11-2.2.0/bin目錄,分別執行sh kafka-server-start.sh ../config/server1.properties (第2臺用server2.properties配置文件)
2、Golang生產者和消費者
目前比較流行的golang版的kafka客戶端庫有兩個:
1. https://github.com/Shopify/sarama
2. https://github.com/confluentinc/confluent-kafka-go
至於誰好誰壞本身去分辨,我用的是第1個,star比較多的。
1. kafka生產者代碼
這裏有2點要說明:
1) config.Producer.Partitioner = sarama.NewRandomPartitioner,我分partition用的是隨機,若是你想穩定分paritition的話能夠自定義,還有輪詢和hash方式
2) 個人topic是走的外部配置,能夠根據本身的需求修改
// Package kafka_producer kafka 生產者的包裝 package kafka_producer import ( "github.com/Shopify/sarama" "strings" "sync" "time" "github.com/alecthomas/log4go" ) // Config 配置 type Config struct { Topic string `xml:"topic"` Broker string `xml:"broker"` Frequency int `xml:"frequency"` MaxMessage int `xml:"max_message"` } type Producer struct { producer sarama.AsyncProducer topic string msgQ chan *sarama.ProducerMessage wg sync.WaitGroup closeChan chan struct{} } // NewProducer 構造KafkaProducer func NewProducer(cfg *Config) (*Producer, error) { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.NoResponse // Only wait for the leader to ack config.Producer.Compression = sarama.CompressionSnappy // Compress messages config.Producer.Flush.Frequency = time.Duration(cfg.Frequency) * time.Millisecond // Flush batches every 500ms config.Producer.Partitioner = sarama.NewRandomPartitioner p, err := sarama.NewAsyncProducer(strings.Split(cfg.Broker, ","), config) if err != nil { return nil, err } ret := &Producer{ producer: p, topic: cfg.Topic, msgQ: make(chan *sarama.ProducerMessage, cfg.MaxMessage), closeChan: make(chan struct{}), } return ret, nil } // Run 運行 func (p *Producer) Run() { p.wg.Add(1) go func() { defer p.wg.Done() LOOP: for { select { case m := <-p.msgQ: p.producer.Input() <- m case err := <-p.producer.Errors(): if nil != err && nil != err.Msg { l4g.Error("[producer] err=[%s] topic=[%s] key=[%s] val=[%s]", err.Error(), err.Msg.Topic, err.Msg.Key, err.Msg.Value) } case <-p.closeChan: break LOOP } } }() for hasTask := true; hasTask; { select { case m := <-p.msgQ: p.producer.Input() <- m default: hasTask = false } } } // Close 關閉 func (p *Producer) Close() error { close(p.closeChan) l4g.Warn("[producer] is quiting") p.wg.Wait() l4g.Warn("[producer] quit over") return p.producer.Close() } // Log 發送log func (p *Producer) Log(key string, val string) { msg := &sarama.ProducerMessage{ Topic: p.topic, Key: sarama.StringEncoder(key), Value: sarama.StringEncoder(val), } select { case p.msgQ <- msg: return default: l4g.Error("[producer] err=[msgQ is full] key=[%s] val=[%s]", msg.Key, msg.Value) } }
2. kafka消費者
幾點說明:
1) kafka必定要選用支持集羣的版本
2) 裏面帶了建立topic,刪除topic,打印topic的工具
3) replication是外面配置的
4) 開多個consumer須要在建立topic時設置多個partition。官方的示例當開多個consumer的時候會崩潰,我這個版本不會,我給官方提交了一個PR,還不知道有沒有采用
// Package main Kafka消費者 package main import ( "context" "encoding/xml" "flag" "fmt" "io/ioutil" "log" "os" "os/signal" "runtime" "strings" "syscall" "time" "github.com/Shopify/sarama" "github.com/alecthomas/log4go" ) // Consumer Consumer配置 type ConsumerConfig struct { Topic []string `xml:"topic"` Broker string `xml:"broker"` Partition int32 `xml:"partition"` Replication int16 `xml:"replication"` Group string `xml:"group"` Version string `xml:"version"` } var ( configFile = "" // 配置路徑 initTopic = false listTopic = false delTopic = "" cfg = &Config{} ) // Config 配置 type Config struct { Consumer ConsumerConfig `xml:"consumer"` } func init() { flag.StringVar(&configFile, "config", "../config/consumer.xml", "config file ") flag.BoolVar(&initTopic, "init", initTopic, "create topic") flag.BoolVar(&listTopic, "list", listTopic, "list topic") flag.StringVar(&delTopic, "del", delTopic, "delete topic") } func main() { runtime.GOMAXPROCS(runtime.NumCPU()) defer func() { time.Sleep(time.Second) log4go.Warn("[main] consumer quit over!") log4go.Global.Close() }() contents, _ := ioutil.ReadFile(configFile) xml.Unmarshal(contents, cfg) // sarama的logger sarama.Logger = log.New(os.Stdout, fmt.Sprintf("[%s]", "consumer"), log.LstdFlags) // 指定kafka版本,必定要支持kafka集羣 version, err := sarama.ParseKafkaVersion(cfg.Consumer.Version) if err != nil { panic(err) } config := sarama.NewConfig() config.Version = version config.Consumer.Offsets.Initial = sarama.OffsetOldest // 工具 if tool(cfg, config) { return } // kafka consumer client ctx, cancel := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(strings.Split(cfg.Consumer.Broker, ","), cfg.Consumer.Group, config) if err != nil { panic(err) } consumer := Consumer{} go func() { for { err := client.Consume(ctx, cfg.Consumer.Topic, &consumer) if err != nil { log4go.Error("[main] client.Consume error=[%s]", err.Error()) // 5秒後重試 time.Sleep(time.Second * 5) } } }() // os signal sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) <-sigterm cancel() err = client.Close() if err != nil { panic(err) } log4go.Info("[main] consumer is quiting") } func tool(cfg *Config, config *sarama.Config) bool { if initTopic || listTopic || len(delTopic) > 0 { ca, err := sarama.NewClusterAdmin(strings.Split(cfg.Consumer.Broker, ","), config) if nil != err { panic(err) } if len(delTopic) > 0 { // 刪除Topic if err := ca.DeleteTopic(delTopic); nil != err { panic(err) } log4go.Info("delete ok topic=[%s]\n", delTopic) } else if initTopic { // 初始化Topic if detail, err := ca.ListTopics(); nil != err { panic(err) } else { for _, v := range cfg.Consumer.Topic { if d, ok := detail[v]; ok { if cfg.Consumer.Partition > d.NumPartitions { if err := ca.CreatePartitions(v, cfg.Consumer.Partition, nil, false); nil != err { panic(err) } log4go.Info("alter topic ok", v, cfg.Consumer.Partition) } } else { if err := ca.CreateTopic(v, &sarama.TopicDetail{NumPartitions: cfg.Consumer.Partition, ReplicationFactor: cfg.Consumer.Replication}, false); nil != err { panic(err) } log4go.Info("create topic ok", v) } } } } // 顯示Topic列表 if detail, err := ca.ListTopics(); nil != err { log4go.Info("ListTopics error", err) } else { for k := range detail { log4go.Info("[%s] %+v", k, detail[k]) } } if err := ca.Close(); nil != err { panic(err) } return true } return false } type Consumer struct { } func (consumer *Consumer) Setup(s sarama.ConsumerGroupSession) error { return nil } func (consumer *Consumer) Cleanup(s sarama.ConsumerGroupSession) error { return nil } func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { key := string(message.Key) val := string(message.Value) log4go.Info("%s-%s", key, val) session.MarkMessage(message, "") } return nil }