測試環境:ubuntu 15.10 64位git
cpu:inter core i7-4790 3.60GHZ * 8github
內存:16GBubuntu
硬盤:ssd 120GBtcp
軟件環境:rabbmitmq 3.6.0 kafka0.8.1 (均爲單機本機運行)oop
PS: 測試結果均爲單操做測試,即生產的時候沒有消費操做性能
測試結果:測試
kafka :消費速度: 37,586 /s 生產速度: 448,753 /surl
rabbitmq: 消費速度: 20,807 /s 生產速度 16.413 /sspa
出現問題:code
rabbitmq 生產4分鐘左右出現隊列阻塞,沒法繼續添加數據,1分鐘後恢復,再過大約1分鐘又出現此現象並以約1分鐘爲間隔出現此問題。
rabbitmq 生產對象時有不小的概率(約 1/20)添加隊列失敗,報出的錯誤是「tcp連接重置」
其餘並沒有任何問題
結論:
很明顯的看出kafka的性能遠超rabbitmq。不過這也是理所固然的,畢竟2個消息隊列實現的協議是不同的,處理消息的場景也大有不一樣。rabbitmq適合處理一些數據嚴謹的消息,好比說支付消息,社交消息等不能丟失的數據。kafka是批量操做切不報證數據是否能完整的到達消費者端,因此適合一些大量的營銷消息的場景。
代碼:
kafka:
package main import ( "github.com/Shopify/sarama" "os" "os/signal" "sync" "log" "time" ) func main() { go producer() // go consumer() time.Sleep(10*time.Minute) } func producer() { config :=sarama.NewConfig() config.Producer.Return.Successes = true proder,err := sarama.NewAsyncProducer([]string{"localhost:9092"},config) if err != nil { panic(err) } signals :=make(chan os.Signal,1) signal.Notify(signals,os.Interrupt) var ( wg sync.WaitGroup enqueued, successes, errors int ) wg.Add(1) go func() { defer wg.Done() for _=range proder.Successes(){ successes++ } }() wg.Add(1) go func() { defer wg.Done() for err := range proder.Errors(){ log.Println(err) errors++ } }() go func() { t1 := time.NewTicker(time.Second) for{ <- t1.C log.Println(enqueued) } }() ProducerLoop: for{ message :=&sarama.ProducerMessage{Topic:"test",Value:sarama.StringEncoder("testing 123")} select { case proder.Input() <- message: enqueued++ case <- signals: proder.AsyncClose() break ProducerLoop } } wg.Wait() log.Println("Successfully produced:%d;errors:%d\n",successes,errors) } func consumer() { coner,err := sarama.NewConsumer([]string{"localhost:9092"},nil) if err != nil { panic(err) } defer func() { if err :=coner.Close(); err !=nil{ log.Fatalln(err) } }() partitionConsumer ,err := coner.ConsumePartition("test",0,sarama.OffsetNewest) if err != nil { panic(err) } defer func() { if err := partitionConsumer.Close();err!=nil{ log.Fatalln(err) } }() signals := make(chan os.Signal,1) signal.Notify(signals,os.Interrupt) consumed:=0 go func() { t1 := time.NewTicker(time.Second) for{ <- t1.C log.Println(consumed) } }() ConsumerLoop: for{ select { case _ = <-partitionConsumer.Messages(): consumed++ // log.Println( string(msg.Value)," => ",consumed) case <-signals: break ConsumerLoop } } log.Printf("Consumed: %d\n", consumed) }
rabbitmq:
package main import ( "github.com/streadway/amqp" "time" "fmt" "log" ) const ( queueName = "push.msg.q" exchange = "t.msg.ex" mqurl ="amqp://shimeng:shimeng1015@192.168.155.106:5672/push" ) var conn *amqp.Connection var channel *amqp.Channel func main() { fmt.Println(1) // push() receive() // fmt.Println("end") // close() } func failOnErr(err error, msg string) { if err != nil { log.Fatalf("%s:%s", msg, err) panic(fmt.Sprintf("%s:%s", msg, err)) } } func mqConnect() { var err error conn, err = amqp.Dial(mqurl) if err != nil { log.Println(1) log.Fatalln(err) } fmt.Println(5) channel, err = conn.Channel() if err != nil { fmt.Println(2) log.Fatalln(err) }else { fmt.Println("a") } } func push() { count := 0 if channel == nil { fmt.Println(2) mqConnect() }else { fmt.Println(3) } msgContent := "hello world!" t1 := time.NewTicker(time.Second) go func() { for{ <- t1.C log.Println(count) } }() for{ err := channel.Publish(exchange, "test", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msgContent), }) if err != nil { }else { count ++ } } } func receive() { if channel == nil { mqConnect() } count :=0 msgs, err := channel.Consume(queueName, "", true, false, false, false, nil) failOnErr(err, "") forever := make(chan bool) t1 := time.NewTicker(time.Second) go func() { for{ <- t1.C log.Println(count) } }() go func() { //fmt.Println(*msgs) for _= range msgs { count ++ // s := BytesToString(&(d.Body)) // count++ // fmt.Printf("receve msg is :%s -- %d\n", *s, count) } }() fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n") <-forever }