第十七章:springboot 整合 activeMQ

首先介紹 JMShtml

  JMS即Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持(百度百科給出的概述)。咱們能夠簡單的理解:兩個應用程序之間須要進行通訊,咱們使用一個JMS服務,進行中間的轉發,經過JMS 的使用,咱們能夠解除兩個程序之間的耦合。JMS是一種與廠商無關的 API,用來訪問消息收發系統消息,它相似於JDBC(Java Database Connectivity)。java

概念    
        JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
        JMS生產者(Message Producer)
        JMS消費者(Message Consumer)
        JMS消息
        JMS隊列
        JMS主題spring

 

JMS消息一般有兩種類型:點對點(Point-to-Point)、發佈/訂閱(Publish/Subscribe)apache

 

接着介紹 activeMQ瀏覽器

ActiveMQ 是Apache出品,最流行的. 功能強大的即時通信和集成模式的開源服務器springboot

特色:
            1)支持來自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各類跨語言客戶端和協議
            2)支持許多高級功能,如消息組,虛擬目標,通配符和複合目標
            3) 徹底支持JMS 1.1和J2EE 1.4,支持瞬態,持久,事務和XA消息
            4) Spring支持,ActiveMQ能夠輕鬆嵌入到Spring應用程序中,並使用Spring的XML配置機制進行配置
            5) 支持在流行的J2EE服務器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中進行測試
            6) 使用JDBC和高性能日誌支持很是快速的持久化
            ...
 服務器

下載地址:http://activemq.apache.org/activemq-5153-release.htmlapp

 

下載解壓,進入 bin 文件夾 ,若是咱們是32位的機器,就雙擊win32目錄下的activemq.bat,若是是64位機器,則雙擊win64目錄下的activemq.bat異步

若是沒有異常,表明啓動成功,常見啓動失敗緣由是端口占用的問題tcp

咱們能夠關閉佔用的端口,或者修改 activeMQ 的端口。打開 conf/activemq.xml,修改被佔用的端口。

  <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5671?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

瀏覽器打開 http://localhost:8161 進入 控制檯,輸入 默認的用戶名和密碼 admin/admin 

打開 queue 頁面

Name:隊列名稱。
        Number Of Pending Messages:等待消費的消息個數。
        Number Of Consumers:當前鏈接的消費者數目
        Messages Enqueued:進入隊列的消息總個數,包括出隊列的和待消費的,這個數量只增不減。
        Messages Dequeued:已經消費的消息數量

建立 隊列,輸入隊列名稱,點擊 create 按鈕

 

-----------------------------------------分割線-------------------------------------------------------------------------

如今咱們整合 springboot 和 activeMQ

第一步、引入 pom 依賴

<!-- 整合消息隊列ActiveMQ -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>

		<!-- 若是配置線程池則加入 -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-pool</artifactId>
		</dependency>

 

第二步、配置文件 application.properties 修改

#整合jms測試,安裝在別的機器,防火牆和端口號記得開放
spring.activemq.broker-url=tcp://127.0.0.1:61616
#集羣配置
#spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
spring.activemq.user=admin
spring.activemq.password=admin
#下列配置要增長依賴
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

第三步、啓動類

加入註解

@EnableJms

加入 bean

@Bean
ConnectionFactory connectionFactory() {
   return new ActiveMQConnectionFactory();
}

@Bean
JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
   JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
   jmsTemplate.setPriority(999);
   return jmsTemplate;
}

@Bean(value="jmsMessagingTemplate")
JmsMessagingTemplate jmsMessagingTemplate(JmsTemplate jmsTemplate) {
   JmsMessagingTemplate messagingTemplate = new JmsMessagingTemplate(jmsTemplate);
   return messagingTemplate;
}

第四步、消息發佈者

public interface ProducerService {

   /**
    * 功能描述:指定消息隊列,還有消息
    * @param destination
    * @param message
    */
   public void sendMessage(Destination destination, final String message);
    
}
@Service
public class ProducerServiceImpl implements ProducerService{

   @Autowired
   private JmsMessagingTemplate jmsTemplate; //用來發送消息到broker的對象
   
   //發送消息,destination是發送到的隊列,message是待發送的消息
   @Override
   public void sendMessage(Destination destination, String message) {
      
      jmsTemplate.convertAndSend(destination, message);
      
   }
     
}

第五步、消息消費者

@Component
public class OrderConsumer {
// 接收 order.queue 隊列的消息
   @JmsListener(destination="order.queue")
   public void receiveQueue(String text){
       System.out.println("OrderConsumer收到的報文爲:"+text);
   }
}

第六步、測試類進行測試

@RestController
@RequestMapping("/queue")
public class QueueController {
   
   @Autowired
   private ProducerService producerService;


   /**
    * 點對點消息發送 ,指定 隊列
    * @param msg
    * @return
    */
   @RequestMapping("/order")
   public Object order(String msg){
      Destination destination = new ActiveMQQueue("order.queue");
      producerService.sendMessage(destination, msg);
        return "ok";
   }

}

打開  activeMQ 控制檯,能夠看到 隊列中的消息數量 發生了 變化

 

以上是 activeMQ 的點對點消息隊列。點對點並非只A發送的消息只能指定B接收,而是隻A發送的任意一條消息只能由一我的接收處理,也就是每條消息只能被消費一次。

JMS 的另外一種模式 發佈/訂閱模式。A發送的消息能夠被全部監聽A的對象的接收,就比如學校的廣播,全部的學生均可以收聽校園廣播信息。

正常狀況下,activeMQ 只支持一種消息模式,這裏作出配置修改,讓其可以同時支持 兩種 模式

 

啓動類中 加入 bean

@Bean
            public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
                DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
                bean.setPubSubDomain(true);
                bean.setConnectionFactory(activeMQConnectionFactory);
                return bean;
            }
@Bean
public Topic topic(){
   return new ActiveMQTopic("topic.queue");
}

 

Topic  發佈/訂閱 使用 topic.queue 隊列

新建訂閱者,@JmsListener若是不指定獨立的containerFactory的話是隻能消費queue消息

@Component
public class TopicSub {

   
   @JmsListener(destination="topic.queue",containerFactory="jmsListenerContainerTopic")
   public void receive1(String text){
      System.out.println("topic.queue 消費者:receive1="+text);
   }
   
   
   @JmsListener(destination="topic.queue",containerFactory="jmsListenerContainerTopic")
   public void receive2(String text){
      System.out.println("topic.queue 消費者:receive2="+text);
   }


   @JmsListener(destination="topic.queue",containerFactory="jmsListenerContainerTopic")
   public void receive3(String text){
      System.out.println("topic.queue 消費者:receive3="+text);
   }
   
   
}

 

修改 信息發送者 ProducerServiceImpl.java,增長代碼

@Autowired
private Topic topic;


@Override
public void publish(String msg) {
   this.jmsTemplate.convertAndSend(this.topic, msg);
}

 

修改測試類 QueueController.java ,增長代碼

/**
 * 發佈、訂閱消息
 * @param msg
 * @return
 */
@RequestMapping("/publish")
public Object publish(String msg){
   producerService.publish(msg);
   return "ok";
}

 

啓動項目,訪問 /queue/order ,打印出一條數據 

OrderConsumer收到的報文爲:訂單信息

訪問  /queue/publish ,打印出三條數據

topic.queue 消費者:receive3=發佈、訂閱
topic.queue 消費者:receive1=發佈、訂閱
topic.queue 消費者:receive2=發佈、訂閱
相關文章
相關標籤/搜索