Java消息中間件--初級篇

    1、 爲何使用消息中間件?java

           假設用戶登陸系統   傳統方式 用戶登陸  調用短息服務   積分服務  日誌服務等各類服務  若是短息服務出現問題就沒法發送短信並且用戶登陸成功必須全部調用所有完成返回給用戶登陸系統一條用戶登陸成功信息。從總體業務上講  用戶只是要登陸系統  並不關心短信服務  日誌服務怎麼樣就想登陸成功就好  這種操做讓用戶等待了時間。spring

           2)經過消息中間件解耦服務調用apache

            用戶登陸系統會將登陸消息發送給消息中間件  ---消息中間件會將用戶登陸消息異步一條一條推送給---短息服務  日誌服務等其餘相關服務   用戶就不須要等待其餘服務處理完成在給我返回結果。安全

 

   2、 消息中間件的好處:服務器

    1)系統解耦session

    2)異步併發

    3)橫向擴展mvc

    4)安全可靠   消息中間件會把咱們的消息進行保存  若是其餘業務系統出現問題  或者業務系統沒有對消息進行消費  業務系統能夠下一次繼續對消息進行消費異步

    5)順序保存  maven

3、中間件是什麼:

    中間件做用在業務系統之間  不是操做系統軟件  還不是業務軟件,用戶不能直接使用的軟件同一叫法。

4、消息中間件:

  用於數據接收和發送,利用高效可靠的異步消息傳遞機制集成分佈式系統

5、JMS (java Message Service )

   Java消息服務 java消息中間件的API,用於在兩個應用程序之間或者分佈式系統中發送消息,進行異步通訊的規範。

6、AMQP

   提供統一消息服務的應用層標準協議,遵循這個協議客戶端與消息中間件能夠傳遞消息,不會受到客戶端和中間件不一樣產品,是不一樣開發語言影響  只要遵循這種協議就能夠傳遞消息。

7、常見消息中間件

     activeMQ 是一個徹底支持JMS1.1和J2EE1.4規範的 

     rabbitMQ 是一個開源的AMQP實現,用於分佈式系統中存儲轉發消息

      kafka  是一個高吞吐量的分佈式發佈訂閱消息系統,是一個分佈式的,分區的,可靠的分佈式日誌存儲服務。(不是一個嚴格消息中間件 ) 

                  1)高吞吐量:即便很是普通的硬件kafka也能夠支持每秒數百萬的消息

8、JMS規範

     提供者:實現JMS規範的消息中間件服務器

     客戶端:接收或發送消息的應用程序

     生產者/發佈者:建立併發送消息的客戶端

    消費者/訂閱者:接收並處理消息的客戶端

    消息:應用程序之間傳遞的數據內容

   消息模式:在客戶端之間傳遞消息的方式,JMS中定義了主題和隊列兩種模式

9、JMS消息模式

  1)隊列模式

       客戶端包括生產者和消費者

       隊列中的消息只能被一個消費者消費

      消費者能夠隨時消費隊列的消息  

       舉例:生產者  應用1  應用2  向JMS隊列中發送消息  應用1發送 1 3 5   應用2 發送2   4   6  JMS消息隊列中會存在  1 2 3 4 5 6 消息     時存在消費者  應用3  應用4   應用3與JMS 有兩個連接   應用4有一個連接  在消費消息的時候  三個連接會平均分配6各消息

  2)主題模式

       客戶端:包括髮布者和訂閱者

      主題中的消息被全部訂閱者消費

      消費者不能消費訂閱以前就發送到主題中的消息(消費者要消費隊列中的消息要先訂閱在消費   若是不提早訂閱是接收不到消息的)

  舉例:應用3 與應用4 向隊列中訂閱消息  應用3創建了兩個連接  應用4創建了一個連接   發佈者 應用1 應用2  向隊列中發佈消息 123456  當訂閱者消費消息的時候三個連接都消費了6個消息

      

