消息隊列篇—詳談ActiveMQ消息隊列模式的分析及使用

消息隊列(Message Queue)是分佈式系統中重要的組件,通用使用場景能夠簡單地描述爲當不須要當即得到結果,可是併發量需控制時就須要使用消息隊列。消息列隊有兩種消息模式,一種是點對點的消息模式,另外一種是訂閱\發佈的消息模式。java

點對點的消息模式apache

點對點的模式主要創建在一個隊列上,當鏈接一個列隊時,發送方不須要知道接收方是否正在接收消息,能夠直接向ActiveMQ發送消息,而發送的消息將直接進入隊列中,若是接收方啓動着監聽,則會向接收方發送消息,若接收方沒有接收到消息,則會保存在ActiveMQ服務器中,直到接收方接收消息爲止。點對點的消息模式能夠有多個接收方和發送方,可是一條消息只會被一個接收方接收到,先連上ActiveMQ接收方,則會先接收到消息,而以後的接收方則接收不到已被接收過的消息。服務器

Java實現ActiveMQ點對點模式,使用ActiveMQ服務器版本爲5.15.3,項目使用Maven構建,其中pom.xml增長ActiveMQ依賴jar配置以下:session

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

點對點的發送方邏輯代碼併發

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MQSender {
    
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    
    public static void main(String[] args) {
        MQSender send = new MQSender();
        send.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//鏈接名是"textMsg"的隊列,此會話將會到該隊列中,若 該隊列不存在,則被建立
            Destination destination = session.createQueue("textMsg");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            TextMessage textMsg = session.createTextMessage("消息內容");
            for(int i = 0 ; i < 10; i ++){
                producer.send(textMsg);
            }
            producer.close();
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

點對點的接收方代碼tcp

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class PTPReceive {
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    public static void main(String[] args) {
        PTPReceive receive = new PTPReceive();
        receive.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("textMsg");
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        String text = ((TextMessage)message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            consumer.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

訂閱/發佈的消息模式分佈式

訂閱/發佈模式有多個接收方和發送方,可是接收方與發送方存在時間上的依賴,若是發送方發送消息時接收方沒有監聽消息,那麼ActiveMQ將不會保存該消息,認爲消息已經發送。這個模式還有一個特色就是發送方發送的消息會被全部的接收方接收到,與點對點模式偏偏相反。ide

Java實現ActiveMQ訂閱/發佈模式,使用ActiveMQ服務器版本爲5.15.3,項目使用Maven構建,其中pom.xml增長ActiveMQ依賴jar配置以下:url

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

訂閱/發佈的發送方代碼code

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MQSender {
    
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    
    public static void main(String[] args) {
        MQSender send = new MQSender();
        send.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//鏈接名是"textMsg"的隊列,此會話將會到該隊列中,若 該隊列不存在,則被建立
            Destination destination = session.createTopic("textMsg");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            TextMessage textMsg = session.createTextMessage("消息內容");
            for(int i = 0 ; i < 10; i ++){
                producer.send(textMsg);
            }
            producer.close();
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

訂閱/發佈的接收方代碼

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class PTPReceive {
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    public static void main(String[] args) {
        PTPReceive receive = new PTPReceive();
        receive.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createTopic("textMsg");
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        String text = ((TextMessage)message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            consumer.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
相關文章
相關標籤/搜索