高性能消息中間件——NATS

前 言

這段時間個人主要工做內容是將公司系統中使用的RabbitMQ替換成NATS,而此以前我對Nats一無所知。通過一段時間緊張的學習和開發以後我順利的完成了任務,並對消息中間件有了更深的瞭解。在此感謝同事鍾亮在此過程當中對個人幫助。NATS屬於比較小衆的一款中間件產品,中文資料基本上是沒有的,故寫以記之,爲想學習Nats的同窗提供一點幫助。java

 

在介紹NATS以前先了解下什麼是分佈式系統和消息中間件git

對於分佈式系統的定義,一直以來我都沒有找到或者想到特別簡練而又合適的定義,這裏引用一下Distributed System Concepts and Design (Thrid Edition)中的一句話A distributed system is one in which components located at networked computers communicate and coordinate their actions only by passing messages,從這句話咱們能夠看到幾個重點,一是組件分佈在網絡計算機上,二是組件之間僅僅經過消息傳遞來通訊並協調行動。消息中間件維基百科給出的定義爲Message-oriented middleware(MOM) is software infrastructure focused on sending and receiving messages between distrubuted systems,意思就是面向消息的系統(消息中間件)是在分佈式系統中完成消息的發送和接收的基礎軟件github

消息中間件常被說起的好處即異步和解耦,市面上經常被使用到的中間件有RabbitMQ, ActiveMQ, Kafka等,他們的關注度和使用率都很是的高,而且使用起來也很是的方便。公司的WiseCloud產品就集成了RabbitMQ。而在下一個版本的更新中將會使用NATS來替換RabbitMQ。使用NATS的好處比較多首先就是其性能很是好,下面引用官網的性能對比圖:spring

NATS介紹docker

NATS是一個開源、輕量級、高性能的分佈式消息中間件,實現了高可伸縮性和優雅的Publish/Subscribe模型,使用Golang語言開發。NATS的開發哲學認爲高質量的QoS應該在客戶端構建,故只創建了Request-Reply,不提供 1.持久化 2.事務處理 3.加強的交付模式 4.企業級隊列。服務器

NATS消息傳遞模型網絡

NATS支持各類消息傳遞模型,包括:架構

發佈訂閱(Publish Subscribe)app

請求回覆(Request Reply)負載均衡

隊列訂閱(Queue Subscribers )

提供的功能:

純粹的發佈訂閱模型(Pure pub-sub)

服務器集羣(Cluster mode server)

自動精簡訂閱者(Auto-pruning of subscribers)

基於文本協議(Text-based protocol)

多服務質量保證(Multiple qualities of service - QoS)

發佈訂閱(Publish Subscribe)

NATS將publish/subscribe消息分發模型實現爲一對多通訊,發佈者在Subject上發送消息,而且監聽該Subject在任何活動的訂閱者都會收到該消息

java:

//publish

Connection nc = Nats.connect("nats://127.0.0.1:4222");

nc.publish("subject", "hello world".getBytes(StandardCharsets.UTF_8));
//subscribe

Subscription sub = nc.subscribe("subject");

Message msg = sub.nextMessage(Duration.ofMillis(500));

String response = new String(msg.getData(), StandardCharsets.

UTF_8);

或者是基於回調的subscribe

//subscribe

Dispatcher d = nc.createDispatcher(msg - >{

String response = new String(msg.getData(), StandardCharsets.UTF_8)

//do something

})

d.subscribe("subject");

請求響應(Request Reply)

NATS支持兩種請求響應消息:點對點或多對多。點對點涉及最快或首先響應。在一對多的消息交換中,須要限制請求響應的限制

在Request Reply過程當中,發佈請求發佈帶有響應主題的消息,指望對該subject作出響應操做

java:

// publish 

Connection connection = Nats.connect("nats://127.0.0.1:4222");

String reply = "replyMsg"; 

//請求迴應方法回調