10、JMS編碼接口

       ConnectionFactory 用於建立連接到消息中間件的連接工廠

       Connection 表明可應用程序和消息服務器之間的通訊鏈路

       Destination (目的地) 指消息發佈和接收的地點,包括隊列和主題

       Session 表示一個單線程的上下文,用於發送和接收消息

       MessageConsumer 由會話建立,用於接收發送到目標的消息

  MessageProducer 由會話建立,用於發送消息到目標

       Message 是在消費者和生產者之間傳送的對象,消息頭,一組消息屬性,一個消息體

   

   

十一:JMS代碼演示

  1)使用JMS接口規範連接activeMQ  隊列模式

   引入activemq依賴jar  注意:引入相關jar  必須與相應的jdk匹配不然會報異常

1 java.lang.UnsupportedClassVersionError: org/apache/lucene/store/Directory : Unsupported major.minor version 51.0

2 at java.lang.ClassLoader.defineClass1(Native Method)

3 at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)

4 at java.lang.ClassLoader.defineClass(ClassLoader.java:615)

5 at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)

6 at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)

7 at java.net.URLClassLoader.access$000(URLClassLoader.java:58) 8 at java.net.URLClassLoader$1.run(URLClassLoader.java:197)

<dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.14.0</version>
        </dependency>
    </dependencies>

建立消費提供方(主題模式消息發佈方)

public class JmsProduce {

    //聲明服務器地址
    private static final String url = "tcp://127.0.0.1:61616";
    //聲明隊列名稱
    //private static final String queue = "queue_test";
    private static final string topic = "topic_test";
    public static void main(String []args)throws Exception{
  
        //建立鏈接工廠  由消息服務商提供
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
        //根據消息工廠建立鏈接
        Connection connection = factory.createConnection();
        //開啓鏈接
        connection.start();
        //根據鏈接建立會話   參數一  是否使用事務  參數二 應答模式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //建立目標  也就是隊列
        // Destination destination =  session.createQueue(JmsProduce.queue);
//建立主題目標
Destination destination = session.createTopic(topic);
//建立一個生產者 MessageProducer producer = session.createProducer(destination); // for (int i=0;i<100;i++){ //建立消息 TextMessage textMessage = session.createTextMessage("test" + i); //生產者將消息發送給隊列 producer.send(textMessage); System.out.println("生產者"+textMessage); } connection.close(); } }

消息消費方(主題模式訂閱者)  

public class JmsConsumer {

    private static final String url="tcp://127.0.0.1:61616";
    //private static final String queue = "queue_test";
private static final String topic = "topic_test";
public static void main(String [] args) throws JMSException { //建立鏈接工廠 由消息服務商提供 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); //根據消息工廠建立鏈接 Connection connection = factory.createConnection(); //開啓鏈接 connection.start(); //根據鏈接建立會話 參數一 是否使用事務 參數二 應答模式 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目標 也就是隊列 //Destination destination = session.createQueue(queue);
//建立主題目標
Destination destination = session.createTopic(topic);
//建立消費者 MessageConsumer consumer = session.createConsumer(destination); //建立一個監聽器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage message1 = (TextMessage) message; System.out.println("接收消息"+message1); } }); } }

 

隊列模式是點對點形式

主題模式  消費者須要先對主題進行訂閱  而後發佈者在發佈過程當中消費者才能消費消息

 

Spring 整合JMS ActiveMq

建立一個maven項目   

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>cn.ac.bcc</groupId>
  <artifactId>Jms-Activemq</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>war</packaging>
  <properties>
        <spring.version>4.1.3.RELEASE</spring.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
        </dependency>
            <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>spring-context</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
View Code

 消息提供方實現

1)定義消息服務方接口  

package cn.ac.bcc.jms.service;

public interface ProducerService {
    //定義發送消息的方法
    public void sendMessage(String message);
}
View Code

2)配置公共common.xml文件

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:context="http://www.springframework.org/schema/context" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context-4.0.xsd 
        http://www.springframework.org/schema/jms 
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd">

        
        <!--開啓註解掃描  -->
        <context:annotation-config/>
        <!--配置activemq鏈接工廠   在spring提供的鏈接工廠中須要提供activemq提供的工廠  -->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <!-- 配置activemq服務器地址  經過地址建立鏈接 -->
            <property name="brokerURL" value="tcp://localhost:61616"/>
        </bean>
        <!-- 配置spring jms 提供的鏈接工廠 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
        <!-- 配置activeMq目的地 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <!-- 指定隊列名稱  經過構造方式 -->
            <constructor-arg value="queue-test"/>
        </bean>
    
</beans>
View Code

3)配置spring配置文件 producer.xml

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context-4.0.xsd 
        http://www.springframework.org/schema/jms 
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd" >

