golang實現kafka的消息推送

Kafka的安裝與啓動

kafka中涉及的名詞
  1. 消息記錄:由一個key,一個value和一個時間戳構成,消息最終存儲在主題下的分區中,記錄在生產中稱爲生產者記錄,在消費者中稱爲消費記錄。Kafka集羣保持了全部發布的消息,直到它們過時,不管消息是否被消費了,在一個可配置的時間段內,Kafka集羣保留了全部發布的消息。好比消息的保存策略被設置爲2天,那麼在一個消息被髮布的兩天時間內,它都是能夠被消費的。Kafka的性能是和數據量無關的常量級的,因此保留太多數據並非問題
  2. 生成者:生產者用於發佈消息
  3. 消費者:消費者用於訂閱消息
  4. 消費者組:相同的groupID的消費者將視爲同一個消費者組,每一個消費者都須要設置一個組id,每條消息只能被consumer group中的一個Consumer消費,可是能夠被多個consumer group消費
  5. 主題(topic):消息的一種邏輯分組,用於對消息分門別類,每一類消息稱之爲一個主題,相同主題的消息放在一個隊列中
  6. 分區(partition):消息的一種物理分組,一個主題被拆成多個分區,每個分區就是一個順序的,不可變的消息隊列,而且能夠持續添加,分區中的每一個消息都被分配了一個惟一的id,稱之爲偏移量(offset),在每一個分區中偏移量都是惟一的。每一個分區對應一個邏輯log,有多個segment組成
  7. 偏移量:分區中每一個消息都有一個惟一的Id,稱之爲偏移量,表明已經消費的位置
  8. 代理(broker):一臺kafka服務器稱之爲一個broker
  9. 副本(replica):副本只是一個分區(partition)的備份。副本不讀取或寫入數據。它們用於防止數據丟失
  10. 領導者:leader是負責給定分區的全部讀取和寫入的節點
  11. 追隨者:跟隨領導者指令的節點被稱爲Follower。
  12. zookeeper:Kafka代理是無狀態的,因此它們使用Zookeeper來維護它們的集羣狀態。Zookeeper用於管理和協調Kafka代理
kafka功能
  • 發佈訂閱:生產者生產消息(數據流),將消息發送給kafka指定的主題隊列中,也能夠發送到topic中的指定分區中,消費者從kafka的指定隊列中獲取消息,而後來處理消息
  • 一. Mac版安裝
brew install kafka

安裝kafka須要依賴zookeeper的,因此安裝kafka的時候也會包含zookergit

  • kafka的安裝目錄:/usr/local/Cellar/kafka
  • kafka的配置文件目錄:/usr/local/etc/kafka
  • kafka服務的配置文件:/usr/local/etc/kafka/server.properties
  • zookeeper配置文件:/usr/local/etc/kafka/zookeeper.properties

server.properties中重要配置

  1. broker.id=0
  2. listeners=PLAINTEXT://:9092
  3. advertised.listeners=PLAINTEXT://127.0.0.1:9092
  4. log.dirs=/usr/local/var/lib/kafka-logs

zookeeper.properties重要配置

  1. dataDir=/usr/local/var/lib/zookeeper
  2. clientPort=2181
  3. maxClientCnxns=0
二. 啓動zookeeper

新建立終端啓動zookeeper

  1. cd /usr/local/Cellar/kafka/2.1.0
  2. ./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
  3. 打印臺顯示:INFO Reading configuration from: /usr/local/etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
  4. ...便是啓動成功
三.啓動kafka

新建立終端啓動kafka(啓動kafka以前必須先啓動zookeeper)

  1. cd /usr/local/Cellar/kafka/2.1.0
  2. ./bin/kafka-server-start /usr/local/etc/kafka/server.properties
  3. 打印臺顯示:INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
  4. ...即啓動成功
  5. 啓動了kafka以後,zookeeper端會報一些Error:KeeperErrorCode = NoNode for /config/topics/test之類的錯誤,這個是沒有問題的,這是由於kafka向zookeeper發送了關於該路徑的一些請求信息,可是不存在,因此這是沒有問題的
四.建立topic

新建立終端

  1. cd /usr/local/Cellar/kafka/2.1.0
  2. 建立一個名爲「test」的主題:./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  3. 查看全部的topic:./bin/kafka-topics --list --zookeeper localhost:2181
  4. 查看某個topic的信息,好比test:./bin/kafka-topics --describe --zookeeper localhost:2181 --topic test
五.發送消息

