Apache Pulsar是一個開源的分佈式發佈-訂閱消息系統,與Kafka相似,但比後者更增強大。Pulsar最初由Yahoo開發並維護,目前已經成爲Apache軟件組織的一個孵化子項目,當前最新版本號爲2.1.0-incubating。官網地址:http://pulsar.apache.org/。git
Pulsar目前僅僅支持MacOS和Linux系統,不支持Windows系統。而且要求系統中安裝了Java 8環境。github
咱們以CentOS系統做爲搭建環境,系統爲CentOS7.2。web
首先,訪問官網下載網頁http://pulsar.apache.org/en/download/,以下圖所示:apache
因爲我本地環境的限制,因此本文中全部的文件下載都是首先在Windows系統中下載,而後手動拷貝到Linux服務器上的。vim
此處,咱們點擊下載第一個,即二進制發佈。而後拷貝到CentOS服務器上,並解壓該壓縮包,結果以下:服務器
進入對應解壓獲得的文件夾,該文件夾下文件以下:分佈式
進入conf文件夾下,並使用vi或vim打開文件client.conf,修改裏面的webServiceUrl和brokerServiceUrl字段中對應的IP爲該服務器IP,以下所示(其中塗改部分爲服務器IP):spa
保存並退出,而後進入到bin目錄下,之後臺運行模式啓動pulsar服務,以下:.net
因爲以前我已經啓動了後臺服務,因此上圖中提示已經在運行該服務。
如此簡單,pulsar單機版就這麼順利的運行起來了。然而,如何驗證是否正常啓動了呢?一種是經過查看日誌文件來確保正常啓動,此處略去這種方式。直接使用指令來驗證是否正常啓動:
(1)建立消費者consumer
指令:3d
$ ./pulsar-client consume -s "my-subscription" my-topic
含義:建立一個consumer,該consumer訂閱的topic名稱爲my-topic,本訂閱名稱爲my-subscription。建立成功會打印以下信息(只截圖了部分信息):
建立成功後,該consumer就處於等待接收消息狀態。
(2)建立生產者producer
$ ./pulsar-client produce my-topic --messages "test message from producer"
含義:建立一個producer,該producer對應的topic名稱爲my-topic(與上面建立的consumer訂閱的topic相同),發送的消息由--messages指定,此處內容爲「test message from producer」。建立成功會打印以下信息(只截圖了部分信息):
此時,咱們會在1中建立的consumer端接收到producer發送的消息,以下圖:
至此,說明咱們的pulsar服務正常運行。
前提條件:開發電腦本地或Linux服務器中已經安裝好了Go開發環境。
在Windows系統中開發Pulsar時須要安裝GCC編譯環境,因此須要安裝MinGW,因爲環境限制,這裏我沒法下載MinGW,因此就直接在CentOS系統中搭建開發環境了。
當前版本(2.1.0-incubating)下,Pulsar官方僅僅提供了C++、Java、Python、Go四種語言的客戶端開發包。且四種語言的支持特性不盡相同,以下所示:
此外,還有一些第三方的客戶端包,以下:
因爲Pulsar Go客戶端庫是基於C++客戶端庫的,因此在安裝Go庫以前必需要確保已經成功安裝了C++客戶端庫。
在Pulsar C++客戶端網頁中,下載下圖中所示的三個文件:
而後將下載的三個文件拷貝到CentOS服務器上,以下:
而後執行以下命令來安裝這三個RPM包:
$ rpm -ivh apache-pulsar-client*.rpm
此處暫且先不驗證是否安裝成功。
因爲我環境所限制,沒法使用go get的方式來下載Pulsar的Go語言包,我是直接在GitHub上面下載的incubator-pulsar-branch-2.1.zip,解壓該文件獲得以下內容:
此處,咱們僅僅須要裏面的pulsar-client-go文件夾裏面的內容,根據官網上的示例程序可知該go語言包的路徑以下:
因此咱們將pulsar-client-go拷貝到CentOS服務器上$GOPATH/src/github.com/apache/incubator-pulsar下,若是中間文件夾不存在就本身建立,最終以下:
到此,Pulsar Go客戶端包安裝完成。
此時,咱們使用一個簡單的Pulsar Go程序來驗證上面安裝是否正常,程序內容以下:
1 package main 2 3 import ( 4 "fmt" 5 "runtime" 6 "context" 7 "github.com/apache/incubator-pulsar/pulsar-client-go/pulsar" 8 "log" 9 ) 10 11 func main (){ 12 fmt.Println("Pulsar Producer") 13 14 ctx := context.Background() 15 16 //實例化Pulsar client 17 client,err := pulsar.NewClient(pulsar.ClientOptions{ 18 URL:"pulsar://xx.xx.xx.xx:6650", //xx.xx.xx.xx表明Pulsar IP 19 OperationTimeoutSeconds:5, 20 MessageListenerThreads:runtime.NumCPU()/2, 21 }) 22 23 if err != nil { 24 log.Fatalf("Could not instantiate Pulsar client:%v",err) 25 } 26 27 28 // 建立producer 29 producer,err := client.CreateProducer(pulsar.ProducerOptions{ 30 Topic:"my-topic", 31 }) 32 33 if err != nil { 34 log.Fatalf("Could not instantiate Pulsar producer:%v",err) 35 } 36 37 defer producer.Close() 38 39 msg := pulsar.ProducerMessage{ 40 Payload:[]byte("Hello,This is a message from Pulsar Producer!"), 41 } 42 43 if err := producer.Send(ctx,msg);err != nil { 44 log.Fatalf("Producer could not send message:%v",err) 45 } 46 47 }
編譯並運行,結果以下:
但其實咱們查看/usr/lib路徑下發現,實際上是存在libpulsar.so.2.1.0-incubating這個庫文件的,因此應該是系統沒有引用到這個路徑:
因此,須要將該文件所在的路徑添加到到/etc/ld.so.conf中,以下:
此時,再次運行程序時則成功:
此處直接給出代碼,裏面有必要的註釋:
文件:pulsar-producer.go
1 package main 2 3 import ( 4 "fmt" 5 "runtime" 6 "context" 7 "github.com/apache/incubator-pulsar/pulsar-client-go/pulsar" 8 "log" 9 ) 10 11 func main (){ 12 fmt.Println("Pulsar Producer") 13 14 ctx := context.Background() 15 16 //實例化Pulsar client 17 client,err := pulsar.NewClient(pulsar.ClientOptions{ 18 URL:"pulsar://xx.xx.xx.xx:6650", // xx.xx.xx.xx表明Pulsar IP 19 OperationTimeoutSeconds:5, 20 MessageListenerThreads:runtime.NumCPU()/2, 21 }) 22 23 if err != nil { 24 log.Fatalf("Could not instantiate Pulsar client:%v",err) 25 } 26 27 28 // 建立producer 29 producer,err := client.CreateProducer(pulsar.ProducerOptions{ 30 Topic:"my-topic", 31 }) 32 33 if err != nil { 34 log.Fatalf("Could not instantiate Pulsar producer:%v",err) 35 } 36 37 defer producer.Close() 38 39 msg := pulsar.ProducerMessage{ 40 Payload:[]byte("Hello,This is a message from Pulsar Producer!"), 41 } 42 43 if err := producer.Send(ctx,msg);err != nil { 44 log.Fatalf("Producer could not send message:%v",err) 45 } 46 47 }
文件:pulsar-consumer.go
1 package main 2 3 import ( 4 "fmt" 5 "github.com/apache/incubator-pulsar/pulsar-client-go/pulsar" 6 "log" 7 "context" 8 ) 9 10 func main() { 11 fmt.Println("Pulsar Consumer") 12 13 //實例化Pulsar client 14 client,err := pulsar.NewClient(pulsar.ClientOptions{ 15 URL:"pulsar://xx.xx.xx.xx:6650", // xx.xx.xx.xx表明Pulsar IP 16 }) 17 18 if err != nil { 19 log.Fatal(err) 20 } 21 22 //使用client對象實例化consumer 23 consumer,err := client.Subscribe(pulsar.ConsumerOptions{ 24 Topic:"my-topic", 25 SubscriptionName:"sub-demo", 26 }) 27 28 if err != nil { 29 log.Fatal(err) 30 } 31 32 defer consumer.Close() 33 34 ctx := context.Background() 35 36 //無限循環監聽topic 37 for { 38 msg,err := consumer.Receive(ctx) 39 if err != nil { 40 log.Fatal(err) 41 } else { 42 fmt.Printf("Received message : %v",string(msg.Payload())) 43 } 44 45 consumer.Ack(msg) 46 47 } 48 49 }
這兩個go文件分別處於兩個項目中,其項目結構分別以下:
而後,分別編譯這兩個go項目,並生成可執行文件。首先運行pulsar-consumer,打開消費者程序,此時該消費者程序處於監聽消息狀態,以下:
而後,運行pulsar-producer,打開生產者程序,以下:
該生產者程序發送完一條消息以後即運行結束並退出。
此時,再回到消費者程序運行界面,就能夠看到消費者這邊已經接收到了生產者發送的那條消息:
到此,Go語言版本的最簡單的producer和consumer代碼就完成了。