凱哥java 凱哥java
快速入門:消息的生產者和消費者
生產者的代碼步驟:
1:獲取到鏈接的工廠 ConnectionFactory
2:從工廠中獲取到一個鏈接:connection
3:重建一個數據通訊的通道,能夠發送和接收消息對象:channel
4:經過channel發送消息
5:關閉流
代碼編寫:
java
public class Procuder { public static void main(String[] args) throws IOException, TimeoutException { //1:建立一個connectioFactory工廠對象,並進行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); //設置ip 端口 vhost等 connectionFactory.setHost("192.168.31.128"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2:經過工廠對象獲取到connection對象 Connection connection = connectionFactory.newConnection(); //3:經過connection對象獲取到一個消息通訊的通道 channel Channel channel = connection.createChannel(); // 4:經過channel發送數據 /** * 參數說明: * exchange: 數據路由 routingKey: props: 消息描述body:消息體。字節數組 */ for(int i = 0;i<5;i++){ String mst = "hi Rabbit mq!"+i; channel.basicPublish("","mytest001",null,mst.getBytes()); } System.out.println("===>>>生產者發送消息完成。。。"); //5:關閉流 channel.close(); connection.close(); } }
消費者的代碼步驟:
前三步是同樣的。
4:聲明一個隊列
5:建立一個消費者
6:設置channel
7:獲取消息
代碼以下:
.數組
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1:建立一個connectioFactory工廠對象,並進行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
//設置ip 端口 vhost等
connectionFactory.setHost("192.168.31.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");瀏覽器
//2:經過工廠對象獲取到connection對象 Connection connection = connectionFactory.newConnection(); //3:公共connection對象獲取到一個消息通訊的通道 channel Channel channel = connection.createChannel(); //4:聲明(建立)一個隊列 String queueName = "mytest001"; //這裏能夠使用routingKey /** * Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; */ channel.queueDeclare(queueName,true,false,false,null); //5:建立消費者 QueueingConsumer consumer = new QueueingConsumer(channel); //6:設置channel // String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException; /* * 參數說明: queue:隊列名稱 autoAck:是否自動簽收 consumer:消費者對象 */ channel.basicConsume(queueName,true,consumer); System.out.println("===>>消費者開始處理消息。"); while(true){ //7 獲取消息 QueueingConsumer.Delivery delivery = consumer.nextDelivery();; String msg = new String(delivery.getBody()); System.err.println("消費端: " + msg); } }
}
運行測試:
生產者:
啓動消費者:
接着咱們經過瀏覽器查看管理頁面:
查看channel:
在queues中:
咱們發現多了個channel和多了個queues
下節預告:下節咱們將講解重要對象之:exchange 交換機ide