ActiveMQ 使用的是標準生產者和消費者模型
有兩種數據結構 Queue、 Topic
1、 Queue 隊列 ,生產者生產了一個消息,只能由一個消費者進行消費
2、 Topic 話題,生產者生產了一個消息,能夠由多個消費者進行消費 spring
默認 tcp 鏈接 activeMQ 端口 61616
緩存
導入jar網絡
2、 編寫配置生產者
配置 activemq 鏈接工廠session
<!-- ActiveMQ 鏈接工廠 --> <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> <!-- 若是鏈接網絡:tcp://ip:61616;未鏈接網絡:tcp://localhost:61616 以及用戶名,密碼--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> <!-- Spring Caching鏈接工廠 --> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="mqConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactor <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- 同上,同理 --> <!-- <constructor-arg ref="amqConnectionFactory" /> --> <!-- Session緩存數量 --> <property name="sessionCacheSize" value="100" /> </bean> <!-- Spring JmsTemplate 的消息生產者 start--> <!-- 定義JmsTemplate的Queue類型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 --> <constructor-arg ref="mqConnectionFactory" /> <!-- 非pub/sub模型(發佈/訂閱),即隊列模式 --> <property name="pubSubDomain" value="false" /> </bean> <!-- 定義JmsTemplate的Topic類型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 --> <constructor-arg ref="mqConnectionFactory" /> <!-- pub/sub模型(發佈/訂閱) --> <property name="pubSubDomain" value="true" /> </bean> <!-- 定義Queue監聽器 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <!-- 默認註冊bean名稱,應該是類名首字母小寫 --> <jms:listener destination="bos_sms" ref="topicConsumer"/> </jms:listener-container>
queue模式生產者模板,topic模式同,只是注入類不一樣:數據結構
@Service public class QueueSender { // 注入jmsTemplate @Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate; //建立一個send方法,傳入隊列名,和一個用於發送的消息 public void send(String queueName, final String message) { jmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
queue消費者模板app
@Service public class QueueConsumer1 implements MessageListener { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out .println("消費者QueueConsumer獲取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
生產者測試tcp
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:applicationContext-mq.xml") public class ProducerTest { @Autowired private QueueSender queueSender; @Autowired private TopicSender topicSender; @Test public void testSendMessage() { queueSender.send("spring_queue", "我是生產者"); } }
消費者測試測試
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:applicationContext-mq-consumer.xml") public class ConsumerTest { @Test public void testConsumerMessage() { while (true) { // junit退出,防止進程死掉 } } }
結果運行生產者測試後對應測試者會輸出對應消息spa