最近得閒,探索了一下ActiveMQ。java
ActiveMQ消息隊列,信息收發的容器,做用有異步消息,流量削鋒,應用耦合。
同行還有 Kafka、RabbitMQ、RocketMQ、ZeroMQ、MetaMQ 。
下載地址:http://activemq.apache.org/co...mysql
window版本的解壓後雙擊/bin/activemq.bat
便可啓動。
或者以服務方式啓動:右鍵管理員運行InstallService.bat
,而後在Windows系統的服務中啓動。spring
它有本身的可視化頁面:http://localhost:8161/admin/sql
默認訪問密碼是:admin/admin
若是須要修改在:/conf/jetty-realm.properties 中修改數據庫
在springboot
上整合的,使用spring 的JmsTemplate
來操做ActiveMQ
1、首先在pom文件中導入所需的jar包座標:apache
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
2、新增一個ActiveMQ的配置文件spring-jms.xml
安全
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- 配置JMS鏈接工廠 --> <bean id="innerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${spring.activemq.broker-url}" /> </bean> <!--配置鏈接池--> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory" ref="innerConnectionFactory" /> <property name="maxConnections" value="100"></property> </bean> <!-- 配置JMS模板,Spring提供的JMS工具類 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="pooledConnectionFactory" /> <property name="defaultDestination" ref="JmsSenderDestination" /> <property name="receiveTimeout" value="10000" /> </bean> </beans>
3、在啓動類上配置以生效springboot
@ImportResource(locations={"classpath:/config/spring-jms.xml"})
4、在application.properties中配置ActiveMQ 的鏈接地址session
spring.activemq.broker-url=tcp://localhost:61616
準備就緒;開始寫生產者和消費者,我這裏把生產者和消費者寫在一個項目裏面。在這以前須要明白兩個概念
隊列(Queue)和主題(Topic)
隊列(Queue)和主題(Topic)是JMS支持的兩種消息傳遞模型:app
先在spring-jms.xml
裏添加配置一個隊列名稱Queue_love
<bean id="JmsSenderDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>Queue_love</value> </constructor-arg> </bean>
建立一個生產者來發送消息;@Qualifier("JmsSenderDestination")
指定了發送到上面配置的Queue_love
隊列
@Component public class JmsSender { @Autowired private JmsTemplate jmsTemplate; @Qualifier("JmsSenderDestination") @Autowired protected Destination destination; public void sendMessage(final String msg) { logger.info("QUEUE destination :" + destination.toString() + ", 發送消息:" + msg); jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(final Session session) throws JMSException { return session.createTextMessage(msg); } }); }}
建立一個消費者來消費消息:
@Component public class JmsTemplateListener implements MessageListener { @Override public void onMessage(Message message) { final TextMessage tm = (TextMessage) message; try { logger.info("QUEUE接收信息==="+tm.getText()); } catch (JMSException e) { e.printStackTrace(); } }}
消費者須要在spring-jms.xml
配置一下並指明該消費者須要消費哪一個隊列的消息
<!-- 配置消息隊列監聽者 --> <bean id="JmsListener" class="com.mashu.activeMq.jmsTemplate.JmsTemplateListener" /> <!-- 使用spring進行配置 監聽 --> <bean id="JmsTemplateListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="pooledConnectionFactory"></property> <property name="destination" ref="JmsSenderDestination"></property> <property name="messageListener" ref="JmsListener"></property> <property name="sessionTransacted" value="false"></property> <property name="concurrentConsumers" value="6"></property> <property name="concurrency" value="2-4"></property> <property name="maxConcurrentConsumers" value="10"></property> </bean>
這樣一個簡單的消息隊列收發程序已經寫好了,調用生產者方法,看看結果
發出的消息立馬就被消費了!
咱們能夠先把消費者註釋掉,只用生產者發送消息就能夠在可視化頁面上看到尚未被消費的消息內容。
Topic的方式和Queue相似,只須要在定義隊列的時候calss
=org.apache.activemq.command.ActiveMQTopic
便可
<bean id="JmsSenderTDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg> <value>Topic_love</value> </constructor-arg> </bean>
除了使用createTextMessage()
方法發送純字符串消息,還有
session.createObjectMessage()
;session.createStreamMessage()
;session.createBytesMessage()
;session.createMapMessage()
;ActiveMQ在使用的時候和MySQL
同樣,也能夠配置用戶名密碼,默認不沒有,咱們能夠打開:
在conf/activemq.xml
添加如下信息(務必在<systemUsage>
標籤上面)
<plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/> </users> </simpleAuthenticationPlugin> </plugins>
對應的用戶名密碼在/conf/credentials.properties
中配置
activemq.username=admin activemq.password=123456 guest.password=password
那麼咱們在項目中的application.properties
須要加上也要用戶名密碼:
spring.activemq.broker-url=tcp://localhost:61616 spring.activemq.user=admin spring.activemq.password=123456
ActiveMQ的持久化機制包含JDBC,KahaDB(默認)、LevelDB
默認保存的消息在\data\kahadb目錄下;下面方法修改成用MySQL(JDBC)
保存,
生產者發送消息存儲到MySQL
數據庫,消費者消費後消息從數據庫消失。
修改/conf/activemq.xml
將:
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
修改成:
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/> </persistenceAdapter>
createTablesOnStartup
默認值是true,每次ActiveMQ啓動的時候都從新建立數據表,通常是首次啓動設置爲true,以後設置爲false。
添加:在/conf/activemq.xml 中加入MySQL鏈接信息
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost:3306/db_activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="123456"/> <property name="poolPreparedStatements" value="true"/> </bean>
重啓ActiveMQ
後數據庫產生三個表activemq_acks
、activemq_lock
、activemq_msgs