brew install kafka
安裝kafka須要依賴zookeeper的,因此安裝kafka的時候也會包含zookergit
server.properties中重要配置
- broker.id=0
- listeners=PLAINTEXT://:9092
- advertised.listeners=PLAINTEXT://127.0.0.1:9092
- log.dirs=/usr/local/var/lib/kafka-logs
zookeeper.properties重要配置
- dataDir=/usr/local/var/lib/zookeeper
- clientPort=2181
- maxClientCnxns=0
新建立終端啓動zookeeper
- cd /usr/local/Cellar/kafka/2.1.0
- ./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
- 打印臺顯示:INFO Reading configuration from: /usr/local/etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
- ...便是啓動成功
新建立終端啓動kafka(啓動kafka以前必須先啓動zookeeper)
- cd /usr/local/Cellar/kafka/2.1.0
- ./bin/kafka-server-start /usr/local/etc/kafka/server.properties
- 打印臺顯示:INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
- ...即啓動成功
- 啓動了kafka以後,zookeeper端會報一些Error:KeeperErrorCode = NoNode for /config/topics/test之類的錯誤,這個是沒有問題的,這是由於kafka向zookeeper發送了關於該路徑的一些請求信息,可是不存在,因此這是沒有問題的
新建立終端
- cd /usr/local/Cellar/kafka/2.1.0
- 建立一個名爲「test」的主題:./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- 查看全部的topic:./bin/kafka-topics --list --zookeeper localhost:2181
- 查看某個topic的信息,好比test:./bin/kafka-topics --describe --zookeeper localhost:2181 --topic test
新建立一個終端,做爲生產者,用於發送消息,每一行就是一條信息,將消息發送到kafka服務器
- cd /usr/local/Cellar/kafka/2.1.0
- ./bin/kafka-console-producer --broker-list localhost:9092 --topic test
- send one message
- send two message
新建立一個終端做爲消費者,接受消息
- cd /usr/local/Cellar/kafka/2.1.0
- ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
- send one message
- send two message(這些即是從生產者得到的消息)
import ( "fmt" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() // 等待服務器全部副本都保存成功後的響應 config.Producer.RequiredAcks = sarama.WaitForAll // 隨機的分區類型:返回一個分區器,該分區器每次選擇一個隨機分區 config.Producer.Partitioner = sarama.NewRandomPartitioner // 是否等待成功和失敗後的響應 config.Producer.Return.Successes = true // 使用給定代理地址和配置建立一個同步生產者 producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } defer producer.Close() //構建發送的消息, msg := &sarama.ProducerMessage { //Topic: "test",//包含了消息的主題 Partition: int32(10),// Key: sarama.StringEncoder("key"),// } var value string var msgType string for { _, err := fmt.Scanf("%s", &value) if err != nil { break } fmt.Scanf("%s",&msgType) fmt.Println("msgType = ",msgType,",value = ",value) msg.Topic = msgType //將字符串轉換爲字節數組 msg.Value = sarama.ByteEncoder(value) //fmt.Println(value) //SendMessage:該方法是生產者生產給定的消息 //生產成功的時候返回該消息的分區和所在的偏移量 //生產失敗的時候返回error partition, offset, err := producer.SendMessage(msg) if err != nil { fmt.Println("Send message Fail") } fmt.Printf("Partition = %d, offset=%d\n", partition, offset) } }
import ( "fmt" "github.com/Shopify/sarama" "sync" ) var ( wg sync.WaitGroup ) func main() { // 根據給定的代理地址和配置建立一個消費者 consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { panic(err) } //Partitions(topic):該方法返回了該topic的全部分區id partitionList, err := consumer.Partitions("test") if err != nil { panic(err) } for partition := range partitionList { //ConsumePartition方法根據主題,分區和給定的偏移量建立建立了相應的分區消費者 //若是該分區消費者已經消費了該信息將會返回error //sarama.OffsetNewest:代表了爲最新消息 pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest) if err != nil { panic(err) } defer pc.AsyncClose() wg.Add(1) go func(sarama.PartitionConsumer) { defer wg.Done() //Messages()該方法返回一個消費消息類型的只讀通道,由代理產生 for msg := range pc.Messages() { fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } }(pc) } wg.Wait() consumer.Close() }
服務解耦github
好比咱們發了一個帖子,除了寫入數據庫以外還有不少聯動操做,好比給關注這個用戶的人發送通知,推送到首頁的時間線列表,若是用代碼實現的話,發帖服務就要調用通知服務,時間線服務,這樣的耦合很大,而且若是增長一個功能依賴發帖,除了要增長新功能外還要修改發帖代碼。golang
解決方法:引入kafka,將發完貼的消息放入kafka消息隊列中,對這個主題感興趣的功能就本身去消費這個消息,那麼發帖功能就可以徹底獨立。同時即便發帖進程掛了,其餘功能還可以使用,這樣能夠將bug隔離在最小範圍內數據庫
流量削峯apache
流量削峯在消息隊列中也是經常使用場景,通常在秒殺或團購活動中使用比較普遍。當流量太大的時候達到服務器瓶頸的時候能夠將事件放在kafka中,下游服務器當接收到消息的時候本身去消費,有效防止服務器被擠垮bootstrap
消息隊列通常都內置了高效的通訊機制,所以也能夠用在純的消息通信中,好比客戶端A跟客戶端B都使用同一隊列進行消息通信,客戶端A,客戶端B,客戶端N都訂閱了同一個主題進行消息發佈和接受不了實現相似聊天室效果數組
參考代碼服務器