Dispatcher d = connection.createDispatcher(msg ->  

System.out.println("reply: " + JSON.toJSONString(msg));

}) 

d.unsubscribe(repl , 1);

//訂閱請求

d.subscribe(reply);

//發佈請求

connection.publish("requestSub", reply, "request".getBytes(StandardCharsets.

UTF_8));
//subscribe

Connection nc = Nats.connect("nats://127.0.0.1:4222");

//註冊訂閱

Dispatcher dispatcher = nc.createDispatcher(msg -> {

System.out.println(JSON.toJSONString(msg));

nc.publish(msg.getReplyTo(), "this is reply".getBytes(StandardCharsets.UTF_8));

});

dispatcher.subscribe("requestSub");

隊列訂閱&分享工做(Queue Subscribers & Sharing Work)

NATS提供稱爲隊列訂閱的負載均衡功能,雖然名字爲queue(隊列)可是並非咱們所認爲的那樣。他的主要功能是將具備相同queue名字的subject進行負載均衡。使用隊列訂閱功能消息發佈者不須要作任何改動,消息接受者須要具備相同的對列名歡迎工做一到五年的Java工程師朋友們加入Java架構交流:874811168 羣內提供免費的Java架構學習資料

// Subscribe

Connection nc = Nats.connect();

Dispatcher d = nc.createDispatcher(msg -> {

//do something

System.out.println("msg: " + new String(msg.getData(),StandardCharsets.UTF_8));

});

d.subscribe("queSub", "queName");

Nats-Spring集成

NATS雖然說是一個性能很是好的消息中間鍵,可是和Spring的集成不是很好。這裏提供兩個集成的思路

  • CloudFoundry-Community/java-nats
  • Wanlinus/nats-spring

java-nats

這是一個由CloudFoundry主導的一個NATS java客戶端。提供了區別於官方的nats客戶端,支持註解配置,對Spring有比較好的支持,可是此項目已經有1年多沒有更新且不支持NATS Streaming。相應用法參考Github,這裏不作詳細講解. 

nats-spring

因爲開源社區只提供一個簡單的NATS Client,缺乏對註解和Spring的支持,因此我基於官方jnats客戶端寫了一個SpringBoot的兼容插件.主要是爲了兼容spring boot amqp開發模式,儘可能使用註解解決問題開發出來的,因此使用方法相似於在代碼中使用@RabbitListener.具體使用方法以下

{{git clone https://github.com/wanlinus/nats-spring.git

cd nats-spring

mvn clean install}}}

<dependency>

<groupId>cn.wanlinus</groupId>

<artifactId>nats-spring</artifactId>

<version>1.0.0.RELEASE</version>

</dependency>

application.yml

spring:

nats:

urls:

 - nats://127.0.0.1:4222
@EnableNats

@SpringBootApplication

public class NatsDemo2Application {

public static void main(String[] args) {
    歡迎Java工程師朋友們加入Java架構交流:874811168 
   SpringApplication.run(NatsDemo2Application.class, args);
    
}

}

@Component

public class Foo{

@NatsSubscribe("haha")

public void message(Message message) {

 System.out.println(message.getSubject() + " : " + new String(message.getData()));

}

}

 

NATS Streaming介紹

NATS因爲不能保證消息的投遞正確性和存在其餘的缺點,NATS Streaming就孕育而生.他是一個由NATS提供支持的數據流系統,採用Go語言編寫,NATS Streaming與核心NATS平臺無縫嵌入,擴展和互操做.除了核心NATS平臺的功能外,他還提供瞭如下功能:

NATS Streaming特徵

加強消息協議(Enhanced message protocol)

消息/事件持久化(Message/event persistence)

至少一次數據傳輸(At-least-once-delivery)

Publisher限速(Publisher rate limiting)

Subscriber速率匹配(Rate matching/limiting per subscriber)

按主題重發消息(Historical message replay by subject)

持續訂閱(Durable subscriptions)

基本用法

