分佈式消息隊列ActiveMQ+Spring整合

消息隊列MQ簡介

消息隊列技術是分佈式應用間交換信息的一種技術。使用消息隊列能夠很好的將任務以異步的方式進行處理或者進行數據傳送和儲存等。例如當你頻繁地向數據庫中插入數據、頻繁的向搜索引擎提交數據,就可採起消息隊列來異步插入。另外,還能夠將較慢/較複雜的處理邏輯、有併發數量限制的處理邏輯,經過消息隊列放在後臺處理。
常規的使用場景:短信服務、電子郵件服務、圖片處理服務、好友動態推送服務等。
特性:異步、順序讀寫、高性能、協議簡單。因此通常會用於解決大量的服務器端異步請求,同時能夠實現服務端的負載均衡和業務的容災。html

ActiveMQ簡介java

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE
1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位

消息模型: 點對點(p2p)、發佈/廣播(Pub/Sub)。spring

流程: 數據庫

p2p:消息生產者>消息隊列>消息消費者apache

Pub/Sub:主題>發佈者>訂閱者緩存

ActiveMQ準備性能優化

apache官網下載activeMQ:服務器

http://activemq.apache.org/do...session

進行解壓後運行其bin目錄下面的win32或者win64文件夾下的activemq.bat文件啓動activeMQ。架構

  • Maven配置

clipboard.png

配置connectionFactory

connectionFactory是Spring用於建立到JMS服務器連接的,Spring提供了多種connectionFactory.

PooledConnectionFactory: JmsTemlate每次發送消息時只會緩存connection,session和productor,不會緩存consumer。所以只適合於生產者發送消息,這個只是在要求消息處理的及時性不是特別高的狀況下。

SingleConnectionFactory: 對於創建JMS服務器連接的請求會一直返回同一個連接,而且會忽略Connection的close方法調用。

CachingConnectionFactory: 繼承了SingleConnectionFactory,因此它擁有SingleConnectionFactory的全部功能,同時它還新增了緩存功能,它能夠緩存Session、MessageProducer和MessageConsumer。咱們使用CachingConnectionFactory來做爲示例。

在此我向你們推薦一個架構學習交流羣。交流學習羣號:478030634 裏面會分享一些資深架構師錄製的視頻錄像:有Spring,MyBatis,Netty源碼分析,高併發、高性能、分佈式、微服務架構的原理,JVM性能優化、分佈式架構等這些成爲架構師必備的知識體系。還能領取免費的學習資源,目前受益良多

Spring配置

applicationContext.xml 設置:

<!-- 獲取ActiveMQ提供的ConnectionFactory -->
<amq:connectionFactory id="amqConnectionFactory"
<!--設置ActiveMQ服務器地址及端口-->
    brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100" />

<!-- 配置ActiveMQ服務器鏈接conneciotnFactory -->
<bean id="connectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <!-- 將ActiveMQ提供的ConnectionFactory注入到Spring管理的connectionFactory中 -->
    <constructor-arg ref="amqConnectionFactory" />
    <!--設置緩存大小-->
    <property name="sessionCacheSize" value="100" />
</bean>

<!-- 配置JMS消息生產者 -->
<!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="connectionFactory" />
    <!-- 非pub/sub模型(發佈/訂閱),即隊列模式 -->
    <property name="pubSubDomain" value="false" />
</bean>

<!-- 定義Queue監聽器 -->
<jms:listener-container concurrency="10"
    destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
    <!--監聽的java類-->
    <jms:listener destination="queue" ref="ConsumerMessageListener" />
</jms:listener-container>

clipboard.png

clipboard.png

實現消息監聽,獲取消息並消費

clipboard.png

相關文章
相關標籤/搜索