使用 ActiveMQ 實現JMS 異步調用

簡介

服務之間的同步調用,可使用 HTTP 或 RPC 來完成,但並不是全部的調用都須要同步,有些場景下,當客戶端調用服務端時,並不須要等待服務端作出響應,此時就應該使用異步調用。異步調用的經常使用方式是基於 MQ (Message Queue) 來實現的。下文會以 ActiveMQ 爲例進行講解。java

ActiveMQ 是 Java 世界中最爲流行的開源消息中間件,它不只功能強大,並且性能穩定。它可全面支持 JMS(Java 消息服務)技術規範,爲 Java 應用程序提供標準的 JMS API。web

此外 ActiveMQ 具有與 Spring 框架整合的能力,它一直都是 Spring 應用程序的消息中間件標配。一樣, Spring Boot 也提供了 ActiveMQ 的開箱即用的插件,只須要幾項配置,就能接入 ActiveMQ,並輕鬆使用 JMS API 編寫異步消息通訊程序。spring

Active MQ 官網地址以下docker

http://activemq.apache.orgshell

啓動 ActiveMQ 服務器

先使用 docker 安裝 ActiveMQ ,目前 ActiveMQ 官方並未提供相應的 Docker 鏡像,咱們選擇使用第三方鏡像 webcenter/activemqapache

docker pull webcenter/activemq:5.14.3

接下來運行 ActiveMQ瀏覽器

docker run  -d -p 8161:8161 -p 61616:61616 -e ACTIVEMQ_ADMIN_LOGIN=admin -e ACTIVEMQ_ADMIN_PASSWORD=admin --name activemq webcenter/activemq:5.14.3

在啓動 ActiveMQ 容器時,容器對宿主機暴露了兩個端口號:springboot

  • 8161: 表示 ActiveMQ 控制檯端口號,可在瀏覽器中經過控制檯來執行 ActiveMQ 的相關操做
  • 61616: 表示 ActiveMQ 所監聽的 TCP 端口號,應用程序可經過該端口號與 ActiveMQ 創建 TCP 鏈接,並完成後續的異步消息通訊

此外,在啓動 ActiveMQ 容器時,還提供了兩個環境變量服務器

  • ACTIVEMQ_ADMIN_LOGIN: 用於設置控制檯管理員的用戶名,默認爲 admin
  • ACTIVEMQ_ADMIN_PASSWORD: 用於設置控制檯管理員的密碼,默認爲 admin

查看控制檯

webcenter/activemq 鏡像擁有一個基於 Web 的控制檯,可經過瀏覽器訪問。容器啓動完畢後,能夠打開瀏覽器,並在地址欄中輸入 http://localhost:8161

點擊 Manage ActiveMQ borker 連接,瀏覽器將彈出一個對話框,此時輸入用戶名和密碼,認證經過後會進入管理界面

在管理界面中,包括 8 個功能菜單

  1. Home: 基本信息
  2. Queues: 管理的隊列
  3. Topics: 查看所管理的主題
  4. Subscribers: 查看相關主題的訂閱者
  5. Connections: 查看客戶端的鏈接信息
  6. Network: 查看網絡相關信息
  7. Scheduled: 查看 ActiveMQ 內部運行的定時任務
  8. Send: 經過表單方式查看向隊列或主題發送具體消息

ActiveMQ 的消息通道

ActiveMQ 管理了兩類消息通道,一類是隊列(Queue),另外一類叫作主題(Topic)。

Queue

Queue 用於解決消息的 點對點 通訊問題,也就是說,消息從生產者(Producer) 發出後,首先進入 ActiveMQ 某個指定的 Queue 中,而後再將消息傳送給其中一個消費者(Consumer)。

Topic

Topic 用於解決消息的發佈與訂閱(Publish-subscribe) 通訊問題,也就是說,消息從 Producer 發出後,首先將其發佈到 ActiveMQ 某個指定的 Topic 上,而後將此消息分發給每一個訂閱者(Subscriber) 。

比較

在具體場合下,靈活使用以上兩種通訊模式來實現 Producer 與 Consumer/Subscriber 間的異步調用,從而解決調用方的耦合問題。可見,Queue 能解決調用緩衝問題,Topic 能解決消息廣播問題, Queue 與 Topic 都能解決掉調用耦合問題,這些技術都爲一個好的軟件架構提供了有效的支撐。

開發生產者和消費者

