一,activeMq安裝java
1,下載activeMq,放置到電腦某個目錄,web
2,直接運行spring
/bin/activemq.bat腳本,便可啓動activeMqapache
3,訪問網址localhost:8161,便可查看activeMq網頁界面springboot
4,activeMq目錄中有一些例子,能夠參考;dom
二,springboot集成activeMQtcp
1,添加依賴ide
<!-- mq begin --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <!-- <version>5.7.0</version> --> </dependency> <!-- mq end -->
2,添加activeMQ配置信息spring-boot
spring.activemq.broker-url=tcp://localhost:61616 #activeMQ地址信息,注意訪問端口是61616 #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617) spring.activemq.close-timeout=5000 spring.activemq.in-memory=false #spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=100 spring.activemq.send-timeout=3000 #activeMQ發送topic消息,須要此配置 #spring.jms.pub-sub-domain=true #信任全部的包 spring.activemq.packages.trust-all=true #queque 及 topic配置信息 queque.police=queque.police queue.edu=queue.edu topic.pbc=topic.pbc topic.veh=topic.veh
3,配置類this
package org.spring.web.component; import javax.jms.Queue; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.core.JmsMessagingTemplate; //MQ configuration class
//主要配置 配置信息中的 spring.jms.pub-sub-domain 屬性值信息,能夠實現同時發送queue和topic信息
@Configuration public class MqConfig { /* @Bean public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616"); }*/ @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(connectionFactory); System.out.println("注入的connectionFactory>>>>>"+connectionFactory.getUserName()); return bean; }
@Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(connectionFactory); bean.setPubSubDomain(false); return bean; } /* @Bean public JmsMessagingTemplate jmsMessagingTemplate(ActiveMQConnectionFactory connectionFactory){ return new JmsMessagingTemplate(connectionFactory); }*/ }
4,注入mqBeans
package org.spring.web.mq; import javax.jms.Queue; import javax.jms.Topic; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; /** * * 項目名稱:spring-web 類名稱:MqBans 類描述: 建立人:john 建立時間:2018年7月30日 上午7:30:18 修改人:john * 修改時間:2018年7月30日 上午7:30:18 修改備註: * * @version * */ @Configuration @EnableJms public class MqBans { @Value("${queque.police}") private String policeQueue; @Value("${queue.edu}") private String eduQueue; @Value("${topic.pbc}") private String pbcTopic; @Value("${topic.veh}") private String vehTopic; @Bean public Queue policeQueue() { return new ActiveMQQueue(policeQueue); } @Bean public Queue eduQueue() { return new ActiveMQQueue(eduQueue); } @Bean public Topic pbcTopic() { return new ActiveMQTopic(pbcTopic); } @Bean public Topic vehTopic() { return new ActiveMQTopic(vehTopic); } }
5,生產者
package org.spring.web.mq; import javax.jms.Queue; import javax.jms.Topic; import org.spring.web.entity.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; @Component public class Producer implements CommandLineRunner { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue policeQueue; @Autowired private Topic pbcTopic; @Autowired private Topic vehTopic; @Override public void run(String... args) throws Exception { for(int i=0;i<1;i++){ send("Sample message"+i); } sendTopic("topic send"); } public void send(String msg) { this.jmsMessagingTemplate.convertAndSend(this.policeQueue, msg); System.out.println("Message was sent to the policeQueue"); } public void sendTopic(String msg){ System.out.println("消費者發送topic消息"); this.jmsMessagingTemplate.convertAndSend(this.pbcTopic,msg); System.out.println("Message was sent to the pbcTopic"); } }
6,消費者
package org.spring.web.mq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import org.spring.web.entity.User; import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; @Component public class Consumer { @JmsListener(destination = "${queque.police}",containerFactory="jmsListenerContainerQueue") @SendTo("out.queue") public String receiveQueue(String text) { System.out.println("police消費者1接受到的消息"); System.out.println(text); return "sample.queue接受的消息>>>>>" + text; } @JmsListener(destination = "${queque.police}",containerFactory="jmsListenerContainerQueue") public void receiveQueue2(String text){ System.out.println("police消費者2接受到的消息"); System.out.println(text); } @JmsListener(destination = "out.queue",containerFactory="jmsListenerContainerQueue") public void receiveOutQueue(String text) { System.out.println("out.Queue接受到的信息" + text); } @JmsListener(destination = "${topic.pbc}",containerFactory="jmsListenerContainerTopic") public void receiveTopic(String text){ System.out.println("pbc1消費者接受的信息"); System.out.println(text); } @JmsListener(destination = "${topic.pbc}",containerFactory="jmsListenerContainerTopic") public void receiveTopic2(String text){ System.out.println("pbc2消費者接受的信息"+text); } @JmsListener(destination = "${topic.veh}",containerFactory="jmsListenerContainerTopic") public void receiveTopicUser(Message m){ System.out.println("接受到的對象信息message>>>>"+m); User user=new User(); if(m instanceof ObjectMessage){ ObjectMessage objectMessage=(ObjectMessage) m; System.out.println("objectMessage>>>>>>>"+objectMessage); try { user=(User) objectMessage.getObject(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("消費者接受的用戶對象信息"+user); } }
總結:
1,activeMq的端口號是61616;
2,使用topic,須要配置 spring.jms.pub-sub-domain=true;
3,queque若是沒有消費者,會將信息存儲到queue中;
4,主方法啓動的時候,消費者的監聽已經生效;
5,發送的消息爲對象的時候,須要將對象序列化;消費者接受對象信息的時候須要使用ObjectMessage進行轉化;
6,使用JmsListener註解中的containerFactory屬性,能夠配置spring.jms.pub-sub屬性,實現同事接收queque和topic;
7,queque爲點對點模式;tipic爲發佈訂閱模式;