前面講的案例都是點對點的消息,即一個生產者發送的一條消息只能被一個消費者消費,而後就移除了。 而topic模式一條消息能夠被多個消費者訂閱,關係以下:
java
package com.sxt.demo; import java.net.URI; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQTextMessage; import com.sxt.bean.User; /** * 定義生產者 * @author Administrator * */ public class ActivemqdemoProducer { private static String userName="admin"; private static String password="admin"; private static String brokerURL="tcp://192.168.119.12:61616"; public static void main(String[] args) throws Exception { TopicConnectionFactory factory = new ActiveMQConnectionFactory(userName,password,brokerURL); TopicConnection connection = factory.createTopicConnection(); connection.start(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("topic-hello"); TopicPublisher publisher = session.createPublisher(topic); MapMessage message = session.createMapMessage(); message.setString("name", "lisi"); message.setString("age", "18"); publisher.send(message ); publisher.close(); session.close(); connection.close(); } }
package com.sxt.demo; import java.net.URI; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQTextMessage; import com.sxt.bean.User; /** * 定義消費者 * @author Administrator * */ public class ActivemqdemoConsumer { private static String userName="admin"; private static String password="admin"; private static String brokerURL="tcp://192.168.119.12:61616"; public static void main(String[] args) throws Exception { TopicConnectionFactory factory = new ActiveMQConnectionFactory(userName,password,brokerURL); TopicConnection connection = factory.createTopicConnection(); connection.start(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("topic-hello"); TopicPublisher publisher = session.createPublisher(topic); TopicSubscriber subscriber = session.createSubscriber(topic); subscriber.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(message instanceof MapMessage){ MapMessage mapMessage=(MapMessage)message; try { String name = mapMessage.getString("name"); String age = mapMessage.getString("age"); System.out.println(name+"+"+age); } catch (JMSException e) { // TODO Auto-generated catch block System.out.println("錯誤信息"); } } } }); Thread.sleep(30000); publisher.close(); session.close(); connection.close(); } }
先啓動消費者
啓動生產者
spring
<dependencies> <!-- ActiveMQ客戶端完整jar包依賴 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> <!-- ActiveMQ和Spring整合配置文件標籤處理jar包依賴 --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.5</version> </dependency> <!-- Spring-JMS插件相關jar包依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.6.RELEASE</version> </dependency> <!-- Spring框架上下文jar包依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.6.RELEASE</version> </dependency> </dependencies>
package com.sxt.bean; import java.io.Serializable; public class User implements Serializable{ /** * */ private static final long serialVersionUID = 1L; private String id; private String name; private String password; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override public String toString() { return "User [id=" + id + ", name=" + name + ", password=" + password + "]"; } }
package com.sxt.producer; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import com.sxt.bean.User; public class OrderProducer { private JmsTemplate template; public JmsTemplate getTemplate() { return template; } public void setTemplate(JmsTemplate template) { this.template = template; } /** * 發送消息的方法 */ public void sendOrder(String destinationName,User user){ template.send(destinationName, new MessageCreator() { /** * 模板模式中暴露給調用者的方法 */ @Override public Message createMessage(Session session) throws JMSException { return session.createObjectMessage(user); } }); } }
package com.sxt.consumer; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import org.apache.activemq.command.ActiveMQObjectMessage; public class OrderConsumer implements MessageListener{ @Override public void onMessage(Message message) { ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message; try { System.out.println(msg.getObject()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd"> <!-- ActiveMQ 鏈接工廠 --> <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <!-- 需提供訪問路徑tcp://ip:61616;以及用戶名,密碼 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://192.168.119.12:61616" userName="admin" password="admin" /> <!-- Spring Caching鏈接工廠 --> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- Session緩存數量 --> <property name="sessionCacheSize" value="100" /> </bean> <!-- 消息生產者 start --> <!-- 定義JmsTemplate對象. 此類型由Spring框架JMS組件提供. 用於訪問ActiveMQ使用. --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 --> <constructor-arg ref="connectionFactory" /> <!-- 非pub/sub模型(發佈/訂閱),即隊列模式, 默認數據可省略配置 --> <!-- <property name="pubSubDomain" value="false" /> --> </bean> <!-- 定義生成者對象 --> <bean id="orderProducer" class="com.sxt.producer.OrderProducer"> <!-- 爲屬性賦值 --> <property name="template" ref="jmsQueueTemplate"></property> </bean> <!--消息生產者 end --> <!-- 消息消費者 start --> <!-- 定義消息監聽器, 此組件爲spring-jms組件定義. 能夠一次註冊若干消息監聽器. 屬性解釋: destination-type - 目的地類型, queue表明消息隊列 可選值: queue | topic | durableTopic queue - 默認值. 表明消息隊列 topic - 表明消息隊列集合 durableTopic - 持久化的消息隊列集合. ActiveMQ會保證消息的消費者必定接收到此消息. container-type - 容器類型 可選值: default | simple default - 默認值. 默認容器類型, 對應DefaultMessageListenerContainer simple - 簡單容器類型, 對應SimpleMessageListenerContainer connection-factory - 連接工廠, 注入的是Spring-JMS組件提供的連接工廠對象. acknowledge - 確認方式 可選值: auto | client | dups-ok | transacted auto - 默認值, 即自動確認消息 client - 客戶端確認消息 dups-ok - 可以使用副本的客戶端確認消息 transacted - 有事務的持久化消息確認機制. 需開啓對ActiveMQ的事務控制纔可應用. --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <!-- 註冊消息監聽器. 若是須要註冊多個, 重複定義下述標籤. --> <jms:listener destination="spring-MQ" ref="orderReciver" /> </jms:listener-container> <!-- 容器管理消息監聽器實現類對象 --> <bean id="orderReciver" class="com.sxt.consumer.OrderConsumer"/> <!-- 消息消費者 end --> </beans>
package com.sxt.test; import org.apache.xbean.spring.context.ClassPathXmlApplicationContext; import org.springframework.context.ApplicationContext; import com.sxt.bean.User; import com.sxt.producer.OrderProducer; public class Test { public static void main(String[] args) { ApplicationContext ac = new ClassPathXmlApplicationContext("application.xml"); OrderProducer bean = ac.getBean(OrderProducer.class); User user = new User(); user.setId("2"); user.setName("lisi"); user.setPassword("123456"); bean.sendOrder("spring-MQ", user); System.out.println("消息發送完成"); } }