在使用NATS Streaming以前首先要啓動服務器,在這裏我選擇使用docker容器

# 4222 client默認鏈接端口

8222 Web端口

6222 集羣通訊端口

$ docker run -d -p 4222:4222 -p 8222:8222 -p 6222:6222 nats-streaming

STREAM: Starting nats-streaming-server[test-cluster] version 0.11.0

STREAM: ServerID: bzkKJL3jI4KW9Hqb0bC1Ae

STREAM: Go version: go1.11

Starting nats-server version 1.3.0

Git commit [not set]

Starting http monitor on 0.0.0.0:8222

Listening for client connections on 0.0.0.0:4222

Server is ready

STREAM: Recovering the state...

STREAM: No recovered state

STREAM: Message store is MEMORY

STREAM: ---------- Store Limits ----------

STREAM: Channels:                  100 *

STREAM: --------- Channels Limits --------

STREAM:   Subscriptions:          1000 *

STREAM:   Messages     :       1000000 *

STREAM:   Bytes        :     976.56 MB *

STREAM:   Age          :     unlimited *

STREAM:   Inactivity   :     unlimited *

STREAM: ----------------------------------

java:

// 第一個參數表示clusterId,在啓動NATS Streaming容器的時候肯定

// 第二個參數表示clientID,鏈接客戶端的惟一標識符

StreamingConnectionFactory cf = new StreamingConnectionFactory

("test-cluster", "bar");

//設置Nats服務器地址和端口,默認是nats://127.0.0.1:4222

cf.setNatsConnection(Nats.connect("nats://127.0.0.1:4222"));

StreamingConnection sc = cf.createConnection();

Publish: sc.publish("foo", "Hello World".getBytes());

Subscribe:

sc.subscribe("foo", msg -> {

System.out.println(new String(msg.getData(), StandardCharsets.UTF_8));

}, new SubscriptionOptions.Builder()

        .durableName("aa")

        .deliverAllAvailable().build());

在使用NATS Streaming的時候須要注意訂閱主題不支持通配符,在訂閱消息時傳入MessageHandler函數是接口實現和SubscriptionOptions對象.MessageHandler提供消息回調處理, SubscriptionOptions用於設置訂閱選項,好比設置Queue, durableName, ack等。

Streaming-Spring集成

做爲一款優秀的消息中間件,卻沒有對Spring作集成,這是很是的惋惜的事情.因此爲了在工做中方便的使用他,我開發了一個很小的插件.雖然還有很大的改進空間,不過在公司的項目中卻可以很好的運行.他開發思路和nats-spring差很少,因此使用方式也是大同小異,具體以下:

{{git clone https://github.com/wanlinus/na ... g.git

cd nats-streaming-spring

mvn clean install}}}

<dependency>

<groupId>cn.wanlinus</groupId>

<artifactId>nats-streaming-spring</artifactId>
歡迎Java工程師朋友們加入Java架構交流:874811168 
羣內提供免費的Java架構學習資料

<version>1.0.0-SNAPSHOT</version>

</dependency>

application.yml

spring:

nats:

streaming:

 nats-url: nats://127.0.0.1:4222

 cluster-id: test-cluster
@EnableNatsStreaming

@SpringBootApplication

public class StreamingDemoApplication {

public static void main(String[] args) {

   SpringApplication.run(StreamingDemoApplication.class, args);

}



//發佈消息只須要注入StreamingConnection

@Autowired

private StreamingConnection sc;



public void sendMsg(){

   sc.publish("foo", "publish message".getBytes())

}

}



@Service

public class A {

@Subscribe(value = "foo", durableName = "dname", queue = "queue")

public void asd(Message message) throws IOException {

   System.out.println(new String(message.getData(), StandardCharsets.UTF_8));

}

}

兩個插件因爲是爲告終合項目所寫的,因此裏面有些部分並不通用。後續的開發中我將會繼續進行抽象和改進。

相關文章
相關標籤/搜索