ActiveMQ消息隊列使用和配置

 

安裝ActiveMQ

使用brew安裝https://blog.csdn.net/u010046908/article/details/54728375java

直接下載安裝https://blog.csdn.net/ytangdigl/article/details/77740100git

啓動

activemq start

賬號密碼都是admingithub

默認端口8161web

 

搭建項目框架

這邊拿一個簡易的spring+springMVC爲框架spring

添加pom依賴apache

    <!--activity工做流依賴-->
    <dependency>
      <groupId>org.activiti</groupId>
      <artifactId>activiti-engine</artifactId>
      <version>${activiti.version}</version>
    </dependency>
    <!-- activiti 與 Spring 集成 -->
    <dependency>
      <groupId>org.activiti</groupId>
      <artifactId>activiti-spring</artifactId>
      <version>${activiti.version}</version>
    </dependency>


    <!--activeMq依賴包-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.13.2</version>
    </dependency>
    <!--spring和mq的依賴包-->
    <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>4.3.8.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-messaging</artifactId>
      <version>4.3.8.RELEASE</version>
    </dependency>

 

添加spring-config.xml中的schema

xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"

http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd

  

添加配置

   <!--mq配置-->
    <!--mq的消息中心,brokerURL中心的位置-->
    <amq:connectionFactory brokerURL="tcp://localhost:61616" userName="admin" password="admin"
                           id="amqconnectionFactory"/>

    <!--spring和jms的鏈接targetConnectionFactory是寫死的-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqconnectionFactory"/>
        <!--緩存大小-->
        <property name="sessionCacheSize" value="10"/>
    </bean>

    <!--配置兩種模式-->
    <!--點對點模式-->
    <!--發佈訂閱-->
    <!--id爲jmsTemplate,可能id會重複-->
    <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory"/>
        <!--不接收 pubSubDomain:是否是隊列-->
        <property name="pubSubDomain" value="false"/>
        <!--消息不會持久化了-->
        <!--<property name="deliveryMode" value="1"/>-->
    </bean>

    <!--訂閱者-->
    <bean id="topicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory"/>
        <!--接收-->
        <property name="pubSubDomain" value="true"/>
    </bean>

    <!--監聽-->
    <!--acknowledge消息課靠性傳輸,屬性auto默認自動確認機制,-->
    <!-- prefetch="100" 傳輸次數,和冪有關係-->
    <!--destination-type=urableTopic能夠設置持久化和非持久化-->
    <!--接收信息queue點對點-->
    <jms:listener-container destination-type="queue" connection-factory="connectionFactory">
        <!-- destination監聽的對列-->
        <jms:listener destination="oldboy.queue" ref="queueReceicer_one"/>
        <jms:listener destination="oldboy.queue" ref="queueReceicer_two"/>
    </jms:listener-container>


    <!--發送信息queue發佈訂閱-->
    <jms:listener-container destination-type="topic" connection-factory="connectionFactory">
        <!--destination監聽的對列-->
        <jms:listener destination="oldboy.topic" ref="topicReceicer_one"/>
        <jms:listener destination="oldboy.topic" ref="topicReceicer_two"/>
    </jms:listener-container>

  

建立和配置文件對應的類

P2P模式(點point對點point)

QueueReceicer_one.java緩存

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created by peng on 18/6/26.
 */
@Component("queueReceicer_one")
public class QueueReceicer_one implements MessageListener {

    @Override
    public void onMessage(Message message) {

        try {
            System.out.println("queueReceicer_one:"+((TextMessage) message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

  

QueueReceicer_two.javasession

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created by peng on 18/6/26.
 */
@Component("queueReceicer_two")
public class QueueReceicer_two implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("queueReceicer_two:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

  

QueueSender.javaapp

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * Created by peng on 18/6/26.
 */
@Component
public class QueueSender {

    @Autowired
    @Qualifier("queueTemplate")
    private JmsTemplate template;

    public void send(String queueName , final String message){
        template.send(queueName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }

}

  

 

發佈訂閱模式

TopicReceicer_one.java框架

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created by peng on 18/6/26.
 */
@Component("topicReceicer_one")
public class TopicReceicer_one implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("topicReceicer_one:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

  

TopicReceicer_two.java

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created by peng on 18/6/26.
 */
@Component("topicReceicer_two")
public class TopicReceicer_two implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("topicReceicer_two:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

  

TopicSender.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * Created by peng on 18/6/26.
 */
@Component
public class TopicSender {

    @Autowired
    @Qualifier("topicTemplate")
    private JmsTemplate template;

    public void send(String queueName , final String message){
        template.send(queueName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }

}

  

調用

MqTest.java

import oldboy.vip.controller.service.QueueSender;
import oldboy.vip.controller.service.TopicSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * Created by peng on 18/6/26.
 */
@Controller
@RequestMapping("mq")
public class MqTest {

    @Autowired
    private QueueSender queueSender;

    @Autowired
    private TopicSender topicSender;


    @RequestMapping("/test1")
    @ResponseBody
    public String test1(){
        queueSender.send("oldboy.queue","oldboy");//oldboy.queue和配置文件中destination對應
        topicSender.send("oldboy.topic","oldboy");//oldboy.topic和配置文件中destination對應

        return "oldboy.vip";
    }

}

  

最終目錄結構

 

啓動項目

 

控制檯接收到消息

點對點的只能收一我的收到

消息訂閱的能夠全部人收到

 

能夠在acitveMQ中看到(能夠查看12是由於測試的次數比較多。。)

 

代碼已經扔到github上了https://github.com/oldboyooxx/activeMQDemo對你有幫助喜歡點個喜歡~

相關文章
相關標籤/搜索