下面就以 Queue 爲例,將 ActiveMQ 與 Spring Boot 進行整合,將 Producer 做爲客戶端, Consumer 做爲服務端,經過 Queue 實現客戶端與服務端的異步調用

開發服務端(消費者)

首先建立一個名爲 acitvemq-hello-server 的 spring boot 項目,若是在 eclipse 中安裝了 Spring Tools ,能夠在新建時選擇 New Spring Starter Project 選項。或者新建 Maven 工程。對應的 maven 依賴以下

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.19.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
    </dependencies>

在 Spring Boot 框架中已經內置了對 ActiveMQ 的支持,咱們只須要依賴 spring-boot-starter-mq 就能啓動 ActiveMQ,此時還須要在 application.properties 文件中添加 ActiveMQ 配置項

spring.activemq.broker-url=tcp://10.104.10.1:61616
spring.activemq.user=admin
spring.activemq.password=admin

接下來建立 HelloServer 的類,封裝服務端相關代碼

@Component
public class HelloServer {
    @JmsListener(destination="hello-queue")
    public void receive(String message) {
        System.out.println(message);
    }
}

使用 @Component 註解,說明它可被 Spring IoC 容器所管理。此時只須要使用 @JmsListener 註解,並將其綁定到 receive() 方法上,就能從 ActiveMQ 中接收響應的消息。

@JmsListener 註解中須要添加一個 destination 屬性來指定 Queue/Topic 的名稱,該名稱具備惟一性。消息將以一個 String 類型參數的形式傳入方法體中,也能夠接收其餘類型的消息,這取決於客戶端發送的消息是哪一種類型。Spring JMS 將消息放入 ActiveMQ 時會進行序列化,當消息從 ActiveMQ 取出時將進行反序列化,應用程序無需關注這些底層細節,只須要將精力放在業務邏輯上。

最後,編寫一個 Spring Boot 應用程序啓動類來啓動服務端(使用 spring tools 工具會自動生成)

@SpringBootApplication
public class ActivemqHelloServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ActivemqHelloServerApplication.class, args);
    }
}

當服務端啓動完畢後,將一直監聽 ActiveMQ 的 hello-queue 隊列中即將到來的消息,消息由客戶端來發送。

開發客戶端(生產者)

建立一個名爲 active-mq-client 的 Maven 項目, pom.xml 文件內容與服務端類似。application.properties 文件與服務端相同。

接下來建立一個名爲 HelloClient 的類,將其做爲客戶端。

@Component
public class HelloClient {
    
    @Autowired
    private JmsTemplate jmsTemplate;
    
    public void send(String message) {
        jmsTemplate.convertAndSend("hello-queue", message);
    }
}

這裏使用了 @Autowired 註解, JmsTemplate 對象注入進來,還編寫了一個 send() 方法,在該方法中調用 JmsTemplate 對象的 convertAndSend 來轉換併發送消息。

最後使用 Spring Boot 應用程序啓動類來啓動客戶端

@SpringBootApplication
public class ActivemqHelloClientApplication {

    @Autowired
    private HelloClient helloClient;
    
    @PostConstruct
    public void init() {
        helloClient.send("hello world");
    }
    
    public static void main(String[] args) {
        SpringApplication.run(ActivemqHelloClientApplication.class, args);
    }

}

須要注意的是, init() 方法帶有 @PostConstruct 註解,表示 Spring IoC 容器實例化 ActivemqHelloClientApplication 類後將調用該方法。

運行 main() 方法能夠啓動客戶端應用程序,並能夠在服務端應用程序控制臺中看到 client 發送的消息,也能夠在 ActiveMQ 控制檯中查看隊列的當前狀態

Queue 表格中列明的含義以下

  • Name 表示隊列名稱,可在應用程序中自動建立,也可在 ActiveMQ 控制檯中手動建立
  • Number Of Pending Messages 表示阻塞在隊列中未經消費的消息條數
  • Number Of Consumers 表示正在與 ActiveMQ 創建鏈接的消費者數量
  • Messages Enqueued 表示進入隊列的消息數量
  • Messages Dequeued 表示離開隊列的消息數量

此外,還有下面幾種操做

  • Browser 用於查看當前隊列中消息的相關細節
  • Active Consumers 用於查看當前活動消費者的相關信息
  • Active Producers 用於查看當前活動生產者的相關信息
  • Send To 用於向當前隊列中發送具體消息
  • Purge 用於清空隊列中的消息
  • Delete 用於刪除當前隊列

參考

  • 《架構探險—輕量級微服務架構》
相關文章
相關標籤/搜索