    <!--引入公共配置文件 -->
    <import resource="common.xml" />
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--注入鏈接工廠 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <bean class="cn.ac.bcc.jms.service.impl.ProducerServiceImpl"></bean>
</beans>

4)實現消息發送接口

package cn.ac.bcc.jms.service.impl;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import cn.ac.bcc.jms.service.ProducerService;

@Service
public class ProducerServiceImpl implements ProducerService {

    @Autowired
    private JmsTemplate jmsTemplate;
    @Resource(name = "queueDestination")
    private Destination destination;

    @Override
    public void sendMessage(final String message) {

        //經過jmsTemplate 模板發送消息  傳遞兩個參數  消息的目的地  也就是activemq服務    參數2  建立一個消息體 封裝消息信息
        jmsTemplate.send(destination, new MessageCreator() {

            @Override
            public Message createMessage(Session session) throws JMSException {

                TextMessage textMessage = session.createTextMessage(message);
                System.out.println("發送消息" + textMessage.getText());
                return textMessage;
            }
        });

    }

}
View Code

5)測試類

package cn.ac.bcc.test;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import cn.ac.bcc.jms.service.ProducerService;

public class JmsProducer {

    public static void main(String[] args) {

        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
        // 獲取提供者接口實例
        ProducerService producerService = context.getBean(ProducerService.class);
        for (int i = 0; i < 100; i++) {
            // 調用發送消息方法
            producerService.sendMessage("消息發送來了" + i);
        }
        //關閉鏈接
        context.close();
    }

}
View Code

消息消費方實現

  1)自定義消息消費方監聽實現spring提供的MessageListener監聽

package cn.ac.bcc.jms.listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ConsumerListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage)message;
        try {
            System.out.println("消息消費"+textMessage.getText());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}
View Code

2)配置消費方spring 配置文件 consumer.xml

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context-4.0.xsd 
        http://www.springframework.org/schema/jms 
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd">
        <!--引入公共配置文件  -->
        <import resource="common.xml"/>
        <!--建立自定義監聽  -->
        <bean id = "consumerListener" class="cn.ac.bcc.jms.listener.ConsumerListener"></bean>
        <!--配置jms監聽器  -->
        <bean id="jmsLisener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
         <property name="connectionFactory" ref="connectionFactory"></property>
        <property name="destination" ref="queueDestination"></property>
        <property name="messageListener" ref="consumerListener"></property>
        </bean>        
</beans>
View Code

3)消費方測試實現

package cn.ac.bcc.test;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ConsumerText {
    
    public static void main(String[] args) {
        
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
        
    }

}
View Code

以上爲spring整合JMS 實現消息接收發送隊列模式實現   在消息接收與發送過程當中要啓動activemq 

 //設置消息的有效期  當24小時內消息接收爲有效期

jmsTemplate.setTimeToLive(86400000)

activemq入門好文章推薦

https://blog.csdn.net/lifetragedy/article/details/51836557 

相關文章
相關標籤/搜索