消息的發佈有2種形式, 隊列式(點對點) 和主題式(pub/sub) 模式, 隊列式發佈後, 接收者從隊列中獲取消息後, 消息就會消失, 但任意消費者均可以從隊列中接受消息, 消息只能被接受一次html
主題式則爲接受後消息不消失java
JMS 是(java message service) 是 基於JVM代理的規範, ActiveMQ是他的一種實現linux
2、JMS消息基本組件 2.1、ConnectionFactory 建立Connection對象的工廠,針對兩種不一樣的jms消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。能夠經過JNDI來查找ConnectionFactory對象。 2.2、Destination Destination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對於消息生產者來講,它的Destination是某個隊列(Queue)或某個主題(Topic);對於消息消費者來講,它的Destination也是某個隊列或主題(即消息來源)。 因此,Destination實際上就是兩種類型的對象:Queue、Topic能夠經過JNDI來查找Destination。 2.3、Connection Connection表示在客戶端和JMS系統之間創建的連接(對TCP/IP socket的包裝)。Connection能夠產生一個或多個Session。跟ConnectionFactory同樣,Connection也有兩種類型:QueueConnection和TopicConnection。 2.4、Session Session是操做消息的接口。能夠經過session建立生產者、消費者、消息等。Session提供了事務的功能。當須要使用session發送/接收多個消息時,能夠將這些發送/接收動做放到一個事務中。一樣,也分QueueSession和TopicSession。 2.5、消息的生產者 消息生產者由Session建立,並用於將消息發送到Destination。一樣,消息生產者分兩種類型:QueueSender和TopicPublisher。能夠調用消息生產者的方法(send或publish方法)發送消息。 2.6、消息消費者 消息消費者由Session建立,用於接收被髮送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別經過session的createReceiver(Queue)或createSubscriber(Topic)來建立。固然,也能夠session的creatDurableSubscriber方法來建立持久化的訂閱者。 2.7、MessageListener 消息監聽器。若是註冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。
1), linux安裝spring
訪問: http://activemq.apache.org/activemq-5111-release.html
2), docker安裝docker
sudo docker run -d -p 61616:61616 -p 8161:8161 cloudesire/activemq
安裝成功後, 可訪問: apache
3) 使用springboot的內嵌 activemqspringboot
<!--springboot 內嵌 activemq--> <dependency> <groupId>org.apacke.activemq</groupId> <artifactId>activemq-broker</artifactId> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> </dependency>
spring: activemq: broker-url: tcp://192.168.50.202:61616
支持的配置有: session
activemq: broker-url: tcp://192.168.50.202:61616 user: password: in-memory:
3) 消息發送者: dom
package com.wenbronk.enterprise.jms.message; import org.springframework.jms.core.MessageCreator; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; /** * 消息建立者 * Created by wenbronk on 2017/6/13. */ public class CreateMessage implements MessageCreator{ @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage("測試消息"); } }
4) 消息接受着: socket
package com.wenbronk.enterprise.jms.message; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; /** * 消息接受者 * Created by wenbronk on 2017/6/13. */ @Component public class ReceiMessage { /** * 使用 @JmsListener 指定要監聽的域, 有消息發送時就會發送到此域中 * @param message */ @JmsListener(destination = "my-destination") public void receiveMessage(String message) { System.out.println("接受到的消息是: " + message); } }
5), 消息發送:
package com.wenbronk.enterprise.jms.config; import com.wenbronk.enterprise.jms.message.CreateMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; /** * 消息發送及目的地定義 * 實現commandLineRunner, 使得程序啓動後執行該類的run() 方法 * Created by wenbronk on 2017/6/13. */ @Component public class MessageConfig implements CommandLineRunner { @Autowired private JmsTemplate jmsTemplate; /** * 消息域爲 my-destination * 發送者爲 createMessage() * @param args * @throws Exception */ @Override public void run(String... args) throws Exception { jmsTemplate.send("my-destination", new CreateMessage()); } }
若是想自定義使用 topic 或者 queue 模式, 須要本身指定:
MessageConfig
package com.wenbronk.enterprise.jms.config; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Queue; import javax.jms.Topic; /** * 建立 點對點模式的對象和發佈訂閱模式的對象 * Created by wenbronk on 2017/6/13. */ @Configuration public class TopicConfig { @Bean public Queue queue() { return new ActiveMQQueue("sample.queue"); } @Bean public Topic topic() { return new ActiveMQTopic("sample.topic"); } }
消息發佈者:
package com.wenbronk.enterprise.jms.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.jms.Queue; import javax.jms.Topic; /** * 點對點和訂閱模式的消息發佈 * Created by wenbronk on 2017/6/13. */ @Component @EnableScheduling public class MessageProducer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Autowired private Topic topic; @Scheduled(fixedDelay=3000)//每3s執行1次 public void send() { //send queue. this.jmsMessagingTemplate.convertAndSend(this.queue, "hi,activeMQ"); //send topic. this.jmsMessagingTemplate.convertAndSend(this.topic, "hi,activeMQ(topic)"); } }
消息接受者
package com.wenbronk.enterprise.jms.message; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; /** * 點對點和訂閱模式的消息接受着 * Created by wenbronk on 2017/6/13. */ @Component public class MessageConsumer { @JmsListener(destination = "sample.topic") public void receiveQueue(String text) { System.out.println("Consumer2="+text); } @JmsListener(destination = "sample.topic") public void receiveTopic(String text) { System.out.println("Consumer3="+text); } }
須要在配置文件中開啓 發佈訂閱模式的支持:
spring:
activemq: broker-url: tcp://192.168.50.202:61616 jms: pub-sub-domain: true