activeMq兩種實現方式

 
 

第一種:點對點


#發佈者
public class Producer { private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER; private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD; private static final String brokerURL = "tcp://192.168.178.X:61616"; public static void main(String[] args) throws JMSException { //1.建立鏈接工廠類 ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL); //2.建立鏈接 Connection connection = factory.createConnection(); //3.啓動鏈接 connection.start(); //4.建立會話對象session(事務transacted爲true,參數2不生效) //acknowledgeMode: Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); //5.目的地 Queue queue = session.createQueue("mq-test-01"); //7.建立發送者 MessageProducer producer = session.createProducer(queue); for(int i=1;i<=10;i++) { //6.消息對象 TextMessage message = session.createTextMessage(); message.setText("消息"+i); //8.發送消息 producer.send(message); } //9.會話提交 // session.commit(); //10.關閉鏈接 connection.close(); } }
#消費者
public class Consumer1 {
    
    private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER;
    private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD;
    private static final String brokerURL = "tcp://192.168.178.X:61616";

    public static void main(String[] args) throws JMSException {
        //1.建立鏈接工廠類
        ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL);
        //2.建立鏈接
        Connection connection = factory.createConnection();
        //3.啓動鏈接
        connection.start();
        
        //4.建立會話對象session(事務transacted爲true,參數2不生效)
        //acknowledgeMode:
        Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
        
        //5.目的地
        Queue queue = session.createQueue("mq-test-01");
        
        //6.接收消息對象
        MessageConsumer consumer = session.createConsumer(queue);
        
        //7.經過監聽器接收消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                //獲取消息
                try {
                    String msg = textMessage.getText();
                    System.out.println(msg);
                } catch (JMSException e) {
                }
            }
        });
        
    }
}

第二種: 發佈者/訂閱者

啓動順序:先訂閱、再發布

#訂閱者
public class Subscriber1 {
    
    private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER;
    private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD;
    private static final String brokerURL = "tcp://192.168.129.10:61616";

    public static void main(String[] args) throws JMSException {
        //1.建立鏈接工廠類
        ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL);
        //2.建立鏈接
        Connection connection = factory.createConnection();
        //3.啓動鏈接
        connection.start();
        
        //4.建立會話對象session(事務transacted爲true,參數2不生效)
        //acknowledgeMode:
        Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
        
        //5.目的地
        Topic topic = session.createTopic("mq-test-02");
        
        //6.接收消息對象
        MessageConsumer consumer = session.createConsumer(topic);
        
        //7.經過監聽器接收消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                //獲取消息
                try {
                    String msg = textMessage.getText();
                    System.out.println(msg);
                } catch (JMSException e) {
                }
            }
        });
        
    }
}

 

#發佈者
public class Publisher {
    
    private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER;
    private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD;
    private static final String brokerURL = "tcp://192.168.129.10:61616";

    public static void main(String[] args) throws JMSException {
        //1.建立鏈接工廠類
        ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL);
        //2.建立鏈接
        Connection connection = factory.createConnection();
        //3.啓動鏈接
        connection.start();
        
        //4.建立會話對象session(事務transacted爲true,參數2不生效)
        //acknowledgeMode:
        Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
        
        //5.目的地
        Topic topic = session.createTopic("mq-test-02");
        //7.建立發送者
        MessageProducer producer = session.createProducer(topic);
        for(int i=1;i<=10;i++) {
            //6.消息對象
            TextMessage message = session.createTextMessage();
            message.setText("消息"+i);
            //8.發送消息
            producer.send(message);
            //設置自動持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        }
        //9.會話提交
//        session.commit();
        
        //10.關閉鏈接
        connection.close();
    }
}
相關文章
相關標籤/搜索