2、activemq簡介、安裝與入門| 8月更文挑戰

1、activemq簡介與安裝html

 

1. ActiveMQ 簡介java

ActiveMQ是Apache所提供的一個開源的消息系統,徹底採用Java來實現,所以,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服務)規範。JMS是一組Java應用程序接口,它提供消息的建立、發送、讀取等一系列服務。JMS提供了一組公共應用程序接口和響應的語法,相似於Java數據庫的統一訪問接口JDBC,它是一種與廠商無關的API,使得Java程序可以與不一樣廠商的消息組件很好地進行通訊。數據庫

消息隊列中間件是分佈式系統中的重要組件,主要解決異步消息,應用解耦,流量削鋒等問題,從而實現高性能,高可用,可伸縮和最終一致性的架構apache

使用較多的消息隊列有ActiveMQ,RabbitMQ,Kafka,MetaMQ等markdown

2.activemq下載與安裝session

  1. activemq.apache.org/download.ht…

2.目錄結構架構

啓動activemq步驟: bin -->win64(或者win32) -->activemq.bat 點擊運行便可app

3.啓動完畢,http://localhost:8161/admin/index.jsp 用戶名:admin 密碼 admin異步

activemq 入門級代碼

 

能夠啓動多個消費者 來區分 隊列和 主題模式  jsp

1.queue (session會話 )

  1. 生產者
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author Yang
 * 描述: 生產者
 */
public class Producer {

    /**
     * 用戶名
     */
    private static final String userName = ActiveMQConnection.DEFAULT_USER;
    /**
     * 密碼
     */
    private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * url
     */
    private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;

    public void send(String message) {


        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);

            final Connection connection = connectionFactory.createConnection();

            connection.start();
            // true 爲開啓會話 須要提交 
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //建立隊列
            final Queue queue = session.createQueue("test");
            final MessageProducer producer = session.createProducer(queue);

            TextMessage textMessage = session.createTextMessage(message);

            //發送
            producer.send(textMessage);

            // 開啓會話 須要提交 
            // session.commit(); 
            producer.close();
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Producer producer = new Producer();
        producer.send("hello world");
    }
}

複製代碼

2.消費者

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author Yang
 * 描述: 消費者
 */

public class Consumer {

    /**
     * 用戶名
     */
    private static final String userName = ActiveMQConnection.DEFAULT_USER;
    /**
     * 密碼
     */
    private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * url
     */
    private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;

    public void receive() {


        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);

            final Connection connection = connectionFactory.createConnection();

            connection.start();

            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //建立隊列
            final Queue queue = session.createQueue("test");

            final MessageConsumer messageConsumer = session.createConsumer(queue);


            messageConsumer.setMessageListener(n -> {

                try {
                    TextMessage msg = (TextMessage) n;

                    final String text = msg.getText();

                    if (text.equalsIgnoreCase("hello world")) {
                        System.out.println("               接受信息:         " + msg.getText());
                    } else {//默認爲6次
                        System.out.println("     測試重發次數 " );
                        int i = 1 / 0;
                    }


                } catch (JMSException e) {
//                    e.printStackTrace();
                }

            });

        } catch (JMSException e) {
//            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        Consumer consumer = new Consumer();
        consumer.receive();
    }
複製代碼

2.topic  首先要啓動 消費者進行訂閱

1.producer 生產者

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author Yang
 * 描述: 生產者
 */
public class TopicProducer {

    /**
     * 用戶名
     */
    private static final String userName = ActiveMQConnection.DEFAULT_USER;
    /**
     * 密碼
     */
    private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * url
     */
    private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;

    public void send(String message) {


        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);

            final Connection connection = connectionFactory.createConnection();

            connection.start();

            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);


            //建立隊列
            final Topic topic = session.createTopic("topic-test");
            final MessageProducer producer = session.createProducer(topic);

            TextMessage textMessage = session.createTextMessage(message);

            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//            producer.setTimeToLive(10);

//            發送
            producer.send(textMessage);
//            producer.send(textMessage, DeliveryMode.PERSISTENT, 1, 60 * 60 * 24);

//            session.commit();

            producer.close();

            session.close();

            connection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        TopicProducer producer = new TopicProducer();
        producer.send("hello world");

    }
複製代碼

2.consumer 消費者 

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author Yang
 * 描述: 消費者
 */


public class TopicConsumer {

    /**
     * 用戶名
     */
    private static final String userName = ActiveMQConnection.DEFAULT_USER;
    /**
     * 密碼
     */
    private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * url
     */
    private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;

    public void receive() {


        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);

            final Connection connection = connectionFactory.createConnection();
             //設置客戶端id
          //  connection.setClientID("client-1");

            connection.start();

            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //建立隊列
            final Topic topic = session.createTopic("topic-test");

            final MessageConsumer messageConsumer = session.createConsumer(topic);//普通訂閱
//            MessageConsumer consumer = session.createDurableSubscriber(topic,"bb"); //持久訂閱

            messageConsumer.setMessageListener(n -> {

                try {
                    TextMessage msg = (TextMessage) n;

                    final String text = msg.getText();

                    if (text.equalsIgnoreCase("hello world")) {
                        System.out.println("               接受信息:         " + msg.getText());
                    } else {
                        System.out.println("     測試重發次數 ");
                        int i = 1 / 0;
                    }


                } catch (JMSException e) {
//                    e.printStackTrace();
                }

            });

        } catch (JMSException e) {
//            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        TopicConsumer consumer = new TopicConsumer();
        consumer.receive();
    }


}
複製代碼

相關文章
相關標籤/搜索