新建立一個終端,做爲生產者,用於發送消息,每一行就是一條信息,將消息發送到kafka服務器

  1. cd /usr/local/Cellar/kafka/2.1.0
  2. ./bin/kafka-console-producer --broker-list localhost:9092 --topic test
  3. send one message
  4. send two message
六.消費消息(接受消息)

新建立一個終端做爲消費者,接受消息

  1. cd /usr/local/Cellar/kafka/2.1.0
  2. ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
  3. send one message
  4. send two message(這些即是從生產者得到的消息)
注意:發送消息與接受消息必須啓動kafka與zookeeper

GoLang實現kafka的信息發佈與訂閱

生產者

import (
    "fmt"
    "github.com/Shopify/sarama"
)


func main() {
    config := sarama.NewConfig()
    // 等待服務器全部副本都保存成功後的響應
    config.Producer.RequiredAcks = sarama.WaitForAll
    // 隨機的分區類型:返回一個分區器,該分區器每次選擇一個隨機分區
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    // 是否等待成功和失敗後的響應
    config.Producer.Return.Successes = true

    // 使用給定代理地址和配置建立一個同步生產者
    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(err)
    }

    defer producer.Close()

    //構建發送的消息,
    msg := &sarama.ProducerMessage {
        //Topic: "test",//包含了消息的主題
        Partition: int32(10),//
        Key:        sarama.StringEncoder("key"),//
    }

    var value string
    var msgType string
    for {
        _, err := fmt.Scanf("%s", &value)
        if err != nil {
            break
        }
        fmt.Scanf("%s",&msgType)
        fmt.Println("msgType = ",msgType,",value = ",value)
        msg.Topic = msgType
        //將字符串轉換爲字節數組
        msg.Value = sarama.ByteEncoder(value)
        //fmt.Println(value)
        //SendMessage:該方法是生產者生產給定的消息
        //生產成功的時候返回該消息的分區和所在的偏移量
        //生產失敗的時候返回error
        partition, offset, err := producer.SendMessage(msg)

        if err != nil {
            fmt.Println("Send message Fail")
        }
        fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
    }
}

消費者

import (
    "fmt"
    "github.com/Shopify/sarama"
    "sync"
    )
var (
    wg  sync.WaitGroup
)
func main() {
    // 根據給定的代理地址和配置建立一個消費者
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    if err != nil {
        panic(err)
    }
    //Partitions(topic):該方法返回了該topic的全部分區id
    partitionList, err := consumer.Partitions("test")
    if err != nil {
        panic(err)
    }

    for partition := range partitionList {
        //ConsumePartition方法根據主題,分區和給定的偏移量建立建立了相應的分區消費者
        //若是該分區消費者已經消費了該信息將會返回error
        //sarama.OffsetNewest:代表了爲最新消息
        pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
        if err != nil {
            panic(err)
        }
        defer pc.AsyncClose()
        wg.Add(1)
        go func(sarama.PartitionConsumer) {
            defer wg.Done()
            //Messages()該方法返回一個消費消息類型的只讀通道,由代理產生
            for msg := range pc.Messages() {
                fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
            }
        }(pc)
    }
    wg.Wait()
    consumer.Close()
}

kafka使用場景

  • kafka的應用很普遍,在這裏簡單介紹幾種
    • 服務解耦github

      好比咱們發了一個帖子,除了寫入數據庫以外還有不少聯動操做,好比給關注這個用戶的人發送通知,推送到首頁的時間線列表,若是用代碼實現的話,發帖服務就要調用通知服務,時間線服務,這樣的耦合很大,而且若是增長一個功能依賴發帖,除了要增長新功能外還要修改發帖代碼。golang

      解決方法:引入kafka,將發完貼的消息放入kafka消息隊列中,對這個主題感興趣的功能就本身去消費這個消息,那麼發帖功能就可以徹底獨立。同時即便發帖進程掛了,其餘功能還可以使用,這樣能夠將bug隔離在最小範圍內數據庫

    • 流量削峯apache

    流量削峯在消息隊列中也是經常使用場景,通常在秒殺或團購活動中使用比較普遍。當流量太大的時候達到服務器瓶頸的時候能夠將事件放在kafka中,下游服務器當接收到消息的時候本身去消費,有效防止服務器被擠垮bootstrap

    • 消息通信

    消息隊列通常都內置了高效的通訊機制,所以也能夠用在純的消息通信中,好比客戶端A跟客戶端B都使用同一隊列進行消息通信,客戶端A,客戶端B,客戶端N都訂閱了同一個主題進行消息發佈和接受不了實現相似聊天室效果數組

參考代碼服務器

相關文章
相關標籤/搜索