概要java
本文中咱們將討論如何藉助Kafka實現分佈式消息管理,使用事件溯源(Event Sourcing)模式實現原子化數據處理,使用CQRS模式(Command-Query Responsibility Segregation )實現查詢職責分離,使用消費者羣組解決單點故障問題,理解分佈式協調框架Zookeeper的運行機制。整個應用的代碼實現使用Go語言描述。git
爲何須要微服務程序員
微服務自己並不算什麼新概念,它要解決的問題在軟件工程歷史中早已經有人提出:解耦、擴展性、靈活性,解決「爛架構」膨脹後帶來的複雜度問題。github
Conway's law(康威定律)golang
Any organization that designs a system (defined broadly) will produce a design whose structure is a copy of the organization's communication structure.(任何組織在設計一套系統(廣義概念上的系統)時,所交付的設計方案在結構上都與該組織的通訊結構保持一致)面試
-- Melvyn Conway, 1967redis
爲了趕進度加程序員就像用水去滅油鍋裏的火同樣,緣由在於:溝通成本 = n(n-1)/2,溝通成本隨着項目或者組織的人員增長呈指數級增加。不少項目在通過一段時間的發展以後,都會有很多恐龍級代碼,無人敢挑戰。好比一個類的規模就多達數千行,核心方法近千行,大量重複代碼,每次調整都以失敗了結。龐大的系統規模致使團隊新成員接手困難,項目組人員增長致使的代碼衝突問題,系統複雜度的增長致使的不肯定上線風險、引入新技術困難等。docker
微服務 (Microservices)是解決這些困難的衆多方案之一。它本質上是一種軟件架構風格,它是以專一於單一責任與功能的小型功能區塊 (Small Building Blocks) 爲基礎,利用模組化的方式組合出複雜的大型應用程序,各功能區塊使用與語言無關 (Language-Independent/Language agnostic) 的 API 集相互通信。shell
Event Sourcing(事件溯源)數據庫
真正構建一個微服務是很是具備挑戰性的。其中一個最重要的挑戰就是原子化————如何處理分佈式數據,如何設計服務的粒度。例如,常見的客戶、工單場景,若是拆分紅兩個服務,查詢都變成了一個難題:
select * from order o, customer c where o.customer_id = c.id and o.gross_amount > 50000 and o.status = 'PAID' and c.country = 'INDONESIA';
在DDD領域(Domain-Driven Design,領域驅動設計)有一種架構風格被普遍應用,即CQRS (Command Query Responsibility Seperation,命令查詢職責分離)。CQRS最核心的概念是Command、Event,「將數據(Data)看作是事實(Fact)。每一個事實都是過去的痕跡,雖然這種過去能夠遺忘,但卻沒法改變。」 這一思想直接發展了Event Source,即將這些事件的發生過程記錄下來,使得咱們能夠追溯業務流程。CQRS對設計者的影響,是將領域邏輯,尤爲是業務流程,皆看作是一種領域對象狀態遷移的過程。這一點與REST將HTTP應用協議看作是應用狀態遷移的引擎,有着殊途同歸之妙。
實現方案
Kafka in a Nutshell
Apache Kafka是由Apache軟件基金會開發的一個開源消息中間件項目,由Scala寫成。Kafka最初是由LinkedIn開發,並於2011年初開源。2012年10月從Apache Incubator畢業。該項目的目標是爲處理實時數據提供一個統1、高吞吐、低延遲的平臺。Kafka使用Zookeeper做爲其分佈式協調框架,很好的將消息生產、消息存儲、消息消費的過程結合在一塊兒。同時藉助Zookeeper,kafka可以生產者、消費者和broker在內的因此組件在無狀態的狀況下,創建起生產者和消費者的訂閱關係,並實現生產者與消費者的負載均衡。
總體設計
案例:假設一個銀行帳戶系統。通過一段時間的經營發展,該行客戶數量和交易規模都有了巨大的增加,系統內部變得異常複雜,每個部分都變得沉重不堪。咱們嘗試對他的業務單元進行解耦,例如將餘額計算邏輯從原有的核心繫統拆分出來。根據銀行帳戶業務特色,咱們設計一個生產者——負責根據業務事件觸發生成一個事件,全部事件基於Kafka存儲,再設計一個消費者——負責從Kafka抓去未處理事件,經過調用業務邏輯處理單元完成後續持久化操做。這樣一個帳戶的全部業務操做均可以有完整的快照歷史,符合金融業務Audit(審計)的須要。並且經過使用事件,咱們能夠很方便地重建數據。
業務事件列表:
領域模型:帳戶(Account)
holder's name:持有人名稱
balance:餘額
registration date:開戶日期
......
領域模型:事件(Event)
name:事件名稱
ID:序號
......
環境準備
$ wget http://mirror.bit.edu.cn/apache/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz $ tar -xvf kafka_2.10-0.10.1.0.tgz $ cd kafka_2.10-0.10.1.0 $ bin/zookeeper-server-start.sh config/zookeeper.properties $ netstat -an | grep 2181 tcp46 0 0 *.2181 *.* LISTEN
$ bin/kafka-server-start.sh config/server.properties [2017-06-13 14:03:08,168] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) [2017-06-13 14:03:08,172] INFO Kafka version : 0.10.1.0 (org.apache.kafka.common.utils.AppInfoParser) [2017-06-13 14:03:08,172] INFO Kafka commitId : 3402a74efb23d1d4 (org.apache.kafka.common.utils.AppInfoParser) [2017-06-13 14:03:08,173] INFO [Kafka Server 0], started (kafka.server.KafkaServer) $ lsof -nP -iTCP -sTCP:LISTEN | sort -n $ netstat -an | grep 9092 tcp46 0 0 *.9092 *.* LISTEN
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic x-microservice-transactions-t1 Created topic "x-microservice-transactions-t1".
config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2 // 啓動多個broker,須指定不一樣的屬性文件 $ bin/kafka-server-start.sh config/server-1.properties $ bin/kafka-server-start.sh config/server-2.properties
domain model
package main // domain model: bank_account.go type BankAccount struct { Id string Name string Balance int } //定義下列函數: //1. FetchAccount(id) 從Redis讀取帳戶實例信息 //2. updateAccount(id, data) 更新指定帳戶信息 //3. ToAccount(map) 將從Redis讀到的帳戶信息轉換爲模型數據,return *BankAccount object.
Kafka & Redis library
// main.go import ( "github.com/go-redis/redis" // Redis通信庫:go-redis ) var ( Redis = initRedis() ) func initRedis() *redis.Client { redisUrl := os.Getenv("REDIS_URL") if redisUrl == "" { redisUrl = "127.0.0.1:6379" } return redis.NewClient(&redis.Options{ Addr: redisUrl, Password: "", DB: 0, }) } package main //kafka.go import ( "encoding/json" "fmt" "github.com/Shopify/sarama" //Kafka通信庫:Sarama "os" ) var ( brokers = []string{"127.0.0.1:9092"} topic = "go-microservice-transactions" topics = []string{topic} ) func newKafkaConfiguration() *sarama.Config { conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Producer.Return.Successes = true conf.ChannelBufferSize = 1 conf.Version = sarama.V0_10_1_0 return conf } func newKafkaSyncProducer() sarama.SyncProducer { kafka, err := sarama.NewSyncProducer(brokers, newKafkaConfiguration()) if err != nil { fmt.Printf("Kafka error: %s ", err) os.Exit(-1) } return kafka } func newKafkaConsumer() sarama.Consumer { consumer, err := sarama.NewConsumer(brokers, newKafkaConfiguration()) if err != nil { fmt.Printf("Kafka error: %s ", err) os.Exit(-1) } return consumer }
消息生產者Producer
package main //消息生產者 producer.go import ( "bufio" "fmt" "os" "strconv" "strings" ) func mainProducer() { var err error reader := bufio.NewReader(os.Stdin) kafka := newKafkaSyncProducer() for { fmt.Print("-> ") text, _ := reader.ReadString(' ') text = strings.Replace(text, " ", "", -1) args := strings.Split(text, "###") cmd := args[0] switch cmd { case "create": if len(args) == 2 { accName := args[1] event := NewCreateAccountEvent(accName) sendMsg(kafka, event) } else { fmt.Println("Only specify create###Account Name") } default: fmt.Printf("Unknown command %s, only: create, deposit, withdraw, transfer ", cmd) } if err != nil { fmt.Printf("Error: %s ", err) err = nil } } } // kafka.go // 增長髮送消息的方法 func sendMsg(kafka sarama.SyncProducer, event interface{}) error { json, err := json.Marshal(event) if err != nil { return err } msgLog := &sarama.ProducerMessage{ Topic: topic, Value: sarama.StringEncoder(string(json)), } partition, offset, err := kafka.SendMessage(msgLog) if err != nil { fmt.Printf("Kafka error: %s ", err) } fmt.Printf("Message: %+v ", event) fmt.Printf("Message is stored in partition %d, offset %d ", partition, offset) return nil } package main //啓動入口,main.go func main() { mainProducer() } $ go build $ ./go-microservice -> create Only specify create###Account Name -> create###Yanrui Message: {Event:{AccId:49a23d27-4ffe-4c86-ab9a-fbc308ecff1c Type:CreateEvent} AccName:Yanrui} Message is stored in partition 0, offset 0 ->
Consumer負責從Kafka加載消息隊列。另外,咱們須要爲每個事件建立process()函數。
package main //processor.go import ( "errors" ) func (e CreateEvent) Process() (*BankAccount, error) { return updateAccount(e.AccId, map[string]interface{}{ "Id": e.AccId, "Name": e.AccName, "Balance": "0", }) } func (e InvalidEvent) Process() error { return nil } func (e AcceptEvent) Process() error { return nil } // other Process() codes ... package main //consumer.go func mainConsumer(partition int32) { kafka := newKafkaConsumer() defer kafka.Close() //注:開發環境中咱們使用sarama.OffsetOldest,Kafka將從建立以來第一條消息開始發送。 //在生產環境中切換爲sarama.OffsetNewest,只會將最新生成的消息發送給咱們。 consumer, err := kafka.ConsumePartition(topic, partition, sarama.OffsetOldest) if err != nil { fmt.Printf("Kafka error: %s ", err) os.Exit(-1) } go consumeEvents(consumer) fmt.Println("Press [enter] to exit consumer ") bufio.NewReader(os.Stdin).ReadString(' ') fmt.Println("Terminating...") }
Go語言經過goroutine提供了對於併發編程的直接支持,goroutine是Go語言運行庫的功能,做爲一個函數入口,在堆上爲其分配的一個堆棧。因此它很是廉價,咱們能夠很輕鬆的建立上萬個goroutine,但它們並非被操做系統所調度執行。除了被系統調用阻塞的線程外,Go運行庫最多會啓動$GOMAXPROCS個線程來運行goroutine。
func consumeEvents(consumer sarama.PartitionConsumer) { var msgVal []byte var log interface{} var logMap map[string]interface{} var bankAccount *BankAccount var err error for { //goruntine exec select { // blocking <- channel operator case err := <-consumer.Errors(): fmt.Printf("Kafka error: %s ", err) case msg := <-consumer.Messages(): msgVal = msg.Value // if err = json.Unmarshal(msgVal, &log); err != nil { fmt.Printf("Failed parsing: %s", err) } else { logMap = log.(map[string]interface{}) logType := logMap["Type"] fmt.Printf("Processing %s: %s ", logMap["Type"], string(msgVal)) switch logType { case "CreateEvent": event := new(CreateEvent) if err = json.Unmarshal(msgVal, &event); err == nil { bankAccount, err = event.Process() } default: fmt.Println("Unknown command: ", logType) } if err != nil { fmt.Printf("Error processing: %s ", err) } else { fmt.Printf("%+v ", *bankAccount) } } } } }
重構main
package main //main.go //支持producer和consumer啓動模式 import ( "flag" ... ) func main() { act := flag.String("act", "producer", "Either: producer or consumer") partition := flag.String("partition", "0", "Partition which the consumer program will be subscribing") flag.Parse() fmt.Printf("Welcome to go-microservice : %s ", *act) switch *act { case "producer": mainProducer() case "consumer": if part32int, err := strconv.ParseInt(*partition, 10, 32); err == nil { mainConsumer(int32(part32int)) } } }
經過--act參數,能夠啓動一個消費者進程。當進程運行時,他將從Kafka一個一個拿出消息進行處理,按照咱們以前在每一個事件定義的Process() 方法。
$ go build $ ./go-microservice --act=consumer Welcome to go-microservice : consumer Press [enter] to exit consumer Processing CreateEvent: {"AccId":"49a23d27-4ffe-4c86-ab9a-fbc308ecff1c","Type":"CreateEvent","AccName":"Yanrui"} {Id:49a23d27-4ffe-4c86-ab9a-fbc308ecff1c Name:Yanrui Balance:0} Terminating...
集羣化消息消費者
問題:若是一個Consumer宕機了怎麼辦?(例如:程序崩潰、網絡異常等緣由)
解決方案:將多個Consumer編組爲集羣實現高可用。具體來講就是打標籤,當有一個新的Log發送時,Kafka將其發送給其中一個實例。當該實例沒法接收Log時,Kafka將Log傳遞給另外一個包含相同標籤的Consumer。
注意:Kafka 版本 0.9 +,另外還須要使用sarama-cluster庫
#使用govendor獲取 govendor fetch github.com/bsm/sarama-cluster //修改mainConsumer方法使用sarama-cluster library鏈接Kafka config := cluster.NewConfig() config.Consumer.Offsets.Initial = sarama.OffsetNewest consumer, err := cluster.NewConsumer(brokers, "go-microservice-consumer", topics, config) //topics定義 var ( topics = []string{topic} ) //調整consumeEvents() case err, more := <-consumer.Errors(): if more { fmt.Printf("Kafka error: %s ", err) } //consumer.Messages() : MarkOffset //consumer.go //func mainConsumer(partition int32) consumer.MarkOffset(msg, "") //增長的行 msgVal = msg.Value
即便程序崩潰,MarkOffset也會將消息標記爲 processed ,標籤包括元數據以及這個時間點的狀態。元數據能夠被另一個Consumer恢復數據狀態,也就能被從新消費。即即便一樣的消息被處理兩次,結果也是同樣的,這個過程理論上是 冪等 的(idempotent)。
Kafka Consumers
//運行多個consumer實例 $ ./go-microservice --act=consumer $ ./go-microservice --act=consumer $ ./go-microservice --act=consumer
使用vendor管理Golang項目依賴
用govendor fetch <url1> <url2>新增的第三方包直接被get到根目錄的vendor文件夾下,不會與其它的項目混用第三方包,完美避免多個項目同用同一個第三方包的不一樣版本問題。只須要對vendor/vendor.json進行版本控制,便可對第三包依賴關係進行控制。
$ // $ go get -u github.com/kardianos/govendor $ cd $PROJECT_PATH $ govendor init $ govendor add +external $
單元測試:ginkgo Test Suite
$ go get github.com/onsi/ginkgo/ginkgo $ go get github.com/onsi/gomega $ ginkgo bootstrap Generating ginkgo test suite bootstrap for main in: go_microservice_suite_test.go package main_test //go_microservice_suite_test.go,單元測試類 import ( "github.com/onsi/ginkgo" "github.com/onsi/gomega" ) var _ = Describe("Event", func() { Describe("NewCreateAccountEvent", func() { It("can create a create account event", func() { name := "John Smith" event := NewCreateAccountEvent(name) Expect(event.AccName).To(Equal(name)) Expect(event.AccId).NotTo(BeNil()) Expect(event.Type).To(Equal("CreateEvent")) }) }) }) $ ginkgo Running Suite: go-microservice Suite ========================== Random Seed: 1490709758 Will run 1 of 1 specs Ran 1 of 1 Specs in 0.000 seconds SUCCESS! -- 1 Passed | 0 Failed | 0 Pending | 0 Skipped PASS Ginkgo ran 1 suite in 905.68195ms Test Suite Passed
單元測試的四個階段
Docker部署
Docker 容器中須要包含下列組件:
在根目錄建立一個Dockerfile
FROM golang:1.8.0 MAINTAINER Yanrui //install our dependencies RUN go get -u github.com/kardianos/govendor RUN go get github.com/onsi/ginkgo/ginkgo RUN go get github.com/onsi/gomega //將整個目錄拷貝到容器 ADD . /go/src/go-microservice //檢查工做目錄 WORKDIR /go/src/go-microservice //安裝依賴項 RUN govendor sync //測試 $ docker build -t go-microservice . $ docker run -i -t go-microservice /bin/bash $ ginkgo ....................... .......Failed..........
因爲容器本地並無一個Redis實例運行在上面,這時運行ginkgo測試就會報錯。咱們爲何不在這個Dockerfile中包含一個Redis呢?這就違背了Docker分層解耦的初衷,咱們能夠經過docker-compose將兩個服務鏈接起來一塊兒工做。
建立一個docker-compose.yml文件(與Dockerfile目錄一致):
version: "2.0" services: app: environment: REDIS_URL: redis:6379 build: . working_dir: /go/src/go-microservice links: - redis redis: image: redis:alpine
本地構建完成以後,再次運行 docker-compose run app ginkgo 測試經過。
Infrastructure as Code(基礎設施即代碼)
The enabling idea of infrastructure as code is that the systems and devices which are used to run software can be treated as if they, themselves, are software. — Kief Morris
雲帶來的好的一方面是它讓公司中的任何人均可以輕鬆部署、配置和管理他們須要的基礎設施。雖然不少基礎設施團隊採用了雲和自動化技術,卻沒有采用相應的自動化測試和發佈流程。它們把這些看成一門過於複雜的腳本語言來使用。他們會爲每一次具體的改動編寫手冊、配置文件和執行腳本,再針對一部分指定的服務器手工運行它們,也就是說每一次改動都還須要花費專業知識、時間和精力。這種工做方式意味着基礎設施團隊沒有把他們本身從平常的重複性勞動中解放出來。目前已經有不少商業雲平臺提供了Docker服務,只須要將本身的 git repository 連接到平臺,便可以自動幫你完成部署,在雲上完成集成測試。
docker-compose build docker-compose run app ginkgo
加Java架構師羣獲取Java工程化、高性能及分佈式、高性能、深刻淺出。高架構。性能調優、Spring,MyBatis,Netty源碼分析和大數據等多個知識點高級進階乾貨的直播免費學習權限 都是大牛帶飛 讓你少走不少的彎路的 羣..號是:855801563 對了 小白勿進 最好是有開發經驗
注:加羣要求
一、具備工做經驗的,面對目前流行的技術不知從何下手,須要突破技術瓶頸的能夠加。
二、在公司待久了,過得很安逸,但跳槽時面試碰壁。須要在短期內進修、跳槽拿高薪的能夠加。
三、若是沒有工做經驗,但基礎很是紮實,對java工做機制,經常使用設計思想,經常使用java開發框架掌握熟練的,能夠加。
四、以爲本身很牛B,通常需求都能搞定。可是所學的知識點沒有系統化,很難在技術領域繼續突破的能夠加。
5.阿里Java高級大牛直播講解知識點,分享知識,多年工做經驗的梳理和總結,帶着你們全面、科學地創建本身的技術體系和技術認知!