Java操做RabbitMQ簡單隊列

一、建立工具類

package com.kobe.rabbitmq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionUtils {

    public static Connection getConnection() throws TimeoutException,IOException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("127.0.0.1");

        factory.setPort(5672);

        factory.setVirtualHost("/vhost_kobe");

        factory.setUsername("kobe");

        factory.setPassword("123");

        return factory.newConnection();
    }

}

二、建立生產者

package com.kobe.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class SendSms {
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = ConnectionUtils.getConnection();
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String msg = "hello rabbitmq : " + System.currentTimeMillis();
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("send msg to rabbitmq:" + msg );
        } catch (Exception e ) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

三、建立消費者

package com.kobe.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveSms {

    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = ConnectionUtils.getConnection();
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            DefaultConsumer consumer =  new DefaultConsumer(channel){
                //一旦有消息進入隊列就會觸發
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String (body,"utf-8");
                    System.out.println("receive msg :" + msg);
                }
            };
            //監聽隊列
            channel.basicConsume(QUEUE_NAME,true,consumer);

        } catch (Exception e ) {
            e.printStackTrace();
        } 
    }

}

四、運行生產者,往隊列裏存數據

輸出結果:send msg to rabbitmq:hello rabbitmq : 1534087498613java

五、查看RabbitMQ Management

能夠看獲得數據已經存入隊列ide

六、運行消費者進行消息監聽

輸出結果:receive msg :hello rabbitmq : 1534087498613工具

七、再次運行生產者

輸出結果:send msg to rabbitmq:hello rabbitmq : 1534087638186spa

消費者監聽到以後打印出 receive msg :hello rabbitmq : 1534087638186code

八、查看RabbitMQ Management 

相關文章
相關標籤/搜索